source: XIOS3/trunk/src/buffer_client.cpp

Last change on this file was 2547, checked in by ymipsl, 10 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 11.6 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "log.hpp"
4#include "buffer_out.hpp"
5#include "buffer_client.hpp"
6#include "cxios.hpp"
7#include "mpi.hpp"
8#include "tracer.hpp"
9#include "timeline_events.hpp"
10#include "timer.hpp"
11
12namespace xios
13{
14  size_t CClientBuffer::maxRequestSize = 0;
15
16  CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, bool hasWindows)
17    : interComm(interComm)
18    , clientRank_(0)
19    , serverRank(serverRank)
20    , bufferSize(bufferSize)
21    , maxEventSize(0)
22    , current(0)
23    , count(0)
24    , pending(false)
25    , hasWindows_(hasWindows) 
26  {
27      if (hasWindows_)
28      { 
29        windows_.resize(2) ;
30        windows_[0] = new CWindowDynamic() ;
31        windows_[0]->allocateBuffer(bufferSize+headerSize_) ;
32        bufferHeader[0] = (char*) windows_[0]->getBufferAddress() ;
33        windows_[1] = new CWindowDynamic() ;
34        windows_[1]->allocateBuffer(bufferSize+headerSize_) ;
35        bufferHeader[1] = (char*) windows_[1]->getBufferAddress() ;
36      }
37      else
38      {
39        MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ;
40        MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ;
41      }
42
43      buffer[0] = bufferHeader[0]+headerSize_ ;
44      buffer[1] = bufferHeader[1]+headerSize_ ;
45      firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_ ;
46      firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_ ;
47      bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ;
48      bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ;
49      control[0]=(size_t*)bufferHeader[0] + controlOffset_ ;
50      control[1]=(size_t*)bufferHeader[1] + controlOffset_ ;
51      notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ;
52      notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ;
53
54      *firstTimeLine[0]=0 ;
55      *firstTimeLine[1]=0 ;
56      *bufferCount[0]=0 ;
57      *bufferCount[1]=0 ;
58      *control[0]=0 ;
59      *control[1]=0 ;
60      *notify[0]=notifyNothing_ ;
61      *notify[1]=notifyNothing_ ;
62      winState[0]=false ;
63      winState[1]=false ;
64
65   
66      retBuffer = new CBufferOut(buffer[current], bufferSize);
67      info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl;
68  }
69
70  MPI_Aint CClientBuffer::getWinAddress(int i)
71  {
72    MPI_Aint address ;
73    MPI_Get_address(bufferHeader[i], &address) ;
74    return address ;
75  }
76 
77  MPI_Aint CClientBuffer::getWinBufferAddress(int i)
78  {
79    return windows_[i]->getWinBufferAddress() ;
80  }
81
82  void CClientBuffer::attachWindows(MPI_Comm& winComm)
83  {
84    isAttachedWindows_=true ;
85
86    if (hasWindows_)
87    { 
88      MPI_Aint buffSize=bufferSize+headerSize_ ;
89      windows_[0]->create(winComm) ;
90      windows_[0]->attach() ;
91      windows_[1]->create(winComm) ;
92      windows_[1]->attach() ;
93
94      windows_[0]->lockExclusive(clientRank_) ;
95      windows_[1]->lockExclusive(clientRank_) ;
96
97      windows_[0]->unlockExclusive(clientRank_) ;
98      windows_[1]->unlockExclusive(clientRank_) ;
99
100    } 
101
102  }
103
104
105  CClientBuffer::~CClientBuffer()
106  {
107    if (hasWindows_)
108    {
109      windows_[0]->detach() ;
110      windows_[1]->detach() ;
111      delete windows_[0] ;
112      delete windows_[1] ;
113    }
114    else 
115    {
116      MPI_Free_mem(bufferHeader[0]) ;
117      MPI_Free_mem(bufferHeader[1]) ;
118    }
119    delete retBuffer;
120  }
121
122  void CClientBuffer::lockBuffer(void)
123  {
124    CTimer::get("lock buffer").resume();
125    if (isAttachedWindows_)   
126    {
127      if (winState[current]==true) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo lock client buffer but winState said it is already locked") ;
128      //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ;
129      windows_[current]->lockExclusive(clientRank_) ;
130      winState[current]=true ;
131    }
132    CTimer::get("lock buffer").suspend();
133  }
134
135  void CClientBuffer::unlockBuffer(void)
136  {
137    CTimer::get("unlock buffer").resume();
138    if (isAttachedWindows_)
139    {
140      if (winState[current]==false) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo unlock client buffer but winState said it is already unlocked") ;
141      //MPI_Win_unlock(clientRank_, windows_[current]) ;
142      windows_[current]->unlockExclusive(clientRank_) ;
143      winState[current]=false ;
144    }
145    CTimer::get("unlock buffer").suspend();
146  }
147
148  StdSize CClientBuffer::remain(void)
149  {
150    return bufferSize - count;
151  }
152
153  bool CClientBuffer::isBufferFree(StdSize size)
154  {
155    if (!isAttachedWindows_) return false;
156
157    lockBuffer();
158    count=*bufferCount[current] ;
159   
160    if (resizingBufferStep_ > 0 ) return false ;
161
162    if (size > bufferSize)
163    {
164      resizingBufferStep_=1 ;
165      *firstTimeLine[current]=0 ;
166      newBufferSize_=size ;
167      return false ;
168    }
169
170    if (size > maxEventSize)
171    {
172      maxEventSize = size;
173
174      if (size > maxRequestSize) maxRequestSize = size;
175    }
176   
177    if (size > remain())
178    {
179      if (isGrowableBuffer_)
180      {
181        resizingBufferStep_ = 1 ;
182        *firstTimeLine[current]=0 ;
183        newBufferSize_ = (count+size)*growFactor_ ;
184      } 
185      return false ;
186    }
187    else return true ;
188  }
189
190
191  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size)
192  {
193    if (size <= remain())
194    {
195      retBuffer->realloc(buffer[current] + count, size);
196      count += size;
197      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ;
198      *bufferCount[current]=count ;
199      return retBuffer;
200    }
201    else
202    {
203      ERROR("CBufferOut* CClientBuffer::getBuffer(StdSize size)",
204            << "Not enough space in buffer, this should not have happened...");
205      return NULL;
206    }
207  }
208
209  void CClientBuffer::infoBuffer(void)
210  {
211     
212      char checksum=0 ;
213      for(size_t i=0;i<*bufferCount[current];i++) checksum=checksum+buffer[current][i] ;
214 
215      char checksumFirst=0 ;
216      for(size_t i=5; i<10 && i<*bufferCount[current] ;i++) checksumFirst=checksumFirst+buffer[current][i] ;
217 
218      char checksumLast=0 ;
219      for(size_t i=(*bufferCount[current]<10)?0:*bufferCount[current]-10; i<*bufferCount[current] ; i++) checksumLast=checksumLast+buffer[current][i] ;
220 
221      info(45)<<"CClientBuffer::infoBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current<<" WinState "<<winState[current]
222              <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<" checksum "<<(int)checksum<<" "
223              <<(int)buffer[current][0]<<" "<<(int)buffer[current][1]<<" "<<(int)buffer[current][2]<<" "<<(int)buffer[current][3]<<" "<<(int)buffer[current][4]<<" "<<(int)buffer[current][5]<<" "
224              <<(int)buffer[current][6]<<" "<<(int)buffer[current][7]<<" "<<(int)buffer[current][8]<<" "<<(int)buffer[current][9]<<" "<<(int)buffer[current][10]<<" "<<(int)buffer[current][11]<<endl ;
225
226  }
227
228  bool CClientBuffer::checkBuffer(bool send)
229  {
230    MPI_Status status;
231    int flag;
232    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
233 
234    if (pending)
235    {
236      traceOff();
237      MPI_Test(&request, &flag, &status);
238      traceOn();
239      if (flag == true) pending = false;
240    }
241
242    if (!pending)
243    {
244      if (!send && resizingBufferStep_==0 ) return false ;
245
246      if (count > 0)
247      {
248        double time=MPI_Wtime() ;
249        if (time - lastCheckedWithNothing_ > latency_)
250        {
251          lockBuffer() ;
252          if (*bufferCount[current] > 0)
253          {
254            MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request);
255            if (resizingBufferStep_==4) resizingBufferStep_=0 ;
256            pending = true;
257            *firstTimeLine[current]=0 ;
258            *bufferCount[current]=0 ;
259
260             unlockBuffer() ;
261
262            if (current == 1) current = 0;
263            else current = 1;
264            count = 0;
265          }
266          else 
267          {
268            unlockBuffer() ;
269            lastCheckedWithNothing_ = time ;
270          }
271        }
272      }
273      else
274      {
275        if (resizingBufferStep_==1) resizeBufferNotify() ;
276        else if (resizingBufferStep_==2) isNotifiedChangeBufferSize() ;
277        else if (resizingBufferStep_==3) resizeBuffer(newBufferSize_) ;
278      }
279    }
280
281    return pending;
282  }
283
284  void CClientBuffer::resizeBufferNotify(void)
285  {
286    // notify server of changing buffers size
287    lockBuffer() ;
288    int size=sizeof(int)+sizeof(size_t) ;
289    CBufferOut* bufOut = this->getBuffer(timelineEventNotifyChangeBufferSize, size);
290    bufOut->put(size);
291    bufOut->put(timelineEventNotifyChangeBufferSize);
292    resizingBufferStep_ = 2 ;
293    unlockBuffer() ;
294  }
295
296  void CClientBuffer::resizeBuffer(size_t newSize)
297  {
298
299    bufferSize=newSize ;
300
301    if (hasWindows_)
302    { 
303      windows_[0]->detach();
304      windows_[1]->detach();
305     
306      windows_[0]->attach(bufferSize+headerSize_) ;
307      bufferHeader[0] = (char*) windows_[0] -> getBufferAddress() ;
308      windows_[1]->attach(bufferSize+headerSize_) ;
309      bufferHeader[1] = (char*) windows_[1] -> getBufferAddress() ;
310    }
311   
312   
313    buffer[0] = bufferHeader[0]+headerSize_ ;
314    buffer[1] = bufferHeader[1]+headerSize_ ;
315    firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_;
316    firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_;
317    bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ;
318    bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ;
319    control[0]=(size_t*)bufferHeader[0] + controlOffset_ ;  // control=0 => nothing ; control=1 => changeBufferSize
320    control[1]=(size_t*)bufferHeader[1] + controlOffset_ ;
321    notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ;
322    notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ;
323
324    *firstTimeLine[0]=0 ;
325    *firstTimeLine[1]=0 ;
326    *bufferCount[0]=0 ;
327    *bufferCount[1]=0 ;
328    *control[0]=0 ;
329    *control[1]=0 ;
330    *notify[0] = notifyNothing_ ;
331    *notify[1] = notifyNothing_ ;
332    winState[0]=false ;
333    winState[1]=false ;
334    current=0 ;
335   
336    if (hasWindows_)
337    { 
338
339      windows_[0]->lockExclusive(clientRank_) ;
340      windows_[1]->lockExclusive(clientRank_) ;
341     
342      windows_[1]->unlockExclusive(clientRank_) ;
343      windows_[0]->unlockExclusive(clientRank_) ;
344    } 
345
346    lockBuffer() ;
347 
348    int size=sizeof(int)+2*sizeof(size_t)+2*sizeof(MPI_Aint) ;
349    CBufferOut* bufOut = this->getBuffer(timelineEventChangeBufferSize, size);
350    bufOut->put(size);
351    bufOut->put(timelineEventChangeBufferSize);
352    bufOut->put(newBufferSize_);
353   
354    bufOut->put(this->getWinBufferAddress(0));
355    bufOut->put(this->getWinBufferAddress(1));
356
357    resizingBufferStep_=4;
358    unlockBuffer() ;
359    info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinBufferAddress(0)<<" winAdress[1] "<<this->getWinBufferAddress(1)<<endl;
360  }
361
362  bool CClientBuffer::hasPendingRequest(void)
363  {
364   
365    lockBuffer() ;
366    count=*bufferCount[current] ;
367    unlockBuffer() ;
368
369    return (pending || count > 0);
370  }
371
372  bool CClientBuffer::isNotifiedChangeBufferSize(void)
373  {
374   
375    bool ret ;
376    lockBuffer() ;
377    ret=*notify[current] == notifyResizeBuffer_ ? true : false ;
378    if (ret || !hasWindows_) 
379    {
380      *notify[current] = notifyNothing_ ;
381      resizingBufferStep_=3; 
382    }
383    unlockBuffer() ;
384
385    return ret;
386  }
387
388  bool CClientBuffer::isNotifiedFinalized(void)
389  {
390    if (!isFinalized_)
391    {
392      double time=MPI_Wtime() ;
393//      if (time - lastCheckedNotify_ > latency_)
394      {
395        int flag ;
396        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
397        lockBuffer() ;
398        isFinalized_=*notify[current] == notifyFinalize_ ? true : false ;
399        unlockBuffer() ;
400        lastCheckedNotify_=time ;
401      }
402    }
403    return isFinalized_ ;
404  }
405
406}
Note: See TracBrowser for help on using the repository browser.