source: XIOS3/branches/xios-3.0-beta/src/buffer_server.cpp

Last change on this file was 2539, checked in by jderouillat, 11 months ago

Set an incremental latency in window access of the legacy protocol (read operations)

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 8.4 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "buffer_server.hpp"
4#include "timer.hpp"
5
6
7namespace xios
8{
9
10  CServerBuffer::CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize) 
11  : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress)
12  {
13    size = 3 * buffSize;
14    first = 0;
15    current = 1;
16    end = size;
17    used=0 ;
18    MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer) ;
19    currentWindows=1 ;
20    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ;
21  }
22
23  CServerBuffer::~CServerBuffer()
24  {
25    MPI_Free_mem(buffer) ;
26  }
27
28  void CServerBuffer::updateCurrentWindows(void)
29  {
30    if (currentWindows==0) currentWindows=1 ;
31    else currentWindows=0 ;
32  }
33
34
35  bool CServerBuffer::isBufferFree(size_t count)
36  {
37    bool ret ;
38
39    if (count==0) return true ;
40
41    if (current>first)
42    {
43      if (current+count<size)
44      {
45        ret=true ;
46      }
47      else if (current+count==size)
48      {
49        if (first>0)
50        {
51          ret=true ;
52        }
53        else
54        {
55          ret=false ;
56        }
57      }
58      else
59      {
60        if (count<first)
61        {
62          ret=true ;
63        }
64        else
65        {
66          ret=false ;
67        }
68      }
69    }
70    else
71    {
72      if (current+count<first)
73      {
74        ret=true ;
75      }
76      else
77      {
78         ret=false ;
79      }
80    }
81
82    return ret ;
83  }
84
85  bool CServerBuffer::isBufferEmpty(void)
86  {
87    if (used==0) return true ;
88    else return false;
89  }
90
91  void* CServerBuffer::getBuffer(size_t count)
92  {
93    char* ret ;
94
95    if (count==0) return buffer+current ;
96
97    if (current>first)
98    {
99      if (current+count<size)
100      {
101        ret=buffer+current ;
102        current+=count ;
103      }
104      else if (current+count==size)
105      {
106        if (first>0)
107        {
108          ret=buffer+current ;
109          current=0 ;
110        }
111        else
112        {
113          ERROR("void* CServerBuffer::getBuffer(size_t count)",
114                 <<"cannot allocate required size in buffer") ;
115        }
116      }
117      else
118      {
119        end=current ;
120        if (count<first)
121        {
122          ret=buffer ;
123          current=count ;
124        }
125        else
126        {
127          ERROR("void* CServerBuffer::getBuffer(size_t count)",
128                 <<"cannot allocate required size in buffer") ;
129        }
130      }
131    }
132    else
133    {
134      if (current+count<first)
135      {
136        ret=buffer+current ;
137        current+=count ;
138      }
139      else
140      {
141          ERROR("void* CServerBuffer::getBuffer(size_t count)",
142                 <<"cannot allocate required size in buffer") ;
143      }
144    }
145
146    used+=count ;
147    return ret ;
148  }
149
150  void CServerBuffer::freeBuffer(size_t count)
151  {
152    if (count==0) return ;
153
154    if (first==end-1)
155    {
156      first=0 ;
157      count-- ;
158      used-- ;
159      end=size ;
160    }
161
162    if (first<=current)
163    {
164      if (first+count <current)
165      {
166        first+=count ;
167      }
168      else
169      {
170          ERROR("void CServerBuffer::freeBuffer(size_t count)",
171                 <<"cannot free required size in buffer") ;
172      }
173
174    }
175    else
176    {
177      if (first+count<end)
178      {
179        first+=count ;
180      }
181      else
182      {
183          ERROR("void CServerBuffer::freeBuffer(size_t count)",
184                 <<"cannot free required size in buffer") ;
185      }
186    }
187    used-=count ;
188  }
189
190  void CServerBuffer::popBuffer(size_t count)
191  {
192    if (count==0) return ;
193
194    if (current==0) 
195    {
196      current = end ;
197      end=size ;
198    }
199   
200
201    if (first<=current)
202    {
203      if (current-count >first)
204      {
205        current-=count ;
206      }
207      else
208      {
209          ERROR("void CServerBuffer::popBuffer(size_t count)",
210                 <<"cannot pop required size in buffer") ;
211      }
212
213    }
214    else
215    {
216      if (current-count>=0)
217      {
218        current-=count ;
219      }
220      else
221      {
222          ERROR("void CServerBuffer::freeBuffer(size_t count)",
223                 <<"cannot pop required size in buffer") ;
224      }
225    }
226    used-=count ;
227  }
228
229  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count)
230  {
231    count = -1 ;
232    if (!hasWindows || resizingBuffer_) return false ;
233    double time=MPI_Wtime() ;
234    if (time-bufferFromClientTime_ < bufferFromClientLatency_ ) return false;
235    bufferFromClientTime_ = time ;
236    if ( bufferFromClientLatency_ < LATENCY_MAX ) bufferFromClientLatency_ *= 2; // Done to reduce pressure on windows access if not necessary (especially for read on JZ, ex : transect)
237    CTimer::get("getBufferFromClient").resume() ;   
238    size_t clientTimeline ;
239    size_t clientCount ;
240    bool ok=false ;
241   
242    MPI_Group group ;
243    int groupSize,groupRank ;
244    MPI_Win_get_group(windows_[currentWindows], &group) ;
245    MPI_Group_size(group, &groupSize) ;
246    MPI_Group_rank(group, &groupRank) ;
247   
248    lockBuffer(); 
249    CTimer::get("getBufferFromClient_locked").resume() ;   
250// lock is acquired
251
252    MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
253    MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
254    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;
255
256    if (timeLine==clientTimeline)
257    {
258      bufferFromClientLatency_ = LATENCY_DEFAULT; // Reset latency if windows access is used
259      buffer=(char*)getBuffer(clientCount) ;
260      count=clientCount ;
261      MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ;
262      clientTimeline = 0 ;
263      clientCount = 0 ;
264      MPI_Put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
265      MPI_Put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
266
267// release lock
268      CTimer::get("getBufferFromClient_locked").suspend() ;   
269      unlockBuffer() ;
270
271      ok=true ;
272      char checksum=0 ;
273      for(size_t i=0;i<count;i++) checksum=checksum+buffer[i] ;
274      char checksumFirst=0 ;
275      for(size_t i=5; i<10 && i<count ;i++) checksumFirst=checksumFirst+buffer[i] ;
276      char checksumLast=0 ;
277      for(size_t i=(count<10)?0:count-10; i<count ; i++) checksumLast=checksumLast+buffer[i] ;
278     
279      info(40)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "
280              <<clientTimeline<<" clientCount "<<count<<" checksum "<<(int)checksum<<" "
281              <<(int)buffer[0]<<" "<<(int)buffer[1]<<" "<<(int)buffer[2]<<" "<<(int)buffer[3]<<" "<<(int)buffer[4]<<" "<<(int)buffer[5]<<" " 
282              <<(int)buffer[6]<<" "<<(int)buffer[7]<<" "<<(int)buffer[8]<<" "<<(int)buffer[9]<<" "<<(int)buffer[10]<<" "<<(int)buffer[11]<<endl ;
283
284    }
285    else
286    {
287      count=0 ;
288 
289 // release lock
290      CTimer::get("getBufferFromClient_locked").suspend() ; 
291      unlockBuffer() ;
292    }
293    CTimer::get("getBufferFromClient").suspend() ;   
294    if (ok) return true ;
295
296    return false ;
297  }
298 
299  void CServerBuffer::lockBuffer(void)
300  {
301    if (!hasWindows) return ;
302    MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ;
303  }
304
305  void CServerBuffer::unlockBuffer(void)
306  {
307    if (!hasWindows) return ;
308    MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ;
309  }
310 
311  void CServerBuffer::notifyClientFinalize(void)
312  {
313    if (!hasWindows) return ;
314    size_t notify=notifyFinalize_ ;
315    lockBuffer(); 
316// lock is acquired
317    MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
318    unlockBuffer() ;
319  }
320
321  void CServerBuffer::notifyBufferResizing(void)
322  {
323    resizingBuffer_=true ;
324    if (!hasWindows) return ;
325    size_t notify=notifyResizeBuffer_ ;
326    lockBuffer(); 
327// lock is acquired
328    MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
329    unlockBuffer() ;
330  }
331}
Note: See TracBrowser for help on using the repository browser.