source: XIOS3/trunk/src/transport/p2p_client_buffer.cpp @ 2563

Last change on this file since 2563 was 2563, checked in by jderouillat, 10 months ago

Delete debugging code lines pushed by mistake

  • Property svn:executable set to *
File size: 10.4 KB
Line 
1#include "p2p_client_buffer.hpp"
2#include "event_client.hpp"
3#include "timer.hpp"
4
5namespace xios
6{
7 
8  extern CLogType logProtocol;
9
10  CP2pClientBuffer::CP2pClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) : interComm_(interComm), serverRank_(serverRank), interCommMerged_(interCommMerged), intraServerRank_(intraServerRank)
11  {
12   
13    //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
14    //control_[CONTROL_ADDR] = 0 ;
15    //control_[CONTROL_FINALIZE] = 0 ;
16    sendNewBuffer() ;
17    createWindow(commSelf, interCommMerged, intraServerRank ) ;
18    char dummy ;
19    MPI_Irecv(&dummy, 0, MPI_CHAR, intraServerRank, 22, interCommMerged, &finalizeRequest_) ;
20  }
21
22  void CP2pClientBuffer::createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank )
23  {
24    CTimer::get("create Windows").resume() ;
25    //MPI_Comm interComm ;
26    //MPI_Intercomm_create(commSelf, 0, interCommMerged, intraServerRank, 0, &interComm) ;
27    //MPI_Intercomm_merge(interComm, false, &winComm_) ;
28    //int rank ;
29    //MPI_Comm_rank(winComm_,&rank) ;
30    //info(logProtocol)<<"Windows rank="<<rank<<endl ;
31    //CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
32    //MPI_Comm_free(&interComm) ;
33   
34    maxWindows_=MAX_WINDOWS ;   
35    windows_.resize(maxWindows_) ;
36    usedWindows_.resize(maxWindows_,false) ;
37    //for(int i=0;i<maxWindows_;++i)
38    //{
39    //  MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
40    //  CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
41    //}
42    currentWindow_=-1 ;
43   
44    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
45    //CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
46
47    //MPI_Barrier(winComm_) ;
48    //MPI_Win_attach(winControl_, control_, controlSize_*sizeof(MPI_Aint)) ;
49    //MPI_Barrier(winComm_) ;
50    CTimer::get("create Windows").suspend() ;
51 
52 //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, winControl_) ;
53 //   MPI_Win_unlock(0,winControl_) ;
54
55  } 
56
57
58
59  void CP2pClientBuffer::newBuffer(size_t size, bool fixed)
60  { 
61    currentWindow_=(currentWindow_+1)%maxWindows_ ;
62    if (usedWindows_[currentWindow_]) 
63    {
64      ERROR("void CP2pClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ;
65    }
66    else usedWindows_[currentWindow_]=true ;
67    buffers_.push_back(new CBuffer(windows_[currentWindow_], size, fixed)); 
68    currentBuffer_=buffers_.back() ;
69    info(logProtocol)<<"   Nb attached memory blocs="<<buffers_.size()<<endl ;
70  }
71
72  bool CP2pClientBuffer::isBufferFree(size_t size)
73  {
74    if (sentBlocRequest_.size()> maxSentBlocRequests_) return false ;
75    if (buffers_.size()>maxWindows_-1) return false ; 
76    CBuffer* buffer ;
77    if (buffers_.size()==0) return true ;
78    else if (!fixed) return true ;
79    else 
80    {
81      buffer = buffers_.back() ;
82      if (buffer->remain()>=size) return true ;
83      else
84      {
85        if (buffer->isFixed()) 
86        {
87          if ( size > buffer->getSize()) return true ;
88          else return false ;
89        }
90        else return true ;
91      }
92    }
93  }
94 
95  int CP2pClientBuffer::writeBuffer(char* buffer, size_t size)
96  {
97    MPI_Aint addr ;
98    size_t start ;
99    size_t count ;
100    int nbBlocs=0 ;
101
102    if (isBufferFree(size))
103    {
104      while (size > 0)
105      {
106        if (buffers_.empty())
107        {
108          if (fixed_) newBuffer(fixedSize_,fixed_) ;
109          else
110          { 
111            if (currentBufferSize_==0) currentBufferSize_=size ;
112            newBuffer(currentBufferSize_, fixed_) ;
113          }
114        }
115        CBuffer* currentBuffer = buffers_.back() ;
116
117        //MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, windows_[currentWindow_]) ;
118        currentBuffer->write(&buffer, size, addr, start, count) ;
119        if (count > 0) 
120        {
121          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ;
122          nbBlocs++ ; 
123        }
124
125        currentBuffer->write(&buffer, size, addr, start, count) ;
126        //MPI_Win_unlock(0,windows_[currentWindow_]) ;
127
128        if (count > 0) 
129        {
130          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ;
131          nbBlocs++ ; 
132        }
133
134        if (size>0) 
135        {
136          if (fixed_) 
137          {
138            currentBufferSize_ = fixedSize_ ;
139            newBuffer(currentBufferSize_,fixed_) ;
140          }
141          else
142          {
143            currentBufferSize_ = max((size_t)(currentBufferSize_*growingFactor_), size) ;
144            newBuffer(currentBufferSize_,fixed_) ;
145          }
146        } 
147      }     
148      // send message here ?
149      return nbBlocs ;
150    }
151    else return 0 ;
152  }
153
154  void CP2pClientBuffer::freeBuffer(MPI_Aint addr)
155  {
156    if (addr != 0)
157    {
158      while(freeBloc(addr)) ;
159    }
160   
161    if (isFinalized_ && !buffers_.empty() && buffers_.front()->getCount()==0) 
162    {
163      delete buffers_.front() ;
164      buffers_.pop_front() ;
165    }
166  }
167 
168  bool CP2pClientBuffer::freeBloc(MPI_Aint addr)
169  {
170    SBloc& bloc = blocs_.front() ;
171   
172    if (info.isActive(logProtocol))
173    {
174      size_t checksum=0 ;
175      for(size_t j=0;j<bloc.count;j++) checksum+=((unsigned char*)(bloc.addr))[j] ;
176      info(logProtocol)<<"free bloc sent to server rank "<<serverRank_<<" : addr="<<bloc.addr<<"  start="<<bloc.start<<"  count="<<bloc.count<<"  checksum="<<checksum<<endl ;
177    }
178
179    bloc.buffer->free(bloc.start, bloc.count) ;
180    if (bloc.buffer->getCount()==0) 
181      if (buffers_.size()>1) 
182      { 
183        usedWindows_[bloc.window]=false ;
184        delete buffers_.front() ;
185        buffers_.pop_front() ;
186      }
187   
188    if (addr != bloc.addr) 
189    {
190      blocs_.pop_front() ;
191      return true ;
192    }
193    else 
194    {
195      blocs_.pop_front() ;
196      return false ;
197    }
198
199  }
200
201  bool CP2pClientBuffer::writeEvent(size_t timeline, CEventClient& event)
202  {
203    size_t size = event.getSize() ;
204    if (isBufferFree(size))
205    {
206      CBufferOut buffer(size) ;
207      event.send(timeline, size, &buffer) ;
208      size_t bufferSizeBefore = currentBufferSize_ ; 
209      int nbBlocs = writeBuffer((char*)buffer.start(),buffer.count()) ;
210      if (currentBufferSize_!=bufferSizeBefore) sendResizeBufferEvent(timeline,currentBufferSize_) ;
211      sendTimelineEvent(timeline, event.getNbSender(), nbBlocs) ;
212      return true ;
213    }
214    else return false ;
215  }
216
217  void CP2pClientBuffer::eventLoop(void)
218  {
219    // check to free requests
220    int flag ;
221    bool out = true;
222    SRequest request ; 
223    while (!requests_.empty() && out) 
224    {
225      request = requests_.front() ;
226      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").resume() ;
227      MPI_Test(&request.mpiRequest, &flag, MPI_STATUS_IGNORE) ;
228      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").suspend() ;
229      if (flag==true)
230      {
231        delete request.buffer ;
232        requests_.pop_front() ;
233      }
234      else out=false; 
235    }
236   
237    // check to free blocs
238//    MPI_Aint addr ;
239//    MPI_Aint finalize ;
240//    MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, winControl_) ;
241//    addr = control_[CONTROL_ADDR] ;
242//    control_[CONTROL_ADDR] = 0 ;
243//    finalize = control_[CONTROL_FINALIZE] ;
244//    MPI_Win_unlock(0, winControl_) ;
245    MPI_Aint addr=0 ;
246    auto it=sentBlocRequest_.begin() ;
247    for( ; it!=sentBlocRequest_.end() ; ++it)
248    {
249      int flag=true ;
250      MPI_Status status ; 
251      MPI_Test(&it->mpiRequest, &flag, &status) ;
252      if (flag==false) 
253      {
254//        ++it ;
255        break ;
256      }
257      else addr = it->addr;
258    }
259    if (addr!=0) sentBlocRequest_.erase(sentBlocRequest_.begin(), it) ;
260    freeBuffer(addr) ;
261
262    // if (finalize==1) isFinalized_=true ;
263    listenFinalize() ;
264  }
265
266  void CP2pClientBuffer::listenFinalize(void)
267  {
268    if (!isFinalized_)
269    {
270      int flag ;
271      MPI_Status status ;
272      MPI_Test(&finalizeRequest_,&flag, &status) ;
273      if (flag) isFinalized_=true;
274    }
275  }
276
277  void CP2pClientBuffer::sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs)
278  {
279    ostringstream outStr ;
280    SRequest request ;
281    request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int))*nbBlocs) ; 
282    *(request.buffer)<<timeline<<nbSenders<<nbBlocs ;
283    if (info.isActive(logProtocol))  outStr<<"New timeline event sent to server rank "<<serverRank_<<" : timeLine="<<timeline<<"  nbSenders="<<nbSenders<<"  nbBlocs="<<nbBlocs<<endl ;
284    auto it = blocs_.end() ;
285    for(int i=0 ; i<nbBlocs; ++i,--it) ;
286    for(int i=0 ; i<nbBlocs; ++i,++it) 
287    {
288      *(request.buffer) << it->addr << it->count << it->window;
289   
290      if (info.isActive(logProtocol))
291      {
292        size_t checksum=0 ;
293        for(size_t j=0;j<it->count;j++) checksum+=((unsigned char*)(it->addr))[j] ;
294        outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  ;  " ;
295      }
296
297      sentBlocRequest_.emplace_back() ;
298      sentBlocRequest_.back().addr = it->addr ; 
299      MPI_Issend((void*)(it->addr), it->count, MPI_CHAR, intraServerRank_, 21, interCommMerged_, &sentBlocRequest_.back().mpiRequest) ;
300    }
301    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ;
302    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
303    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ;
304    info(logProtocol)<<outStr.str()<<endl ;
305    requests_.push_back(request) ;
306  }
307
308  void CP2pClientBuffer::sendResizeBufferEvent(size_t timeline, size_t size)
309  {
310    SRequest request ;
311    request.buffer = new CBufferOut(sizeof(EVENT_BUFFER_RESIZE)+sizeof(timeline)+sizeof(size)) ; 
312    *(request.buffer)<<EVENT_BUFFER_RESIZE<<timeline<<size ;
313    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
314    requests_.push_back(request) ;
315  }
316
317  void CP2pClientBuffer::sendNewBuffer(void)
318  {
319    MPI_Aint controlAddr ;
320//    MPI_Get_address(control_, &controlAddr) ;
321    MPI_Send(&controlAddr, 1, MPI_AINT, intraServerRank_, 20, interCommMerged_) ;
322  }
323
324}
Note: See TracBrowser for help on using the repository browser.