source: XIOS3/trunk/src/manager/server_context.cpp @ 2580

Last change on this file since 2580 was 2580, checked in by ymipsl, 9 months ago

Tracking unfree MPI windows and communicators.

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.3 KB
Line 
1#include "server_context.hpp"
2#include "contexts_manager.hpp"
3#include "cxios.hpp"
4#include "mpi.hpp"
5#include "context.hpp"
6#include "register_context_info.hpp"
7#include "services.hpp"
8#include "thread_manager.hpp"
9#include "timer.hpp"
10
11
12namespace xios
13{
14  using namespace std ;
15
16  map<string, tuple<bool,MPI_Comm,MPI_Comm> > CServerContext::overlapedComm_ ;
17
18  CServerContext::CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 
19                                 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService),
20                                 hasNotification_(false)
21  {
22   info(40)<<"CCServerContext::CServerContext  : new context creation ; contextId : "<<contextId<<endl ;
23   int localRank, globalRank, commSize ;
24
25    MPI_Comm_dup(contextComm, &contextComm_) ;
26    CXios::getMpiGarbageCollector().registerCommunicator(contextComm_) ;
27    xiosComm_=CXios::getXiosComm() ;
28 
29    MPI_Comm_rank(xiosComm_,&globalRank) ;
30    MPI_Comm_rank(contextComm_,&localRank) ;
31 
32    winNotify_ = new CWindowManager(contextComm_, maxBufferSize_,"CServerContext::winNotify_") ;
33    MPI_Barrier(contextComm_) ;
34   
35    int type;
36    if (localRank==localLeader_) 
37    {
38      globalLeader_=globalRank ;
39      MPI_Comm_rank(contextComm_,&commSize) ;
40     
41      CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
42      SRegisterContextInfo contextInfo = {poolId, serviceId, partitionId, type, contextId, commSize, globalLeader_} ;
43      name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
44      CXios::getContextsManager()->registerContext(name_, contextInfo) ;
45    }
46    MPI_Bcast(&type, 1, MPI_INT, localLeader_,contextComm_) ;
47    name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
48    context_=CContext::create(name_);
49
50    context_->init(this, contextComm, type) ;
51
52    info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank
53                        <<" and global rank "<<globalRank<<endl  ;
54   
55    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServerContext::threadEventLoop, this) ;
56  }
57
58  CServerContext::~CServerContext()
59  {
60    delete winNotify_ ;
61    cout<<"Server Context destructor"<<endl;
62  } 
63
64  bool CServerContext::createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, 
65                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait)
66  {
67    info(40)<<"CServerContext::createIntercomm  : context intercomm creation ; contextId : "<<contextId<<endl ;
68    int intraCommRank ;
69    MPI_Comm_rank(intraComm, &intraCommRank) ;
70    int contextLeader ;
71
72    bool ok ;
73    int type ;
74    MPI_Comm newInterCommClient, newInterCommServer ;
75    MPI_Comm_dup(contextComm_,&newInterCommClient) ;
76    MPI_Comm_dup(contextComm_,&newInterCommServer) ;
77    overlapedComm_[name_]=tuple<bool, MPI_Comm, MPI_Comm>(false, newInterCommClient, newInterCommServer) ;
78    MPI_Barrier(contextComm_) ;
79
80    if (intraCommRank==0)
81    {
82      ok=CXios::getContextsManager()->createServerContextIntercomm(poolId, serviceId, partitionId, contextId, name_, wait) ;
83      if (ok) 
84      {
85        CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
86        string name=CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
87        CXios::getContextsManager()->getContextLeader(name, contextLeader) ;
88      }
89    }
90   
91    if (wait)
92    {
93      MPI_Request req ;
94      MPI_Status status ;
95      MPI_Ibarrier(intraComm,&req) ;
96   
97      int flag=false ;
98      while(!flag) 
99      {
100        CXios::getDaemonsManager()->servicesEventLoop() ;
101        MPI_Test(&req,&flag,&status) ;
102      }
103    }
104   
105    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
106
107    if (ok) 
108    {
109      int globalRank ;
110      MPI_Comm_rank(xiosComm_,&globalRank) ;
111      MPI_Bcast(&contextLeader, 1, MPI_INT, 0, intraComm) ;
112     
113      int overlap, nOverlap ;
114      if (contextLeader==globalRank) overlap=1 ;
115      else overlap=0 ;
116      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
117/*
118      int overlap  ;
119      if (get<0>(overlapedComm_[name_])) overlap=1 ;
120      else overlap=0 ;
121
122      int nOverlap ; 
123      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
124      int commSize ;
125      MPI_Comm_size(contextComm_,&commSize ) ;
126*/
127      if (nOverlap==0)
128      { 
129        MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ;
130        CXios::getMpiGarbageCollector().registerCommunicator(interCommClient) ;
131        MPI_Comm_dup(interCommClient, &interCommServer) ;
132        CXios::getMpiGarbageCollector().registerCommunicator(interCommServer) ;
133        MPI_Comm_free(&newInterCommClient) ;
134        MPI_Comm_free(&newInterCommServer) ;
135      }
136      else
137      {
138        ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ;
139      }
140    }
141    overlapedComm_.erase(name_) ;
142    return ok ;
143  }
144
145
146  void CServerContext::createIntercomm(int remoteLeader, const string& sourceContext)
147  {
148     int commSize ;
149     MPI_Comm_size(contextComm_,&commSize) ;
150     info(40)<<"CServerContext::createIntercomm  : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ;
151   
152     for(int rank=0; rank<commSize; rank++)
153     {
154       notifyOutType_=NOTIFY_CREATE_INTERCOMM ;
155       notifyOutCreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ;
156       sendNotification(rank) ;
157     }
158  }
159 
160  void CServerContext::sendNotification(int rank)
161  {
162    winNotify_->pushToExclusiveWindow(rank, this, &CServerContext::notificationsDumpOut) ;
163  }
164
165 
166  void CServerContext::notificationsDumpOut(CBufferOut& buffer)
167  {
168   
169    buffer.realloc(maxBufferSize_) ;
170   
171    if (notifyOutType_==NOTIFY_CREATE_INTERCOMM)
172    {
173      auto& arg=notifyOutCreateIntercomm_ ;
174      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) ;
175    }
176  }
177
178  void CServerContext::notificationsDumpIn(CBufferIn& buffer)
179  {
180    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
181    else
182    {
183      buffer>>notifyInType_;
184      if (notifyInType_==NOTIFY_CREATE_INTERCOMM)
185      {
186        auto& arg=notifyInCreateIntercomm_ ;
187        buffer >> std::get<0>(arg)>> std::get<1>(arg) ;
188      }
189    }
190  }
191
192  void CServerContext::checkNotifications(void)
193  {
194    if (!hasNotification_)
195    {
196      double time=MPI_Wtime() ;
197      if (time-lastEventLoop_ > eventLoopLatency_) 
198      {
199        int commRank ;
200        MPI_Comm_rank(contextComm_, &commRank) ;
201        winNotify_->popFromExclusiveWindow(commRank, this, &CServerContext::notificationsDumpIn) ;
202       
203        if (notifyInType_!= NOTIFY_NOTHING)
204        {
205          hasNotification_=true ;
206          auto eventScheduler=parentService_->getEventScheduler() ;
207          std::hash<string> hashString ;
208          size_t hashId = hashString(name_) ;
209          size_t currentTimeLine=0 ;
210          eventScheduler->registerEvent(currentTimeLine,hashId); 
211        }
212        lastEventLoop_=time ;
213      }
214    }
215   
216    if (hasNotification_)
217    {
218      auto eventScheduler=parentService_->getEventScheduler() ;
219      std::hash<string> hashString ;
220      size_t hashId = hashString(name_) ;
221      size_t currentTimeLine=0 ;
222      if (eventScheduler->queryEvent(currentTimeLine,hashId))
223      {
224        eventScheduler->popEvent() ;
225        if (notifyInType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ;
226        hasNotification_=false ;
227      }
228    }
229  }
230
231  bool CServerContext::eventLoop(bool serviceOnly)
232  {
233    CTimer::get("CServerContext::eventLoop").resume();
234    bool finished=false ;
235    int flag ;
236    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
237
238//    double time=MPI_Wtime() ;
239//    if (time-lastEventLoop_ > eventLoopLatency_)
240//    {
241      if (winNotify_!=nullptr) checkNotifications() ;
242//      lastEventLoop_=time ;
243//    }
244
245
246    if (!serviceOnly && context_!=nullptr) 
247    {
248      if (context_->eventLoop())
249      {
250        info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
251        CContext::removeContext(context_->getId()) ;
252        context_=nullptr ;
253        // destroy context ??? --> later
254      }
255    }
256    CTimer::get("CServerContext::eventLoop").suspend();
257    if (context_==nullptr && finalizeSignal_) finished=true ;
258    return finished ;
259  }
260
261  void CServerContext::threadEventLoop(void)
262  {
263   
264    info(100)<<"Launch Thread for CServerContext::threadEventLoop, context id = "<<context_->getId()<<endl ;
265    CThreadManager::threadInitialize() ; 
266    do
267    {
268      CTimer::get("CServerContext::eventLoop").resume();
269      int flag ;
270      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
271
272      if (winNotify_!=nullptr) checkNotifications() ;
273
274
275      if (context_!=nullptr) 
276      {
277        if (context_->eventLoop())
278        {
279          info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
280          CContext::removeContext(context_->getId()) ;
281          context_=nullptr ;
282          // destroy context ??? --> later
283        }
284      }
285      CTimer::get("CServerContext::eventLoop").suspend();
286      if (context_==nullptr && finalizeSignal_) finished_=true ;
287 
288      if (!finished_) CThreadManager::yield() ;
289    }
290    while (!finished_) ;
291   
292    CThreadManager::threadFinalize() ;
293    info(100)<<"Close thread for CServerContext::threadEventLoop"<<endl ;
294  }
295
296  void CServerContext::createIntercomm(void)
297  {
298    info(40)<<"CServerContext::createIntercomm  : received createIntercomm notification"<<endl ;
299
300     MPI_Comm interCommServer, interCommClient ;
301     auto& arg=notifyInCreateIntercomm_ ;
302     int remoteLeader=get<0>(arg) ;
303     string sourceContext=get<1>(arg) ;
304
305     auto it=overlapedComm_.find(sourceContext) ;
306     int overlap=0 ;
307     if (it!=overlapedComm_.end())
308     {
309       get<0>(it->second)=true ;
310       overlap=1 ;
311     }
312     int nOverlap ; 
313     MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
314     int commSize ;
315     MPI_Comm_size(contextComm_,&commSize ) ;
316
317    if (nOverlap==0)
318    { 
319      info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
320      MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ;
321      CXios::getMpiGarbageCollector().registerCommunicator(interCommServer) ;
322      MPI_Comm_dup(interCommServer,&interCommClient) ;
323      CXios::getMpiGarbageCollector().registerCommunicator(interCommClient) ;
324      context_ -> createClientInterComm(interCommClient,interCommServer) ;
325      clientsInterComm_.push_back(interCommClient) ;
326      clientsInterComm_.push_back(interCommServer) ;
327    }
328    else
329    {
330      ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ;
331    }
332   
333  }
334
335  void CServerContext::freeComm(void)
336  {
337    //delete winNotify_ ;
338    //winNotify_=nullptr ;
339    //MPI_Comm_free(&contextComm_) ;
340    // don't forget intercomm -> later
341  }
342 
343  void CServerContext::finalizeSignal(void)
344  {
345    finalizeSignal_=true ;
346  }
347
348}
Note: See TracBrowser for help on using the repository browser.