source: XIOS3/trunk/src/manager/window_base.hpp @ 2557

Last change on this file since 2557 was 2547, checked in by ymipsl, 10 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property svn:executable set to *
File size: 4.6 KB
Line 
1#ifndef __WINDOW_BASE_HPP__
2#define __WINDOW_BASE_HPP__
3
4#include <map>
5#include "exception.hpp"
6#include "mpi.hpp"
7
8namespace xios
9{
10
11
12  class CWindowBase
13  {
14    private:
15      void * winBuffer_ ;   
16      const MPI_Aint OFFSET_LOCK=0 ;
17      const int SIZE_LOCK=sizeof(long) ;
18      const MPI_Aint OFFSET_BUFFER =  SIZE_LOCK ;
19      MPI_Aint bufferSize_ ;
20      MPI_Aint windowSize_ ;
21      const double maxLatency_ = 1e-3 ; // 1ms latency maximum
22      MPI_Win window_ ;
23
24    public :
25
26    CWindowBase(MPI_Comm winComm, size_t bufferSize)
27    {
28      bufferSize_ = bufferSize ;
29      windowSize_ = bufferSize_ + OFFSET_BUFFER ;
30      MPI_Win_allocate(windowSize_, 1, MPI_INFO_NULL, winComm, &winBuffer_, &window_) ;
31      MPI_Aint& lock = *((MPI_Aint*)((char*)winBuffer_+OFFSET_LOCK)) ;
32      lock=0 ;
33      MPI_Win_lock_all(0, window_) ;
34      MPI_Barrier(winComm) ;
35    }
36
37    bool tryLockExclusive(int rank)
38    {
39      long lock = 1;
40      long unlock = 0;
41      long state;
42
43      int flag ;
44      MPI_Compare_and_swap(&lock, &unlock, &state, MPI_LONG, rank, OFFSET_LOCK, window_) ;
45      MPI_Win_flush(rank, window_);
46
47      bool locked = (state == unlock) ;
48      return locked ;
49    }
50
51    bool tryLockShared(int rank, MPI_Op op)
52    {
53      long one = 0x100000000;
54      long res;
55
56      MPI_Fetch_and_op(&one, &res, MPI_LONG, rank, OFFSET_LOCK, op, window_);
57      MPI_Win_flush(rank, window_);
58     
59      bool locked =  ! (res & 1) ;
60      return locked ;
61    }
62
63    void unlockExclusive(int rank)
64    {
65      int lock = 1;
66      int unlock = 0;
67      int state;
68
69      MPI_Win_flush(rank, window_);
70      MPI_Compare_and_swap(&unlock, &lock, &state, MPI_INT, rank, OFFSET_LOCK, window_) ;
71      MPI_Win_flush(rank, window_);
72      if (lock != state) ERROR("CWindowBase::unlockWindowExclusive",<<"unlockWindow failed: bad state"<<endl) ; 
73    }
74
75    void unlockShared(int rank)
76    {
77      int minusone = -1;
78      int res;
79      MPI_Fetch_and_op(&minusone, &res, MPI_INT, rank, OFFSET_LOCK+4, MPI_SUM, window_);
80      MPI_Win_flush(rank, window_);
81    }
82
83    void lockExclusive(int rank)
84    {
85      double time =  MPI_Wtime() ;
86      bool locked = tryLockExclusive(rank);
87      double lastTime = MPI_Wtime() ;
88      double delta = lastTime-time ;
89     
90      while (!locked)
91      {
92        time = MPI_Wtime() ;
93        if (delta > maxLatency_) delta = maxLatency_ ;
94        if (time >= lastTime+delta)
95        { 
96          locked = tryLockExclusive(rank);
97          delta=delta*2.;
98          lastTime = time ;     
99        }
100      } 
101    }
102
103    void lockShared(int rank)
104    {
105      double time =  MPI_Wtime() ;
106      bool locked = tryLockShared(rank, MPI_SUM);
107      double lastTime = MPI_Wtime() ;
108      double delta = lastTime-time ;
109     
110      while (!locked)
111      {
112        time = MPI_Wtime() ;
113        if (delta > maxLatency_) delta = maxLatency_ ;
114        if (time >= lastTime+delta)
115        { 
116          locked = tryLockShared(rank, MPI_NO_OP);
117          delta=delta*2.;
118          lastTime = time ;     
119        }
120      } 
121    }
122
123    int flush(int rank)
124    {
125      return MPI_Win_flush(rank, window_) ;
126    }
127
128    int put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp,
129            int target_count, MPI_Datatype target_datatype)
130    {
131      return MPI_Put(origin_addr, origin_count, origin_datatype, target_rank,  target_disp + OFFSET_BUFFER, target_count, target_datatype, window_) ;
132    }
133
134    int get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp,
135            int target_count, MPI_Datatype target_datatype)
136    {
137      return MPI_Get(origin_addr, origin_count, origin_datatype, target_rank, target_disp + OFFSET_BUFFER, target_count, target_datatype, window_) ;
138    }
139   
140    int fetchAndOp(const void *origin_addr, void *result_addr, MPI_Datatype datatype, int target_rank, MPI_Aint target_disp, MPI_Op op)
141    {
142      return MPI_Fetch_and_op(origin_addr, result_addr, datatype, target_rank, target_disp + OFFSET_BUFFER, op, window_ ) ;
143    }
144   
145    int compareAndSwap(const void *origin_addr, const void *compare_addr, void *result_addr, MPI_Datatype datatype,
146                       int target_rank, MPI_Aint target_disp)
147    {
148      return MPI_Compare_and_swap(origin_addr, compare_addr, result_addr, datatype, target_rank, target_disp + OFFSET_BUFFER, window_) ;
149    }
150
151    ~CWindowBase()
152    {
153      MPI_Win_unlock_all(window_);
154      MPI_Win_free(&window_) ;
155    }
156
157  } ;
158}
159
160
161
162#endif
Note: See TracBrowser for help on using the repository browser.