source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/window_manager.hpp @ 2258

Last change on this file since 2258 was 2258, checked in by ymipsl, 3 years ago

One sided protocol improvment.
YM

  • Property svn:executable set to *
File size: 8.7 KB
Line 
1#ifndef __WINDOW_MANAGER_HPP__
2#define __WINDOW_MANAGER_HPP__
3
4#include <map>
5#include "mpi.hpp"
6#include "buffer_in.hpp"
7#include "buffer_out.hpp"
8#include "message.hpp"
9
10namespace xios
11{
12
13
14  class CWindowManager
15  {
16
17    private :
18    const MPI_Aint OFFSET_LOCK=0 ;
19    const int SIZE_LOCK=sizeof(int) ;
20    const MPI_Aint OFFSET_BUFFER_SIZE=OFFSET_LOCK+SIZE_LOCK ;
21    const int SIZE_BUFFER_SIZE=sizeof(size_t) ;
22    const MPI_Aint OFFSET_BUFFER=OFFSET_BUFFER_SIZE+SIZE_BUFFER_SIZE ;
23    const int WINDOWS_LOCKED=-1 ;
24
25    MPI_Win window_ ;
26    void * winBuffer_ ;
27    map<int,double> lastTimeLock_ ;
28    const double latency_=0e-2 ; 
29
30    public :
31
32    CWindowManager(MPI_Comm winComm, size_t bufferSize)
33    {
34      const MPI_Aint windowSize=bufferSize+OFFSET_BUFFER ;
35      MPI_Win_allocate(windowSize, 1, MPI_INFO_NULL, winComm, &winBuffer_, &window_) ;
36      int lock=0 ;
37      size_t size=0 ;
38      int commRank ;
39      MPI_Comm_rank(winComm, &commRank) ;
40      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, commRank, 0, window_) ;
41      MPI_Put(&lock, SIZE_LOCK, MPI_CHAR, commRank, OFFSET_LOCK, SIZE_LOCK, MPI_CHAR, window_) ;
42      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, commRank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
43      MPI_Win_unlock(commRank, window_) ;
44      MPI_Barrier(winComm) ;
45    }
46   
47    void lockWindow(int rank, int state )
48    {
49      int lock=state ;
50      double time ;
51      auto it=lastTimeLock_.find(rank) ;
52      if (it == lastTimeLock_.end()) 
53      { 
54        lastTimeLock_[rank] = 0. ; 
55        it=lastTimeLock_.find(rank) ;
56      }
57      double& lastTime = it->second ;
58
59      do 
60      {
61        time=MPI_Wtime() ;
62        while(time-lastTime < latency_) time=MPI_Wtime() ;
63        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
64        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
65        MPI_Win_unlock(rank, window_) ;
66        lastTime=MPI_Wtime() ;
67      } while (lock!=state) ;
68     
69     
70    }
71
72    void lockWindowExclusive(int rank, int state )
73    {
74      int lock=state ;
75      double time ;
76      auto it=lastTimeLock_.find(rank) ;
77      if (it == lastTimeLock_.end()) 
78      { 
79        lastTimeLock_[rank] = 0. ; 
80        it=lastTimeLock_.find(rank) ;
81      }
82      double& lastTime = it->second ;
83
84      do 
85      {
86        time=MPI_Wtime() ;
87        while(time-lastTime < latency_) time=MPI_Wtime() ;
88        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
89        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
90        MPI_Win_unlock(rank, window_) ;
91        lastTime=MPI_Wtime() ;
92      } while (lock!=state) ;
93    }
94
95    void lockWindowExclusive(int rank)
96    {
97      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
98    }
99
100    void lockWindowShared(int rank)
101    {
102      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
103    }
104
105    void unlockWindow(int rank)
106    {
107      MPI_Win_unlock(rank, window_) ;
108    }
109
110    void flushWindow(int rank)
111    {
112      MPI_Win_flush(rank, window_) ;
113    }
114
115    void unlockWindow(int rank, int state )
116    {
117      int lock ;
118      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
119      MPI_Compare_and_swap(&state, &WINDOWS_LOCKED, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
120      MPI_Win_unlock(rank, window_) ;
121    }
122   
123    template< class T >
124    void updateToWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
125    {
126      CBufferOut buffer ;
127      (object->*dumpOut)(buffer) ;
128      size_t size=buffer.count() ;
129      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
130      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
131      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
132      MPI_Win_unlock(rank, window_) ;
133    }
134
135    template< class T >
136    void updateToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
137    {
138      CBufferOut buffer ;
139      (object->*dumpOut)(buffer) ;
140      size_t size=buffer.count() ;
141//      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
142      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
143      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
144//      MPI_Win_unlock(rank, window_) ;
145    }
146
147    template< typename T >
148    void updateFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
149    {
150      size_t size ;
151      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
152      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
153      MPI_Win_flush(rank,window_) ;
154      CBufferIn buffer(size) ;
155      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
156      MPI_Win_unlock(rank, window_) ;
157      (object->*dumpIn)(buffer) ;
158    }
159
160    template< typename T >
161    void updateFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
162    {
163      size_t size ;
164//      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
165      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
166      MPI_Win_flush(rank,window_) ;
167      CBufferIn buffer(size) ;
168      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
169//      MPI_Win_unlock(rank, window_) ;
170      MPI_Win_flush(rank, window_) ;
171      (object->*dumpIn)(buffer) ;
172    }
173
174
175    template< class T >
176    void pushToWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
177    {
178      size_t size ;
179      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
180      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
181      MPI_Win_flush(rank,window_) ;
182      CBufferOut buffer ;
183      (object->*dumpOut)(buffer) ;
184      size_t bufferSize=buffer.count() ;
185      size_t newSize = size + bufferSize;
186      MPI_Put(&newSize, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
187      MPI_Put(buffer.start(), bufferSize, MPI_CHAR, rank, OFFSET_BUFFER+size, bufferSize, MPI_CHAR, window_) ;
188      MPI_Win_unlock(rank, window_) ;
189    }
190
191    template< class T >
192    void pushToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
193    {
194      size_t size ;
195      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
196      MPI_Win_flush(rank,window_) ;
197      CBufferOut buffer ;
198      (object->*dumpOut)(buffer) ;
199      size_t bufferSize=buffer.count() ;
200      size_t newSize = size + bufferSize;
201      MPI_Put(&newSize, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
202      MPI_Put(buffer.start(), bufferSize, MPI_CHAR, rank, OFFSET_BUFFER+size, bufferSize, MPI_CHAR, window_) ;
203    }
204
205    template< typename T >
206    void popFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
207    {
208      size_t size ;
209      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
210      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
211      MPI_Win_flush(rank,window_) ;
212      CBufferIn buffer(size) ;
213      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
214      MPI_Win_flush(rank,window_) ;
215      (object->*dumpIn)(buffer) ;
216     
217      size=buffer.remain() ;
218      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
219      MPI_Put(buffer.ptr(),buffer.remain(), MPI_CHAR, rank, OFFSET_BUFFER, buffer.remain(), MPI_CHAR, window_) ;
220      MPI_Win_unlock(rank, window_) ;
221     
222    }
223
224    template< typename T >
225    void popFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
226    {
227      size_t size ;
228      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
229      MPI_Win_flush(rank,window_) ;
230      CBufferIn buffer(size) ;
231      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
232      MPI_Win_flush(rank,window_) ;
233      (object->*dumpIn)(buffer) ;
234     
235      size=buffer.remain() ;
236      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
237      MPI_Put(buffer.ptr(),buffer.remain(), MPI_CHAR, rank, OFFSET_BUFFER, buffer.remain(), MPI_CHAR, window_) ;
238    }
239
240    ~CWindowManager()
241    {
242      MPI_Win_free(&window_) ;
243    }
244  } ;
245}
246
247
248
249#endif
Note: See TracBrowser for help on using the repository browser.