source: XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp @ 2338

Last change on this file since 2338 was 2326, checked in by ymipsl, 2 years ago

Fix Deadlock from reading phase.
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 17.3 KB
RevLine 
[300]1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
[352]5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
[300]8#include "domain.hpp"
[352]9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[401]15#include "cxios.hpp"
[492]16#include "event_scheduler.hpp"
17#include "server.hpp"
[1761]18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
[2130]22#include "timeline_events.hpp"
[1761]23
[492]24#include <boost/functional/hash.hpp>
[1761]25#include <random>
26#include <chrono>
[300]27
28
[335]29namespace xios
[300]30{
[1761]31  using namespace std ;
[300]32
[1853]33  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : eventScheduler_(nullptr), isProcessingEvent_(false), associatedClient_(nullptr)
[300]35  {
[549]36    context=parent;
37    intraComm=intraComm_;
[1639]38    MPI_Comm_size(intraComm,&intraCommSize);
39    MPI_Comm_rank(intraComm,&intraCommRank);
[1054]40
[549]41    interComm=interComm_;
42    int flag;
[1639]43    MPI_Comm_test_inter(interComm,&flag);
[1757]44
45    if (flag) attachedMode=false ;
46    else  attachedMode=true ;
47   
[2258]48    if (flag) MPI_Comm_remote_size(interComm,&clientSize_);
49    else  MPI_Comm_size(interComm,&clientSize_);
[983]50
[1761]51   
52    SRegisterContextInfo contextInfo ;
53    CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ;
54
[2022]55  //  if (contextInfo.serviceType != CServicesManager::CLIENT) // we must have an event scheduler => to be retrieve from the associated services
56  //  {
[2123]57      //if (!isAttachedModeEnabled()) eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
[2230]58    eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
59    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
[2022]60  //  }
[1761]61
62
[1757]63    currentTimeLine=1;
[549]64    scheduled=false;
65    finished=false;
[1761]66
67    // generate unique hash for server
68    auto time=chrono::system_clock::now().time_since_epoch().count() ;
69    std::default_random_engine rd(time); // not reproducible from a run to another
70    std::uniform_int_distribution<size_t> dist;
71    hashId=dist(rd) ;
72    MPI_Bcast(&hashId,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context
73
74
[2259]75    if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
76    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
[1757]77   
78    itLastTimeLine=lastTimeLine.begin() ;
79
80    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
81    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
82     
[300]83  }
[992]84
[1757]85//! Attached mode is used ?
86//! \return true if attached mode is used, false otherwise
87  bool CContextServer::isAttachedModeEnabled() const
88  {
89    return attachedMode ;
90  }
91 
[300]92  void CContextServer::setPendingEvent(void)
93  {
[549]94    pendingEvent=true;
[300]95  }
[489]96
[300]97  bool CContextServer::hasPendingEvent(void)
98  {
[549]99    return pendingEvent;
[300]100  }
[489]101
[597]102  bool CContextServer::hasFinished(void)
103  {
104    return finished;
105  }
106
[1054]107  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
[300]108  {
[2246]109    CTimer::get("listen request").resume();
[549]110    listen();
[2246]111    CTimer::get("listen request").suspend();
112    CTimer::get("check pending request").resume();
[549]113    checkPendingRequest();
[2246]114    checkPendingProbe() ;
115    CTimer::get("check pending request").suspend();
116    CTimer::get("check event process").resume();
[2326]117    processEvents(enableEventsProcessing);
[2246]118    CTimer::get("check event process").suspend();
[549]119    return finished;
[300]120  }
[489]121
[2246]122 void CContextServer::listen(void)
123  {
124    int rank;
125    int flag;
126    int count;
127    char * addr;
128    MPI_Status status;
129    MPI_Message message ;
130    map<int,CServerBuffer*>::iterator it;
131    bool okLoop;
[1225]132
[2246]133    traceOff();
134    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status);
135    traceOn();
136    if (flag==true) listenPendingRequest(message, status) ;
137  }
138
139  bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
140  {
141    int count;
142    char * addr;
143    map<int,CServerBuffer*>::iterator it;
144    int rank=status.MPI_SOURCE ;
145
146    it=buffers.find(rank);
147    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
148    {
[2259]149      MPI_Aint recvBuff[4] ;
150      MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status);
151      remoteHashId_ = recvBuff[0] ;
152      StdSize buffSize = recvBuff[1];
153      vector<MPI_Aint> winAdress(2) ;
154      winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;
155      mapBufferSize_.insert(std::make_pair(rank, buffSize));
156
157      // create windows dynamically for one-sided
158      if (!isAttachedModeEnabled())
159      { 
160        CTimer::get("create Windows").resume() ;
161        MPI_Comm interComm ;
162        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ;
163        MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ;
[2310]164        CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
165        MPI_Comm_free(&interComm) ;
[2259]166        windows_[rank].resize(2) ;
167        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
[2310]168        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
[2259]169        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);
[2310]170        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
[2259]171        CTimer::get("create Windows").suspend() ;
[2260]172        MPI_Barrier(winComm_[rank]) ;
[2259]173      }
174      else
175      {
176        winComm_[rank] = MPI_COMM_NULL ;
177        windows_[rank].resize(2) ;
178        windows_[rank][0] = MPI_WIN_NULL ;
179        windows_[rank][1] = MPI_WIN_NULL ;
180      }   
181
182      it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first;
183      lastTimeLine[rank]=0 ;
184      itLastTimeLine=lastTimeLine.begin() ;
185
186      return true;
[2246]187    }
188    else
189    {
190        std::pair<MPI_Message,MPI_Status> mypair(message,status) ;
191        pendingProbe[rank].push_back(mypair) ;
192        return false;
193    }
194  }
195
196  void CContextServer::checkPendingProbe(void)
197  {
198   
199    list<int> recvProbe ;
200    list<int>::iterator itRecv ;
201    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
202
203    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
204    {
205      int rank=itProbe->first ;
206      if (pendingRequest.count(rank)==0)
207      {
208        MPI_Message& message = itProbe->second.front().first ;
209        MPI_Status& status = itProbe->second.front().second ;
210        int count ;
211        MPI_Get_count(&status,MPI_CHAR,&count);
212        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
[2273]213        if ( (it->second->isBufferFree(count) && !it->second->isResizing()) // accept new request if buffer is free
214          || (it->second->isResizing() && it->second->isBufferEmpty()) )    // or if resizing wait for buffer is empty
[2246]215        {
216          char * addr;
217          addr=(char*)it->second->getBuffer(count);
218          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
219          bufferRequest[rank]=addr;
220          recvProbe.push_back(rank) ;
221          itProbe->second.pop_front() ;
222        }
223      }
224    }
225
226    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
227  }
228
229
[300]230  void CContextServer::checkPendingRequest(void)
231  {
[1639]232    map<int,MPI_Request>::iterator it;
[549]233    list<int> recvRequest;
[300]234    list<int>::iterator itRecv;
[549]235    int rank;
236    int flag;
237    int count;
[1639]238    MPI_Status status;
[2246]239   
240    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume();
241    else CTimer::get("receiving requests").suspend();
[489]242
[300]243    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
244    {
[549]245      rank=it->first;
246      traceOff();
[1639]247      MPI_Test(& it->second, &flag, &status);
[549]248      traceOn();
[300]249      if (flag==true)
250      {
[1757]251        buffers[rank]->updateCurrentWindows() ;
[549]252        recvRequest.push_back(rank);
[1639]253        MPI_Get_count(&status,MPI_CHAR,&count);
[549]254        processRequest(rank,bufferRequest[rank],count);
[300]255      }
256    }
[489]257
258    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
[300]259    {
[549]260      pendingRequest.erase(*itRecv);
261      bufferRequest.erase(*itRecv);
[300]262    }
263  }
[489]264
[1757]265  void CContextServer::getBufferFromClient(size_t timeLine)
266  {
[2246]267    CTimer::get("CContextServer::getBufferFromClient").resume() ;
[1757]268    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
269    { 
270      int rank ;
271      char *buffer ;
272      size_t count ; 
273
274      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
275      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
276      {
277        rank=itLastTimeLine->first ;
[2246]278        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())
[1757]279        {
[2246]280          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);
[2262]281          if (count >= 0) ++itLastTimeLine ;
282          break ;
[1757]283        }
284      }
285    }
[2246]286    CTimer::get("CContextServer::getBufferFromClient").suspend() ;
[1757]287  }
288         
289       
[300]290  void CContextServer::processRequest(int rank, char* buff,int count)
291  {
[489]292
[549]293    CBufferIn buffer(buff,count);
294    char* startBuffer,endBuffer;
295    int size, offset;
[1757]296    size_t timeLine=0;
[549]297    map<size_t,CEventServer*>::iterator it;
[489]298
[1757]299   
[1225]300    CTimer::get("Process request").resume();
[300]301    while(count>0)
302    {
[549]303      char* startBuffer=(char*)buffer.ptr();
304      CBufferIn newBuffer(startBuffer,buffer.remain());
305      newBuffer>>size>>timeLine;
[300]306
[2130]307      if (timeLine==timelineEventNotifyChangeBufferSize)
308      {
309        buffers[rank]->notifyBufferResizing() ;
310        buffers[rank]->updateCurrentWindows() ;
[2309]311        buffers[rank]->popBuffer(count) ;
312        info(100)<<"Context id "<<context->getId()<<" : Receive NotifyChangeBufferSize from client rank "<<rank<<endl
313                 <<"isBufferEmpty ? "<<buffers[rank]->isBufferEmpty()<<"  remaining count : "<<buffers[rank]->getUsed()<<endl;
[2130]314      } 
315      else if (timeLine==timelineEventChangeBufferSize)
316      {
317        size_t newSize ;
318        vector<MPI_Aint> winAdress(2) ;
319        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ;
[2273]320        buffers[rank]->freeBuffer(count) ;
321        delete buffers[rank] ;
[2326]322        buffers[rank] = new CServerBuffer(windows_[rank], winAdress, 0, 2*newSize) ;
[2307]323        info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank
324                 <<"  newSize : "<<newSize<<" Address : "<<winAdress[0]<<" & "<<winAdress[1]<<endl ;
[2130]325      }
326      else
327      {
[2307]328        info(100)<<"Context id "<<context->getId()<<" : Receive standard event from client rank "<<rank<<"  with timeLine : "<<timeLine<<endl ;
[2130]329        it=events.find(timeLine);
330        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
331        it->second->push(rank,buffers[rank],startBuffer,size);
332        if (timeLine>0) lastTimeLine[rank]=timeLine ;
333      }
[549]334      buffer.advance(size);
335      count=buffer.remain();
[489]336    }
[1757]337   
[1225]338    CTimer::get("Process request").suspend();
[300]339  }
[489]340
[2326]341  void CContextServer::processEvents(bool enableEventsProcessing)
[300]342  {
[549]343    map<size_t,CEventServer*>::iterator it;
344    CEventServer* event;
[1761]345   
[1764]346//    if (context->isProcessingEvent()) return ;
347    if (isProcessingEvent_) return ;
[2130]348    if (isAttachedModeEnabled())
349      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;
[489]350
[549]351    it=events.find(currentTimeLine);
[489]352    if (it!=events.end())
[300]353    {
[549]354      event=it->second;
[509]355
[300]356      if (event->isFull())
357      {
[2123]358        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
[492]359        {
[1764]360          eventScheduler_->registerEvent(currentTimeLine,hashId);
[549]361          scheduled=true;
[492]362        }
[2123]363        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
[492]364        {
[2326]365          if (!enableEventsProcessing && isCollectiveEvent(*event)) return ;
[2123]366
[2230]367          if (!eventScheduled_) 
[2123]368          {
[2230]369            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
370            eventScheduled_=true ;
371            return ;
372          }
373          else 
374          {
375            MPI_Status status ;
376            int flag ;
377            MPI_Test(&processEventRequest_, &flag, &status) ;
378            if (!flag) return ;
379            eventScheduled_=false ;
380          }
[2123]381
[2230]382          if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ;
[2123]383          //MPI_Barrier(intraComm) ;
[851]384         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
385         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
386         // for now just set up a MPI barrier
[1875]387//ym to be check later
388//         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
[851]389
[1764]390//         context->setProcessingEvent() ;
391         isProcessingEvent_=true ;
[549]392         CTimer::get("Process events").resume();
[2307]393         info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
[549]394         dispatchEvent(*event);
395         CTimer::get("Process events").suspend();
[1764]396         isProcessingEvent_=false ;
397//         context->unsetProcessingEvent() ;
[549]398         pendingEvent=false;
399         delete event;
400         events.erase(it);
401         currentTimeLine++;
402         scheduled = false;
[2130]403         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;
[492]404        }
405      }
[2246]406      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ;
[492]407    }
[2260]408    else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
[492]409  }
[489]410
[300]411  CContextServer::~CContextServer()
412  {
[549]413    map<int,CServerBuffer*>::iterator it;
[1158]414    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
[2273]415    buffers.clear() ;
[489]416  }
[300]417
[1757]418  void CContextServer::releaseBuffers()
419  {
[2275]420    //for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
421    //buffers.clear() ;
[2258]422    freeWindows() ;
423  }
424
425  void CContextServer::freeWindows()
426  {
[2287]427    //if (!isAttachedModeEnabled())
428    //{
429    //  for(auto& it : winComm_)
430    //  {
431    //    int rank = it.first ;
432    //    MPI_Win_free(&windows_[rank][0]);
433    //    MPI_Win_free(&windows_[rank][1]);
434    //    MPI_Comm_free(&winComm_[rank]) ;
435    //  }
436    //}
[1757]437  }
438
439  void CContextServer::notifyClientsFinalize(void)
440  {
441    for(auto it=buffers.begin();it!=buffers.end();++it)
442    {
443      it->second->notifyClientFinalize() ;
444    }
445  }
446
[300]447  void CContextServer::dispatchEvent(CEventServer& event)
448  {
[549]449    string contextName;
450    string buff;
451    int MsgSize;
452    int rank;
453    list<CEventServer::SSubEvent>::iterator it;
[1054]454    StdString ctxId = context->getId();
455    CContext::setCurrent(ctxId);
[1130]456    StdSize totalBuf = 0;
[489]457
[300]458    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
459    {
[597]460      finished=true;
[1194]461      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
[1757]462      notifyClientsFinalize() ;
[2246]463      CTimer::get("receiving requests").suspend();
[1194]464      context->finalize();
[2273]465     
[511]466      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
[983]467                           iteMap = mapBufferSize_.end(), itMap;
[511]468      for (itMap = itbMap; itMap != iteMap; ++itMap)
469      {
[1054]470        rank = itMap->first;
[1130]471        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
472            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
473        totalBuf += itMap->second;
[511]474      }
[1130]475      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
[300]476    }
[549]477    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
478    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
479    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
480    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
481    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
482    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
483    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
[887]484    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
485    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
[549]486    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
487    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
488    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
489    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
490    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
491    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
492    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
[300]493    else
494    {
[549]495      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
[300]496    }
497  }
[2230]498
499  bool CContextServer::isCollectiveEvent(CEventServer& event)
500  {
[2326]501    if (event.type>1000) return false ;
[2230]502    else return true ;
503  }
[300]504}
Note: See TracBrowser for help on using the repository browser.