source: XIOS3/trunk/src/transport/p2p_client_buffer.cpp @ 2556

Last change on this file since 2556 was 2556, checked in by ymipsl, 10 months ago

First version on the point to point transport protocol, activated by the variable : transport_protocol="p2p"

YM

  • Property svn:executable set to *
File size: 10.0 KB
Line 
1#include "p2p_client_buffer.hpp"
2#include "event_client.hpp"
3#include "timer.hpp"
4
5namespace xios
6{
7 
8  extern CLogType logProtocol;
9
10  CP2pClientBuffer::CP2pClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) : interComm_(interComm), serverRank_(serverRank), interCommMerged_(interCommMerged), intraServerRank_(intraServerRank)
11  {
12   
13    //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
14    //control_[CONTROL_ADDR] = 0 ;
15    //control_[CONTROL_FINALIZE] = 0 ;
16    //sendNewBuffer() ;
17    createWindow(commSelf, interCommMerged, intraServerRank ) ;
18    char dummy ;
19    MPI_Irecv(&dummy, 0, MPI_CHAR, intraServerRank, 22, interCommMerged, &finalizeRequest_) ;
20  }
21
22  void CP2pClientBuffer::createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank )
23  {
24    CTimer::get("create Windows").resume() ;
25    //MPI_Comm interComm ;
26    //MPI_Intercomm_create(commSelf, 0, interCommMerged, intraServerRank, 0, &interComm) ;
27    //MPI_Intercomm_merge(interComm, false, &winComm_) ;
28    //int rank ;
29    //MPI_Comm_rank(winComm_,&rank) ;
30    //info(logProtocol)<<"Windows rank="<<rank<<endl ;
31    //CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
32    //MPI_Comm_free(&interComm) ;
33   
34    maxWindows_=MAX_WINDOWS ;   
35    windows_.resize(maxWindows_) ;
36    usedWindows_.resize(maxWindows_,false) ;
37    //for(int i=0;i<maxWindows_;++i)
38    //{
39    //  MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
40    //  CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
41    //}
42    currentWindow_=-1 ;
43   
44    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
45    //CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
46
47    //MPI_Barrier(winComm_) ;
48    //MPI_Win_attach(winControl_, control_, controlSize_*sizeof(MPI_Aint)) ;
49    //MPI_Barrier(winComm_) ;
50    CTimer::get("create Windows").suspend() ;
51 
52 //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, winControl_) ;
53 //   MPI_Win_unlock(0,winControl_) ;
54
55  } 
56
57
58
59  void CP2pClientBuffer::newBuffer(size_t size, bool fixed)
60  { 
61    currentWindow_=(currentWindow_+1)%maxWindows_ ;
62    if (usedWindows_[currentWindow_]) 
63    {
64      ERROR("void CP2pClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ;
65    }
66    else usedWindows_[currentWindow_]=true ;
67    buffers_.push_back(new CBuffer(windows_[currentWindow_], size, fixed)); 
68    currentBuffer_=buffers_.back() ;
69    info(logProtocol)<<"   Nb attached memory blocs="<<buffers_.size()<<endl ;
70  }
71
72  bool CP2pClientBuffer::isBufferFree(size_t size)
73  {
74    if (buffers_.size()>maxWindows_-1) return false ; 
75    CBuffer* buffer ;
76    if (buffers_.size()==0) return true ;
77    else if (!fixed) return true ;
78    else 
79    {
80      buffer = buffers_.back() ;
81      if (buffer->remain()>=size) return true ;
82      else
83      {
84        if (buffer->isFixed()) 
85        {
86          if ( size > buffer->getSize()) return true ;
87          else return false ;
88        }
89        else return true ;
90      }
91    }
92  }
93 
94  int CP2pClientBuffer::writeBuffer(char* buffer, size_t size)
95  {
96    MPI_Aint addr ;
97    size_t start ;
98    size_t count ;
99    int nbBlocs=0 ;
100
101    if (isBufferFree(size))
102    {
103      while (size > 0)
104      {
105        if (buffers_.empty())
106        {
107          if (fixed_) newBuffer(fixedSize_,fixed_) ;
108          else
109          { 
110            if (currentBufferSize_==0) currentBufferSize_=size ;
111            newBuffer(currentBufferSize_, fixed_) ;
112          }
113        }
114        CBuffer* currentBuffer = buffers_.back() ;
115
116        //MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, windows_[currentWindow_]) ;
117        currentBuffer->write(&buffer, size, addr, start, count) ;
118        if (count > 0) 
119        {
120          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ;
121          nbBlocs++ ; 
122        }
123
124        currentBuffer->write(&buffer, size, addr, start, count) ;
125        //MPI_Win_unlock(0,windows_[currentWindow_]) ;
126
127        if (count > 0) 
128        {
129          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ;
130          nbBlocs++ ; 
131        }
132
133        if (size>0) 
134        {
135          if (fixed_) 
136          {
137            currentBufferSize_ = fixedSize_ ;
138            newBuffer(currentBufferSize_,fixed_) ;
139          }
140          else
141          {
142            currentBufferSize_ = max((size_t)(currentBufferSize_*growingFactor_), size) ;
143            newBuffer(currentBufferSize_,fixed_) ;
144          }
145        } 
146      }     
147      // send message here ?
148      return nbBlocs ;
149    }
150    else return 0 ;
151  }
152
153  void CP2pClientBuffer::freeBuffer(MPI_Aint addr)
154  {
155    if (addr != 0)
156    {
157      while(freeBloc(addr)) ;
158    }
159   
160    if (isFinalized_ && !buffers_.empty() && buffers_.front()->getCount()==0) 
161    {
162      delete buffers_.front() ;
163      buffers_.pop_front() ;
164    }
165  }
166 
167  bool CP2pClientBuffer::freeBloc(MPI_Aint addr)
168  {
169    SBloc& bloc = blocs_.front() ;
170    bloc.buffer->free(bloc.start, bloc.count) ;
171    if (bloc.buffer->getCount()==0) 
172      if (buffers_.size()>1) 
173      { 
174        usedWindows_[bloc.window]=false ;
175        delete buffers_.front() ;
176        buffers_.pop_front() ;
177      }
178   
179    if (addr != bloc.addr) 
180    {
181      blocs_.pop_front() ;
182      return true ;
183    }
184    else 
185    {
186      blocs_.pop_front() ;
187      return false ;
188    }
189
190  }
191
192  bool CP2pClientBuffer::writeEvent(size_t timeline, CEventClient& event)
193  {
194    size_t size = event.getSize() ;
195    if (isBufferFree(size))
196    {
197      CBufferOut buffer(size) ;
198      event.send(timeline, size, &buffer) ;
199      size_t bufferSizeBefore = currentBufferSize_ ; 
200      int nbBlocs = writeBuffer((char*)buffer.start(),buffer.count()) ;
201      if (currentBufferSize_!=bufferSizeBefore) sendResizeBufferEvent(timeline,currentBufferSize_) ;
202      sendTimelineEvent(timeline, event.getNbSender(), nbBlocs) ;
203      return true ;
204    }
205    else return false ;
206  }
207
208  void CP2pClientBuffer::eventLoop(void)
209  {
210    // check to free requests
211    int flag ;
212    bool out = true;
213    SRequest request ; 
214    while (!requests_.empty() && out) 
215    {
216      request = requests_.front() ;
217      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").resume() ;
218      MPI_Test(&request.mpiRequest, &flag, MPI_STATUS_IGNORE) ;
219      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").suspend() ;
220      if (flag==true)
221      {
222        delete request.buffer ;
223        requests_.pop_front() ;
224      }
225      else out=false; 
226    }
227   
228    // check to free blocs
229//    MPI_Aint addr ;
230//    MPI_Aint finalize ;
231//    MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, winControl_) ;
232//    addr = control_[CONTROL_ADDR] ;
233//    control_[CONTROL_ADDR] = 0 ;
234//    finalize = control_[CONTROL_FINALIZE] ;
235//    MPI_Win_unlock(0, winControl_) ;
236    MPI_Aint addr=0 ;
237    auto it=sentBlocRequest_.begin() ;
238    for( ; it!=sentBlocRequest_.end() ; ++it)
239    {
240      int flag=true ;
241      MPI_Status status ; 
242      MPI_Test(&it->mpiRequest, &flag, &status) ;
243      if (flag==false) 
244      {
245        ++it ;
246        break ;
247      }
248      else addr = it->addr;
249    }
250    if (addr!=0) sentBlocRequest_.erase(sentBlocRequest_.begin(), it) ;
251    freeBuffer(addr) ;
252
253    // if (finalize==1) isFinalized_=true ;
254    listenFinalize() ;
255  }
256
257  void CP2pClientBuffer::listenFinalize(void)
258  {
259    if (!isFinalized_)
260    {
261      int flag ;
262      MPI_Status status ;
263      MPI_Test(&finalizeRequest_,&flag, &status) ;
264      if (flag) isFinalized_=true;
265    }
266  }
267
268  void CP2pClientBuffer::sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs)
269  {
270    ostringstream outStr ;
271    SRequest request ;
272    request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int))*nbBlocs) ; 
273    *(request.buffer)<<timeline<<nbSenders<<nbBlocs ;
274    if (info.isActive(logProtocol))  outStr<<"New timeline event sent to server rank "<<serverRank_<<" : timeLine="<<timeline<<"  nbSenders="<<nbSenders<<"  nbBlocs="<<nbBlocs<<endl ;
275    auto it = blocs_.end() ;
276    for(int i=0 ; i<nbBlocs; ++i,--it) ;
277    for(int i=0 ; i<nbBlocs; ++i,++it) 
278    {
279      *(request.buffer) << it->addr << it->count << it->window;
280   
281      if (info.isActive(logProtocol))
282      {
283        size_t checksum=0 ;
284        for(size_t j=0;j<it->count;j++) checksum+=((unsigned char*)(it->addr))[j] ;
285        outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  ;  " ;
286      }
287
288      sentBlocRequest_.emplace_back() ;
289      sentBlocRequest_.back().addr = it->addr ; 
290      MPI_Issend((void*)(it->addr), it->count, MPI_CHAR, intraServerRank_, 21, interCommMerged_, &sentBlocRequest_.back().mpiRequest) ;
291    }
292    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ;
293    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
294    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ;
295    info(logProtocol)<<outStr.str()<<endl ;
296    requests_.push_back(request) ;
297  }
298
299  void CP2pClientBuffer::sendResizeBufferEvent(size_t timeline, size_t size)
300  {
301    SRequest request ;
302    request.buffer = new CBufferOut(sizeof(EVENT_BUFFER_RESIZE)+sizeof(timeline)+sizeof(size)) ; 
303    *(request.buffer)<<EVENT_BUFFER_RESIZE<<timeline<<size ;
304    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
305    requests_.push_back(request) ;
306  }
307
308  void CP2pClientBuffer::sendNewBuffer(void)
309  {
310//    MPI_Aint controlAddr ;
311//    MPI_Get_address(control_, &controlAddr) ;
312//    MPI_Send(&controlAddr, 1, MPI_AINT, intraServerRank_, 20, interCommMerged_) ;
313  }
314
315}
Note: See TracBrowser for help on using the repository browser.