source: XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp @ 2338

Last change on this file since 2338 was 2324, checked in by ymipsl, 2 years ago

Solve deadlock or crash occuring when activate second levels of servers.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.2 KB
RevLine 
[591]1#include "xios_spl.hpp"
[300]2#include "exception.hpp"
[380]3#include "log.hpp"
[300]4#include "buffer_out.hpp"
5#include "buffer_client.hpp"
[317]6#include "cxios.hpp"
[382]7#include "mpi.hpp"
[347]8#include "tracer.hpp"
[2130]9#include "timeline_events.hpp"
[2246]10#include "timer.hpp"
[300]11
[335]12namespace xios
[300]13{
[732]14  size_t CClientBuffer::maxRequestSize = 0;
[509]15
[2259]16  CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize)
[917]17    : interComm(interComm)
[2258]18    , clientRank_(0)
[917]19    , serverRank(serverRank)
20    , bufferSize(bufferSize)
[1201]21    , estimatedMaxEventSize(estimatedMaxEventSize)
22    , maxEventSize(0)
[917]23    , current(0)
24    , count(0)
25    , pending(false)
[1757]26    , hasWindows(false) 
[300]27  {
[2259]28     /*
29      if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ;
30      else hasWindows=true ;
31     */
[1757]32
[2246]33      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ;
34      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ;
35      buffer[0] = bufferHeader[0]+headerSize_ ;
36      buffer[1] = bufferHeader[1]+headerSize_ ;
37      firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_ ;
38      firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_ ;
39      bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ;
40      bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ;
41      control[0]=(size_t*)bufferHeader[0] + controlOffset_ ;
42      control[1]=(size_t*)bufferHeader[1] + controlOffset_ ;
43      notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ;
44      notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ;
[1757]45
46      *firstTimeLine[0]=0 ;
47      *firstTimeLine[1]=0 ;
48      *bufferCount[0]=0 ;
49      *bufferCount[1]=0 ;
50      *control[0]=0 ;
51      *control[1]=0 ;
[2246]52      *notify[0]=notifyNothing_ ;
53      *notify[1]=notifyNothing_ ;
[1757]54      winState[0]=false ;
55      winState[1]=false ;
56
57
58    if (hasWindows)
59    { 
60   
[2246]61      MPI_Aint buffSize=bufferSize+headerSize_ ;
[2221]62      MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ;
63      MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ;
[1757]64   
65      MPI_Group group ;
66      int groupSize,groupRank ;
67      MPI_Win_get_group(windows_[0], &group) ;
68      MPI_Group_size(group, &groupSize) ;
69      MPI_Group_rank(group, &groupRank) ;
70      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
71
72      MPI_Win_get_group(windows_[1], &group) ;
73      MPI_Group_size(group, &groupSize) ;
74      MPI_Group_rank(group, &groupRank) ;
75      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
76
77      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
78      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
79
80      MPI_Win_unlock(clientRank_, windows_[1]) ;
81      MPI_Win_unlock(clientRank_, windows_[0]) ;
82    } 
[732]83    retBuffer = new CBufferOut(buffer[current], bufferSize);
[1757]84    info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl;
[300]85  }
[509]86
[1757]87  MPI_Aint CClientBuffer::getWinAddress(int i)
88  {
[2259]89    MPI_Aint address ;
90    MPI_Get_address(bufferHeader[i], &address) ;
91    return address ;
92  }
[1757]93
[2259]94  void CClientBuffer::attachWindows(vector<MPI_Win>& windows)
95  {
96    windows_=windows ;
97    if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ;
98    else hasWindows=true ;
99
100    if (hasWindows)
101    { 
102      MPI_Aint buffSize=bufferSize+headerSize_ ;
103      MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ;
104      MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ;
105   
106      MPI_Group group ;
107      int groupSize,groupRank ;
108      MPI_Win_get_group(windows_[0], &group) ;
109      MPI_Group_size(group, &groupSize) ;
110      MPI_Group_rank(group, &groupRank) ;
111      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
112
113      MPI_Win_get_group(windows_[1], &group) ;
114      MPI_Group_size(group, &groupSize) ;
115      MPI_Group_rank(group, &groupRank) ;
116      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
117
118      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
119      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
120
121      MPI_Win_unlock(clientRank_, windows_[1]) ;
122      MPI_Win_unlock(clientRank_, windows_[0]) ;
123    } 
124
[1757]125  }
126
[2259]127
[300]128  CClientBuffer::~CClientBuffer()
129  {
[1757]130     //freeWindows() ;
131     if (hasWindows)
132     {
133       MPI_Win_detach(windows_[0],bufferHeader[0]);
134       MPI_Win_detach(windows_[1],bufferHeader[1]);
135       MPI_Free_mem(bufferHeader[0]) ;
136       MPI_Free_mem(bufferHeader[1]) ;
137     }
138     delete retBuffer;
[300]139  }
[509]140
[1757]141  void CClientBuffer::lockBuffer(void)
142  {
[2246]143    CTimer::get("lock buffer").resume();
[1757]144    if (hasWindows)
145    {
[2324]146      if (winState[current]==true) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo lock client buffer but winState said it is already locked") ;
[1757]147      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ;
148      winState[current]=true ;
149    }
[2246]150    CTimer::get("lock buffer").suspend();
[1757]151  }
152
153  void CClientBuffer::unlockBuffer(void)
154  {
[2246]155    CTimer::get("unlock buffer").resume();
[1757]156    if (hasWindows)
157    {
[2324]158      if (winState[current]==false) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo unlock client buffer but winState said it is already unlocked") ;
[1757]159      MPI_Win_unlock(clientRank_, windows_[current]) ;
160      winState[current]=false ;
161    }
[2246]162    CTimer::get("unlock buffer").suspend();
[1757]163  }
164
[1227]165  StdSize CClientBuffer::remain(void)
[300]166  {
[732]167    return bufferSize - count;
[300]168  }
[509]169
[1227]170  bool CClientBuffer::isBufferFree(StdSize size)
[300]171  {
[1757]172 
173    lockBuffer();
[2130]174    count=*bufferCount[current] ;
175   
176    if (resizingBufferStep_ > 0 ) return false ;
177
[732]178    if (size > bufferSize)
[2130]179    {
180      resizingBufferStep_=1 ;
[2246]181      *firstTimeLine[current]=0 ;
[2130]182      newBufferSize_=size ;
183      return false ;
184    }
[509]185
[1201]186    if (size > maxEventSize)
187    {
188      maxEventSize = size;
189
190      if (size > estimatedMaxEventSize)
191        error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank
192                 << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl;
193
194      if (size > maxRequestSize) maxRequestSize = size;
195    }
[2130]196   
197    if (size > remain())
198    {
199      if (isGrowableBuffer_)
200      {
201        resizingBufferStep_ = 1 ;
[2246]202        *firstTimeLine[current]=0 ;
[2130]203        newBufferSize_ = (count+size)*growFactor_ ;
204      } 
205      return false ;
206    }
207    else return true ;
[300]208  }
[509]209
210
[1757]211  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size)
[300]212  {
[732]213    if (size <= remain())
[300]214    {
[732]215      retBuffer->realloc(buffer[current] + count, size);
216      count += size;
[1757]217      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ;
218      *bufferCount[current]=count ;
[732]219      return retBuffer;
[300]220    }
221    else
222    {
[1227]223      ERROR("CBufferOut* CClientBuffer::getBuffer(StdSize size)",
[732]224            << "Not enough space in buffer, this should not have happened...");
225      return NULL;
[300]226    }
[509]227  }
228
[1757]229  void CClientBuffer::infoBuffer(void)
[300]230  {
[1757]231     
232      char checksum=0 ;
233      for(size_t i=0;i<*bufferCount[current];i++) checksum=checksum+buffer[current][i] ;
234 
235      char checksumFirst=0 ;
236      for(size_t i=5; i<10 && i<*bufferCount[current] ;i++) checksumFirst=checksumFirst+buffer[current][i] ;
237 
238      char checksumLast=0 ;
239      for(size_t i=(*bufferCount[current]<10)?0:*bufferCount[current]-10; i<*bufferCount[current] ; i++) checksumLast=checksumLast+buffer[current][i] ;
240 
241      info(45)<<"CClientBuffer::infoBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current<<" WinState "<<winState[current]
242              <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<" checksum "<<(int)checksum<<" "
243              <<(int)buffer[current][0]<<" "<<(int)buffer[current][1]<<" "<<(int)buffer[current][2]<<" "<<(int)buffer[current][3]<<" "<<(int)buffer[current][4]<<" "<<(int)buffer[current][5]<<" "
244              <<(int)buffer[current][6]<<" "<<(int)buffer[current][7]<<" "<<(int)buffer[current][8]<<" "<<(int)buffer[current][9]<<" "<<(int)buffer[current][10]<<" "<<(int)buffer[current][11]<<endl ;
245
246  }
247
248  bool CClientBuffer::checkBuffer(bool send)
249  {
[1639]250    MPI_Status status;
[732]251    int flag;
[2260]252    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
253 
[300]254    if (pending)
255    {
[732]256      traceOff();
[1639]257      MPI_Test(&request, &flag, &status);
[732]258      traceOn();
259      if (flag == true) pending = false;
[300]260    }
261
262    if (!pending)
263    {
[2130]264      if (!send && resizingBufferStep_==0 ) return false ;
265
[732]266      if (count > 0)
[300]267      {
[2246]268        double time=MPI_Wtime() ;
269        if (time - lastCheckedWithNothing_ > latency_)
[1757]270        {
[2246]271          lockBuffer() ;
272          if (*bufferCount[current] > 0)
273          {
274            MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request);
275            if (resizingBufferStep_==4) resizingBufferStep_=0 ;
276            pending = true;
277            *firstTimeLine[current]=0 ;
278            *bufferCount[current]=0 ;
[1757]279
[2246]280             unlockBuffer() ;
[1757]281
[2246]282            if (current == 1) current = 0;
283            else current = 1;
284            count = 0;
285          }
286          else 
287          {
288            unlockBuffer() ;
289            lastCheckedWithNothing_ = time ;
290          }
[1757]291        }
[300]292      }
[2130]293      else
294      {
295        if (resizingBufferStep_==1) resizeBufferNotify() ;
[2246]296        else if (resizingBufferStep_==2) isNotifiedChangeBufferSize() ;
297        else if (resizingBufferStep_==3) resizeBuffer(newBufferSize_) ;
[2130]298      }
[300]299    }
[732]300
301    return pending;
[300]302  }
[509]303
[2130]304  void CClientBuffer::resizeBufferNotify(void)
305  {
306    // notify server of changing buffers size
307    lockBuffer() ;
308    int size=sizeof(int)+sizeof(size_t) ;
309    CBufferOut* bufOut = this->getBuffer(timelineEventNotifyChangeBufferSize, size);
310    bufOut->put(size);
311    bufOut->put(timelineEventNotifyChangeBufferSize);
312    resizingBufferStep_ = 2 ;
313    unlockBuffer() ;
314  }
315
316  void CClientBuffer::resizeBuffer(size_t newSize)
317  {
[2246]318
[2130]319    if (hasWindows)
320    { 
321      MPI_Win_detach(windows_[0], bufferHeader[0]) ;
322      MPI_Win_detach(windows_[1], bufferHeader[1]) ;
323    }
324    MPI_Free_mem(bufferHeader[0]) ;
325    MPI_Free_mem(bufferHeader[1]) ;
326
327    bufferSize=newSize ;
[2246]328    MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ;
329    MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ;
330    buffer[0] = bufferHeader[0]+headerSize_ ;
331    buffer[1] = bufferHeader[1]+headerSize_ ;
332    firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_;
333    firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_;
334    bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ;
335    bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ;
336    control[0]=(size_t*)bufferHeader[0] + controlOffset_ ;  // control=0 => nothing ; control=1 => changeBufferSize
337    control[1]=(size_t*)bufferHeader[1] + controlOffset_ ;
338    notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ;
339    notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ;
[2130]340
341    *firstTimeLine[0]=0 ;
342    *firstTimeLine[1]=0 ;
343    *bufferCount[0]=0 ;
344    *bufferCount[1]=0 ;
345    *control[0]=0 ;
346    *control[1]=0 ;
[2246]347    *notify[0] = notifyNothing_ ;
348    *notify[1] = notifyNothing_ ;
[2130]349    winState[0]=false ;
350    winState[1]=false ;
351    current=0 ;
352   
353    if (hasWindows)
354    { 
355   
[2246]356      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ;
357      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ;
[2130]358         
359      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
360      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
361
362      MPI_Win_unlock(clientRank_, windows_[1]) ;
363      MPI_Win_unlock(clientRank_, windows_[0]) ;
364    } 
365
366    lockBuffer() ;
367 
[2221]368    int size=sizeof(int)+2*sizeof(size_t)+2*sizeof(MPI_Aint) ;
[2130]369    CBufferOut* bufOut = this->getBuffer(timelineEventChangeBufferSize, size);
370    bufOut->put(size);
371    bufOut->put(timelineEventChangeBufferSize);
372    bufOut->put(newBufferSize_);
373    bufOut->put(this->getWinAddress(0));
374    bufOut->put(this->getWinAddress(1));
375
[2246]376    resizingBufferStep_=4;
[2130]377    unlockBuffer() ;
[2246]378    info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinAddress(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl;
[2130]379  }
380
[300]381  bool CClientBuffer::hasPendingRequest(void)
382  {
[1757]383   
384    lockBuffer() ;
385    count=*bufferCount[current] ;
386    unlockBuffer() ;
387
[732]388    return (pending || count > 0);
[300]389  }
[1757]390
[2246]391  bool CClientBuffer::isNotifiedChangeBufferSize(void)
392  {
393   
394    bool ret ;
395    lockBuffer() ;
396    ret=*notify[current] == notifyResizeBuffer_ ? true : false ;
[2298]397    if (ret || !hasWindows) 
[2246]398    {
399      *notify[current] = notifyNothing_ ;
400      resizingBufferStep_=3; 
401    }
402    unlockBuffer() ;
403
404    return ret;
405  }
406
[1757]407  bool CClientBuffer::isNotifiedFinalized(void)
408  {
[2258]409    if (!isFinalized_)
410    {
411      double time=MPI_Wtime() ;
412//      if (time - lastCheckedNotify_ > latency_)
413      {
414        int flag ;
415        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
416        lockBuffer() ;
417        isFinalized_=*notify[current] == notifyFinalize_ ? true : false ;
418        unlockBuffer() ;
419        lastCheckedNotify_=time ;
420      }
421    }
422    return isFinalized_ ;
[1757]423  }
424
[509]425}
Note: See TracBrowser for help on using the repository browser.