source: XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/contexts_manager.cpp @ 1764

Last change on this file since 1764 was 1764, checked in by ymipsl, 5 years ago

Some Update on XIOS services
Seems to work on Irène for :

  • first level of servers
  • fisrt + second level of servers
  • attached mode

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.1 KB
Line 
1#include "contexts_manager.hpp"
2#include "cxios.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "services.hpp"
6#include "server_context.hpp"
7#include "servers_ressource.hpp"
8#include "server.hpp"
9#include <functional>
10
11
12namespace xios
13{
14  using namespace std ;
15
16  CContextsManager::CContextsManager(bool isXiosServer)
17  {
18    xiosComm_ = CXios::getXiosComm()  ;
19   
20    int commRank ; 
21    MPI_Comm_rank(xiosComm_, &commRank) ;
22    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
23    else commRank=0 ;
24    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
25
26    MPI_Comm_rank(xiosComm_, &commRank) ;
27    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
28   
29
30    winContexts_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
31    winContexts_->lockWindow(commRank,0) ;
32    winContexts_->updateToWindow(commRank, this, &CContextsManager::contextsDumpOut) ;
33    winContexts_->unlockWindow(commRank,0) ;
34
35    MPI_Barrier(xiosComm_)  ;   
36  }
37
38
39  CContextsManager::~CContextsManager()
40  {
41    delete winNotify_ ;
42    delete winContexts_ ;
43  }
44
45  bool CContextsManager::createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId,
46                                             const string& contextId, bool wait)
47  {
48    int serviceLeader ;
49    auto servicesManager = CXios::getServicesManager() ;
50   
51    bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
52
53    if (wait)
54    {
55      while (!ok) 
56      {
57        CXios::getDaemonsManager()->servicesEventLoop() ;
58        ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
59      }
60    }
61
62    if (ok) 
63    {
64      notifyType_=NOTIFY_CREATE_CONTEXT ;
65      notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ;
66      sendNotification(serviceLeader) ;
67      return true ;
68    }
69    else return false ;
70  }
71
72
73  bool CContextsManager::createServerContextIntercomm(const string& poolId, const string& serviceId, const int& partitionId,
74                                                      const string& contextId, const string& sourceContext, bool wait)
75  {
76    int contextLeader ;
77    bool ok ;
78    int remoteLeader ;
79    MPI_Comm_rank(xiosComm_, &remoteLeader) ;
80   
81    int type ;
82    ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
83    if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
84    if (wait)
85    {
86      while (!ok) 
87      {
88        CXios::getDaemonsManager()->servicesEventLoop() ;
89        ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
90        if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
91      }
92    }
93   
94    if (ok) 
95    {
96      notifyType_=NOTIFY_CREATE_INTERCOMM ;
97      notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ;
98      sendNotification(contextLeader) ;
99      return true ;
100    }
101    else return false ;
102  }
103   
104/*
105    auto eventScheduler=CServer::getServersRessource()->getPoolRessource()->getService(serviceId,partitionId)->getEventScheduler() ;
106    std::hash<string> hashString ;
107    size_t hashId = hashString(getServerContextName(poolId, serviceId, partitionId, contextId)) ;
108    size_t currentTimeLine=0 ;
109    CServer::eventScheduler->registerEvent(currentTimeLine,hashId);
110    while (!eventScheduler->queryEvent(currentTimeLine,hashId))
111    {
112       CXios::getDaemonsManager()->eventLoop() ;
113    } 
114   
115    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
116
117    if (ok)
118    {
119
120      MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interComm) ;
121      return true ;
122    }
123    else return false ;
124  }
125*/
126
127
128  void CContextsManager::sendNotification(int rank)
129  {
130    winNotify_->lockWindow(rank,0) ;
131    winNotify_->pushToWindow(rank, this, &CContextsManager::notificationsDumpOut) ;
132    winNotify_->unlockWindow(rank,0) ;
133  }
134
135 
136  void CContextsManager::notificationsDumpOut(CBufferOut& buffer)
137  {
138   
139    buffer.realloc(maxBufferSize_) ;
140   
141    if (notifyType_==NOTIFY_CREATE_CONTEXT)
142    {
143      auto& arg=notifyCreateContext_ ;
144      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
145    }
146    else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
147    {
148      auto& arg=notifyCreateIntercomm_ ;
149      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg)<< get<5>(arg) ;
150    }
151  }
152
153  void CContextsManager::notificationsDumpIn(CBufferIn& buffer)
154  {
155    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
156    else
157    {
158      buffer>>notifyType_;
159      if (notifyType_==NOTIFY_CREATE_CONTEXT)
160      {
161        auto& arg=notifyCreateContext_ ;
162        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
163      }
164      else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
165      {
166        auto& arg=notifyCreateIntercomm_ ;
167        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg) >> get<3>(arg) >> get<4>(arg) >> get<5>(arg);
168      }
169    }
170
171  }
172
173  void CContextsManager::eventLoop(void)
174  {
175    checkNotifications() ;
176  }
177 
178  void CContextsManager::checkNotifications(void)
179  {
180    int commRank ;
181    MPI_Comm_rank(xiosComm_, &commRank) ;
182    winNotify_->lockWindow(commRank,0) ;
183    winNotify_->popFromWindow(commRank, this, &CContextsManager::notificationsDumpIn) ;
184    winNotify_->unlockWindow(commRank,0) ;
185    if (notifyType_==NOTIFY_CREATE_CONTEXT) createServerContext() ;
186    else if (notifyType_==NOTIFY_CREATE_INTERCOMM) createServerContextIntercomm() ;
187
188  }
189
190  void CContextsManager::createServerContext(void)
191  {
192    auto arg=notifyCreateContext_ ;
193    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
194                             ->createContext(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
195 
196  }
197
198  void CContextsManager::createServerContextIntercomm(void)
199  {
200    auto arg=notifyCreateIntercomm_ ;
201    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
202                             ->getServerContext(get<3>(arg))
203                             ->createIntercomm(get<4>(arg), get<5>(arg)) ;
204  }             
205
206  string CContextsManager::getServerContextName(const string& poolId, const string& serviceId, const int& partitionId, 
207                                                const int& type, const string& contextId)
208  {
209    if (type==CServicesManager::CLIENT) return contextId;
210    else
211    {
212      ostringstream oss;
213      oss<<partitionId;
214      return poolId+"::"+serviceId+"_"+oss.str()+"::"+contextId;
215    }
216  }
217
218  void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo)
219  {
220    winContexts_->lockWindow(managerGlobalLeader_,0) ;
221    winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
222    contexts_[fullContextId] = contextInfo ;
223    winContexts_->updateToWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ;
224    winContexts_->unlockWindow(managerGlobalLeader_,0) ;   
225  }
226
227  bool CContextsManager::getContextInfo(const string& fullContextId, SRegisterContextInfo& contextInfo, MPI_Comm comm)
228  {
229    bool ret ;
230    int commRank=0 ;
231    if (comm!=MPI_COMM_NULL) MPI_Comm_rank(comm, &commRank) ;
232
233    if (commRank==0)
234    {
235
236      winContexts_->lockWindow(managerGlobalLeader_,0) ;
237      winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
238      winContexts_->unlockWindow(managerGlobalLeader_,0) ;
239
240      auto it=contexts_.find(fullContextId) ;
241      if ( it == contexts_.end()) ret=false ;
242      else
243      {
244        contextInfo=it->second ; 
245        ret=true ;
246      }
247    }
248   
249    if (comm!=MPI_COMM_NULL) 
250    {
251      MPI_Bcast(&ret,1,MPI_INT,0,comm) ;
252      if (ret)
253      {
254        MPI_Bcast(&contextInfo.leader,1,MPI_INT,0,comm) ;
255        MPI_Bcast(&contextInfo.size,1,MPI_INT,0,comm) ;
256        MPI_Bcast_string(contextInfo.poolId,0,comm) ;
257        MPI_Bcast_string(contextInfo.serviceId,0,comm) ;
258        MPI_Bcast(&contextInfo.serviceType,1,MPI_INT,0,comm) ;
259        MPI_Bcast(&contextInfo.partitionId,1,MPI_INT,0,comm) ;
260        MPI_Bcast_string(contextInfo.id,0,comm) ;
261      }
262    }
263    return ret ;
264  }
265
266  bool CContextsManager::getContextLeader(const string& fullContextId, int& leader, MPI_Comm comm)
267  {
268    SRegisterContextInfo contextInfo ;
269    bool ret=getContextInfo(fullContextId, contextInfo) ;
270    if (ret) leader=contextInfo.leader ;
271    return ret ;
272  }
273
274  bool CContextsManager::getContextSize(const string& fullContextId, int& size, MPI_Comm comm)
275  {
276
277    SRegisterContextInfo contextInfo ;
278    bool ret=getContextInfo(fullContextId, contextInfo) ;
279    if (ret) size=contextInfo.size ;
280    return ret ;
281  }
282
283  bool CContextsManager::getContextPoolId(const string& fullContextId, string& poolId, MPI_Comm comm)
284  {
285    SRegisterContextInfo contextInfo ;
286    bool ret=getContextInfo(fullContextId, contextInfo) ;
287    if (ret) poolId=contextInfo.poolId ;
288    return ret ;
289  }
290
291  bool CContextsManager::getContextServiceId(const string& fullContextId, string& serviceId, MPI_Comm comm)
292  {
293    SRegisterContextInfo contextInfo ;
294    bool ret=getContextInfo(fullContextId, contextInfo) ;
295    if (ret) serviceId=contextInfo.serviceId ;
296    return ret ;
297  }
298
299  bool CContextsManager::getContextPartitionId(const string& fullContextId, int& partitionId, MPI_Comm comm)
300  {
301    SRegisterContextInfo contextInfo ;
302    bool ret=getContextInfo(fullContextId, contextInfo) ;
303    if (ret) partitionId=contextInfo.partitionId ;
304    return ret ;
305  }
306 
307  bool CContextsManager::getContextServiceType(const string& fullContextId, int& serviceType, MPI_Comm comm)
308  {
309    SRegisterContextInfo contextInfo ;
310    bool ret=getContextInfo(fullContextId, contextInfo) ;
311    if (ret) serviceType=contextInfo.serviceType ;
312    return ret ;
313  }
314
315  bool CContextsManager::getContextId(const string& fullContextId, string& contextId, MPI_Comm comm)
316  {
317    SRegisterContextInfo contextInfo ;
318    bool ret=getContextInfo(fullContextId, contextInfo) ;
319    if (ret) contextId=contextInfo.id ;
320    return ret ;
321  }
322
323
324  bool CContextsManager::hasContext(const string& fullContextId, MPI_Comm comm)
325  {
326    SRegisterContextInfo contextInfo ;
327    return getContextInfo(fullContextId, contextInfo) ;
328  }
329
330  void CContextsManager::contextsDumpOut(CBufferOut& buffer)
331  {
332    buffer.realloc(maxBufferSize_) ;
333    buffer<<(int)contexts_.size();
334   
335    for(auto it=contexts_.begin();it!=contexts_.end(); ++it)
336    { 
337      auto key = it->first ;
338      auto val = it->second ; 
339      buffer << key << val.poolId<<val.serviceId<<val.partitionId<<val.serviceType<<val.id<<val.size<<val.leader  ;
340    }
341  } 
342
343  void CContextsManager::contextsDumpIn(CBufferIn& buffer)
344  {
345    std::string contextId ;
346    SRegisterContextInfo ci;
347    int size; 
348    int leader ;
349
350    contexts_.clear() ;
351    int nbContexts ;
352    buffer>>nbContexts ;
353    for(int i=0;i<nbContexts;i++) 
354    {
355      buffer>>contextId>>ci.poolId>>ci.serviceId>>ci.partitionId>>ci.serviceType>>ci.id>>ci.size>>ci.leader ;
356      contexts_[contextId]=ci ;
357    }
358
359  }
360}
Note: See TracBrowser for help on using the repository browser.