source: XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp @ 2130

Last change on this file since 2130 was 2130, checked in by ymipsl, 3 years ago

New management of client-server buffers.

  • buffers can grow automatically in intialization phase
  • buffers is evaluated after the close context definition phase and fixed at optimal value.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.0 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "log.hpp"
4#include "buffer_out.hpp"
5#include "buffer_client.hpp"
6#include "cxios.hpp"
7#include "mpi.hpp"
8#include "tracer.hpp"
9#include "timeline_events.hpp"
10
11namespace xios
12{
13  size_t CClientBuffer::maxRequestSize = 0;
14
15  CClientBuffer::CClientBuffer(MPI_Comm interComm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize)
16    : interComm(interComm)
17    , clientRank_(clientRank)
18    , serverRank(serverRank)
19    , bufferSize(bufferSize)
20    , estimatedMaxEventSize(estimatedMaxEventSize)
21    , maxEventSize(0)
22    , current(0)
23    , count(0)
24    , pending(false)
25    , hasWindows(false) 
26    , windows_(windows)
27  {
28    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ;
29    else hasWindows=true ;
30
31      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ;
32      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ;
33      buffer[0] = bufferHeader[0]+headerSize ;
34      buffer[1] = bufferHeader[1]+headerSize ;
35      firstTimeLine[0]=(size_t*)bufferHeader[0] ;
36      firstTimeLine[1]=(size_t*)bufferHeader[1] ;
37      bufferCount[0]=(size_t*)bufferHeader[0] +1 ;
38      bufferCount[1]=(size_t*)bufferHeader[1] +1 ;
39      control[0]=(size_t*)bufferHeader[0] +2 ;
40      control[1]=(size_t*)bufferHeader[1] +2 ;
41      finalize[0]=(size_t*)bufferHeader[0] +3 ;
42      finalize[1]=(size_t*)bufferHeader[1] +3 ;
43
44      *firstTimeLine[0]=0 ;
45      *firstTimeLine[1]=0 ;
46      *bufferCount[0]=0 ;
47      *bufferCount[1]=0 ;
48      *control[0]=0 ;
49      *control[1]=0 ;
50      *finalize[0]=0 ;
51      *finalize[1]=0 ;
52      winState[0]=false ;
53      winState[1]=false ;
54
55
56    if (hasWindows)
57    { 
58   
59      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ;
60      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ;
61   
62      MPI_Group group ;
63      int groupSize,groupRank ;
64      MPI_Win_get_group(windows_[0], &group) ;
65      MPI_Group_size(group, &groupSize) ;
66      MPI_Group_rank(group, &groupRank) ;
67      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
68
69      MPI_Win_get_group(windows_[1], &group) ;
70      MPI_Group_size(group, &groupSize) ;
71      MPI_Group_rank(group, &groupRank) ;
72      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
73
74      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
75      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
76
77      MPI_Win_unlock(clientRank_, windows_[1]) ;
78      MPI_Win_unlock(clientRank_, windows_[0]) ;
79    } 
80    retBuffer = new CBufferOut(buffer[current], bufferSize);
81    info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl;
82  }
83
84  MPI_Aint CClientBuffer::getWinAddress(int i)
85  {
86     MPI_Aint address ;
87     
88     if (hasWindows) MPI_Get_address(bufferHeader[i], &address) ;
89     else address=0 ;
90
91     return address ;
92  }
93
94  CClientBuffer::~CClientBuffer()
95  {
96     //freeWindows() ;
97     if (hasWindows)
98     {
99       MPI_Win_detach(windows_[0],bufferHeader[0]);
100       MPI_Win_detach(windows_[1],bufferHeader[1]);
101       MPI_Free_mem(bufferHeader[0]) ;
102       MPI_Free_mem(bufferHeader[1]) ;
103     }
104     delete retBuffer;
105  }
106
107/*  void CClientBuffer::createWindows(MPI_Comm oneSidedComm)
108  {
109    MPI_Barrier(oneSidedComm) ;
110    MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ;
111    MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ;
112
113    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ;
114    *firstTimeLine[0]=0 ;
115    *bufferCount[0]=0 ;
116    *control[0]=0 ;
117    MPI_Win_unlock(0, windows[0]) ;
118
119    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ;
120    *firstTimeLine[1]=0 ;
121    *bufferCount[1]=0 ;
122    *control[1]=0 ;
123    MPI_Win_unlock(0, windows[1]) ;
124    winState[0]=false ;
125    winState[1]=false ;
126    MPI_Barrier(oneSidedComm) ;
127    hasWindows=true ;
128  }
129*/
130
131/* 
132  void CClientBuffer::freeWindows()
133  {
134    if (hasWindows)
135    {
136      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[0]) ;
137      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[1]) ;
138      *control[0]=2 ;
139      *control[1]=2 ;
140      MPI_Win_unlock(0, windows_[1]) ;
141      MPI_Win_unlock(0, windows_[0]) ;
142     
143      MPI_Win_free(&windows_[0]) ;
144      MPI_Win_free(&windows_[1]) ;
145      hasWindows=false ;
146    }
147  }
148*/ 
149  void CClientBuffer::lockBuffer(void)
150  {
151    if (hasWindows)
152    {
153   //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[current]) ;
154      long long int lock=1 ;
155      long long int zero=0, one=1 ;
156     
157      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ;
158     
159      while(lock!=0)
160      {
161        MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)),
162                             windows_[current]) ;
163        MPI_Win_flush(clientRank_, windows_[current]) ;
164      }
165
166//      info(100)<<"Buffer locked "<<&windows_<<"  "<<current<<endl ;
167      winState[current]=true ;
168    }
169  }
170
171  void CClientBuffer::unlockBuffer(void)
172  {
173    if (hasWindows)
174    {
175      long long int lock=1 ;
176      long long int zero=0, one=1 ;
177
178      MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)),
179                             windows_[current]) ;
180      MPI_Win_unlock(clientRank_, windows_[current]) ;
181
182 //     info(100)<<"Buffer unlocked "<<&windows_<<"  "<<current<<endl ;
183      winState[current]=false ;
184    }
185  }
186
187  StdSize CClientBuffer::remain(void)
188  {
189    return bufferSize - count;
190  }
191
192  bool CClientBuffer::isBufferFree(StdSize size)
193  {
194//    bool loop=true ;
195//    while (loop)
196//    {
197//      lockBuffer();
198//      if (*control[current]==0) loop=false ; // attemp to read from server ?
199//      else unlockBuffer() ;
200//    }
201 
202    lockBuffer();
203    count=*bufferCount[current] ;
204   
205    if (resizingBufferStep_ > 0 ) return false ;
206
207    if (size > bufferSize)
208    {
209      // ERROR("bool CClientBuffer::isBufferFree(StdSize size)",
210      //      << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl);
211      resizingBufferStep_=1 ;
212      newBufferSize_=size ;
213      return false ;
214    }
215
216    if (size > maxEventSize)
217    {
218      maxEventSize = size;
219
220      if (size > estimatedMaxEventSize)
221        error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank
222                 << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl;
223
224      if (size > maxRequestSize) maxRequestSize = size;
225    }
226   
227    if (size > remain())
228    {
229      if (isGrowableBuffer_)
230      {
231        resizingBufferStep_ = 1 ;
232        newBufferSize_ = (count+size)*growFactor_ ;
233      } 
234      return false ;
235    }
236    else return true ;
237  }
238
239
240  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size)
241  {
242    if (size <= remain())
243    {
244      retBuffer->realloc(buffer[current] + count, size);
245      count += size;
246      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ;
247      *bufferCount[current]=count ;
248/*      info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current
249              <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;
250      if (!winState[current]) info(40)<<"CClientBuffer::getBuffer "<<" Windows Not Locked... "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current
251              <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;*/
252      return retBuffer;
253    }
254    else
255    {
256      ERROR("CBufferOut* CClientBuffer::getBuffer(StdSize size)",
257            << "Not enough space in buffer, this should not have happened...");
258      return NULL;
259    }
260  }
261
262  void CClientBuffer::infoBuffer(void)
263  {
264     
265      char checksum=0 ;
266      for(size_t i=0;i<*bufferCount[current];i++) checksum=checksum+buffer[current][i] ;
267 
268      char checksumFirst=0 ;
269      for(size_t i=5; i<10 && i<*bufferCount[current] ;i++) checksumFirst=checksumFirst+buffer[current][i] ;
270 
271      char checksumLast=0 ;
272      for(size_t i=(*bufferCount[current]<10)?0:*bufferCount[current]-10; i<*bufferCount[current] ; i++) checksumLast=checksumLast+buffer[current][i] ;
273 
274      info(45)<<"CClientBuffer::infoBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current<<" WinState "<<winState[current]
275              <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<" checksum "<<(int)checksum<<" "
276              <<(int)buffer[current][0]<<" "<<(int)buffer[current][1]<<" "<<(int)buffer[current][2]<<" "<<(int)buffer[current][3]<<" "<<(int)buffer[current][4]<<" "<<(int)buffer[current][5]<<" "
277              <<(int)buffer[current][6]<<" "<<(int)buffer[current][7]<<" "<<(int)buffer[current][8]<<" "<<(int)buffer[current][9]<<" "<<(int)buffer[current][10]<<" "<<(int)buffer[current][11]<<endl ;
278
279  }
280
281  bool CClientBuffer::checkBuffer(bool send)
282  {
283    MPI_Status status;
284    int flag;
285
286    if (pending)
287    {
288      traceOff();
289      MPI_Test(&request, &flag, &status);
290      traceOn();
291      if (flag == true) pending = false;
292    }
293
294    if (!pending)
295    {
296      if (!send && resizingBufferStep_==0 ) return false ;
297
298      if (count > 0)
299      {
300        lockBuffer() ;
301 //       if (*control[current]==0 && bufferCount[current] > 0)
302        if (*bufferCount[current] > 0)
303        {
304          MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request);
305          if (resizingBufferStep_==3) resizingBufferStep_=0 ;
306          pending = true;
307//          *control[current]=0 ;
308          *firstTimeLine[current]=0 ;
309          *bufferCount[current]=0 ;
310
311           unlockBuffer() ;
312
313          if (current == 1) current = 0;
314          else current = 1;
315          count = 0;
316        }
317        else 
318        {
319          unlockBuffer() ;
320        }
321      }
322      else
323      {
324        if (resizingBufferStep_==2) resizeBuffer(newBufferSize_) ;
325        if (resizingBufferStep_==1) resizeBufferNotify() ;
326      }
327    }
328
329    return pending;
330  }
331
332  void CClientBuffer::resizeBufferNotify(void)
333  {
334    // notify server of changing buffers size
335    lockBuffer() ;
336    int size=sizeof(int)+sizeof(size_t) ;
337    CBufferOut* bufOut = this->getBuffer(timelineEventNotifyChangeBufferSize, size);
338    bufOut->put(size);
339    bufOut->put(timelineEventNotifyChangeBufferSize);
340    resizingBufferStep_ = 2 ;
341    unlockBuffer() ;
342  }
343
344  void CClientBuffer::resizeBuffer(size_t newSize)
345  {
346    if (hasWindows)
347    { 
348      MPI_Win_detach(windows_[0], bufferHeader[0]) ;
349      MPI_Win_detach(windows_[1], bufferHeader[1]) ;
350    }
351    MPI_Free_mem(bufferHeader[0]) ;
352    MPI_Free_mem(bufferHeader[1]) ;
353
354    bufferSize=newSize ;
355    MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ;
356    MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ;
357    buffer[0] = bufferHeader[0]+headerSize ;
358    buffer[1] = bufferHeader[1]+headerSize ;
359    firstTimeLine[0]=(size_t*)bufferHeader[0] ;
360    firstTimeLine[1]=(size_t*)bufferHeader[1] ;
361    bufferCount[0]=(size_t*)bufferHeader[0] +1 ;
362    bufferCount[1]=(size_t*)bufferHeader[1] +1 ;
363    control[0]=(size_t*)bufferHeader[0] +2 ;
364    control[1]=(size_t*)bufferHeader[1] +2 ;
365    finalize[0]=(size_t*)bufferHeader[0] +3 ;
366    finalize[1]=(size_t*)bufferHeader[1] +3 ;
367
368    *firstTimeLine[0]=0 ;
369    *firstTimeLine[1]=0 ;
370    *bufferCount[0]=0 ;
371    *bufferCount[1]=0 ;
372    *control[0]=0 ;
373    *control[1]=0 ;
374    *finalize[0]=0 ;
375    *finalize[1]=0 ;
376    winState[0]=false ;
377    winState[1]=false ;
378    current=0 ;
379   
380    if (hasWindows)
381    { 
382   
383      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ;
384      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ;
385         
386      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
387      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
388
389      MPI_Win_unlock(clientRank_, windows_[1]) ;
390      MPI_Win_unlock(clientRank_, windows_[0]) ;
391    } 
392
393    lockBuffer() ;
394 
395    int size=sizeof(int)+2*sizeof(size_t)+2*sizeof(MPI_AINT) ;
396    CBufferOut* bufOut = this->getBuffer(timelineEventChangeBufferSize, size);
397    bufOut->put(size);
398    bufOut->put(timelineEventChangeBufferSize);
399    bufOut->put(newBufferSize_);
400    bufOut->put(this->getWinAddress(0));
401    bufOut->put(this->getWinAddress(1));
402
403    resizingBufferStep_=3;
404    unlockBuffer() ;
405  }
406
407  bool CClientBuffer::hasPendingRequest(void)
408  {
409   
410    lockBuffer() ;
411    count=*bufferCount[current] ;
412    unlockBuffer() ;
413
414    return (pending || count > 0);
415  }
416
417  bool CClientBuffer::isNotifiedFinalized(void)
418  {
419   
420    bool ret ;
421    lockBuffer() ;
422    ret=*finalize[current] == 1 ? true : false ;
423    unlockBuffer() ;
424
425    return ret;
426  }
427
428}
Note: See TracBrowser for help on using the repository browser.