source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.cpp @ 2230

Last change on this file since 2230 was 2230, checked in by ymipsl, 3 years ago

Fix some Dead-lock issue...
YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.5 KB
RevLine 
[1761]1#include "server_context.hpp"
2#include "contexts_manager.hpp"
3#include "cxios.hpp"
4#include "mpi.hpp"
5#include "context.hpp"
6#include "register_context_info.hpp"
7#include "services.hpp"
8
9
10namespace xios
11{
12  using namespace std ;
13
14  map<string, tuple<bool,MPI_Comm,MPI_Comm> > CServerContext::overlapedComm_ ;
15
16  CServerContext::CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 
[1764]17                                 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService),
18                                 hasNotification_(false)
[1761]19  {
20   int localRank, globalRank, commSize ;
21
22    MPI_Comm_dup(contextComm, &contextComm_) ;
23    xiosComm_=CXios::getXiosComm() ;
24 
25    MPI_Comm_rank(xiosComm_,&globalRank) ;
26    MPI_Comm_rank(contextComm_,&localRank) ;
27 
28    winNotify_ = new CWindowManager(contextComm_, maxBufferSize_) ;
[2220]29    MPI_Barrier(contextComm_) ;
[1761]30   
31    int type;
32    if (localRank==localLeader_) 
33    {
34      globalLeader_=globalRank ;
35      MPI_Comm_rank(contextComm_,&commSize) ;
36     
37      CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
38      SRegisterContextInfo contextInfo = {poolId, serviceId, partitionId, type, contextId, commSize, globalLeader_} ;
39      name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
40      CXios::getContextsManager()->registerContext(name_, contextInfo) ;
41    }
42    MPI_Bcast(&type, 1, MPI_INT, localLeader_,contextComm_) ;
43    name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
44    context_=CContext::create(name_);
45
46    context_->init(this, contextComm, type) ;
47
48    info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank
49                        <<" and global rank "<<globalRank<<endl  ;
50  }
51
[1764]52  CServerContext::~CServerContext()
53  {
54
55  } 
56
[1761]57  bool CServerContext::createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, 
58                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait)
59  {
60    int intraCommRank ;
61    MPI_Comm_rank(intraComm, &intraCommRank) ;
62    int contextLeader ;
63
64    bool ok ;
65    int type ;
66    MPI_Comm newInterCommClient, newInterCommServer ;
67    MPI_Comm_dup(contextComm_,&newInterCommClient) ;
68    MPI_Comm_dup(contextComm_,&newInterCommServer) ;
69    overlapedComm_[name_]=tuple<bool, MPI_Comm, MPI_Comm>(false, newInterCommClient, newInterCommServer) ;
70    MPI_Barrier(contextComm_) ;
71
72    if (intraCommRank==0)
73    {
74      ok=CXios::getContextsManager()->createServerContextIntercomm(poolId, serviceId, partitionId, contextId, name_, wait) ;
75      if (ok) 
76      {
77        CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
78        string name=CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
79        CXios::getContextsManager()->getContextLeader(name, contextLeader) ;
80      }
81    }
82   
[1764]83    MPI_Request req ;
84    MPI_Status status ;
85    MPI_Ibarrier(intraComm,&req) ;
86   
87    int flag=false ;
88    while(!flag) 
[1761]89    {
[1764]90      CXios::getDaemonsManager()->servicesEventLoop() ;
91      MPI_Test(&req,&flag,&status) ;
92    }
[1765]93
[1761]94    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
95
96    if (ok) 
97    {
[1764]98      int globalRank ;
99      MPI_Comm_rank(xiosComm_,&globalRank) ;
100      MPI_Bcast(&contextLeader, 1, MPI_INT, 0, intraComm) ;
101     
102      int overlap, nOverlap ;
103      if (contextLeader==globalRank) overlap=1 ;
104      else overlap=0 ;
105      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
106/*
[1761]107      int overlap  ;
108      if (get<0>(overlapedComm_[name_])) overlap=1 ;
109      else overlap=0 ;
110
111      int nOverlap ; 
112      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
113      int commSize ;
114      MPI_Comm_size(contextComm_,&commSize ) ;
[1764]115*/
116      if (nOverlap> 0 )
[1761]117      {
[1764]118        while (get<0>(overlapedComm_[name_])==false) CXios::getDaemonsManager()->servicesEventLoop() ;
119        isAttachedMode_=true ;
[1761]120        cout<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ;
121        interCommClient=newInterCommClient ;
122        interCommServer=newInterCommServer ;
123      }
124      else if (nOverlap==0)
125      { 
126        cout<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
[1764]127        isAttachedMode_=false ;
[1761]128        MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ;
129        MPI_Comm_dup(interCommClient, &interCommServer) ;
130        MPI_Comm_free(&newInterCommClient) ;
131        MPI_Comm_free(&newInterCommServer) ;
132      }
133      else
134      {
135        cout<<"CServerContext::createIntercomm : partial overlap ==> not managed"<<endl ;
136      }
137    }
138    overlapedComm_.erase(name_) ;
139    return ok ;
140  }
141
142
143  void CServerContext::createIntercomm(int remoteLeader, const string& sourceContext)
144  {
145     int commSize ;
146     MPI_Comm_size(contextComm_,&commSize) ;
147     for(int rank=0; rank<commSize; rank++)
148     {
[1764]149       notifyOutType_=NOTIFY_CREATE_INTERCOMM ;
150       notifyOutCreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ;
[1761]151       sendNotification(rank) ;
152     }
153  }
154 
155  void CServerContext::sendNotification(int rank)
156  {
157    winNotify_->lockWindow(rank,0) ;
158    winNotify_->pushToWindow(rank, this, &CServerContext::notificationsDumpOut) ;
159    winNotify_->unlockWindow(rank,0) ;
160  }
161
162 
163  void CServerContext::notificationsDumpOut(CBufferOut& buffer)
164  {
165   
166    buffer.realloc(maxBufferSize_) ;
167   
[1764]168    if (notifyOutType_==NOTIFY_CREATE_INTERCOMM)
[1761]169    {
[1764]170      auto& arg=notifyOutCreateIntercomm_ ;
171      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) ;
[1761]172    }
173  }
174
175  void CServerContext::notificationsDumpIn(CBufferIn& buffer)
176  {
[1764]177    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
[1761]178    else
179    {
[1764]180      buffer>>notifyInType_;
181      if (notifyInType_==NOTIFY_CREATE_INTERCOMM)
[1761]182      {
[1764]183        auto& arg=notifyInCreateIntercomm_ ;
[1761]184        buffer >> std::get<0>(arg)>> std::get<1>(arg) ;
185      }
186    }
187  }
188
189  void CServerContext::checkNotifications(void)
190  {
[1764]191    if (!hasNotification_)
192    {
193      int commRank ;
194      MPI_Comm_rank(contextComm_, &commRank) ;
195      winNotify_->lockWindow(commRank,0) ;
196      winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ;
197      winNotify_->unlockWindow(commRank,0) ;
198     
199      if (notifyInType_!= NOTIFY_NOTHING)
200      {
201        hasNotification_=true ;
202        auto eventScheduler=parentService_->getEventScheduler() ;
203        std::hash<string> hashString ;
204        size_t hashId = hashString(name_) ;
205        size_t currentTimeLine=0 ;
206        eventScheduler->registerEvent(currentTimeLine,hashId); 
207      }
208    }
209   
210    if (hasNotification_)
211    {
212      auto eventScheduler=parentService_->getEventScheduler() ;
213      std::hash<string> hashString ;
214      size_t hashId = hashString(name_) ;
215      size_t currentTimeLine=0 ;
216      if (eventScheduler->queryEvent(currentTimeLine,hashId))
217      {
[2230]218        eventScheduler->popEvent() ;
[1764]219        if (notifyInType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ;
220        hasNotification_=false ;
221      }
222    }
[1761]223  }
224
[1764]225  bool CServerContext::eventLoop(bool serviceOnly)
[1761]226  {
227    bool finished=false ;
[1764]228    if (winNotify_!=nullptr) checkNotifications() ;
229    if (!serviceOnly && context_!=nullptr) 
[1761]230    {
231      if (context_->eventLoop())
232      {
233        context_=nullptr ;
234        // destroy context ??? --> later
235      }
236    }
237
238    if (context_==nullptr && finalizeSignal_) finished=true ;
239    return finished ;
240  }
241
242  void CServerContext::createIntercomm(void)
243  {
244     MPI_Comm interCommServer, interCommClient ;
[1764]245     auto& arg=notifyInCreateIntercomm_ ;
246     int remoteLeader=get<0>(arg) ;
247     string sourceContext=get<1>(arg) ;
[1761]248
249     auto it=overlapedComm_.find(sourceContext) ;
250     int overlap=0 ;
251     if (it!=overlapedComm_.end())
252     {
253       get<0>(it->second)=true ;
254       overlap=1 ;
255     }
256     int nOverlap ; 
257     MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
258     int commSize ;
259     MPI_Comm_size(contextComm_,&commSize ) ;
260
261    if (nOverlap==commSize)
262    {
[1764]263      info(10)<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ;
264      isAttachedMode_=true ;
[1761]265      interCommClient=get<2>(it->second) ;
266      interCommServer=get<1>(it->second) ;
267      context_ -> createClientInterComm(interCommClient, interCommServer ) ;
268      clientsInterComm_.push_back(interCommClient) ;
269      clientsInterComm_.push_back(interCommServer) ;
270    }
271    else if (nOverlap==0)
272    { 
[1764]273      info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
274      isAttachedMode_=false ;
[1761]275      MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ;
276      MPI_Comm_dup(interCommServer,&interCommClient) ;
277      context_ -> createClientInterComm(interCommClient,interCommServer) ;
278      clientsInterComm_.push_back(interCommClient) ;
279      clientsInterComm_.push_back(interCommServer) ;
280    }
281    else
282    {
[1764]283      ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : partial overlap ==> not managed") ;
[1761]284    }
285   
286  }
287
[1764]288  void CServerContext::freeComm(void)
289  {
290    delete winNotify_ ;
291    winNotify_=nullptr ;
292    MPI_Comm_free(&contextComm_) ;
293    // don't forget intercomm -> later
294  }
295 
[1761]296  void CServerContext::finalizeSignal(void)
297  {
298    finalizeSignal_=true ;
299  }
300
301}
Note: See TracBrowser for help on using the repository browser.