source: XIOS3/trunk/src/manager/ressources_manager.cpp

Last change on this file was 2628, checked in by jderouillat, 3 months ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.3 KB
RevLine 
[1761]1#include "ressources_manager.hpp"
2#include "server.hpp"
3#include "servers_ressource.hpp"
[2458]4#include "token_manager.hpp"
[2246]5#include "timer.hpp"
[1761]6
7
8
9
10
11namespace xios
12{
13  using namespace std;
[2628]14  extern CLogType logTimers ;
[1761]15
16  CRessourcesManager::CRessourcesManager(bool isXiosServer) 
17  {
18   
19    xiosComm_ = CXios::getXiosComm()  ;
20   
21    int commRank ; 
22    MPI_Comm_rank(xiosComm_, &commRank) ;
23    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
24    else commRank=0 ;
[2580]25    //tokenManager_ = new CTokenManager(xiosComm_,commRank) ;
[2458]26
[1761]27    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
28
29    MPI_Comm_rank(xiosComm_, &commRank) ;
[2580]30    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_,"CRessourcesManager::winNotify_") ;
[1761]31   
32
[2580]33    winRessources_ = new CWindowManager(xiosComm_, maxBufferSize_,"CRessourcesManager::winRessources_") ;
[1761]34    winRessources_->lockWindow(commRank,0) ;
35    serverLeader_=-1 ;
36    winRessources_->updateToWindow(commRank, this, &CRessourcesManager::ressourcesDumpOut) ;
37    winRessources_->unlockWindow(commRank,0) ;
38
39    MPI_Barrier(xiosComm_)  ;   
40  }
[1764]41 
42  CRessourcesManager::~CRessourcesManager()
43  {
44    delete winNotify_ ;
45    delete winRessources_ ;
[2580]46    //delete tokenManager_ ;
[1764]47  } 
[1761]48
49  void CRessourcesManager::createPool(const string& poolId, int size)
50  {
[2246]51    info(40)<<"CRessourcesManager::createPool : calling createPool : "<<poolId<<"  of size"<<size<<endl ;
52    info(40)<<"send notification to leader : "<<serverLeader_<<endl ;
[2517]53    winRessources_->updateFromExclusiveWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
54   
[1761]55    notifyType_=NOTIFY_CREATE_POOL ;
56    notifyCreatePool_=make_tuple(poolId, size) ;
[2246]57    info(40)<<"CRessourcesManager::createPool : send notification creating pool to server leader "<<serverLeader_<<endl ;
[1761]58    sendNotification(serverLeader_) ; 
59  }
60 
61  void CRessourcesManager::finalize(void)
62  {
[2517]63    winRessources_->updateFromExclusiveWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[1761]64   
65    if (serverLeader_!=-1)
66    {
67      notifyType_=NOTIFY_FINALIZE ;
[2246]68      info(40)<<"CRessourcesManager::finalize : send notification finalize to server leader "<<serverLeader_<<endl ;
[1761]69      sendNotification(serverLeader_) ;
70    } 
71  }
72
73  void CRessourcesManager::sendNotification(int rank)
74  {
[2258]75    winNotify_->lockWindowExclusive(rank) ;
76    winNotify_->pushToLockedWindow(rank, this, &CRessourcesManager::notificationsDumpOut) ;
[2517]77    winNotify_->unlockWindowExclusive(rank) ;
[1761]78  }
79
80 
81  void CRessourcesManager::notificationsDumpOut(CBufferOut& buffer)
82  {
83   
84    buffer.realloc(maxBufferSize_) ;
85   
86    if (notifyType_==NOTIFY_CREATE_POOL)
87    {
88      auto& arg=notifyCreatePool_ ;
89      buffer << notifyType_<< get<0>(arg) << get<1>(arg) ;
90    }
91    else if (notifyType_==NOTIFY_FINALIZE)
92    {
93      buffer << notifyType_ ;
94    }
95  }
96
97  void CRessourcesManager::notificationsDumpIn(CBufferIn& buffer)
98  {
99    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
100    else
101    {
102      buffer>>notifyType_;
103      if (notifyType_==NOTIFY_CREATE_POOL)
104      {
105        auto& arg=notifyCreatePool_ ;
106        buffer >> get<0>(arg) >> get<1>(arg)  ;
107      }
108      else if (notifyType_==NOTIFY_FINALIZE) { /*nothing to do*/ }
109    }
110
111  }
112
113  void CRessourcesManager::eventLoop(void)
114  {
[2628]115    if (info.isActive(logTimers)) CTimer::get("CRessourcesManager::eventLoop").resume();
[2260]116    int flag ;
117    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
[2246]118    double time=MPI_Wtime() ;
119    if (time-lastEventLoop_ > eventLoopLatency_) 
120    {
121      checkNotifications() ;
122      lastEventLoop_=time ;
123    }
124
[2628]125    if (info.isActive(logTimers)) CTimer::get("CRessourcesManager::eventLoop").suspend();
[1761]126  }
127 
128  void CRessourcesManager::checkNotifications(void)
129  {
130    int commRank ;
131    MPI_Comm_rank(xiosComm_, &commRank) ;
[2628]132    if (info.isActive(logTimers)) CTimer::get("CRessourcesManager::checkNotifications lock").resume();
[2258]133    winNotify_->lockWindowExclusive(commRank) ;
[2628]134    if (info.isActive(logTimers)) CTimer::get("CRessourcesManager::checkNotifications lock").suspend();
135    if (info.isActive(logTimers)) CTimer::get("CRessourcesManager::checkNotifications pop").resume();
[2258]136    winNotify_->popFromLockedWindow(commRank, this, &CRessourcesManager::notificationsDumpIn) ;
[2628]137    if (info.isActive(logTimers)) CTimer::get("CRessourcesManager::checkNotifications pop").suspend();
138    if (info.isActive(logTimers)) CTimer::get("CRessourcesManager::checkNotifications unlock").resume();
[2517]139    winNotify_->unlockWindowExclusive(commRank) ;
[2628]140    if (info.isActive(logTimers)) CTimer::get("CRessourcesManager::checkNotifications unlock").suspend();
[1761]141    if (notifyType_==NOTIFY_CREATE_POOL) createPool() ;
142    else if (notifyType_==NOTIFY_FINALIZE) finalizeSignal() ;
143  }
144
145  void CRessourcesManager::createPool(void)
146  {
[2246]147   
[1761]148    auto& arg=notifyCreatePool_ ;
149    string poolId=get<0>(arg) ;
150    int size=get<1>(arg) ;
[2246]151    info(40)<<"CRessourcesManager::createPool : receive create pool notification : "<< poolId<<"  of size "<<size<<endl ;
[1761]152    CServer::getServersRessource()->createPool(poolId,size) ;
153  } 
154
155  void CRessourcesManager::finalizeSignal(void)
156  {
[2246]157    info(40)<<"CRessourcesManager::createPool : receive finalize notification"<<endl ;
[1761]158    CServer::getServersRessource()->finalize() ;
159  }
160
161  void CRessourcesManager::ressourcesDumpOut(CBufferOut& buffer)
162  {
163   
164    buffer.realloc(maxBufferSize_) ;
165   
[2458]166    buffer<<ressourcesSize_<<freeRessourcesSize_<<serverLeader_ ; 
[1761]167    buffer<<(int) pools_.size();
168    for(auto it=pools_.begin();it!=pools_.end(); ++it)
169    { 
170      auto key = it->first ;
171      auto val = it->second ; 
[2458]172      buffer << key<<std::get<0>(val)  << std::get<1>(val)  << std::get<2>(val);
[1761]173    }
174  }
175
176  void CRessourcesManager::ressourcesDumpIn(CBufferIn& buffer)
177  {
178    std::string poolId ;
179    int size ;
[2458]180    int freeSize ;
[1761]181    int leader ;
182   
[2458]183    buffer>>ressourcesSize_>>freeRessourcesSize_>>serverLeader_ ;
[1761]184    pools_.clear() ;
185    int nbPools ;
186    buffer>>nbPools ;
187    for(int i=0;i<nbPools;i++) 
188    {
[2458]189      buffer>>poolId>>size>>freeSize>>leader ;
190      pools_[poolId]=std::make_tuple(size, freeSize, leader) ;
[1761]191    }
192  }
193 
194  void CRessourcesManager::registerServerLeader(int serverLeaderRank)
195  {
[2517]196    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
197    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[1761]198    serverLeader_ = serverLeaderRank ;
[2517]199    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
200    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
[1761]201  }
202 
203  void CRessourcesManager::registerRessourcesSize(int size)
204  {
[2517]205    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
206    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[1761]207    ressourcesSize_ = size ;
208    freeRessourcesSize_ = size ;
[2517]209    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
210    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
[1761]211  }
212
213 
[2458]214  void CRessourcesManager::registerPoolClient(const string& poolId, int size, int leader)
[1761]215  {
[2517]216    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
217    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[2458]218    pools_[poolId] = make_tuple(size, size, leader) ;
[2517]219    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
220    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
[2458]221  }
222
223  void CRessourcesManager::registerPoolServer(const string& poolId, int size, int leader)
224  {
[2517]225    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
226    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[2458]227    pools_[poolId] = make_tuple(size, size, leader) ;
[1761]228    freeRessourcesSize_-=size ;
[2517]229    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
230    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ;   
[1761]231  }
232
[2458]233  bool CRessourcesManager::getPoolInfo(const string& poolId, int& size, int& freeSize, int& leader)
[1761]234  {
[2517]235    winRessources_->updateFromSharedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[1761]236
237    auto it=pools_.find(poolId) ;
238    if ( it == pools_.end()) return false ;
239    else
240    {
241      size=get<0>(it->second) ;
[2458]242      freeSize=get<1>(it->second) ;
243      leader=get<2>(it->second) ;
[1761]244      return true ;
245    }
246  }
247
[2458]248  bool CRessourcesManager::decreasePoolFreeSize(const string& poolId, int size)
249  {
250    bool ret ;
251
[2517]252    winRessources_->lockWindowExclusive(managerGlobalLeader_) ;
253    winRessources_->updateFromLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[2458]254   
255
256    auto it=pools_.find(poolId) ;
257   
258    if ( it == pools_.end()) ret=false ;
259    else 
260    {
261      get<1>(it->second)-=size ;
262      ret=true ;
263    }
[2517]264    winRessources_->updateToLockedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
265    winRessources_->unlockWindowExclusive(managerGlobalLeader_) ; 
[2458]266
267    return ret ;   
268  }
269
[1761]270  int CRessourcesManager::getRessourcesSize(void)
271  {
[2517]272    winRessources_->updateFromSharedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
273   
[1761]274    return ressourcesSize_ ;
275  }
276
277  int CRessourcesManager::getFreeRessourcesSize(void)
278  {
[2517]279    winRessources_->updateFromSharedWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
[1761]280
281    return freeRessourcesSize_ ;
282  } 
283
284  bool CRessourcesManager::getPoolLeader(const string& poolId, int& leader)
285  {
[2458]286    int size, freeSize ;
287    return getPoolInfo(poolId, size, freeSize, leader) ;
[1761]288  }
289
290  bool CRessourcesManager::getPoolSize(const string& poolId, int& size)
291  {
[2458]292    int leader,freeSize ;
293    return getPoolInfo(poolId, size, freeSize, leader) ;
[1761]294  }
295
[2458]296  bool CRessourcesManager::getPoolFreeSize(const string& poolId, int& freeSize)
[1761]297  {
298    int leader,size ;
[2458]299    return getPoolInfo(poolId, size, freeSize, leader) ;
[1761]300  }
[2458]301
302  bool CRessourcesManager::hasPool(const string& poolId)
303  {
304    int leader,size,freeSize ;
305    return getPoolInfo(poolId, size, freeSize, leader) ;
306  }
307
308  void CRessourcesManager::waitPoolRegistration(const string& poolId)
309  {
310    while(!hasPool(poolId)) CXios::getDaemonsManager()->servicesEventLoop() ;
311  }
312}
Note: See TracBrowser for help on using the repository browser.