source: XIOS3/trunk/src/event_scheduler.cpp @ 2551

Last change on this file since 2551 was 2522, checked in by ymipsl, 12 months ago

Improvment of event scheduler. Now a hierachical approach make possible event scheduling accross different process groups, if a parent group of process is totally overlapping a child group of process.
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 10.0 KB
RevLine 
[492]1#include "event_scheduler.hpp"
[591]2#include "xios_spl.hpp"
[492]3#include "mpi.hpp"
[1224]4#include "tracer.hpp"
[492]5
6namespace xios
7{
8 
9 
[1639]10  CEventScheduler::CEventScheduler(const MPI_Comm& comm) 
[492]11  {
[2522]12     schedulerLevel_=0 ;
13     parentScheduler_.reset();
14     childScheduler_.reset();
15     initialize(comm) ;
16  }
17 
18  CEventScheduler::CEventScheduler(const MPI_Comm& comm, size_t schedulerLevel) 
19  {
20     schedulerLevel_=schedulerLevel ;
21     parentScheduler_.reset();
22     childScheduler_.reset();
23     initialize(comm) ;
24  }
25 
26  void CEventScheduler::initialize(const MPI_Comm& comm) 
27  {
[2518]28    MPI_Comm_dup(comm, &communicator_) ;
29    MPI_Comm_size(communicator_,&mpiSize_) ;
30    MPI_Comm_rank(communicator_,&mpiRank_);
[492]31
32
33    int maxChild=1 ;
34
35    int m ;
36    do
37    {
38      m=1 ;
39      maxChild=maxChild+1 ;
40      for(int i=0;i<maxChild;i++) m=m*maxChild ;
[2518]41     } while(m<mpiSize_) ;
[492]42   
43   
44    int maxLevel=0 ;
[2518]45    for(int size=1; size<=mpiSize_; size*=maxChild) maxLevel++ ; 
[492]46
47    int begin, end, nb ;
48    int pos, n ;
49 
[2518]50    parent_=vector<int>(maxLevel+1) ;
51    child_=vector<vector<int> >(maxLevel+1,vector<int>(maxChild)) ;
52    nbChild_=vector<int> (maxLevel+1) ;
[492]53   
[2518]54    level_=0 ;
[492]55    begin=0 ;
[2518]56    end=mpiSize_-1 ;     
[492]57    nb=end-begin+1 ;
58     
59    do
60    {
61      n=0 ;
62      pos=begin ;
[2518]63      nbChild_[level_]=0 ;
64      parent_[level_+1]=begin ;
[492]65      for(int i=0;i<maxChild && i<nb ;i++)
66      {
67        if (i<nb%maxChild) n = nb/maxChild + 1 ;
68        else n = nb/maxChild ;
69     
[2518]70        if (mpiRank_>=pos && mpiRank_<pos+n)
[492]71        {
72          begin=pos ;
73          end=pos+n-1 ;
74        }
[2518]75        child_[level_][i]=pos ;
[492]76        pos=pos+n ;
[2518]77        nbChild_[level_]++ ;
[492]78      } 
79      nb=end-begin+1 ;
[2518]80      level_=level_+1 ;
[492]81    } while (nb>1) ;
82
83   
84  }
85
86  CEventScheduler::~CEventScheduler()
87  {
[2518]88    while (!pendingSentParentRequest_.empty() || !pendingRecvParentRequest_.empty() || !pendingRecvChildRequest_.empty() ||  !pendingSentChildRequest_.empty())
[2274]89    {
[2522]90      checkEvent_() ;
[2274]91    } 
[492]92  } 
93
[2522]94  void CEventScheduler::splitScheduler(const MPI_Comm& splittedComm, shared_ptr<CEventScheduler>& parent, shared_ptr<CEventScheduler>& child)
95  {
96    int color ;
97    MPI_Comm newComm ;
98    child = make_shared<CEventScheduler>(splittedComm, schedulerLevel_+ 1) ;
99    if (child->isRoot()) color=1 ;
100    else color=0 ;
101    MPI_Comm_split(communicator_, color, mpiRank_, &newComm) ;
102
103    parent = make_shared<CEventScheduler>(newComm , schedulerLevel_) ;
104    child->setParentScheduler(parent) ;
105    parent->setChildScheduler(child) ;
106    if (parentScheduler_) 
107    {
108      parentScheduler_->setChildScheduler(parent) ;
109      parent->setParentScheduler(parentScheduler_) ;
110    }
111
112  }
113
[492]114  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId)
115  {
[2522]116    getBaseScheduler()->registerEvent(timeLine, contextHashId, schedulerLevel_) ;
117    checkEvent_() ;
[492]118  }
119 
[2522]120  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId, const size_t schedulerLevel)
[492]121  {
[2522]122    registerEvent(timeLine, contextHashId, schedulerLevel, level_) ;
123    checkEvent_() ;
124  }
125
126  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId, const size_t schedulerLevel, const size_t lev)
127  {
[492]128       
[1224]129    traceOff() ;
[492]130    SPendingRequest* sentRequest=new SPendingRequest ;
131    sentRequest->buffer[0]=timeLine ;
132    sentRequest->buffer[1]=contextHashId ;
[2522]133    sentRequest->buffer[2]=schedulerLevel ;
134    sentRequest->buffer[3]=lev-1 ;
[492]135
[2518]136    pendingSentParentRequest_.push(sentRequest) ;
[2522]137//    info(100)<<"CEventScheduler::registerEvent => send event to parent "<<parent_[lev]<<" of level" <<lev-1<<endl ;
138    MPI_Isend(sentRequest->buffer,4, MPI_UNSIGNED_LONG, parent_[lev], 0, communicator_, &sentRequest->request) ;
[1224]139    traceOn() ;
[492]140  } 
141
[2522]142 
143  bool CEventScheduler::queryEvent_(const size_t timeLine, const size_t contextHashId)
[492]144  {
[2522]145    checkEvent_() ;
146
[2518]147    if (! eventStack_.empty() && eventStack_.front().first==timeLine && eventStack_.front().second==contextHashId)
[492]148    {
149      return true ;
150    }
151    else return false ; 
152  } 
[2230]153 
[2522]154  void CEventScheduler::checkEvent_(void)
[492]155  {
[2522]156   
157    if (parentScheduler_) parentScheduler_->checkEvent_() ;
[1224]158    traceOff() ;
[492]159    checkChildRequest() ;
160    checkParentRequest() ;
[1224]161    traceOn() ;
[492]162   
163  }
164 
165  void CEventScheduler::checkParentRequest(void)
166  {
167    int completed ;
[1639]168    MPI_Status status ;
[492]169    int received ;
170    SPendingRequest* recvRequest ;
171    completed=true ;
172   
173    // check sent request to parent
[2518]174    while (! pendingSentParentRequest_.empty() && completed)
[492]175    {
[2518]176      MPI_Test( & pendingSentParentRequest_.front()->request, &completed, &status) ;
[492]177      if (completed) 
178      {
[2518]179        delete pendingSentParentRequest_.front() ;
180        pendingSentParentRequest_.pop() ;
[492]181      }
182    }
183   
184    // probe if a message is coming from parent
185    received=true ;
186    while(received)
187    {
[2518]188      MPI_Iprobe(MPI_ANY_SOURCE,1,communicator_,&received, &status) ;
[492]189      if (received)
190      {
191        recvRequest=new SPendingRequest ;
[2522]192        MPI_Irecv(recvRequest->buffer, 4, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 1, communicator_, &(recvRequest->request)) ;
[2518]193        pendingRecvParentRequest_.push(recvRequest) ;
[492]194      }
195    }
196   
197     // check sent request from parent
198    completed=true ;
[2518]199    while (! pendingRecvParentRequest_.empty() && completed)
[492]200    {
[2518]201      recvRequest=pendingRecvParentRequest_.front() ;
[1639]202      MPI_Test( &(recvRequest->request), &completed, &status) ;
[2522]203
[492]204      if (completed) 
205      {
206        size_t timeLine=recvRequest->buffer[0] ;
207        size_t hashId=recvRequest->buffer[1] ;
[2522]208        size_t schedulerLevel=recvRequest->buffer[2] ;
209        size_t lev=recvRequest->buffer[3] ;
[1158]210        delete recvRequest ;
[2518]211        pendingRecvParentRequest_.pop() ;       
[2522]212       
213//        info(100)<<"CEventScheduler::checkParentRequest => receive event from parent "<< status.MPI_SOURCE<<"at level"<< lev<< endl ;
214       
215        if (lev==level_) 
216        {
217          if (childScheduler_)
218          {
219//            info(100)<<"CEventScheduler::checkParentRequest => bcast event to child scheduler "<<endl;
220            childScheduler_->bcastEvent(timeLine, hashId, schedulerLevel, 0) ;
221          }
222          else
223          { 
224//            info(100)<<"CEventScheduler::checkParentRequest => put event to stack : timeLine : "<<timeLine<<"  hashId : "<<hashId<<endl;
225            eventStack_.push(pair<size_t,size_t>(timeLine,hashId)) ;
226          }
227        }
228        else 
229        {
230//          info(100)<<"CEventScheduler::checkParentRequest => bcast event to child process "<<endl;
231          bcastEvent(timeLine, hashId, schedulerLevel, lev) ;
232        }
[492]233      }
234    }   
235   
236  }
237
238  void CEventScheduler::checkChildRequest(void)
239  {
240// function call only by parent mpi process
241
[1639]242    MPI_Status status ; 
[492]243    int received ;
244    received=true ;
245    SPendingRequest* recvRequest ;
246   
247    // check for posted requests and make the corresponding receive
248    while(received)
249    {
[2518]250      MPI_Iprobe(MPI_ANY_SOURCE,0,communicator_,&received, &status) ;
[492]251      if (received)
252      {
253        recvRequest=new SPendingRequest ;
[2522]254        MPI_Irecv(recvRequest->buffer, 4, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 0, communicator_, &recvRequest->request) ;
[2518]255        pendingRecvChildRequest_.push_back(recvRequest) ;
[492]256      }
257    }
258   
259    // check if receive request is achieved
260   
[2518]261    for(list<SPendingRequest*>::iterator it=pendingRecvChildRequest_.begin(); it!=pendingRecvChildRequest_.end() ; )
[492]262    {
[1639]263      MPI_Test(&((*it)->request),&received,&status) ;
[492]264      if (received)
265      {
266        size_t timeLine=(*it)->buffer[0] ;
267        size_t hashId=(*it)->buffer[1] ;
[2522]268        size_t schedulerLevel=(*it)->buffer[2] ;
269        size_t lev=(*it)->buffer[3] ;
[492]270       
[2522]271//        info(100)<<"CEventScheduler::checkChildRequest => received event from child "<<status.MPI_SOURCE<<" at level "<<lev<<endl;
272
273        SEvent event={timeLine, hashId, schedulerLevel, lev} ;
[492]274        delete *it ; // free mem
[2518]275        it=pendingRecvChildRequest_.erase(it) ; // get out of the list
[492]276       
[2518]277        map< SEvent,int>::iterator itEvent=recvEvent_.find(event) ;
278        if (itEvent==recvEvent_.end()) 
[492]279        {
[2518]280          itEvent=(recvEvent_.insert(pair< SEvent ,int > (event,1))).first ;
[492]281 
282        }
283        else (itEvent->second)++ ;
[2518]284        if (itEvent->second==nbChild_[lev])
[492]285        {
286          if (lev==0)
287          {
[2522]288            if (schedulerLevel==schedulerLevel_) 
289            { 
290//              info(100)<<"CEventScheduler::checkChildRequest => bcastEvent to child"<<endl ;
291              bcastEvent(timeLine, hashId, schedulerLevel, lev) ;
292            }
293            else 
294            { 
295//              info(100)<<"CEventScheduler::checkChildRequest => register event to parent scheduler"<<endl ;
296              parentScheduler_->registerEvent(timeLine, hashId, schedulerLevel) ;
297            }
[2518]298            recvEvent_.erase(itEvent) ;
[492]299          }
300          else
301          {
[2522]302//            info(100)<<"CEventScheduler::checkChildRequest => register event to parent process"<<endl ;
303            registerEvent( timeLine,hashId, schedulerLevel, lev) ;
[2518]304            recvEvent_.erase(itEvent) ;
[492]305          }
306        }
307      }
308      else ++it ;
309    }
310   
311    // check if bcast request is achieved
312
[2518]313    for(list<SPendingRequest*>::iterator it=pendingSentChildRequest_.begin(); it!=pendingSentChildRequest_.end() ; )
[492]314    {
[1639]315      MPI_Test(&(*it)->request,&received,&status) ;
[492]316      if (received)
317      {
318        delete *it ;    // free memory
[2518]319        it = pendingSentChildRequest_.erase(it) ;          // get out of the list
[492]320
321      }
322      else ++it ;
323       
324    }
325  }
326 
[2522]327  void CEventScheduler::bcastEvent(const size_t timeLine, const size_t contextHashId, const size_t schedulerLevel, const size_t lev)
[492]328  {
329    SPendingRequest* sentRequest ;
330     
331   
[2518]332    for(int i=0; i<nbChild_[lev];i++)
[492]333    {
334      sentRequest=new SPendingRequest ;
335      sentRequest->buffer[0]=timeLine ;
336      sentRequest->buffer[1]=contextHashId ;
[2522]337      sentRequest->buffer[2]=schedulerLevel ;
338      sentRequest->buffer[3]=lev+1 ;
339      MPI_Isend(sentRequest->buffer,4, MPI_UNSIGNED_LONG, child_[lev][i], 1, communicator_, & sentRequest->request) ;
[2518]340      pendingSentChildRequest_.push_back(sentRequest) ;
[492]341    }
342  }
343   
344
345}
Note: See TracBrowser for help on using the repository browser.