source: XIOS3/trunk/src/manager/ressources_manager.cpp @ 2547

Last change on this file since 2547 was 2517, checked in by ymipsl, 13 months ago

New way to manage locks in window manager. Windows is locked with MPI_Win_lock_all at creation (shared mode), and lock is manage by software way in the class (using MPI_swap_and_compare and MPI_Fetch_op). We get in this case a better control of lock, with controled latency between each attemp of lock.

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.0 KB
Line 
1#include "ressources_manager.hpp"
2#include "server.hpp"
3#include "servers_ressource.hpp"
4#include "token_manager.hpp"
5#include "timer.hpp"
6
7
8
9
10
11namespace xios
12{
13  using namespace std;
14
15  CRessourcesManager::CRessourcesManager(bool isXiosServer) 
16  {
17   
18    xiosComm_ = CXios::getXiosComm()  ;
19   
20    int commRank ; 
21    MPI_Comm_rank(xiosComm_, &commRank) ;
22    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
23    else commRank=0 ;
24    tokenManager_ = new CTokenManager(xiosComm_,commRank) ;
25
26    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
27
28    MPI_Comm_rank(xiosComm_, &commRank) ;
29    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
30   
31
32    winRessources_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
33    winRessources_->lockWindow(commRank,0) ;
34    serverLeader_=-1 ;
35    winRessources_->updateToWindow(commRank, this, &CRessourcesManager::ressourcesDumpOut) ;
36    winRessources_->unlockWindow(commRank,0) ;
37
38    MPI_Barrier(xiosComm_)  ;   
39  }
40 
41  CRessourcesManager::~CRessourcesManager()
42  {
43    delete winNotify_ ;
44    delete winRessources_ ;
45  } 
46
47  void CRessourcesManager::createPool(const string& poolId, int size)
48  {
49    info(40)<<"CRessourcesManager::createPool : calling createPool : "<<poolId<<"  of size"<<size<<endl ;
50    info(40)<<"send notification to leader : "<<serverLeader_<<endl ;
51    winRessources_->updateFromExclusiveWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
52   
53    notifyType_=NOTIFY_CREATE_POOL ;
54    notifyCreatePool_=make_tuple(poolId, size) ;
55    info(40)<<"CRessourcesManager::createPool : send notification creating pool to server leader "<<serverLeader_<<endl ;
56    sendNotification(serverLeader_) ; 
57  }
58 
59  void CRessourcesManager::finalize(void)
60  {
61    winRessources_->updateFromExclusiveWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
62   
63    if (serverLeader_!=-1)
64    {
65      notifyType_=NOTIFY_FINALIZE ;
66      info(40)<<"CRessourcesManager::finalize : send notification finalize to server leader "<<serverLeader_<<endl ;
67      sendNotification(serverLeader_) ;
68    } 
69  }
70
71  void CRessourcesManager::sendNotification(int rank)
72  {
73    winNotify_->lockWindowExclusive(rank) ;
74    winNotify_->pushToLockedWindow(rank, this, &CRessourcesManager::notificationsDumpOut) ;
75    winNotify_->unlockWindowExclusive(rank) ;
76  }
77
78 
79  void CRessourcesManager::notificationsDumpOut(CBufferOut& buffer)
80  {
81   
82    buffer.realloc(maxBufferSize_) ;
83   
84    if (notifyType_==NOTIFY_CREATE_POOL)
85    {
86      auto& arg=notifyCreatePool_ ;
87      buffer << notifyType_<< get<0>(arg) << get<1>(arg) ;
88    }
89    else if (notifyType_==NOTIFY_FINALIZE)
90    {
91      buffer << notifyType_ ;
92    }
93  }
94
95  void CRessourcesManager::notificationsDumpIn(CBufferIn& buffer)
96  {
97    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
98    else
99    {
100      buffer>>notifyType_;
101      if (notifyType_==NOTIFY_CREATE_POOL)
102      {
103        auto& arg=notifyCreatePool_ ;
104        buffer >> get<0>(arg) >> get<1>(arg)  ;
105      }
106      else if (notifyType_==NOTIFY_FINALIZE) { /*nothing to do*/ }
107    }
108
109  }
110
111  void CRessourcesManager::eventLoop(void)
112  {
113    CTimer::get("CRessourcesManager::eventLoop").resume();
114    int flag ;
115    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
116    double time=MPI_Wtime() ;
117    if (time-lastEventLoop_ > eventLoopLatency_) 
118    {
119      checkNotifications() ;
120      lastEventLoop_=time ;
121    }
122
123    CTimer::get("CRessourcesManager::eventLoop").suspend();
124  }
125 
126  void CRessourcesManager::checkNotifications(void)
127  {
128    int commRank ;
129    MPI_Comm_rank(xiosComm_, &commRank) ;
130    CTimer::get("CRessourcesManager::checkNotifications lock").resume();
131    winNotify_->lockWindowExclusive(commRank) ;
132    CTimer::get("CRessourcesManager::checkNotifications lock").suspend();
133    CTimer::get("CRessourcesManager::checkNotifications pop").resume();
134    winNotify_->popFromLockedWindow(commRank, this, &CRessourcesManager::notificationsDumpIn) ;
135    CTimer::get("CRessourcesManager::checkNotifications pop").suspend();
136    CTimer::get("CRessourcesManager::checkNotifications unlock").resume();
137    winNotify_->unlockWindowExclusive(commRank) ;
138    CTimer::get("CRessourcesManager::checkNotifications unlock").suspend();
139    if (notifyType_==NOTIFY_CREATE_POOL) createPool() ;
140    else if (notifyType_==NOTIFY_FINALIZE) finalizeSignal() ;
141  }
142
143  void CRessourcesManager::createPool(void)
144  {
145   
146    auto& arg=notifyCreatePool_ ;
147    string poolId=get<0>(arg) ;
148    int size=get<1>(arg) ;
149    info(40)<<"CRessourcesManager::createPool : receive create pool notification : "<< poolId<<"  of size "<<size<<endl ;
150    CServer::getServersRessource()->createPool(poolId,size) ;
151  } 
152
153  void CRessourcesManager::finalizeSignal(void)
154  {
155    info(40)<<"CRessourcesManager::createPool : receive finalize notification"<<endl ;
156    CServer::getServersRessource()->finalize() ;
157  }
158
159  void CRessourcesManager::ressourcesDumpOut(CBufferOut& buffer)
160  {
161   
162    buffer.realloc(maxBufferSize_) ;
163   
164    buffer<<ressourcesSize_<<freeRessourcesSize_<<serverLeader_ ; 
165    buffer<<(int) pools_.size();
166    for(auto it=pools_.begin();it!=pools_.end(); ++it)
167    { 
168      auto key = it->first ;
169      auto val = it->second ; 
170      buffer << key<<std::get<0>(val)  << std::get<1>(val)  << std::get<2>(val);
171    }
172  }
173
174  void CRessourcesManager::ressourcesDumpIn(CBufferIn& buffer)
175  {
176    std::string poolId ;
177    int size ;
178    int freeSize ;
179    int leader ;
180   
181    buffer>>ressourcesSize_>>freeRessourcesSize_>>serverLeader_ ;
182    pools_.clear() ;
183    int nbPools ;
184    buffer>>nbPools ;
185    for(int i=0;i<nbPools;i++) 
186    {
187      buffer>>poolId>>size>>freeSize>>leader ;
188      pools_[poolId]=std::make_tuple(size, freeSize, leader) ;
189    }
190  }
191 
192  void CRessourcesManager::registerServerLeader(int serverLeaderRank)
193  {
194    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
195    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
196    serverLeader_ = serverLeaderRank ;
197    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
198    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
199  }
200 
201  void CRessourcesManager::registerRessourcesSize(int size)
202  {
203    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
204    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
205    ressourcesSize_ = size ;
206    freeRessourcesSize_ = size ;
207    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
208    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
209  }
210
211 
212  void CRessourcesManager::registerPoolClient(const string& poolId, int size, int leader)
213  {
214    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
215    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
216    pools_[poolId] = make_tuple(size, size, leader) ;
217    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
218    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
219  }
220
221  void CRessourcesManager::registerPoolServer(const string& poolId, int size, int leader)
222  {
223    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
224    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
225    pools_[poolId] = make_tuple(size, size, leader) ;
226    freeRessourcesSize_-=size ;
227    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
228    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
229  }
230
231  bool CRessourcesManager::getPoolInfo(const string& poolId, int& size, int& freeSize, int& leader)
232  {
233    winRessources_->updateFromSharedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
234
235    auto it=pools_.find(poolId) ;
236    if ( it == pools_.end()) return false ;
237    else
238    {
239      size=get<0>(it->second) ;
240      freeSize=get<1>(it->second) ;
241      leader=get<2>(it->second) ;
242      return true ;
243    }
244  }
245
246  bool CRessourcesManager::decreasePoolFreeSize(const string& poolId, int size)
247  {
248    bool ret ;
249
250    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
251    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
252   
253
254    auto it=pools_.find(poolId) ;
255   
256    if ( it == pools_.end()) ret=false ;
257    else 
258    {
259      get<1>(it->second)-=size ;
260      ret=true ;
261    }
262    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
263    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ; 
264
265    return ret ;   
266  }
267
268  int CRessourcesManager::getRessourcesSize(void)
269  {
270    winRessources_->updateFromSharedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
271   
272    return ressourcesSize_ ;
273  }
274
275  int CRessourcesManager::getFreeRessourcesSize(void)
276  {
277    winRessources_->updateFromSharedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
278
279    return freeRessourcesSize_ ;
280  } 
281
282  bool CRessourcesManager::getPoolLeader(const string& poolId, int& leader)
283  {
284    int size, freeSize ;
285    return getPoolInfo(poolId, size, freeSize, leader) ;
286  }
287
288  bool CRessourcesManager::getPoolSize(const string& poolId, int& size)
289  {
290    int leader,freeSize ;
291    return getPoolInfo(poolId, size, freeSize, leader) ;
292  }
293
294  bool CRessourcesManager::getPoolFreeSize(const string& poolId, int& freeSize)
295  {
296    int leader,size ;
297    return getPoolInfo(poolId, size, freeSize, leader) ;
298  }
299
300  bool CRessourcesManager::hasPool(const string& poolId)
301  {
302    int leader,size,freeSize ;
303    return getPoolInfo(poolId, size, freeSize, leader) ;
304  }
305
306  void CRessourcesManager::waitPoolRegistration(const string& poolId)
307  {
308    while(!hasPool(poolId)) CXios::getDaemonsManager()->servicesEventLoop() ;
309  }
310}
Note: See TracBrowser for help on using the repository browser.