source: XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/contexts_manager.cpp @ 1761

Last change on this file since 1761 was 1761, checked in by ymipsl, 5 years ago

implementing first guess for service functionnalities.

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.0 KB
Line 
1#include "contexts_manager.hpp"
2#include "cxios.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "services.hpp"
6#include "server_context.hpp"
7#include "servers_ressource.hpp"
8#include "server.hpp"
9#include <functional>
10
11
12namespace xios
13{
14  using namespace std ;
15
16  CContextsManager::CContextsManager(bool isXiosServer)
17  {
18    xiosComm_ = CXios::getXiosComm()  ;
19   
20    int commRank ; 
21    MPI_Comm_rank(xiosComm_, &commRank) ;
22    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
23    else commRank=0 ;
24    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
25
26    MPI_Comm_rank(xiosComm_, &commRank) ;
27    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
28   
29
30    winContexts_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
31    winContexts_->lockWindow(commRank,0) ;
32    winContexts_->updateToWindow(commRank, this, &CContextsManager::contextsDumpOut) ;
33    winContexts_->unlockWindow(commRank,0) ;
34
35    MPI_Barrier(xiosComm_)  ;   
36  }
37
38  bool CContextsManager::createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId,
39                                             const string& contextId, bool wait)
40  {
41    int serviceLeader ;
42    auto servicesManager = CXios::getServicesManager() ;
43   
44    bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
45
46    if (wait)
47    {
48      while (!ok) 
49      {
50        CXios::getDaemonsManager()->eventLoop() ;
51        ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
52      }
53    }
54
55    if (ok) 
56    {
57      notifyType_=NOTIFY_CREATE_CONTEXT ;
58      notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ;
59      sendNotification(serviceLeader) ;
60      return true ;
61    }
62    else return false ;
63  }
64
65
66  bool CContextsManager::createServerContextIntercomm(const string& poolId, const string& serviceId, const int& partitionId,
67                                                      const string& contextId, const string& sourceContext, bool wait)
68  {
69    int contextLeader ;
70    bool ok ;
71    int remoteLeader ;
72    MPI_Comm_rank(xiosComm_, &remoteLeader) ;
73   
74    int type ;
75    ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
76    if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
77    if (wait)
78    {
79      while (!ok) 
80      {
81        CXios::getDaemonsManager()->eventLoop() ;
82        ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
83        if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
84      }
85    }
86   
87    if (ok) 
88    {
89      notifyType_=NOTIFY_CREATE_INTERCOMM ;
90      notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ;
91      sendNotification(contextLeader) ;
92      return true ;
93    }
94    else return false ;
95  }
96   
97/*
98    auto eventScheduler=CServer::getServersRessource()->getPoolRessource()->getService(serviceId,partitionId)->getEventScheduler() ;
99    std::hash<string> hashString ;
100    size_t hashId = hashString(getServerContextName(poolId, serviceId, partitionId, contextId)) ;
101    size_t currentTimeLine=0 ;
102    CServer::eventScheduler->registerEvent(currentTimeLine,hashId);
103    while (!eventScheduler->queryEvent(currentTimeLine,hashId))
104    {
105       CXios::getDaemonsManager()->eventLoop() ;
106    } 
107   
108    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
109
110    if (ok)
111    {
112
113      MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interComm) ;
114      return true ;
115    }
116    else return false ;
117  }
118*/
119
120
121  void CContextsManager::sendNotification(int rank)
122  {
123    winNotify_->lockWindow(rank,0) ;
124    winNotify_->pushToWindow(rank, this, &CContextsManager::notificationsDumpOut) ;
125    winNotify_->unlockWindow(rank,0) ;
126  }
127
128 
129  void CContextsManager::notificationsDumpOut(CBufferOut& buffer)
130  {
131   
132    buffer.realloc(maxBufferSize_) ;
133   
134    if (notifyType_==NOTIFY_CREATE_CONTEXT)
135    {
136      auto& arg=notifyCreateContext_ ;
137      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
138    }
139    else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
140    {
141      auto& arg=notifyCreateIntercomm_ ;
142      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg)<< get<5>(arg) ;
143    }
144  }
145
146  void CContextsManager::notificationsDumpIn(CBufferIn& buffer)
147  {
148    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
149    else
150    {
151      buffer>>notifyType_;
152      if (notifyType_==NOTIFY_CREATE_CONTEXT)
153      {
154        auto& arg=notifyCreateContext_ ;
155        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
156      }
157      else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
158      {
159        auto& arg=notifyCreateIntercomm_ ;
160        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg) >> get<3>(arg) >> get<4>(arg) >> get<5>(arg);
161      }
162    }
163
164  }
165
166  void CContextsManager::eventLoop(void)
167  {
168    checkNotifications() ;
169  }
170 
171  void CContextsManager::checkNotifications(void)
172  {
173    int commRank ;
174    MPI_Comm_rank(xiosComm_, &commRank) ;
175    winNotify_->lockWindow(commRank,0) ;
176    winNotify_->popFromWindow(commRank, this, &CContextsManager::notificationsDumpIn) ;
177    winNotify_->unlockWindow(commRank,0) ;
178    if (notifyType_==NOTIFY_CREATE_CONTEXT) createServerContext() ;
179    else if (notifyType_==NOTIFY_CREATE_INTERCOMM) createServerContextIntercomm() ;
180
181  }
182
183  void CContextsManager::createServerContext(void)
184  {
185    auto arg=notifyCreateContext_ ;
186    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
187                             ->createContext(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
188 
189  }
190
191  void CContextsManager::createServerContextIntercomm(void)
192  {
193    auto arg=notifyCreateIntercomm_ ;
194    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
195                             ->getServerContext(get<3>(arg))
196                             ->createIntercomm(get<4>(arg), get<5>(arg)) ;
197  }             
198
199  string CContextsManager::getServerContextName(const string& poolId, const string& serviceId, const int& partitionId, 
200                                                const int& type, const string& contextId)
201  {
202    if (type==CServicesManager::CLIENT) return contextId;
203    else
204    {
205      ostringstream oss;
206      oss<<partitionId;
207      return poolId+"::"+serviceId+"_"+oss.str()+"::"+contextId;
208    }
209  }
210
211  void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo)
212  {
213    winContexts_->lockWindow(managerGlobalLeader_,0) ;
214    winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
215    contexts_[fullContextId] = contextInfo ;
216    winContexts_->updateToWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ;
217    winContexts_->unlockWindow(managerGlobalLeader_,0) ;   
218  }
219
220  bool CContextsManager::getContextInfo(const string& fullContextId, SRegisterContextInfo& contextInfo, MPI_Comm comm)
221  {
222    bool ret ;
223    int commRank=0 ;
224    if (comm!=MPI_COMM_NULL) MPI_Comm_rank(comm, &commRank) ;
225
226    if (commRank==0)
227    {
228
229      winContexts_->lockWindow(managerGlobalLeader_,0) ;
230      winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
231      winContexts_->unlockWindow(managerGlobalLeader_,0) ;
232
233      auto it=contexts_.find(fullContextId) ;
234      if ( it == contexts_.end()) ret=false ;
235      else
236      {
237        contextInfo=it->second ; 
238        ret=true ;
239      }
240    }
241   
242    if (comm!=MPI_COMM_NULL) 
243    {
244      MPI_Bcast(&ret,1,MPI_INT,0,comm) ;
245      if (ret)
246      {
247        MPI_Bcast(&contextInfo.leader,1,MPI_INT,0,comm) ;
248        MPI_Bcast(&contextInfo.size,1,MPI_INT,0,comm) ;
249        MPI_Bcast_string(contextInfo.poolId,0,comm) ;
250        MPI_Bcast_string(contextInfo.serviceId,0,comm) ;
251        MPI_Bcast(&contextInfo.serviceType,1,MPI_INT,0,comm) ;
252        MPI_Bcast(&contextInfo.partitionId,1,MPI_INT,0,comm) ;
253        MPI_Bcast_string(contextInfo.id,0,comm) ;
254      }
255    }
256    return ret ;
257  }
258
259  bool CContextsManager::getContextLeader(const string& fullContextId, int& leader, MPI_Comm comm)
260  {
261    SRegisterContextInfo contextInfo ;
262    bool ret=getContextInfo(fullContextId, contextInfo) ;
263    if (ret) leader=contextInfo.leader ;
264    return ret ;
265  }
266
267  bool CContextsManager::getContextSize(const string& fullContextId, int& size, MPI_Comm comm)
268  {
269
270    SRegisterContextInfo contextInfo ;
271    bool ret=getContextInfo(fullContextId, contextInfo) ;
272    if (ret) size=contextInfo.size ;
273    return ret ;
274  }
275
276  bool CContextsManager::getContextPoolId(const string& fullContextId, string& poolId, MPI_Comm comm)
277  {
278    SRegisterContextInfo contextInfo ;
279    bool ret=getContextInfo(fullContextId, contextInfo) ;
280    if (ret) poolId=contextInfo.poolId ;
281    return ret ;
282  }
283
284  bool CContextsManager::getContextServiceId(const string& fullContextId, string& serviceId, MPI_Comm comm)
285  {
286    SRegisterContextInfo contextInfo ;
287    bool ret=getContextInfo(fullContextId, contextInfo) ;
288    if (ret) serviceId=contextInfo.serviceId ;
289    return ret ;
290  }
291
292  bool CContextsManager::getContextPartitionId(const string& fullContextId, int& partitionId, MPI_Comm comm)
293  {
294    SRegisterContextInfo contextInfo ;
295    bool ret=getContextInfo(fullContextId, contextInfo) ;
296    if (ret) partitionId=contextInfo.partitionId ;
297    return ret ;
298  }
299 
300  bool CContextsManager::getContextServiceType(const string& fullContextId, int& serviceType, MPI_Comm comm)
301  {
302    SRegisterContextInfo contextInfo ;
303    bool ret=getContextInfo(fullContextId, contextInfo) ;
304    if (ret) serviceType=contextInfo.serviceType ;
305    return ret ;
306  }
307
308  bool CContextsManager::getContextId(const string& fullContextId, string& contextId, MPI_Comm comm)
309  {
310    SRegisterContextInfo contextInfo ;
311    bool ret=getContextInfo(fullContextId, contextInfo) ;
312    if (ret) contextId=contextInfo.id ;
313    return ret ;
314  }
315
316
317  bool CContextsManager::hasContext(const string& fullContextId, MPI_Comm comm)
318  {
319    SRegisterContextInfo contextInfo ;
320    return getContextInfo(fullContextId, contextInfo) ;
321  }
322
323  void CContextsManager::contextsDumpOut(CBufferOut& buffer)
324  {
325    buffer.realloc(maxBufferSize_) ;
326    buffer<<(int)contexts_.size();
327   
328    for(auto it=contexts_.begin();it!=contexts_.end(); ++it)
329    { 
330      auto key = it->first ;
331      auto val = it->second ; 
332      buffer << key << val.poolId<<val.serviceId<<val.partitionId<<val.serviceType<<val.id<<val.size<<val.leader  ;
333    }
334  } 
335
336  void CContextsManager::contextsDumpIn(CBufferIn& buffer)
337  {
338    std::string contextId ;
339    SRegisterContextInfo ci;
340    int size; 
341    int leader ;
342
343    contexts_.clear() ;
344    int nbContexts ;
345    buffer>>nbContexts ;
346    for(int i=0;i<nbContexts;i++) 
347    {
348      buffer>>contextId>>ci.poolId>>ci.serviceId>>ci.partitionId>>ci.serviceType>>ci.id>>ci.size>>ci.leader ;
349      contexts_[contextId]=ci ;
350    }
351
352  }
353}
Note: See TracBrowser for help on using the repository browser.