source: XIOS3/trunk/src/manager/ressources_manager.cpp @ 2564

Last change on this file since 2564 was 2562, checked in by jderouillat, 10 months ago

Add the missing delete tokenManager_

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.0 KB
RevLine 
[1761]1#include "ressources_manager.hpp"
2#include "server.hpp"
3#include "servers_ressource.hpp"
[2458]4#include "token_manager.hpp"
[2246]5#include "timer.hpp"
[1761]6
7
8
9
10
11namespace xios
12{
13  using namespace std;
14
15  CRessourcesManager::CRessourcesManager(bool isXiosServer) 
16  {
17   
18    xiosComm_ = CXios::getXiosComm()  ;
19   
20    int commRank ; 
21    MPI_Comm_rank(xiosComm_, &commRank) ;
22    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
23    else commRank=0 ;
[2458]24    tokenManager_ = new CTokenManager(xiosComm_,commRank) ;
25
[1761]26    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
27
28    MPI_Comm_rank(xiosComm_, &commRank) ;
29    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
30   
31
32    winRessources_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
33    winRessources_->lockWindow(commRank,0) ;
34    serverLeader_=-1 ;
35    winRessources_->updateToWindow(commRank, this, &CRessourcesManager::ressourcesDumpOut) ;
36    winRessources_->unlockWindow(commRank,0) ;
37
38    MPI_Barrier(xiosComm_)  ;   
39  }
[1764]40 
41  CRessourcesManager::~CRessourcesManager()
42  {
43    delete winNotify_ ;
44    delete winRessources_ ;
[2562]45    delete tokenManager_ ;
[1764]46  } 
[1761]47
48  void CRessourcesManager::createPool(const string& poolId, int size)
49  {
[2246]50    info(40)<<"CRessourcesManager::createPool : calling createPool : "<<poolId<<"  of size"<<size<<endl ;
51    info(40)<<"send notification to leader : "<<serverLeader_<<endl ;
[2517]52    winRessources_->updateFromExclusiveWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
53   
[1761]54    notifyType_=NOTIFY_CREATE_POOL ;
55    notifyCreatePool_=make_tuple(poolId, size) ;
[2246]56    info(40)<<"CRessourcesManager::createPool : send notification creating pool to server leader "<<serverLeader_<<endl ;
[1761]57    sendNotification(serverLeader_) ; 
58  }
59 
60  void CRessourcesManager::finalize(void)
61  {
[2517]62    winRessources_->updateFromExclusiveWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[1761]63   
64    if (serverLeader_!=-1)
65    {
66      notifyType_=NOTIFY_FINALIZE ;
[2246]67      info(40)<<"CRessourcesManager::finalize : send notification finalize to server leader "<<serverLeader_<<endl ;
[1761]68      sendNotification(serverLeader_) ;
69    } 
70  }
71
72  void CRessourcesManager::sendNotification(int rank)
73  {
[2258]74    winNotify_->lockWindowExclusive(rank) ;
75    winNotify_->pushToLockedWindow(rank, this, &CRessourcesManager::notificationsDumpOut) ;
[2517]76    winNotify_->unlockWindowExclusive(rank) ;
[1761]77  }
78
79 
80  void CRessourcesManager::notificationsDumpOut(CBufferOut& buffer)
81  {
82   
83    buffer.realloc(maxBufferSize_) ;
84   
85    if (notifyType_==NOTIFY_CREATE_POOL)
86    {
87      auto& arg=notifyCreatePool_ ;
88      buffer << notifyType_<< get<0>(arg) << get<1>(arg) ;
89    }
90    else if (notifyType_==NOTIFY_FINALIZE)
91    {
92      buffer << notifyType_ ;
93    }
94  }
95
96  void CRessourcesManager::notificationsDumpIn(CBufferIn& buffer)
97  {
98    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
99    else
100    {
101      buffer>>notifyType_;
102      if (notifyType_==NOTIFY_CREATE_POOL)
103      {
104        auto& arg=notifyCreatePool_ ;
105        buffer >> get<0>(arg) >> get<1>(arg)  ;
106      }
107      else if (notifyType_==NOTIFY_FINALIZE) { /*nothing to do*/ }
108    }
109
110  }
111
112  void CRessourcesManager::eventLoop(void)
113  {
[2246]114    CTimer::get("CRessourcesManager::eventLoop").resume();
[2260]115    int flag ;
116    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
[2246]117    double time=MPI_Wtime() ;
118    if (time-lastEventLoop_ > eventLoopLatency_) 
119    {
120      checkNotifications() ;
121      lastEventLoop_=time ;
122    }
123
124    CTimer::get("CRessourcesManager::eventLoop").suspend();
[1761]125  }
126 
127  void CRessourcesManager::checkNotifications(void)
128  {
129    int commRank ;
130    MPI_Comm_rank(xiosComm_, &commRank) ;
[2246]131    CTimer::get("CRessourcesManager::checkNotifications lock").resume();
[2258]132    winNotify_->lockWindowExclusive(commRank) ;
[2246]133    CTimer::get("CRessourcesManager::checkNotifications lock").suspend();
134    CTimer::get("CRessourcesManager::checkNotifications pop").resume();
[2258]135    winNotify_->popFromLockedWindow(commRank, this, &CRessourcesManager::notificationsDumpIn) ;
[2246]136    CTimer::get("CRessourcesManager::checkNotifications pop").suspend();
137    CTimer::get("CRessourcesManager::checkNotifications unlock").resume();
[2517]138    winNotify_->unlockWindowExclusive(commRank) ;
[2246]139    CTimer::get("CRessourcesManager::checkNotifications unlock").suspend();
[1761]140    if (notifyType_==NOTIFY_CREATE_POOL) createPool() ;
141    else if (notifyType_==NOTIFY_FINALIZE) finalizeSignal() ;
142  }
143
144  void CRessourcesManager::createPool(void)
145  {
[2246]146   
[1761]147    auto& arg=notifyCreatePool_ ;
148    string poolId=get<0>(arg) ;
149    int size=get<1>(arg) ;
[2246]150    info(40)<<"CRessourcesManager::createPool : receive create pool notification : "<< poolId<<"  of size "<<size<<endl ;
[1761]151    CServer::getServersRessource()->createPool(poolId,size) ;
152  } 
153
154  void CRessourcesManager::finalizeSignal(void)
155  {
[2246]156    info(40)<<"CRessourcesManager::createPool : receive finalize notification"<<endl ;
[1761]157    CServer::getServersRessource()->finalize() ;
158  }
159
160  void CRessourcesManager::ressourcesDumpOut(CBufferOut& buffer)
161  {
162   
163    buffer.realloc(maxBufferSize_) ;
164   
[2458]165    buffer<<ressourcesSize_<<freeRessourcesSize_<<serverLeader_ ; 
[1761]166    buffer<<(int) pools_.size();
167    for(auto it=pools_.begin();it!=pools_.end(); ++it)
168    { 
169      auto key = it->first ;
170      auto val = it->second ; 
[2458]171      buffer << key<<std::get<0>(val)  << std::get<1>(val)  << std::get<2>(val);
[1761]172    }
173  }
174
175  void CRessourcesManager::ressourcesDumpIn(CBufferIn& buffer)
176  {
177    std::string poolId ;
178    int size ;
[2458]179    int freeSize ;
[1761]180    int leader ;
181   
[2458]182    buffer>>ressourcesSize_>>freeRessourcesSize_>>serverLeader_ ;
[1761]183    pools_.clear() ;
184    int nbPools ;
185    buffer>>nbPools ;
186    for(int i=0;i<nbPools;i++) 
187    {
[2458]188      buffer>>poolId>>size>>freeSize>>leader ;
189      pools_[poolId]=std::make_tuple(size, freeSize, leader) ;
[1761]190    }
191  }
192 
193  void CRessourcesManager::registerServerLeader(int serverLeaderRank)
194  {
[2517]195    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
196    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[1761]197    serverLeader_ = serverLeaderRank ;
[2517]198    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
199    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
[1761]200  }
201 
202  void CRessourcesManager::registerRessourcesSize(int size)
203  {
[2517]204    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
205    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[1761]206    ressourcesSize_ = size ;
207    freeRessourcesSize_ = size ;
[2517]208    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
209    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
[1761]210  }
211
212 
[2458]213  void CRessourcesManager::registerPoolClient(const string& poolId, int size, int leader)
[1761]214  {
[2517]215    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
216    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[2458]217    pools_[poolId] = make_tuple(size, size, leader) ;
[2517]218    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
219    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
[2458]220  }
221
222  void CRessourcesManager::registerPoolServer(const string& poolId, int size, int leader)
223  {
[2517]224    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
225    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[2458]226    pools_[poolId] = make_tuple(size, size, leader) ;
[1761]227    freeRessourcesSize_-=size ;
[2517]228    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
229    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
[1761]230  }
231
[2458]232  bool CRessourcesManager::getPoolInfo(const string& poolId, int& size, int& freeSize, int& leader)
[1761]233  {
[2517]234    winRessources_->updateFromSharedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[1761]235
236    auto it=pools_.find(poolId) ;
237    if ( it == pools_.end()) return false ;
238    else
239    {
240      size=get<0>(it->second) ;
[2458]241      freeSize=get<1>(it->second) ;
242      leader=get<2>(it->second) ;
[1761]243      return true ;
244    }
245  }
246
[2458]247  bool CRessourcesManager::decreasePoolFreeSize(const string& poolId, int size)
248  {
249    bool ret ;
250
[2517]251    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
252    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[2458]253   
254
255    auto it=pools_.find(poolId) ;
256   
257    if ( it == pools_.end()) ret=false ;
258    else 
259    {
260      get<1>(it->second)-=size ;
261      ret=true ;
262    }
[2517]263    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
264    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ; 
[2458]265
266    return ret ;   
267  }
268
[1761]269  int CRessourcesManager::getRessourcesSize(void)
270  {
[2517]271    winRessources_->updateFromSharedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
272   
[1761]273    return ressourcesSize_ ;
274  }
275
276  int CRessourcesManager::getFreeRessourcesSize(void)
277  {
[2517]278    winRessources_->updateFromSharedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[1761]279
280    return freeRessourcesSize_ ;
281  } 
282
283  bool CRessourcesManager::getPoolLeader(const string& poolId, int& leader)
284  {
[2458]285    int size, freeSize ;
286    return getPoolInfo(poolId, size, freeSize, leader) ;
[1761]287  }
288
289  bool CRessourcesManager::getPoolSize(const string& poolId, int& size)
290  {
[2458]291    int leader,freeSize ;
292    return getPoolInfo(poolId, size, freeSize, leader) ;
[1761]293  }
294
[2458]295  bool CRessourcesManager::getPoolFreeSize(const string& poolId, int& freeSize)
[1761]296  {
297    int leader,size ;
[2458]298    return getPoolInfo(poolId, size, freeSize, leader) ;
[1761]299  }
[2458]300
301  bool CRessourcesManager::hasPool(const string& poolId)
302  {
303    int leader,size,freeSize ;
304    return getPoolInfo(poolId, size, freeSize, leader) ;
305  }
306
307  void CRessourcesManager::waitPoolRegistration(const string& poolId)
308  {
309    while(!hasPool(poolId)) CXios::getDaemonsManager()->servicesEventLoop() ;
310  }
311}
Note: See TracBrowser for help on using the repository browser.