source: XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp @ 2230

Last change on this file since 2230 was 2230, checked in by ymipsl, 3 years ago

Fix some Dead-lock issue...
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 15.2 KB
RevLine 
[300]1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
[352]5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
[300]8#include "domain.hpp"
[352]9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[401]15#include "cxios.hpp"
[492]16#include "event_scheduler.hpp"
17#include "server.hpp"
[1761]18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
[2130]22#include "timeline_events.hpp"
[1761]23
[492]24#include <boost/functional/hash.hpp>
[1761]25#include <random>
26#include <chrono>
[300]27
28
[335]29namespace xios
[300]30{
[1761]31  using namespace std ;
[300]32
[1853]33  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : eventScheduler_(nullptr), isProcessingEvent_(false), associatedClient_(nullptr)
[300]35  {
[549]36    context=parent;
37    intraComm=intraComm_;
[1639]38    MPI_Comm_size(intraComm,&intraCommSize);
39    MPI_Comm_rank(intraComm,&intraCommRank);
[1054]40
[549]41    interComm=interComm_;
42    int flag;
[1639]43    MPI_Comm_test_inter(interComm,&flag);
[1757]44
45    if (flag) attachedMode=false ;
46    else  attachedMode=true ;
47   
[1639]48    if (flag) MPI_Comm_remote_size(interComm,&commSize);
49    else  MPI_Comm_size(interComm,&commSize);
[983]50
[1761]51   
52    SRegisterContextInfo contextInfo ;
53    CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ;
54
[2022]55  //  if (contextInfo.serviceType != CServicesManager::CLIENT) // we must have an event scheduler => to be retrieve from the associated services
56  //  {
[2123]57      //if (!isAttachedModeEnabled()) eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
[2230]58    eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
59    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
[2022]60  //  }
[1761]61
62
[1757]63    currentTimeLine=1;
[549]64    scheduled=false;
65    finished=false;
[1761]66
67    // generate unique hash for server
68    auto time=chrono::system_clock::now().time_since_epoch().count() ;
69    std::default_random_engine rd(time); // not reproducible from a run to another
70    std::uniform_int_distribution<size_t> dist;
71    hashId=dist(rd) ;
72    MPI_Bcast(&hashId,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context
73
74
[1757]75    if (!isAttachedModeEnabled())
76    {
77      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ;
78// create windows for one sided comm
79      int interCommMergedRank;
80      MPI_Comm winComm ;
81      MPI_Comm_rank(intraComm, &interCommMergedRank);
82      windows.resize(2) ;
83      for(int rank=commSize; rank<commSize+intraCommSize; rank++)
84      {
85        if (rank==commSize+interCommMergedRank) 
86        {
87          MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm);
88          int myRank ;
89          MPI_Comm_rank(winComm,&myRank);
90          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]);
91          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);     
92        }
93        else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm);
[2230]94//       ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock
95//            Bug or not ?         
96        // MPI_Comm_free(&winComm) ;
[1757]97      }
98    }
99    else 
100    {
101      windows.resize(2) ;
102      windows[0]=MPI_WIN_NULL ;
103      windows[1]=MPI_WIN_NULL ;
104    }
105
106
107   
108    MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ;
109    itLastTimeLine=lastTimeLine.begin() ;
110
111    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
112    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
113     
[300]114  }
[992]115
[1757]116//! Attached mode is used ?
117//! \return true if attached mode is used, false otherwise
118  bool CContextServer::isAttachedModeEnabled() const
119  {
120    return attachedMode ;
121  }
122 
[300]123  void CContextServer::setPendingEvent(void)
124  {
[549]125    pendingEvent=true;
[300]126  }
[489]127
[300]128  bool CContextServer::hasPendingEvent(void)
129  {
[549]130    return pendingEvent;
[300]131  }
[489]132
[597]133  bool CContextServer::hasFinished(void)
134  {
135    return finished;
136  }
137
[1054]138  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
[300]139  {
[549]140    listen();
141    checkPendingRequest();
[1757]142    if (enableEventsProcessing)  processEvents();
[549]143    return finished;
[300]144  }
145
146  void CContextServer::listen(void)
147  {
148    int rank;
[549]149    int flag;
150    int count;
151    char * addr;
[1639]152    MPI_Status status;
[300]153    map<int,CServerBuffer*>::iterator it;
[1230]154    bool okLoop;
[489]155
[1225]156    traceOff();
[2223]157    // WARNING : with intel MPI, probing crash on an intercommunicator with release library but not with release_mt
158    // ==>  source $I_MPI_ROOT/intel64/bin/mpivars.sh release_mt    needed
[1225]159    MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status);
160    traceOn();
161
162    if (flag==true)
[300]163    {
[1225]164      rank=status.MPI_SOURCE ;
[1230]165      okLoop = true;
[1228]166      if (pendingRequest.find(rank)==pendingRequest.end())
167        okLoop = !listenPendingRequest(status) ;
168      if (okLoop)
[300]169      {
[1225]170        for(rank=0;rank<commSize;rank++)
[300]171        {
[1225]172          if (pendingRequest.find(rank)==pendingRequest.end())
[300]173          {
[1225]174
175            traceOff();
[1639]176            MPI_Iprobe(rank, 20,interComm,&flag,&status);
[1225]177            traceOn();
178            if (flag==true) listenPendingRequest(status) ;
[300]179          }
180        }
181      }
182    }
183  }
[489]184
[1639]185  bool CContextServer::listenPendingRequest(MPI_Status& status)
[1225]186  {
187    int count;
188    char * addr;
189    map<int,CServerBuffer*>::iterator it;
190    int rank=status.MPI_SOURCE ;
191
192    it=buffers.find(rank);
193    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
194    {
[2130]195       MPI_Aint recvBuff[4] ;
196       MPI_Recv(recvBuff, 4, MPI_AINT, rank, 20, interComm, &status);
197       remoteHashId_ = recvBuff[0] ;
198       StdSize buffSize = recvBuff[1];
[1757]199       vector<MPI_Aint> winAdress(2) ;
[2130]200       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;
[1225]201       mapBufferSize_.insert(std::make_pair(rank, buffSize));
[1757]202       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first;
[1765]203     
[1757]204       lastTimeLine[rank]=0 ;
205       itLastTimeLine=lastTimeLine.begin() ;
206
[1228]207       return true;
[1225]208    }
209    else
210    {
[1639]211      MPI_Get_count(&status,MPI_CHAR,&count);
[1225]212      if (it->second->isBufferFree(count))
213      {
214         addr=(char*)it->second->getBuffer(count);
[1639]215         MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]);
[1225]216         bufferRequest[rank]=addr;
[1228]217         return true;
[1225]218       }
[1228]219      else
220        return false;
[1225]221    }
222  }
223
224
[300]225  void CContextServer::checkPendingRequest(void)
226  {
[1639]227    map<int,MPI_Request>::iterator it;
[549]228    list<int> recvRequest;
[300]229    list<int>::iterator itRecv;
[549]230    int rank;
231    int flag;
232    int count;
[1639]233    MPI_Status status;
[489]234
[300]235    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
236    {
[549]237      rank=it->first;
238      traceOff();
[1639]239      MPI_Test(& it->second, &flag, &status);
[549]240      traceOn();
[300]241      if (flag==true)
242      {
[1757]243        buffers[rank]->updateCurrentWindows() ;
[549]244        recvRequest.push_back(rank);
[1639]245        MPI_Get_count(&status,MPI_CHAR,&count);
[549]246        processRequest(rank,bufferRequest[rank],count);
[300]247      }
248    }
[489]249
250    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
[300]251    {
[549]252      pendingRequest.erase(*itRecv);
253      bufferRequest.erase(*itRecv);
[300]254    }
255  }
[489]256
[1757]257  void CContextServer::getBufferFromClient(size_t timeLine)
258  {
259    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
260    { 
261      int rank ;
262      char *buffer ;
263      size_t count ; 
264
265      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
266      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
267      {
268        rank=itLastTimeLine->first ;
269        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0)
270        {
271          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count))
272          {
273            processRequest(rank, buffer, count);
274            break ;
275          }
276        }
277      }
278    }
279  }
280         
281       
[300]282  void CContextServer::processRequest(int rank, char* buff,int count)
283  {
[489]284
[549]285    CBufferIn buffer(buff,count);
286    char* startBuffer,endBuffer;
287    int size, offset;
[1757]288    size_t timeLine=0;
[549]289    map<size_t,CEventServer*>::iterator it;
[489]290
[1757]291   
[1225]292    CTimer::get("Process request").resume();
[300]293    while(count>0)
294    {
[549]295      char* startBuffer=(char*)buffer.ptr();
296      CBufferIn newBuffer(startBuffer,buffer.remain());
297      newBuffer>>size>>timeLine;
[300]298
[2130]299      if (timeLine==timelineEventNotifyChangeBufferSize)
300      {
301        buffers[rank]->notifyBufferResizing() ;
302        buffers[rank]->updateCurrentWindows() ;
303      } 
304      else if (timeLine==timelineEventChangeBufferSize)
305      {
306        size_t newSize ;
307        vector<MPI_Aint> winAdress(2) ;
308        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ;
309        buffers.erase(rank) ;
310        buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, newSize)));
311      }
312      else
313      {
314        it=events.find(timeLine);
315        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
316        it->second->push(rank,buffers[rank],startBuffer,size);
317        if (timeLine>0) lastTimeLine[rank]=timeLine ;
318      }
[549]319      buffer.advance(size);
320      count=buffer.remain();
[489]321    }
[1757]322   
[1225]323    CTimer::get("Process request").suspend();
[300]324  }
[489]325
[300]326  void CContextServer::processEvents(void)
327  {
[549]328    map<size_t,CEventServer*>::iterator it;
329    CEventServer* event;
[1761]330   
[1764]331//    if (context->isProcessingEvent()) return ;
332    if (isProcessingEvent_) return ;
[2130]333    if (isAttachedModeEnabled())
334      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;
[489]335
[549]336    it=events.find(currentTimeLine);
[489]337    if (it!=events.end())
[300]338    {
[549]339      event=it->second;
[509]340
[300]341      if (event->isFull())
342      {
[2123]343        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
[492]344        {
[1764]345          eventScheduler_->registerEvent(currentTimeLine,hashId);
[549]346          scheduled=true;
[492]347        }
[2123]348        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
[492]349        {
[2123]350
[2230]351          if (!eventScheduled_) 
[2123]352          {
[2230]353            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
354            eventScheduled_=true ;
355            return ;
356          }
357          else 
358          {
359            MPI_Status status ;
360            int flag ;
361            MPI_Test(&processEventRequest_, &flag, &status) ;
362            if (!flag) return ;
363            eventScheduled_=false ;
364          }
[2123]365
[2230]366          if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ;
[2123]367          //MPI_Barrier(intraComm) ;
[851]368         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
369         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
370         // for now just set up a MPI barrier
[1875]371//ym to be check later
372//         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
[851]373
[1764]374//         context->setProcessingEvent() ;
375         isProcessingEvent_=true ;
[549]376         CTimer::get("Process events").resume();
[2022]377         info(100)<<"Received Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
[549]378         dispatchEvent(*event);
379         CTimer::get("Process events").suspend();
[1764]380         isProcessingEvent_=false ;
381//         context->unsetProcessingEvent() ;
[549]382         pendingEvent=false;
383         delete event;
384         events.erase(it);
385         currentTimeLine++;
386         scheduled = false;
[2130]387         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;
[492]388        }
389      }
[1757]390      else getBufferFromClient(currentTimeLine) ;
[492]391    }
[1757]392    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
[492]393  }
[489]394
[300]395  CContextServer::~CContextServer()
396  {
[549]397    map<int,CServerBuffer*>::iterator it;
[1158]398    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
[489]399  }
[300]400
[1757]401  void CContextServer::releaseBuffers()
402  {
403    map<int,CServerBuffer*>::iterator it;
404    bool out ;
405    do
406    {
407      out=true ;
408      for(it=buffers.begin();it!=buffers.end();++it)
409      {
410//        out = out && it->second->freeWindows() ;
411
412      }
413    } while (! out) ; 
414  }
415
416  void CContextServer::notifyClientsFinalize(void)
417  {
418    for(auto it=buffers.begin();it!=buffers.end();++it)
419    {
420      it->second->notifyClientFinalize() ;
421    }
422  }
423
[300]424  void CContextServer::dispatchEvent(CEventServer& event)
425  {
[549]426    string contextName;
427    string buff;
428    int MsgSize;
429    int rank;
430    list<CEventServer::SSubEvent>::iterator it;
[1054]431    StdString ctxId = context->getId();
432    CContext::setCurrent(ctxId);
[1130]433    StdSize totalBuf = 0;
[489]434
[300]435    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
436    {
[597]437      finished=true;
[1194]438      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
[1757]439//      releaseBuffers() ;
440      notifyClientsFinalize() ;
[1194]441      context->finalize();
[1757]442
443/* don't know where release windows
444      MPI_Win_free(&windows[0]) ;
445      MPI_Win_free(&windows[1]) ;
446*/     
[511]447      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
[983]448                           iteMap = mapBufferSize_.end(), itMap;
[511]449      for (itMap = itbMap; itMap != iteMap; ++itMap)
450      {
[1054]451        rank = itMap->first;
[1130]452        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
453            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
454        totalBuf += itMap->second;
[511]455      }
[1130]456      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
[300]457    }
[549]458    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
459    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
460    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
461    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
462    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
463    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
464    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
[887]465    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
466    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
[549]467    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
468    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
469    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
470    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
471    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
472    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
473    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
[300]474    else
475    {
[549]476      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
[300]477    }
478  }
[2230]479
480  bool CContextServer::isCollectiveEvent(CEventServer& event)
481  {
482    if (event.classId==CField::GetType()) return CField::isCollectiveEvent(event);
483    else return true ;
484  }
[300]485}
Note: See TracBrowser for help on using the repository browser.