source: XIOS3/trunk/src/manager/server_context.cpp

Last change on this file was 2628, checked in by jderouillat, 3 months ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.5 KB
RevLine 
[1761]1#include "server_context.hpp"
2#include "contexts_manager.hpp"
3#include "cxios.hpp"
4#include "mpi.hpp"
5#include "context.hpp"
6#include "register_context_info.hpp"
7#include "services.hpp"
[2547]8#include "thread_manager.hpp"
[2246]9#include "timer.hpp"
[1761]10
11
12namespace xios
13{
14  using namespace std ;
[2628]15  extern CLogType logTimers ;
[1761]16
17  map<string, tuple<bool,MPI_Comm,MPI_Comm> > CServerContext::overlapedComm_ ;
18
19  CServerContext::CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 
[1764]20                                 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService),
21                                 hasNotification_(false)
[1761]22  {
[2246]23   info(40)<<"CCServerContext::CServerContext  : new context creation ; contextId : "<<contextId<<endl ;
[1761]24   int localRank, globalRank, commSize ;
25
[2589]26    xios::MPI_Comm_dup(contextComm, &contextComm_) ;
[2580]27    CXios::getMpiGarbageCollector().registerCommunicator(contextComm_) ;
[1761]28    xiosComm_=CXios::getXiosComm() ;
29 
30    MPI_Comm_rank(xiosComm_,&globalRank) ;
31    MPI_Comm_rank(contextComm_,&localRank) ;
32 
[2580]33    winNotify_ = new CWindowManager(contextComm_, maxBufferSize_,"CServerContext::winNotify_") ;
[2220]34    MPI_Barrier(contextComm_) ;
[1761]35   
36    int type;
37    if (localRank==localLeader_) 
38    {
39      globalLeader_=globalRank ;
40      MPI_Comm_rank(contextComm_,&commSize) ;
41     
42      CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
43      SRegisterContextInfo contextInfo = {poolId, serviceId, partitionId, type, contextId, commSize, globalLeader_} ;
44      name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
45      CXios::getContextsManager()->registerContext(name_, contextInfo) ;
46    }
47    MPI_Bcast(&type, 1, MPI_INT, localLeader_,contextComm_) ;
48    name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
49    context_=CContext::create(name_);
50
51    context_->init(this, contextComm, type) ;
52
53    info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank
54                        <<" and global rank "<<globalRank<<endl  ;
[2547]55   
56    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServerContext::threadEventLoop, this) ;
[1761]57  }
58
[1764]59  CServerContext::~CServerContext()
60  {
[2580]61    delete winNotify_ ;
[2274]62    cout<<"Server Context destructor"<<endl;
[1764]63  } 
64
[1761]65  bool CServerContext::createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, 
66                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait)
67  {
[2246]68    info(40)<<"CServerContext::createIntercomm  : context intercomm creation ; contextId : "<<contextId<<endl ;
[1761]69    int intraCommRank ;
70    MPI_Comm_rank(intraComm, &intraCommRank) ;
71    int contextLeader ;
72
73    bool ok ;
74    int type ;
[2588]75   
[1761]76
77    if (intraCommRank==0)
78    {
79      ok=CXios::getContextsManager()->createServerContextIntercomm(poolId, serviceId, partitionId, contextId, name_, wait) ;
80      if (ok) 
81      {
82        CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
83        string name=CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
84        CXios::getContextsManager()->getContextLeader(name, contextLeader) ;
85      }
86    }
87   
[2547]88    if (wait)
89    {
90      MPI_Request req ;
91      MPI_Status status ;
92      MPI_Ibarrier(intraComm,&req) ;
[1764]93   
[2547]94      int flag=false ;
95      while(!flag) 
96      {
97        CXios::getDaemonsManager()->servicesEventLoop() ;
98        MPI_Test(&req,&flag,&status) ;
99      }
[1764]100    }
[2547]101   
[1761]102    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
103
104    if (ok) 
105    {
[2588]106      MPI_Comm newInterCommClient, newInterCommServer ;
[2589]107      xios::MPI_Comm_dup(contextComm_,&newInterCommClient) ;
108      xios::MPI_Comm_dup(contextComm_,&newInterCommServer) ;
[2588]109      overlapedComm_[name_]=tuple<bool, MPI_Comm, MPI_Comm>(false, newInterCommClient, newInterCommServer) ;
110      MPI_Barrier(contextComm_) ;
111
[1764]112      int globalRank ;
113      MPI_Comm_rank(xiosComm_,&globalRank) ;
114      MPI_Bcast(&contextLeader, 1, MPI_INT, 0, intraComm) ;
115     
116      int overlap, nOverlap ;
117      if (contextLeader==globalRank) overlap=1 ;
118      else overlap=0 ;
119      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
120/*
[1761]121      int overlap  ;
122      if (get<0>(overlapedComm_[name_])) overlap=1 ;
123      else overlap=0 ;
124
125      int nOverlap ; 
126      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
127      int commSize ;
128      MPI_Comm_size(contextComm_,&commSize ) ;
[1764]129*/
[2547]130      if (nOverlap==0)
[1761]131      { 
[2589]132        xios::MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ;
[2580]133        CXios::getMpiGarbageCollector().registerCommunicator(interCommClient) ;
[2589]134        xios::MPI_Comm_dup(interCommClient, &interCommServer) ;
[2580]135        CXios::getMpiGarbageCollector().registerCommunicator(interCommServer) ;
[2589]136        xios::MPI_Comm_free(&newInterCommClient) ;
137        xios::MPI_Comm_free(&newInterCommServer) ;
[1761]138      }
139      else
140      {
[2547]141        ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ;
[1761]142      }
143    }
144    overlapedComm_.erase(name_) ;
145    return ok ;
146  }
147
148
149  void CServerContext::createIntercomm(int remoteLeader, const string& sourceContext)
150  {
151     int commSize ;
152     MPI_Comm_size(contextComm_,&commSize) ;
[2246]153     info(40)<<"CServerContext::createIntercomm  : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ;
154   
[1761]155     for(int rank=0; rank<commSize; rank++)
156     {
[1764]157       notifyOutType_=NOTIFY_CREATE_INTERCOMM ;
158       notifyOutCreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ;
[1761]159       sendNotification(rank) ;
160     }
161  }
162 
163  void CServerContext::sendNotification(int rank)
164  {
[2517]165    winNotify_->pushToExclusiveWindow(rank, this, &CServerContext::notificationsDumpOut) ;
[1761]166  }
167
168 
169  void CServerContext::notificationsDumpOut(CBufferOut& buffer)
170  {
171   
172    buffer.realloc(maxBufferSize_) ;
173   
[1764]174    if (notifyOutType_==NOTIFY_CREATE_INTERCOMM)
[1761]175    {
[1764]176      auto& arg=notifyOutCreateIntercomm_ ;
177      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) ;
[1761]178    }
179  }
180
181  void CServerContext::notificationsDumpIn(CBufferIn& buffer)
182  {
[1764]183    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
[1761]184    else
185    {
[1764]186      buffer>>notifyInType_;
187      if (notifyInType_==NOTIFY_CREATE_INTERCOMM)
[1761]188      {
[1764]189        auto& arg=notifyInCreateIntercomm_ ;
[1761]190        buffer >> std::get<0>(arg)>> std::get<1>(arg) ;
191      }
192    }
193  }
194
195  void CServerContext::checkNotifications(void)
196  {
[1764]197    if (!hasNotification_)
198    {
[2246]199      double time=MPI_Wtime() ;
200      if (time-lastEventLoop_ > eventLoopLatency_) 
201      {
202        int commRank ;
203        MPI_Comm_rank(contextComm_, &commRank) ;
[2517]204        winNotify_->popFromExclusiveWindow(commRank, this, &CServerContext::notificationsDumpIn) ;
205       
[2246]206        if (notifyInType_!= NOTIFY_NOTHING)
207        {
208          hasNotification_=true ;
209          auto eventScheduler=parentService_->getEventScheduler() ;
210          std::hash<string> hashString ;
211          size_t hashId = hashString(name_) ;
212          size_t currentTimeLine=0 ;
213          eventScheduler->registerEvent(currentTimeLine,hashId); 
214        }
215        lastEventLoop_=time ;
[1764]216      }
217    }
218   
219    if (hasNotification_)
220    {
221      auto eventScheduler=parentService_->getEventScheduler() ;
222      std::hash<string> hashString ;
223      size_t hashId = hashString(name_) ;
224      size_t currentTimeLine=0 ;
225      if (eventScheduler->queryEvent(currentTimeLine,hashId))
226      {
[2230]227        eventScheduler->popEvent() ;
[1764]228        if (notifyInType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ;
229        hasNotification_=false ;
230      }
231    }
[1761]232  }
233
[1764]234  bool CServerContext::eventLoop(bool serviceOnly)
[1761]235  {
[2628]236    if (info.isActive(logTimers)) CTimer::get("CServerContext::eventLoop").resume();
[1761]237    bool finished=false ;
[2260]238    int flag ;
239    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
240
[2246]241//    double time=MPI_Wtime() ;
242//    if (time-lastEventLoop_ > eventLoopLatency_)
243//    {
244      if (winNotify_!=nullptr) checkNotifications() ;
245//      lastEventLoop_=time ;
246//    }
247
248
[1764]249    if (!serviceOnly && context_!=nullptr) 
[1761]250    {
251      if (context_->eventLoop())
252      {
[2265]253        info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
254        CContext::removeContext(context_->getId()) ;
[1761]255        context_=nullptr ;
256        // destroy context ??? --> later
257      }
258    }
[2628]259    if (info.isActive(logTimers)) CTimer::get("CServerContext::eventLoop").suspend();
[1761]260    if (context_==nullptr && finalizeSignal_) finished=true ;
261    return finished ;
262  }
263
[2547]264  void CServerContext::threadEventLoop(void)
265  {
266   
[2628]267    if (info.isActive(logTimers)) CTimer::get("CServerContext::eventLoop").resume();
[2547]268    info(100)<<"Launch Thread for CServerContext::threadEventLoop, context id = "<<context_->getId()<<endl ;
269    CThreadManager::threadInitialize() ; 
270    do
271    {
272      int flag ;
273      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
274
275      if (winNotify_!=nullptr) checkNotifications() ;
276
277
278      if (context_!=nullptr) 
279      {
280        if (context_->eventLoop())
281        {
282          info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
283          CContext::removeContext(context_->getId()) ;
284          context_=nullptr ;
285          // destroy context ??? --> later
286        }
287      }
288      if (context_==nullptr && finalizeSignal_) finished_=true ;
289 
290      if (!finished_) CThreadManager::yield() ;
291    }
292    while (!finished_) ;
293   
294    CThreadManager::threadFinalize() ;
295    info(100)<<"Close thread for CServerContext::threadEventLoop"<<endl ;
[2628]296    if (info.isActive(logTimers)) CTimer::get("CServerContext::eventLoop").suspend();
[2547]297  }
298
[1761]299  void CServerContext::createIntercomm(void)
300  {
[2246]301    info(40)<<"CServerContext::createIntercomm  : received createIntercomm notification"<<endl ;
302
[1761]303     MPI_Comm interCommServer, interCommClient ;
[1764]304     auto& arg=notifyInCreateIntercomm_ ;
305     int remoteLeader=get<0>(arg) ;
306     string sourceContext=get<1>(arg) ;
[1761]307
308     auto it=overlapedComm_.find(sourceContext) ;
309     int overlap=0 ;
310     if (it!=overlapedComm_.end())
311     {
312       get<0>(it->second)=true ;
313       overlap=1 ;
314     }
315     int nOverlap ; 
316     MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
317     int commSize ;
318     MPI_Comm_size(contextComm_,&commSize ) ;
319
[2547]320    if (nOverlap==0)
[1761]321    { 
[1764]322      info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
[2589]323      xios::MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ;
[2580]324      CXios::getMpiGarbageCollector().registerCommunicator(interCommServer) ;
[2589]325      xios::MPI_Comm_dup(interCommServer,&interCommClient) ;
[2580]326      CXios::getMpiGarbageCollector().registerCommunicator(interCommClient) ;
[1761]327      context_ -> createClientInterComm(interCommClient,interCommServer) ;
328      clientsInterComm_.push_back(interCommClient) ;
329      clientsInterComm_.push_back(interCommServer) ;
330    }
331    else
332    {
[2547]333      ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ;
[1761]334    }
335   
336  }
337
[1764]338  void CServerContext::freeComm(void)
339  {
[2580]340    //delete winNotify_ ;
341    //winNotify_=nullptr ;
[2589]342    //xios::MPI_Comm_free(&contextComm_) ;
[1764]343    // don't forget intercomm -> later
344  }
345 
[1761]346  void CServerContext::finalizeSignal(void)
347  {
348    finalizeSignal_=true ;
349  }
350
[2589]351}
Note: See TracBrowser for help on using the repository browser.