source: XIOS3/trunk/src/manager/window_manager.hpp

Last change on this file was 2580, checked in by ymipsl, 9 months ago

Tracking unfree MPI windows and communicators.

YM

  • Property svn:executable set to *
File size: 8.3 KB
RevLine 
[1761]1#ifndef __WINDOW_MANAGER_HPP__
2#define __WINDOW_MANAGER_HPP__
3
4#include <map>
5#include "mpi.hpp"
6#include "buffer_in.hpp"
7#include "buffer_out.hpp"
8#include "message.hpp"
[2517]9#include "window_base.hpp"
[1761]10
11namespace xios
12{
13
14
[2517]15  class CWindowManager : public CWindowBase
[1761]16  {
17
18    private :
[2517]19    static const MPI_Aint OFFSET_LOCK=0 ;
20    static const int SIZE_LOCK=sizeof(int) ;
21    static const MPI_Aint OFFSET_BUFFER_SIZE=OFFSET_LOCK+SIZE_LOCK ;
22    static const int SIZE_BUFFER_SIZE=sizeof(size_t) ;
23    static const MPI_Aint OFFSET_BUFFER=OFFSET_BUFFER_SIZE+SIZE_BUFFER_SIZE ;
24    static const int WINDOWS_LOCKED=-1 ;
[1761]25
26    MPI_Win window_ ;
27    void * winBuffer_ ;
[2246]28    map<int,double> lastTimeLock_ ;
29    const double latency_=0e-2 ; 
[1761]30
31    public :
32
[2580]33    CWindowManager(MPI_Comm winComm, size_t bufferSize, const string name) : CWindowBase(winComm, bufferSize + OFFSET_BUFFER_SIZE, name)
[1761]34    {
35      int lock=0 ;
36      size_t size=0 ;
37      int commRank ;
38      MPI_Comm_rank(winComm, &commRank) ;
[2517]39      lockExclusive(commRank) ;
40      put(&lock, SIZE_LOCK, MPI_CHAR, commRank, OFFSET_LOCK, SIZE_LOCK, MPI_CHAR) ;
41      put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, commRank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR) ;
42      unlockExclusive(commRank) ;
[1761]43      MPI_Barrier(winComm) ;
44    }
45   
46    void lockWindow(int rank, int state )
47    {
48      int lock=state ;
[2246]49      double time ;
50      auto it=lastTimeLock_.find(rank) ;
51      if (it == lastTimeLock_.end()) 
52      { 
53        lastTimeLock_[rank] = 0. ; 
54        it=lastTimeLock_.find(rank) ;
55      }
56      double& lastTime = it->second ;
57
[1761]58      do 
59      {
[2246]60        time=MPI_Wtime() ;
61        while(time-lastTime < latency_) time=MPI_Wtime() ;
[2260]62        int flag ;
63        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
[2517]64        compareAndSwap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK) ;
65        flush(rank) ;
[2246]66        lastTime=MPI_Wtime() ;
[1761]67      } while (lock!=state) ;
68     
[2246]69     
[1761]70    }
71
[2246]72    void lockWindowExclusive(int rank, int state )
73    {
74      int lock=state ;
75      double time ;
76      auto it=lastTimeLock_.find(rank) ;
77      if (it == lastTimeLock_.end()) 
78      { 
79        lastTimeLock_[rank] = 0. ; 
80        it=lastTimeLock_.find(rank) ;
81      }
82      double& lastTime = it->second ;
[1761]83
[2246]84      do 
85      {
86        time=MPI_Wtime() ;
87        while(time-lastTime < latency_) time=MPI_Wtime() ;
[2260]88        int flag ;
89        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
[2517]90        lockExclusive(rank) ;
91        compareAndSwap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK) ;
92        unlockExclusive(rank) ;
[2246]93        lastTime=MPI_Wtime() ;
94      } while (lock!=state) ;
95    }
96
97    void lockWindowExclusive(int rank)
98    {
[2517]99      lockExclusive(rank) ;
[2246]100    }
101
102    void lockWindowShared(int rank)
103    {
[2517]104      lockShared(rank) ;
[2246]105    }
106
[2517]107    void unlockWindowExclusive(int rank)
108    {
109      unlockExclusive(rank) ;
110    }
111
112    void unlockWindowShared(int rank)
113    {
114      unlockShared(rank) ;
115    }
116
117    void lockWindow(int rank)
118    {
119      lockWindowExclusive(rank) ;
120    }
121   
[2246]122    void unlockWindow(int rank)
123    {
[2517]124      unlockWindowExclusive(rank) ;
[2246]125    }
126
127    void flushWindow(int rank)
128    {
[2517]129      flush(rank) ;
[2246]130    }
131
[1761]132    void unlockWindow(int rank, int state )
133    {
134      int lock ;
[2517]135      compareAndSwap(&state, &WINDOWS_LOCKED, &lock, MPI_INT, rank, OFFSET_LOCK) ;
136      flush(rank) ;
[1761]137    }
138   
139    template< class T >
140    void updateToWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
141    {
[2517]142      updateToExclusiveWindow(rank, object, dumpOut) ;
[1761]143    }
[2246]144
145    template< class T >
[2263]146    void updateToExclusiveWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
147    {
[2517]148      lockExclusive(rank) ;
149      updateToLockedWindow(rank, object, dumpOut) ;
150      unlockExclusive(rank) ;
151     }
[2263]152
153    template< class T >
154    void updateTosharedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
155    {
[2517]156      lockShared(rank) ;
157      updateToLockedWindow(rank, object, dumpOut) ;
158      unlockShared(rank) ;
[2263]159    }
160
161    template< class T >
[2246]162    void updateToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
163    {
164      CBufferOut buffer ;
165      (object->*dumpOut)(buffer) ;
166      size_t size=buffer.count() ;
[2517]167
168      put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR) ;
169      put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR) ;
170      flush(rank) ;
[2246]171    }
172
[1761]173    template< typename T >
174    void updateFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
175    {
[2517]176      updateFromExclusiveWindow(rank,object, dumpIn) ;
[1761]177    }
178
[2246]179    template< typename T >
[2263]180    void updateFromExclusiveWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
181    {
182      size_t size ;
[2517]183      lockExclusive(rank) ;
184      updateFromLockedWindow(rank,object, dumpIn) ;
185      unlockExclusive(rank) ;
[2263]186    }
187
188    template< typename T >
189    void updateFromSharedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
190    {
191      size_t size ;
[2517]192      lockShared(rank) ;
193      updateFromLockedWindow(rank,object, dumpIn) ;
194      unlockShared(rank) ;
[2263]195    }
196
197    template< typename T >
[2246]198    void updateFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
199    {
200      size_t size ;
[2517]201
202      get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR) ;
203      flush(rank) ;
[2246]204      CBufferIn buffer(size) ;
[2517]205      get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR) ;
206      flush(rank) ;
207
[2246]208      (object->*dumpIn)(buffer) ;
209    }
210
211
[2517]212
[1761]213    template< class T >
214    void pushToWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
215    {
[2517]216      pushToExclusiveWindow(rank, object, dumpOut) ;
[1761]217    }
218
[2258]219    template< class T >
[2263]220    void pushToExclusiveWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
221    {
[2517]222      lockExclusive(rank) ;
223      pushToLockedWindow(rank, object, dumpOut) ;
224      unlockExclusive(rank) ;
[2263]225    }
226
227    template< class T >
228    void pushToSharedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
229    {
[2517]230      lockShared(rank) ;
231      pushToLockedWindow(rank, object, dumpOut) ;
232      unlockShared(rank) ;
[2263]233    }
234
235    template< class T >
[2258]236    void pushToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
237    {
238      size_t size ;
[2517]239      get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR) ;
240      flush(rank) ;
[2258]241      CBufferOut buffer ;
242      (object->*dumpOut)(buffer) ;
243      size_t bufferSize=buffer.count() ;
244      size_t newSize = size + bufferSize;
[2517]245      put(&newSize, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR) ;
246      put(buffer.start(), bufferSize, MPI_CHAR, rank, OFFSET_BUFFER+size, bufferSize, MPI_CHAR) ;
247      flush(rank) ;
[2258]248    }
249
[2517]250
251
[1761]252    template< typename T >
253    void popFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
254    {
[2517]255      popFromExclusiveWindow(rank,object, dumpIn) ; 
[1761]256    }
257
[2258]258    template< typename T >
[2263]259    void popFromExclusiveWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
260    {
[2517]261      lockExclusive(rank) ;
262      popFromLockedWindow(rank,object, dumpIn) ; 
263      unlockExclusive(rank) ;
[2263]264     
265    }
266
267    template< typename T >
268    void popFromSharedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
269    {
[2517]270      lockShared(rank) ;
271      popFromLockedWindow(rank,object, dumpIn) ; 
272      unlockShared(rank) ;
[2263]273    }
274
275    template< typename T >
[2258]276    void popFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
277    {
278      size_t size ;
[2517]279      get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR) ;
280      flush(rank) ;
[2258]281      CBufferIn buffer(size) ;
[2517]282      get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR) ;
283      flush(rank) ;
[2258]284      (object->*dumpIn)(buffer) ;
285     
286      size=buffer.remain() ;
[2517]287      put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR) ;
288      put(buffer.ptr(),buffer.remain(), MPI_CHAR, rank, OFFSET_BUFFER, buffer.remain(), MPI_CHAR) ;
289      flush(rank) ;
[2258]290    }
291
[1764]292    ~CWindowManager()
293    {
294    }
[1761]295  } ;
296}
297
298
299
300#endif
Note: See TracBrowser for help on using the repository browser.