source: XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp @ 2247

Last change on this file since 2247 was 2246, checked in by ymipsl, 3 years ago
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 11.6 KB
RevLine 
[591]1#include "xios_spl.hpp"
[300]2#include "exception.hpp"
[380]3#include "log.hpp"
[300]4#include "buffer_out.hpp"
5#include "buffer_client.hpp"
[317]6#include "cxios.hpp"
[382]7#include "mpi.hpp"
[347]8#include "tracer.hpp"
[2130]9#include "timeline_events.hpp"
[2246]10#include "timer.hpp"
[300]11
[335]12namespace xios
[300]13{
[732]14  size_t CClientBuffer::maxRequestSize = 0;
[509]15
[1757]16  CClientBuffer::CClientBuffer(MPI_Comm interComm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize)
[917]17    : interComm(interComm)
[1757]18    , clientRank_(clientRank)
[917]19    , serverRank(serverRank)
20    , bufferSize(bufferSize)
[1201]21    , estimatedMaxEventSize(estimatedMaxEventSize)
22    , maxEventSize(0)
[917]23    , current(0)
24    , count(0)
25    , pending(false)
[1757]26    , hasWindows(false) 
27    , windows_(windows)
[300]28  {
[1757]29    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ;
30    else hasWindows=true ;
31
[2246]32      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ;
33      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ;
34      buffer[0] = bufferHeader[0]+headerSize_ ;
35      buffer[1] = bufferHeader[1]+headerSize_ ;
36      firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_ ;
37      firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_ ;
38      bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ;
39      bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ;
40      control[0]=(size_t*)bufferHeader[0] + controlOffset_ ;
41      control[1]=(size_t*)bufferHeader[1] + controlOffset_ ;
42      notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ;
43      notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ;
[1757]44
45      *firstTimeLine[0]=0 ;
46      *firstTimeLine[1]=0 ;
47      *bufferCount[0]=0 ;
48      *bufferCount[1]=0 ;
49      *control[0]=0 ;
50      *control[1]=0 ;
[2246]51      *notify[0]=notifyNothing_ ;
52      *notify[1]=notifyNothing_ ;
[1757]53      winState[0]=false ;
54      winState[1]=false ;
55
56
57    if (hasWindows)
58    { 
59   
[2246]60      MPI_Aint buffSize=bufferSize+headerSize_ ;
[2221]61      MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ;
62      MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ;
[1757]63   
64      MPI_Group group ;
65      int groupSize,groupRank ;
66      MPI_Win_get_group(windows_[0], &group) ;
67      MPI_Group_size(group, &groupSize) ;
68      MPI_Group_rank(group, &groupRank) ;
69      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
70
71      MPI_Win_get_group(windows_[1], &group) ;
72      MPI_Group_size(group, &groupSize) ;
73      MPI_Group_rank(group, &groupRank) ;
74      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
75
76      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
77      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
78
79      MPI_Win_unlock(clientRank_, windows_[1]) ;
80      MPI_Win_unlock(clientRank_, windows_[0]) ;
81    } 
[732]82    retBuffer = new CBufferOut(buffer[current], bufferSize);
[1757]83    info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl;
[300]84  }
[509]85
[1757]86  MPI_Aint CClientBuffer::getWinAddress(int i)
87  {
88     MPI_Aint address ;
89     
90     if (hasWindows) MPI_Get_address(bufferHeader[i], &address) ;
91     else address=0 ;
92
93     return address ;
94  }
95
[300]96  CClientBuffer::~CClientBuffer()
97  {
[1757]98     //freeWindows() ;
99     if (hasWindows)
100     {
101       MPI_Win_detach(windows_[0],bufferHeader[0]);
102       MPI_Win_detach(windows_[1],bufferHeader[1]);
103       MPI_Free_mem(bufferHeader[0]) ;
104       MPI_Free_mem(bufferHeader[1]) ;
105     }
106     delete retBuffer;
[300]107  }
[509]108
[1757]109  void CClientBuffer::lockBuffer(void)
110  {
[2246]111    CTimer::get("lock buffer").resume();
[1757]112    if (hasWindows)
113    {
114      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ;
115      winState[current]=true ;
116    }
[2246]117    CTimer::get("lock buffer").suspend();
[1757]118  }
119
120  void CClientBuffer::unlockBuffer(void)
121  {
[2246]122    CTimer::get("unlock buffer").resume();
[1757]123    if (hasWindows)
124    {
125      MPI_Win_unlock(clientRank_, windows_[current]) ;
126      winState[current]=false ;
127    }
[2246]128    CTimer::get("unlock buffer").suspend();
[1757]129  }
130
[1227]131  StdSize CClientBuffer::remain(void)
[300]132  {
[732]133    return bufferSize - count;
[300]134  }
[509]135
[1227]136  bool CClientBuffer::isBufferFree(StdSize size)
[300]137  {
[1757]138 
139    lockBuffer();
[2130]140    count=*bufferCount[current] ;
141   
142    if (resizingBufferStep_ > 0 ) return false ;
143
[732]144    if (size > bufferSize)
[2130]145    {
146      resizingBufferStep_=1 ;
[2246]147      *firstTimeLine[current]=0 ;
[2130]148      newBufferSize_=size ;
149      return false ;
150    }
[509]151
[1201]152    if (size > maxEventSize)
153    {
154      maxEventSize = size;
155
156      if (size > estimatedMaxEventSize)
157        error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank
158                 << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl;
159
160      if (size > maxRequestSize) maxRequestSize = size;
161    }
[2130]162   
163    if (size > remain())
164    {
165      if (isGrowableBuffer_)
166      {
167        resizingBufferStep_ = 1 ;
[2246]168        *firstTimeLine[current]=0 ;
[2130]169        newBufferSize_ = (count+size)*growFactor_ ;
170      } 
171      return false ;
172    }
173    else return true ;
[300]174  }
[509]175
176
[1757]177  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size)
[300]178  {
[732]179    if (size <= remain())
[300]180    {
[732]181      retBuffer->realloc(buffer[current] + count, size);
182      count += size;
[1757]183      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ;
184      *bufferCount[current]=count ;
[732]185      return retBuffer;
[300]186    }
187    else
188    {
[1227]189      ERROR("CBufferOut* CClientBuffer::getBuffer(StdSize size)",
[732]190            << "Not enough space in buffer, this should not have happened...");
191      return NULL;
[300]192    }
[509]193  }
194
[1757]195  void CClientBuffer::infoBuffer(void)
[300]196  {
[1757]197     
198      char checksum=0 ;
199      for(size_t i=0;i<*bufferCount[current];i++) checksum=checksum+buffer[current][i] ;
200 
201      char checksumFirst=0 ;
202      for(size_t i=5; i<10 && i<*bufferCount[current] ;i++) checksumFirst=checksumFirst+buffer[current][i] ;
203 
204      char checksumLast=0 ;
205      for(size_t i=(*bufferCount[current]<10)?0:*bufferCount[current]-10; i<*bufferCount[current] ; i++) checksumLast=checksumLast+buffer[current][i] ;
206 
207      info(45)<<"CClientBuffer::infoBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current<<" WinState "<<winState[current]
208              <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<" checksum "<<(int)checksum<<" "
209              <<(int)buffer[current][0]<<" "<<(int)buffer[current][1]<<" "<<(int)buffer[current][2]<<" "<<(int)buffer[current][3]<<" "<<(int)buffer[current][4]<<" "<<(int)buffer[current][5]<<" "
210              <<(int)buffer[current][6]<<" "<<(int)buffer[current][7]<<" "<<(int)buffer[current][8]<<" "<<(int)buffer[current][9]<<" "<<(int)buffer[current][10]<<" "<<(int)buffer[current][11]<<endl ;
211
212  }
213
214  bool CClientBuffer::checkBuffer(bool send)
215  {
[1639]216    MPI_Status status;
[732]217    int flag;
[2246]218   
219    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
220    MPI_Win_unlock(clientRank_, windows_[0]) ;
[509]221
[2246]222    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
223    MPI_Win_unlock(clientRank_, windows_[1]) ;
224
[300]225    if (pending)
226    {
[732]227      traceOff();
[1639]228      MPI_Test(&request, &flag, &status);
[732]229      traceOn();
230      if (flag == true) pending = false;
[300]231    }
232
233    if (!pending)
234    {
[2130]235      if (!send && resizingBufferStep_==0 ) return false ;
236
[732]237      if (count > 0)
[300]238      {
[2246]239        double time=MPI_Wtime() ;
240        if (time - lastCheckedWithNothing_ > latency_)
[1757]241        {
[2246]242          lockBuffer() ;
243          if (*bufferCount[current] > 0)
244          {
245            MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request);
246            if (resizingBufferStep_==4) resizingBufferStep_=0 ;
247            pending = true;
248            *firstTimeLine[current]=0 ;
249            *bufferCount[current]=0 ;
[1757]250
[2246]251             unlockBuffer() ;
[1757]252
[2246]253            if (current == 1) current = 0;
254            else current = 1;
255            count = 0;
256          }
257          else 
258          {
259            unlockBuffer() ;
260            lastCheckedWithNothing_ = time ;
261          }
[1757]262        }
[300]263      }
[2130]264      else
265      {
266        if (resizingBufferStep_==1) resizeBufferNotify() ;
[2246]267        else if (resizingBufferStep_==2) isNotifiedChangeBufferSize() ;
268        else if (resizingBufferStep_==3) resizeBuffer(newBufferSize_) ;
[2130]269      }
[300]270    }
[732]271
272    return pending;
[300]273  }
[509]274
[2130]275  void CClientBuffer::resizeBufferNotify(void)
276  {
277    // notify server of changing buffers size
278    lockBuffer() ;
279    int size=sizeof(int)+sizeof(size_t) ;
280    CBufferOut* bufOut = this->getBuffer(timelineEventNotifyChangeBufferSize, size);
281    bufOut->put(size);
282    bufOut->put(timelineEventNotifyChangeBufferSize);
283    resizingBufferStep_ = 2 ;
284    unlockBuffer() ;
285  }
286
287  void CClientBuffer::resizeBuffer(size_t newSize)
288  {
[2246]289
[2130]290    if (hasWindows)
291    { 
292      MPI_Win_detach(windows_[0], bufferHeader[0]) ;
293      MPI_Win_detach(windows_[1], bufferHeader[1]) ;
294    }
295    MPI_Free_mem(bufferHeader[0]) ;
296    MPI_Free_mem(bufferHeader[1]) ;
297
298    bufferSize=newSize ;
[2246]299    MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ;
300    MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ;
301    buffer[0] = bufferHeader[0]+headerSize_ ;
302    buffer[1] = bufferHeader[1]+headerSize_ ;
303    firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_;
304    firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_;
305    bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ;
306    bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ;
307    control[0]=(size_t*)bufferHeader[0] + controlOffset_ ;  // control=0 => nothing ; control=1 => changeBufferSize
308    control[1]=(size_t*)bufferHeader[1] + controlOffset_ ;
309    notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ;
310    notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ;
[2130]311
312    *firstTimeLine[0]=0 ;
313    *firstTimeLine[1]=0 ;
314    *bufferCount[0]=0 ;
315    *bufferCount[1]=0 ;
316    *control[0]=0 ;
317    *control[1]=0 ;
[2246]318    *notify[0] = notifyNothing_ ;
319    *notify[1] = notifyNothing_ ;
[2130]320    winState[0]=false ;
321    winState[1]=false ;
322    current=0 ;
323   
324    if (hasWindows)
325    { 
326   
[2246]327      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ;
328      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ;
[2130]329         
330      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
331      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
332
333      MPI_Win_unlock(clientRank_, windows_[1]) ;
334      MPI_Win_unlock(clientRank_, windows_[0]) ;
335    } 
336
337    lockBuffer() ;
338 
[2221]339    int size=sizeof(int)+2*sizeof(size_t)+2*sizeof(MPI_Aint) ;
[2130]340    CBufferOut* bufOut = this->getBuffer(timelineEventChangeBufferSize, size);
341    bufOut->put(size);
342    bufOut->put(timelineEventChangeBufferSize);
343    bufOut->put(newBufferSize_);
344    bufOut->put(this->getWinAddress(0));
345    bufOut->put(this->getWinAddress(1));
346
[2246]347    resizingBufferStep_=4;
[2130]348    unlockBuffer() ;
[2246]349    info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinAddress(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl;
[2130]350  }
351
[300]352  bool CClientBuffer::hasPendingRequest(void)
353  {
[1757]354   
355    lockBuffer() ;
356    count=*bufferCount[current] ;
357    unlockBuffer() ;
358
[732]359    return (pending || count > 0);
[300]360  }
[1757]361
[2246]362  bool CClientBuffer::isNotifiedChangeBufferSize(void)
363  {
364   
365    bool ret ;
366    lockBuffer() ;
367    ret=*notify[current] == notifyResizeBuffer_ ? true : false ;
368    if (ret) 
369    {
370      *notify[current] = notifyNothing_ ;
371      resizingBufferStep_=3; 
372    }
373    unlockBuffer() ;
374
375    return ret;
376  }
377
[1757]378  bool CClientBuffer::isNotifiedFinalized(void)
379  {
380   
381    bool ret ;
382    lockBuffer() ;
[2246]383    ret=*notify[current] == notifyFinalize_ ? true : false ;
[1757]384    unlockBuffer() ;
385
386    return ret;
387  }
388
[509]389}
Note: See TracBrowser for help on using the repository browser.