source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/window_manager.hpp @ 2260

Last change on this file since 2260 was 2260, checked in by ymipsl, 3 years ago

Improvment of one sided protocol

  • removed latency
  • solve dead-lock

YM

  • Property svn:executable set to *
File size: 8.9 KB
Line 
1#ifndef __WINDOW_MANAGER_HPP__
2#define __WINDOW_MANAGER_HPP__
3
4#include <map>
5#include "mpi.hpp"
6#include "buffer_in.hpp"
7#include "buffer_out.hpp"
8#include "message.hpp"
9
10namespace xios
11{
12
13
14  class CWindowManager
15  {
16
17    private :
18    const MPI_Aint OFFSET_LOCK=0 ;
19    const int SIZE_LOCK=sizeof(int) ;
20    const MPI_Aint OFFSET_BUFFER_SIZE=OFFSET_LOCK+SIZE_LOCK ;
21    const int SIZE_BUFFER_SIZE=sizeof(size_t) ;
22    const MPI_Aint OFFSET_BUFFER=OFFSET_BUFFER_SIZE+SIZE_BUFFER_SIZE ;
23    const int WINDOWS_LOCKED=-1 ;
24
25    MPI_Win window_ ;
26    void * winBuffer_ ;
27    map<int,double> lastTimeLock_ ;
28    const double latency_=0e-2 ; 
29
30    public :
31
32    CWindowManager(MPI_Comm winComm, size_t bufferSize)
33    {
34      const MPI_Aint windowSize=bufferSize+OFFSET_BUFFER ;
35      MPI_Win_allocate(windowSize, 1, MPI_INFO_NULL, winComm, &winBuffer_, &window_) ;
36      int lock=0 ;
37      size_t size=0 ;
38      int commRank ;
39      MPI_Comm_rank(winComm, &commRank) ;
40      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, commRank, 0, window_) ;
41      MPI_Put(&lock, SIZE_LOCK, MPI_CHAR, commRank, OFFSET_LOCK, SIZE_LOCK, MPI_CHAR, window_) ;
42      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, commRank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
43      MPI_Win_unlock(commRank, window_) ;
44      MPI_Barrier(winComm) ;
45    }
46   
47    void lockWindow(int rank, int state )
48    {
49      int lock=state ;
50      double time ;
51      auto it=lastTimeLock_.find(rank) ;
52      if (it == lastTimeLock_.end()) 
53      { 
54        lastTimeLock_[rank] = 0. ; 
55        it=lastTimeLock_.find(rank) ;
56      }
57      double& lastTime = it->second ;
58
59      do 
60      {
61        time=MPI_Wtime() ;
62        while(time-lastTime < latency_) time=MPI_Wtime() ;
63        int flag ;
64        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
65        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
66        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
67        MPI_Win_unlock(rank, window_) ;
68        lastTime=MPI_Wtime() ;
69      } while (lock!=state) ;
70     
71     
72    }
73
74    void lockWindowExclusive(int rank, int state )
75    {
76      int lock=state ;
77      double time ;
78      auto it=lastTimeLock_.find(rank) ;
79      if (it == lastTimeLock_.end()) 
80      { 
81        lastTimeLock_[rank] = 0. ; 
82        it=lastTimeLock_.find(rank) ;
83      }
84      double& lastTime = it->second ;
85
86      do 
87      {
88        time=MPI_Wtime() ;
89        while(time-lastTime < latency_) time=MPI_Wtime() ;
90        int flag ;
91        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
92        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
93        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
94        MPI_Win_unlock(rank, window_) ;
95        lastTime=MPI_Wtime() ;
96      } while (lock!=state) ;
97    }
98
99    void lockWindowExclusive(int rank)
100    {
101      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
102    }
103
104    void lockWindowShared(int rank)
105    {
106      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
107    }
108
109    void unlockWindow(int rank)
110    {
111      MPI_Win_unlock(rank, window_) ;
112    }
113
114    void flushWindow(int rank)
115    {
116      MPI_Win_flush(rank, window_) ;
117    }
118
119    void unlockWindow(int rank, int state )
120    {
121      int lock ;
122      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
123      MPI_Compare_and_swap(&state, &WINDOWS_LOCKED, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ;
124      MPI_Win_unlock(rank, window_) ;
125    }
126   
127    template< class T >
128    void updateToWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
129    {
130      CBufferOut buffer ;
131      (object->*dumpOut)(buffer) ;
132      size_t size=buffer.count() ;
133      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
134      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
135      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
136      MPI_Win_unlock(rank, window_) ;
137    }
138
139    template< class T >
140    void updateToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
141    {
142      CBufferOut buffer ;
143      (object->*dumpOut)(buffer) ;
144      size_t size=buffer.count() ;
145//      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
146      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
147      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
148//      MPI_Win_unlock(rank, window_) ;
149    }
150
151    template< typename T >
152    void updateFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
153    {
154      size_t size ;
155      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
156      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
157      MPI_Win_flush(rank,window_) ;
158      CBufferIn buffer(size) ;
159      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
160      MPI_Win_unlock(rank, window_) ;
161      (object->*dumpIn)(buffer) ;
162    }
163
164    template< typename T >
165    void updateFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
166    {
167      size_t size ;
168//      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ;
169      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
170      MPI_Win_flush(rank,window_) ;
171      CBufferIn buffer(size) ;
172      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
173//      MPI_Win_unlock(rank, window_) ;
174      MPI_Win_flush(rank, window_) ;
175      (object->*dumpIn)(buffer) ;
176    }
177
178
179    template< class T >
180    void pushToWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
181    {
182      size_t size ;
183      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
184      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
185      MPI_Win_flush(rank,window_) ;
186      CBufferOut buffer ;
187      (object->*dumpOut)(buffer) ;
188      size_t bufferSize=buffer.count() ;
189      size_t newSize = size + bufferSize;
190      MPI_Put(&newSize, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
191      MPI_Put(buffer.start(), bufferSize, MPI_CHAR, rank, OFFSET_BUFFER+size, bufferSize, MPI_CHAR, window_) ;
192      MPI_Win_unlock(rank, window_) ;
193    }
194
195    template< class T >
196    void pushToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) )
197    {
198      size_t size ;
199      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
200      MPI_Win_flush(rank,window_) ;
201      CBufferOut buffer ;
202      (object->*dumpOut)(buffer) ;
203      size_t bufferSize=buffer.count() ;
204      size_t newSize = size + bufferSize;
205      MPI_Put(&newSize, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
206      MPI_Put(buffer.start(), bufferSize, MPI_CHAR, rank, OFFSET_BUFFER+size, bufferSize, MPI_CHAR, window_) ;
207    }
208
209    template< typename T >
210    void popFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
211    {
212      size_t size ;
213      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ;
214      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
215      MPI_Win_flush(rank,window_) ;
216      CBufferIn buffer(size) ;
217      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
218      MPI_Win_flush(rank,window_) ;
219      (object->*dumpIn)(buffer) ;
220     
221      size=buffer.remain() ;
222      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
223      MPI_Put(buffer.ptr(),buffer.remain(), MPI_CHAR, rank, OFFSET_BUFFER, buffer.remain(), MPI_CHAR, window_) ;
224      MPI_Win_unlock(rank, window_) ;
225     
226    }
227
228    template< typename T >
229    void popFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 
230    {
231      size_t size ;
232      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
233      MPI_Win_flush(rank,window_) ;
234      CBufferIn buffer(size) ;
235      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ;
236      MPI_Win_flush(rank,window_) ;
237      (object->*dumpIn)(buffer) ;
238     
239      size=buffer.remain() ;
240      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ;
241      MPI_Put(buffer.ptr(),buffer.remain(), MPI_CHAR, rank, OFFSET_BUFFER, buffer.remain(), MPI_CHAR, window_) ;
242    }
243
244    ~CWindowManager()
245    {
246      MPI_Win_free(&window_) ;
247    }
248  } ;
249}
250
251
252
253#endif
Note: See TracBrowser for help on using the repository browser.