source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/contexts_manager.cpp @ 2260

Last change on this file since 2260 was 2260, checked in by ymipsl, 3 years ago

Improvment of one sided protocol

  • removed latency
  • solve dead-lock

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.6 KB
Line 
1#include "contexts_manager.hpp"
2#include "cxios.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "services.hpp"
6#include "server_context.hpp"
7#include "servers_ressource.hpp"
8#include "server.hpp"
9#include "timer.hpp"
10#include <functional>
11
12
13namespace xios
14{
15  using namespace std ;
16
17  CContextsManager::CContextsManager(bool isXiosServer)
18  {
19    xiosComm_ = CXios::getXiosComm()  ;
20   
21    int commRank ; 
22    MPI_Comm_rank(xiosComm_, &commRank) ;
23    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
24    else commRank=0 ;
25    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
26
27    MPI_Comm_rank(xiosComm_, &commRank) ;
28    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
29   
30
31    winContexts_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
32    winContexts_->lockWindow(commRank,0) ;
33    winContexts_->updateToWindow(commRank, this, &CContextsManager::contextsDumpOut) ;
34    winContexts_->unlockWindow(commRank,0) ;
35
36    MPI_Barrier(xiosComm_)  ;   
37  }
38
39
40  CContextsManager::~CContextsManager()
41  {
42    delete winNotify_ ;
43    delete winContexts_ ;
44  }
45
46  bool CContextsManager::createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId,
47                                             const string& contextId, bool wait)
48  {
49    int serviceLeader ;
50    auto servicesManager = CXios::getServicesManager() ;
51   
52    bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
53
54    info(40)<<"CContextsManager::createServerContext : waiting for service leader ;  serviceId : "<<serviceId<<endl ;
55    if (wait)
56    {
57      while (!ok) 
58      {
59        CXios::getDaemonsManager()->servicesEventLoop() ;
60        ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
61      }
62    }
63
64    if (ok) 
65    {
66      notifyType_=NOTIFY_CREATE_CONTEXT ;
67      notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ;
68      info(40)<<"CContextsManager::createServerContext : notification create_context to service leader "<<serviceLeader<<", serviceId : "<<serviceId<<", contextId "<<contextId<<endl ;
69      sendNotification(serviceLeader) ;
70      return true ;
71    }
72    else return false ;
73  }
74
75
76  bool CContextsManager::createServerContextIntercomm(const string& poolId, const string& serviceId, const int& partitionId,
77                                                      const string& contextId, const string& sourceContext, bool wait)
78  {
79    int contextLeader ;
80    bool ok ;
81    int remoteLeader ;
82    MPI_Comm_rank(xiosComm_, &remoteLeader) ;
83   
84    int type ;
85    info(40)<<"CContextsManager::createServerContextIntercomm : waiting for context leader ;  contextId : "<<contextId<<endl ;
86    ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
87    if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
88    if (wait)
89    {
90      while (!ok) 
91      {
92        CXios::getDaemonsManager()->servicesEventLoop() ;
93        ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
94        if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
95      }
96    }
97   
98    if (ok) 
99    {
100      notifyType_=NOTIFY_CREATE_INTERCOMM ;
101      notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ;
102      info(40)<<"CContextsManager::createServerContextIntercomm : notification create_intercomm to context leader : "<<contextLeader<<", contextId :"<<contextId<<endl ;
103      sendNotification(contextLeader) ;
104      return true ;
105    }
106    else return false ;
107  }
108
109  void CContextsManager::sendNotification(int rank)
110  {
111    winNotify_->lockWindowExclusive(rank) ;
112    winNotify_->pushToLockedWindow(rank, this, &CContextsManager::notificationsDumpOut) ;
113    winNotify_->unlockWindow(rank) ;
114  }
115
116 
117  void CContextsManager::notificationsDumpOut(CBufferOut& buffer)
118  {
119   
120    buffer.realloc(maxBufferSize_) ;
121   
122    if (notifyType_==NOTIFY_CREATE_CONTEXT)
123    {
124      auto& arg=notifyCreateContext_ ;
125      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
126    }
127    else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
128    {
129      auto& arg=notifyCreateIntercomm_ ;
130      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg)<< get<5>(arg) ;
131    }
132  }
133
134  void CContextsManager::notificationsDumpIn(CBufferIn& buffer)
135  {
136    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
137    else
138    {
139      buffer>>notifyType_;
140      if (notifyType_==NOTIFY_CREATE_CONTEXT)
141      {
142        auto& arg=notifyCreateContext_ ;
143        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
144      }
145      else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
146      {
147        auto& arg=notifyCreateIntercomm_ ;
148        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg) >> get<3>(arg) >> get<4>(arg) >> get<5>(arg);
149      }
150    }
151
152  }
153
154  void CContextsManager::eventLoop(void)
155  {
156    CTimer::get("CContextsManager::eventLoop").resume();
157    int flag ;
158    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
159    double time=MPI_Wtime() ;
160    if (time-lastEventLoop_ > eventLoopLatency_) 
161    {
162      checkNotifications() ;
163      lastEventLoop_=time ;
164    }
165    CTimer::get("CContextsManager::eventLoop").suspend();
166  }
167 
168  void CContextsManager::checkNotifications(void)
169  {
170    int commRank ;
171    MPI_Comm_rank(xiosComm_, &commRank) ;
172    winNotify_->lockWindowExclusive(commRank) ;
173    winNotify_->popFromLockedWindow(commRank, this, &CContextsManager::notificationsDumpIn) ;
174    winNotify_->unlockWindow(commRank) ;
175    if (notifyType_==NOTIFY_CREATE_CONTEXT) createServerContext() ;
176    else if (notifyType_==NOTIFY_CREATE_INTERCOMM) createServerContextIntercomm() ;
177
178  }
179
180  void CContextsManager::createServerContext(void)
181  {
182    info(40)<<"CContextsManager::createServerContext : receive create server context notification"<<endl ;
183    auto arg=notifyCreateContext_ ;
184    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
185                             ->createContext(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
186 
187  }
188
189  void CContextsManager::createServerContextIntercomm(void)
190  {
191    info(40)<<"CContextsManager::createServerContext : receive create intercomm context notification"<<endl ;
192    auto arg=notifyCreateIntercomm_ ;
193    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
194                             ->getServerContext(get<3>(arg))
195                             ->createIntercomm(get<4>(arg), get<5>(arg)) ;
196  }             
197
198  string CContextsManager::getServerContextName(const string& poolId, const string& serviceId, const int& partitionId, 
199                                                const int& type, const string& contextId)
200  {
201    if (type==CServicesManager::CLIENT) return contextId;
202    else
203    {
204      ostringstream oss;
205      oss<<partitionId;
206      return poolId+"::"+serviceId+"_"+oss.str()+"::"+contextId;
207    }
208  }
209
210  void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo)
211  {
212    winContexts_->lockWindowExclusive(managerGlobalLeader_) ;
213    winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
214    winContexts_->flushWindow(managerGlobalLeader_) ;
215    contexts_[fullContextId] = contextInfo ;
216    winContexts_->updateToLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ;
217    winContexts_->unlockWindow(managerGlobalLeader_) ;
218  }
219
220  bool CContextsManager::getContextInfo(const string& fullContextId, SRegisterContextInfo& contextInfo, MPI_Comm comm)
221  {
222    bool ret ;
223    int commRank=0 ;
224    if (comm!=MPI_COMM_NULL) MPI_Comm_rank(comm, &commRank) ;
225
226    if (commRank==0)
227    {
228
229      winContexts_->lockWindowShared(managerGlobalLeader_) ;
230      winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
231      winContexts_->unlockWindow(managerGlobalLeader_) ;
232
233      auto it=contexts_.find(fullContextId) ;
234      if ( it == contexts_.end()) ret=false ;
235      else
236      {
237        contextInfo=it->second ; 
238        ret=true ;
239      }
240    }
241   
242    if (comm!=MPI_COMM_NULL) 
243    {
244      MPI_Bcast(&ret,1,MPI_INT,0,comm) ;
245      if (ret)
246      {
247        MPI_Bcast(&contextInfo.leader,1,MPI_INT,0,comm) ;
248        MPI_Bcast(&contextInfo.size,1,MPI_INT,0,comm) ;
249        MPI_Bcast_string(contextInfo.poolId,0,comm) ;
250        MPI_Bcast_string(contextInfo.serviceId,0,comm) ;
251        MPI_Bcast(&contextInfo.serviceType,1,MPI_INT,0,comm) ;
252        MPI_Bcast(&contextInfo.partitionId,1,MPI_INT,0,comm) ;
253        MPI_Bcast_string(contextInfo.id,0,comm) ;
254      }
255    }
256    return ret ;
257  }
258
259  bool CContextsManager::getContextLeader(const string& fullContextId, int& leader, MPI_Comm comm)
260  {
261    SRegisterContextInfo contextInfo ;
262    bool ret=getContextInfo(fullContextId, contextInfo) ;
263    if (ret) leader=contextInfo.leader ;
264    return ret ;
265  }
266
267  bool CContextsManager::getContextSize(const string& fullContextId, int& size, MPI_Comm comm)
268  {
269
270    SRegisterContextInfo contextInfo ;
271    bool ret=getContextInfo(fullContextId, contextInfo) ;
272    if (ret) size=contextInfo.size ;
273    return ret ;
274  }
275
276  bool CContextsManager::getContextPoolId(const string& fullContextId, string& poolId, MPI_Comm comm)
277  {
278    SRegisterContextInfo contextInfo ;
279    bool ret=getContextInfo(fullContextId, contextInfo) ;
280    if (ret) poolId=contextInfo.poolId ;
281    return ret ;
282  }
283
284  bool CContextsManager::getContextServiceId(const string& fullContextId, string& serviceId, MPI_Comm comm)
285  {
286    SRegisterContextInfo contextInfo ;
287    bool ret=getContextInfo(fullContextId, contextInfo) ;
288    if (ret) serviceId=contextInfo.serviceId ;
289    return ret ;
290  }
291
292  bool CContextsManager::getContextPartitionId(const string& fullContextId, int& partitionId, MPI_Comm comm)
293  {
294    SRegisterContextInfo contextInfo ;
295    bool ret=getContextInfo(fullContextId, contextInfo) ;
296    if (ret) partitionId=contextInfo.partitionId ;
297    return ret ;
298  }
299 
300  bool CContextsManager::getContextServiceType(const string& fullContextId, int& serviceType, MPI_Comm comm)
301  {
302    SRegisterContextInfo contextInfo ;
303    bool ret=getContextInfo(fullContextId, contextInfo) ;
304    if (ret) serviceType=contextInfo.serviceType ;
305    return ret ;
306  }
307
308  bool CContextsManager::getContextId(const string& fullContextId, string& contextId, MPI_Comm comm)
309  {
310    SRegisterContextInfo contextInfo ;
311    bool ret=getContextInfo(fullContextId, contextInfo) ;
312    if (ret) contextId=contextInfo.id ;
313    return ret ;
314  }
315
316
317  bool CContextsManager::hasContext(const string& fullContextId, MPI_Comm comm)
318  {
319    SRegisterContextInfo contextInfo ;
320    return getContextInfo(fullContextId, contextInfo) ;
321  }
322
323  void CContextsManager::contextsDumpOut(CBufferOut& buffer)
324  {
325    buffer.realloc(maxBufferSize_) ;
326    buffer<<(int)contexts_.size();
327   
328    for(auto it=contexts_.begin();it!=contexts_.end(); ++it)
329    { 
330      auto key = it->first ;
331      auto val = it->second ; 
332      buffer << key << val.poolId<<val.serviceId<<val.partitionId<<val.serviceType<<val.id<<val.size<<val.leader  ;
333    }
334  } 
335
336  void CContextsManager::contextsDumpIn(CBufferIn& buffer)
337  {
338    std::string contextId ;
339    SRegisterContextInfo ci;
340    int size; 
341    int leader ;
342
343    contexts_.clear() ;
344    int nbContexts ;
345    buffer>>nbContexts ;
346    for(int i=0;i<nbContexts;i++) 
347    {
348      buffer>>contextId>>ci.poolId>>ci.serviceId>>ci.partitionId>>ci.serviceType>>ci.id>>ci.size>>ci.leader ;
349      contexts_[contextId]=ci ;
350    }
351
352  }
353}
Note: See TracBrowser for help on using the repository browser.