source: XIOS3/trunk/src/event_scheduler.cpp

Last change on this file was 2589, checked in by jderouillat, 9 months ago

Specify the usage of the xios namespace to overload the MPI funtions

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 10.8 KB
RevLine 
[492]1#include "event_scheduler.hpp"
[591]2#include "xios_spl.hpp"
[492]3#include "mpi.hpp"
[1224]4#include "tracer.hpp"
[2564]5#include "cxios.hpp"
[492]6
7namespace xios
8{
9 
10 
[1639]11  CEventScheduler::CEventScheduler(const MPI_Comm& comm) 
[492]12  {
[2522]13     schedulerLevel_=0 ;
14     parentScheduler_.reset();
15     childScheduler_.reset();
16     initialize(comm) ;
17  }
18 
19  CEventScheduler::CEventScheduler(const MPI_Comm& comm, size_t schedulerLevel) 
20  {
21     schedulerLevel_=schedulerLevel ;
22     parentScheduler_.reset();
23     childScheduler_.reset();
24     initialize(comm) ;
25  }
26 
27  void CEventScheduler::initialize(const MPI_Comm& comm) 
28  {
[2589]29    xios::MPI_Comm_dup(comm, &communicator_) ;
[2564]30    CXios::getMpiGarbageCollector().registerCommunicator(communicator_) ;
[2518]31    MPI_Comm_size(communicator_,&mpiSize_) ;
32    MPI_Comm_rank(communicator_,&mpiRank_);
[492]33
34
35    int maxChild=1 ;
36
37    int m ;
38    do
39    {
40      m=1 ;
41      maxChild=maxChild+1 ;
42      for(int i=0;i<maxChild;i++) m=m*maxChild ;
[2518]43     } while(m<mpiSize_) ;
[492]44   
45   
46    int maxLevel=0 ;
[2518]47    for(int size=1; size<=mpiSize_; size*=maxChild) maxLevel++ ; 
[492]48
49    int begin, end, nb ;
50    int pos, n ;
51 
[2518]52    parent_=vector<int>(maxLevel+1) ;
53    child_=vector<vector<int> >(maxLevel+1,vector<int>(maxChild)) ;
54    nbChild_=vector<int> (maxLevel+1) ;
[492]55   
[2518]56    level_=0 ;
[492]57    begin=0 ;
[2518]58    end=mpiSize_-1 ;     
[492]59    nb=end-begin+1 ;
60     
61    do
62    {
63      n=0 ;
64      pos=begin ;
[2518]65      nbChild_[level_]=0 ;
66      parent_[level_+1]=begin ;
[492]67      for(int i=0;i<maxChild && i<nb ;i++)
68      {
69        if (i<nb%maxChild) n = nb/maxChild + 1 ;
70        else n = nb/maxChild ;
71     
[2518]72        if (mpiRank_>=pos && mpiRank_<pos+n)
[492]73        {
74          begin=pos ;
75          end=pos+n-1 ;
76        }
[2518]77        child_[level_][i]=pos ;
[492]78        pos=pos+n ;
[2518]79        nbChild_[level_]++ ;
[492]80      } 
81      nb=end-begin+1 ;
[2518]82      level_=level_+1 ;
[492]83    } while (nb>1) ;
84
85   
86  }
87
88  CEventScheduler::~CEventScheduler()
89  {
[2518]90    while (!pendingSentParentRequest_.empty() || !pendingRecvParentRequest_.empty() || !pendingRecvChildRequest_.empty() ||  !pendingSentChildRequest_.empty())
[2274]91    {
[2522]92      checkEvent_() ;
[2274]93    } 
[2569]94  }
95   
96  void CEventScheduler::cleanSplitSchedulers()
97  {
98    // Cleaning is operated recursively going from parent to child
99    if (parentScheduler_)
100    {
101      if (parentScheduler_->childScheduler_.get() == this)
102      {
103        parentScheduler_.reset();
104      }
105      else // if orphan (due to splitScheduler) : clean parent tree (it does not have child)
106      {
107        parentScheduler_->cleanSplitSchedulers();
108        parentScheduler_.reset();
109      }
110    }                   
111    if (childScheduler_)
112    {
113      childScheduler_->cleanSplitSchedulers();
114      childScheduler_.reset();
115    }
[492]116  } 
117
[2522]118  void CEventScheduler::splitScheduler(const MPI_Comm& splittedComm, shared_ptr<CEventScheduler>& parent, shared_ptr<CEventScheduler>& child)
119  {
120    int color ;
121    MPI_Comm newComm ;
122    child = make_shared<CEventScheduler>(splittedComm, schedulerLevel_+ 1) ;
123    if (child->isRoot()) color=1 ;
124    else color=0 ;
[2589]125    xios::MPI_Comm_split(communicator_, color, mpiRank_, &newComm) ;
[2564]126    CXios::getMpiGarbageCollector().registerCommunicator(newComm) ;
[2522]127
128    parent = make_shared<CEventScheduler>(newComm , schedulerLevel_) ;
129    child->setParentScheduler(parent) ;
130    parent->setChildScheduler(child) ;
131    if (parentScheduler_) 
132    {
133      parentScheduler_->setChildScheduler(parent) ;
134      parent->setParentScheduler(parentScheduler_) ;
135    }
136
137  }
138
[492]139  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId)
140  {
[2522]141    getBaseScheduler()->registerEvent(timeLine, contextHashId, schedulerLevel_) ;
142    checkEvent_() ;
[492]143  }
144 
[2522]145  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId, const size_t schedulerLevel)
[492]146  {
[2522]147    registerEvent(timeLine, contextHashId, schedulerLevel, level_) ;
148    checkEvent_() ;
149  }
150
151  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId, const size_t schedulerLevel, const size_t lev)
152  {
[492]153       
[1224]154    traceOff() ;
[492]155    SPendingRequest* sentRequest=new SPendingRequest ;
156    sentRequest->buffer[0]=timeLine ;
157    sentRequest->buffer[1]=contextHashId ;
[2522]158    sentRequest->buffer[2]=schedulerLevel ;
159    sentRequest->buffer[3]=lev-1 ;
[492]160
[2518]161    pendingSentParentRequest_.push(sentRequest) ;
[2522]162//    info(100)<<"CEventScheduler::registerEvent => send event to parent "<<parent_[lev]<<" of level" <<lev-1<<endl ;
163    MPI_Isend(sentRequest->buffer,4, MPI_UNSIGNED_LONG, parent_[lev], 0, communicator_, &sentRequest->request) ;
[1224]164    traceOn() ;
[492]165  } 
166
[2522]167 
168  bool CEventScheduler::queryEvent_(const size_t timeLine, const size_t contextHashId)
[492]169  {
[2522]170    checkEvent_() ;
171
[2518]172    if (! eventStack_.empty() && eventStack_.front().first==timeLine && eventStack_.front().second==contextHashId)
[492]173    {
174      return true ;
175    }
176    else return false ; 
177  } 
[2230]178 
[2522]179  void CEventScheduler::checkEvent_(void)
[492]180  {
[2522]181   
182    if (parentScheduler_) parentScheduler_->checkEvent_() ;
[1224]183    traceOff() ;
[492]184    checkChildRequest() ;
185    checkParentRequest() ;
[1224]186    traceOn() ;
[492]187   
188  }
189 
190  void CEventScheduler::checkParentRequest(void)
191  {
192    int completed ;
[1639]193    MPI_Status status ;
[492]194    int received ;
195    SPendingRequest* recvRequest ;
196    completed=true ;
197   
198    // check sent request to parent
[2518]199    while (! pendingSentParentRequest_.empty() && completed)
[492]200    {
[2518]201      MPI_Test( & pendingSentParentRequest_.front()->request, &completed, &status) ;
[492]202      if (completed) 
203      {
[2518]204        delete pendingSentParentRequest_.front() ;
205        pendingSentParentRequest_.pop() ;
[492]206      }
207    }
208   
209    // probe if a message is coming from parent
210    received=true ;
211    while(received)
212    {
[2518]213      MPI_Iprobe(MPI_ANY_SOURCE,1,communicator_,&received, &status) ;
[492]214      if (received)
215      {
216        recvRequest=new SPendingRequest ;
[2522]217        MPI_Irecv(recvRequest->buffer, 4, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 1, communicator_, &(recvRequest->request)) ;
[2518]218        pendingRecvParentRequest_.push(recvRequest) ;
[492]219      }
220    }
221   
222     // check sent request from parent
223    completed=true ;
[2518]224    while (! pendingRecvParentRequest_.empty() && completed)
[492]225    {
[2518]226      recvRequest=pendingRecvParentRequest_.front() ;
[1639]227      MPI_Test( &(recvRequest->request), &completed, &status) ;
[2522]228
[492]229      if (completed) 
230      {
231        size_t timeLine=recvRequest->buffer[0] ;
232        size_t hashId=recvRequest->buffer[1] ;
[2522]233        size_t schedulerLevel=recvRequest->buffer[2] ;
234        size_t lev=recvRequest->buffer[3] ;
[1158]235        delete recvRequest ;
[2518]236        pendingRecvParentRequest_.pop() ;       
[2522]237       
238//        info(100)<<"CEventScheduler::checkParentRequest => receive event from parent "<< status.MPI_SOURCE<<"at level"<< lev<< endl ;
239       
240        if (lev==level_) 
241        {
242          if (childScheduler_)
243          {
244//            info(100)<<"CEventScheduler::checkParentRequest => bcast event to child scheduler "<<endl;
245            childScheduler_->bcastEvent(timeLine, hashId, schedulerLevel, 0) ;
246          }
247          else
248          { 
249//            info(100)<<"CEventScheduler::checkParentRequest => put event to stack : timeLine : "<<timeLine<<"  hashId : "<<hashId<<endl;
250            eventStack_.push(pair<size_t,size_t>(timeLine,hashId)) ;
251          }
252        }
253        else 
254        {
255//          info(100)<<"CEventScheduler::checkParentRequest => bcast event to child process "<<endl;
256          bcastEvent(timeLine, hashId, schedulerLevel, lev) ;
257        }
[492]258      }
259    }   
260   
261  }
262
263  void CEventScheduler::checkChildRequest(void)
264  {
265// function call only by parent mpi process
266
[1639]267    MPI_Status status ; 
[492]268    int received ;
269    received=true ;
270    SPendingRequest* recvRequest ;
271   
272    // check for posted requests and make the corresponding receive
273    while(received)
274    {
[2518]275      MPI_Iprobe(MPI_ANY_SOURCE,0,communicator_,&received, &status) ;
[492]276      if (received)
277      {
278        recvRequest=new SPendingRequest ;
[2522]279        MPI_Irecv(recvRequest->buffer, 4, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 0, communicator_, &recvRequest->request) ;
[2518]280        pendingRecvChildRequest_.push_back(recvRequest) ;
[492]281      }
282    }
283   
284    // check if receive request is achieved
285   
[2518]286    for(list<SPendingRequest*>::iterator it=pendingRecvChildRequest_.begin(); it!=pendingRecvChildRequest_.end() ; )
[492]287    {
[1639]288      MPI_Test(&((*it)->request),&received,&status) ;
[492]289      if (received)
290      {
291        size_t timeLine=(*it)->buffer[0] ;
292        size_t hashId=(*it)->buffer[1] ;
[2522]293        size_t schedulerLevel=(*it)->buffer[2] ;
294        size_t lev=(*it)->buffer[3] ;
[492]295       
[2522]296//        info(100)<<"CEventScheduler::checkChildRequest => received event from child "<<status.MPI_SOURCE<<" at level "<<lev<<endl;
297
298        SEvent event={timeLine, hashId, schedulerLevel, lev} ;
[492]299        delete *it ; // free mem
[2518]300        it=pendingRecvChildRequest_.erase(it) ; // get out of the list
[492]301       
[2518]302        map< SEvent,int>::iterator itEvent=recvEvent_.find(event) ;
303        if (itEvent==recvEvent_.end()) 
[492]304        {
[2518]305          itEvent=(recvEvent_.insert(pair< SEvent ,int > (event,1))).first ;
[492]306 
307        }
308        else (itEvent->second)++ ;
[2518]309        if (itEvent->second==nbChild_[lev])
[492]310        {
311          if (lev==0)
312          {
[2522]313            if (schedulerLevel==schedulerLevel_) 
314            { 
315//              info(100)<<"CEventScheduler::checkChildRequest => bcastEvent to child"<<endl ;
316              bcastEvent(timeLine, hashId, schedulerLevel, lev) ;
317            }
318            else 
319            { 
320//              info(100)<<"CEventScheduler::checkChildRequest => register event to parent scheduler"<<endl ;
321              parentScheduler_->registerEvent(timeLine, hashId, schedulerLevel) ;
322            }
[2518]323            recvEvent_.erase(itEvent) ;
[492]324          }
325          else
326          {
[2522]327//            info(100)<<"CEventScheduler::checkChildRequest => register event to parent process"<<endl ;
328            registerEvent( timeLine,hashId, schedulerLevel, lev) ;
[2518]329            recvEvent_.erase(itEvent) ;
[492]330          }
331        }
332      }
333      else ++it ;
334    }
335   
336    // check if bcast request is achieved
337
[2518]338    for(list<SPendingRequest*>::iterator it=pendingSentChildRequest_.begin(); it!=pendingSentChildRequest_.end() ; )
[492]339    {
[1639]340      MPI_Test(&(*it)->request,&received,&status) ;
[492]341      if (received)
342      {
343        delete *it ;    // free memory
[2518]344        it = pendingSentChildRequest_.erase(it) ;          // get out of the list
[492]345
346      }
347      else ++it ;
348       
349    }
350  }
351 
[2522]352  void CEventScheduler::bcastEvent(const size_t timeLine, const size_t contextHashId, const size_t schedulerLevel, const size_t lev)
[492]353  {
354    SPendingRequest* sentRequest ;
355     
356   
[2518]357    for(int i=0; i<nbChild_[lev];i++)
[492]358    {
359      sentRequest=new SPendingRequest ;
360      sentRequest->buffer[0]=timeLine ;
361      sentRequest->buffer[1]=contextHashId ;
[2522]362      sentRequest->buffer[2]=schedulerLevel ;
363      sentRequest->buffer[3]=lev+1 ;
364      MPI_Isend(sentRequest->buffer,4, MPI_UNSIGNED_LONG, child_[lev][i], 1, communicator_, & sentRequest->request) ;
[2518]365      pendingSentChildRequest_.push_back(sentRequest) ;
[492]366    }
367  }
368   
369
370}
Note: See TracBrowser for help on using the repository browser.