source: XIOS3/trunk/src/manager/window_manager.hpp @ 2403

Last change on this file since 2403 was 2263, checked in by ymipsl, 3 years ago

Fix memory problem.
YM

  • Property svn:executable set to *
File size: 14.4 KB
Line 
1#ifndef __WINDOW_MANAGER_HPP__
2#define __WINDOW_MANAGER_HPP__
3
4#include <map>
5#include "mpi.hpp"
6#include "buffer_in.hpp"
7#include "buffer_out.hpp"
8#include "message.hpp"
9
10namespace xios
11{
12
13
14  class CWindowManager
15  {
16
17    private :
18    const MPI_Aint OFFSET_LOCK=0 ;
19    const int SIZE_LOCK=sizeof(int) ;
20    const MPI_Aint OFFSET_BUFFER_SIZE=OFFSET_LOCK+SIZE_LOCK ;
21    const int SIZE_BUFFER_SIZE=sizeof(size_t) ;
22    const MPI_Aint OFFSET_BUFFER=OFFSET_BUFFER_SIZE+SIZE_BUFFER_SIZE ;
23    const int WINDOWS_LOCKED=-1 ;
24
25    MPI_Win window_ ;
26    void * winBuffer_ ;
27    map<int,double> lastTimeLock_ ;
28    const double latency_=0e-2 ; 
29
30    public :
31
32    CWindowManager(MPI_Comm winComm, size_t bufferSize)
33    {
34      const MPI_Aint windowSize=bufferSize+OFFSET_BUFFER ;
35      MPI_Win_allocate(windowSize, 1, MPI_INFO_NULL, winComm, &winBuffer_, &window_) ;
36      int lock=0 ;
37      size_t size=0 ;
38      int commRank ;
39      MPI_Comm_rank(winComm, &commRank) ;
40      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, commRank, 0, window_) ;
41      MPI_Put(&lock, SIZE_LOCK, MPI_CHAR, commRank, OFFSET_LOCK, SIZE_LOCK, MPI_CHAR, window_) ;
42      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, commRank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
43      MPI_Win_unlock(commRank, window_) ;
44      MPI_Barrier(winComm) ;
45    }
46   
47    void lockWindow(int rank, int state )
48    {
49      int lock=state ;
50      double time ;
51      auto it=lastTimeLock_.find(rank) ;
52      if (it == lastTimeLock_.end()) 
53      { 
54        lastTimeLock_[rank] = 0. ; 
55        it=lastTimeLock_.find(rank) ;
56      }
57      double& lastTime = it->second ;
58
59      do 
60      {
61        time=MPI_Wtime() ;
62        while(time-lastTime < latency_) time=MPI_Wtime() ;
63        int flag ;
64        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
65        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
66        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
67        MPI_Win_unlock(rank, window_) ;
68        lastTime=MPI_Wtime() ;
69      } while (lock!=state) ;
70     
71     
72    }
73
74    void lockWindowExclusive(int rank, int state )
75    {
76      int lock=state ;
77      double time ;
78      auto it=lastTimeLock_.find(rank) ;
79      if (it == lastTimeLock_.end()) 
80      { 
81        lastTimeLock_[rank] = 0. ; 
82        it=lastTimeLock_.find(rank) ;
83      }
84      double& lastTime = it->second ;
85
86      do 
87      {
88        time=MPI_Wtime() ;
89        while(time-lastTime < latency_) time=MPI_Wtime() ;
90        int flag ;
91        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
92        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
93        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
94        MPI_Win_unlock(rank, window_) ;
95        lastTime=MPI_Wtime() ;
96      } while (lock!=state) ;
97    }
98
99    void lockWindowExclusive(int rank)
100    {
101      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
102    }
103
104    void lockWindowShared(int rank)
105    {
106      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
107    }
108
109    void unlockWindow(int rank)
110    {
111      MPI_Win_unlock(rank, window_) ;
112    }
113
114    void flushWindow(int rank)
115    {
116      MPI_Win_flush(rank, window_) ;
117    }
118
119    void unlockWindow(int rank, int state )
120    {
121      int lock ;
122      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
123      MPI_Compare_and_swap(&state, &WINDOWS_LOCKED, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
124      MPI_Win_unlock(rank, window_) ;
125    }
126   
127    template< class T >
128    void updateToWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
129    {
130      CBufferOut buffer ;
131      (object->*dumpOut)(buffer) ;
132      size_t size=buffer.count() ;
133      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
134      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
135      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
136      MPI_Win_unlock(rank, window_) ;
137    }
138
139    template< class T >
140    void updateToExclusiveWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
141    {
142      CBufferOut buffer ;
143      (object->*dumpOut)(buffer) ;
144      size_t size=buffer.count() ;
145      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
146      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
147      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
148      MPI_Win_unlock(rank, window_) ;
149    }
150
151    template< class T >
152    void updateTosharedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
153    {
154      CBufferOut buffer ;
155      (object->*dumpOut)(buffer) ;
156      size_t size=buffer.count() ;
157      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
158      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
159      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
160      MPI_Win_unlock(rank, window_) ;
161    }
162
163    template< class T >
164    void updateToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
165    {
166      CBufferOut buffer ;
167      (object->*dumpOut)(buffer) ;
168      size_t size=buffer.count() ;
169//      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
170      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
171      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
172      MPI_Win_flush(rank, window_) ;
173//      MPI_Win_unlock(rank, window_) ;
174    }
175
176    template< typename T >
177    void updateFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
178    {
179      size_t size ;
180      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
181      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
182      MPI_Win_flush(rank,window_) ;
183      CBufferIn buffer(size) ;
184      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
185      MPI_Win_unlock(rank, window_) ;
186      (object->*dumpIn)(buffer) ;
187    }
188
189    template< typename T >
190    void updateFromExclusiveWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
191    {
192      size_t size ;
193      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
194      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
195      MPI_Win_flush(rank,window_) ;
196      CBufferIn buffer(size) ;
197      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
198      MPI_Win_unlock(rank, window_) ;
199      (object->*dumpIn)(buffer) ;
200    }
201
202    template< typename T >
203    void updateFromSharedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
204    {
205      size_t size ;
206      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
207      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
208      MPI_Win_flush(rank,window_) ;
209      CBufferIn buffer(size) ;
210      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
211      MPI_Win_unlock(rank, window_) ;
212      (object->*dumpIn)(buffer) ;
213    }
214
215    template< typename T >
216    void updateFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
217    {
218      size_t size ;
219//      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
220      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
221      MPI_Win_flush(rank,window_) ;
222      CBufferIn buffer(size) ;
223      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
224//      MPI_Win_unlock(rank, window_) ;
225      MPI_Win_flush(rank, window_) ;
226      (object->*dumpIn)(buffer) ;
227    }
228
229
230    template< class T >
231    void pushToWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
232    {
233      size_t size ;
234      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
235      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
236      MPI_Win_flush(rank,window_) ;
237      CBufferOut buffer ;
238      (object->*dumpOut)(buffer) ;
239      size_t bufferSize=buffer.count() ;
240      size_t newSize = size + bufferSize;
241      MPI_Put(&newSize, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
242      MPI_Put(buffer.start(), bufferSize, MPI_CHAR, rank, OFFSET_BUFFER+size, bufferSize, MPI_CHAR, window_) ;
243      MPI_Win_unlock(rank, window_) ;
244    }
245
246    template< class T >
247    void pushToExclusiveWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
248    {
249      size_t size ;
250      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
251      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
252      MPI_Win_flush(rank,window_) ;
253      CBufferOut buffer ;
254      (object->*dumpOut)(buffer) ;
255      size_t bufferSize=buffer.count() ;
256      size_t newSize = size + bufferSize;
257      MPI_Put(&newSize, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
258      MPI_Put(buffer.start(), bufferSize, MPI_CHAR, rank, OFFSET_BUFFER+size, bufferSize, MPI_CHAR, window_) ;
259      MPI_Win_unlock(rank, window_) ;
260    }
261
262    template< class T >
263    void pushToSharedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
264    {
265      size_t size ;
266      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
267      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
268      MPI_Win_flush(rank,window_) ;
269      CBufferOut buffer ;
270      (object->*dumpOut)(buffer) ;
271      size_t bufferSize=buffer.count() ;
272      size_t newSize = size + bufferSize;
273      MPI_Put(&newSize, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
274      MPI_Put(buffer.start(), bufferSize, MPI_CHAR, rank, OFFSET_BUFFER+size, bufferSize, MPI_CHAR, window_) ;
275      MPI_Win_unlock(rank, window_) ;
276    }
277
278    template< class T >
279    void pushToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
280    {
281      size_t size ;
282      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
283      MPI_Win_flush(rank,window_) ;
284      CBufferOut buffer ;
285      (object->*dumpOut)(buffer) ;
286      size_t bufferSize=buffer.count() ;
287      size_t newSize = size + bufferSize;
288      MPI_Put(&newSize, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
289      MPI_Put(buffer.start(), bufferSize, MPI_CHAR, rank, OFFSET_BUFFER+size, bufferSize, MPI_CHAR, window_) ;
290      MPI_Win_flush(rank, window_) ;
291    }
292
293    template< typename T >
294    void popFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
295    {
296      size_t size ;
297      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
298      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
299      MPI_Win_flush(rank,window_) ;
300      CBufferIn buffer(size) ;
301      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
302      MPI_Win_flush(rank,window_) ;
303      (object->*dumpIn)(buffer) ;
304     
305      size=buffer.remain() ;
306      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
307      MPI_Put(buffer.ptr(),buffer.remain(), MPI_CHAR, rank, OFFSET_BUFFER, buffer.remain(), MPI_CHAR, window_) ;
308      MPI_Win_unlock(rank, window_) ;
309     
310    }
311
312    template< typename T >
313    void popFromExclusiveWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
314    {
315      size_t size ;
316      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
317      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
318      MPI_Win_flush(rank,window_) ;
319      CBufferIn buffer(size) ;
320      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
321      MPI_Win_flush(rank,window_) ;
322      (object->*dumpIn)(buffer) ;
323     
324      size=buffer.remain() ;
325      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
326      MPI_Put(buffer.ptr(),buffer.remain(), MPI_CHAR, rank, OFFSET_BUFFER, buffer.remain(), MPI_CHAR, window_) ;
327      MPI_Win_unlock(rank, window_) ;
328     
329    }
330
331    template< typename T >
332    void popFromSharedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
333    {
334      size_t size ;
335      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
336      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
337      MPI_Win_flush(rank,window_) ;
338      CBufferIn buffer(size) ;
339      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
340      MPI_Win_flush(rank,window_) ;
341      (object->*dumpIn)(buffer) ;
342     
343      size=buffer.remain() ;
344      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
345      MPI_Put(buffer.ptr(),buffer.remain(), MPI_CHAR, rank, OFFSET_BUFFER, buffer.remain(), MPI_CHAR, window_) ;
346      MPI_Win_unlock(rank, window_) ;
347     
348    }
349
350    template< typename T >
351    void popFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
352    {
353      size_t size ;
354      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
355      MPI_Win_flush(rank,window_) ;
356      CBufferIn buffer(size) ;
357      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
358      MPI_Win_flush(rank,window_) ;
359      (object->*dumpIn)(buffer) ;
360     
361      size=buffer.remain() ;
362      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
363      MPI_Put(buffer.ptr(),buffer.remain(), MPI_CHAR, rank, OFFSET_BUFFER, buffer.remain(), MPI_CHAR, window_) ;
364      MPI_Win_flush(rank, window_) ;
365    }
366
367    ~CWindowManager()
368    {
369      MPI_Win_free(&window_) ;
370    }
371  } ;
372}
373
374
375
376#endif
Note: See TracBrowser for help on using the repository browser.