source: XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp @ 2260

Last change on this file since 2260 was 2260, checked in by ymipsl, 3 years ago

Improvment of one sided protocol

  • removed latency
  • solve dead-lock

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 16.4 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : eventScheduler_(nullptr), isProcessingEvent_(false), associatedClient_(nullptr)
35  {
36    context=parent;
37    intraComm=intraComm_;
38    MPI_Comm_size(intraComm,&intraCommSize);
39    MPI_Comm_rank(intraComm,&intraCommRank);
40
41    interComm=interComm_;
42    int flag;
43    MPI_Comm_test_inter(interComm,&flag);
44
45    if (flag) attachedMode=false ;
46    else  attachedMode=true ;
47   
48    if (flag) MPI_Comm_remote_size(interComm,&clientSize_);
49    else  MPI_Comm_size(interComm,&clientSize_);
50
51   
52    SRegisterContextInfo contextInfo ;
53    CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ;
54
55  //  if (contextInfo.serviceType != CServicesManager::CLIENT) // we must have an event scheduler => to be retrieve from the associated services
56  //  {
57      //if (!isAttachedModeEnabled()) eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
58    eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
59    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
60  //  }
61
62
63    currentTimeLine=1;
64    scheduled=false;
65    finished=false;
66
67    // generate unique hash for server
68    auto time=chrono::system_clock::now().time_since_epoch().count() ;
69    std::default_random_engine rd(time); // not reproducible from a run to another
70    std::uniform_int_distribution<size_t> dist;
71    hashId=dist(rd) ;
72    MPI_Bcast(&hashId,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context
73
74
75    if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
76    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
77   
78    itLastTimeLine=lastTimeLine.begin() ;
79
80    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
81    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
82     
83  }
84
85//! Attached mode is used ?
86//! \return true if attached mode is used, false otherwise
87  bool CContextServer::isAttachedModeEnabled() const
88  {
89    return attachedMode ;
90  }
91 
92  void CContextServer::setPendingEvent(void)
93  {
94    pendingEvent=true;
95  }
96
97  bool CContextServer::hasPendingEvent(void)
98  {
99    return pendingEvent;
100  }
101
102  bool CContextServer::hasFinished(void)
103  {
104    return finished;
105  }
106
107  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
108  {
109    CTimer::get("listen request").resume();
110    listen();
111    CTimer::get("listen request").suspend();
112    CTimer::get("check pending request").resume();
113    checkPendingRequest();
114    checkPendingProbe() ;
115    CTimer::get("check pending request").suspend();
116    CTimer::get("check event process").resume();
117    if (enableEventsProcessing)  processEvents();
118    CTimer::get("check event process").suspend();
119    return finished;
120  }
121
122 void CContextServer::listen(void)
123  {
124    int rank;
125    int flag;
126    int count;
127    char * addr;
128    MPI_Status status;
129    MPI_Message message ;
130    map<int,CServerBuffer*>::iterator it;
131    bool okLoop;
132
133    traceOff();
134    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status);
135    traceOn();
136    if (flag==true) listenPendingRequest(message, status) ;
137  }
138
139  bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
140  {
141    int count;
142    char * addr;
143    map<int,CServerBuffer*>::iterator it;
144    int rank=status.MPI_SOURCE ;
145
146    it=buffers.find(rank);
147    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
148    {
149      MPI_Aint recvBuff[4] ;
150      MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status);
151      remoteHashId_ = recvBuff[0] ;
152      StdSize buffSize = recvBuff[1];
153      vector<MPI_Aint> winAdress(2) ;
154      winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;
155      mapBufferSize_.insert(std::make_pair(rank, buffSize));
156
157      // create windows dynamically for one-sided
158      if (!isAttachedModeEnabled())
159      { 
160        CTimer::get("create Windows").resume() ;
161        MPI_Comm interComm ;
162        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ;
163        MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ;
164        windows_[rank].resize(2) ;
165        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
166        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);
167        CTimer::get("create Windows").suspend() ;
168        MPI_Barrier(winComm_[rank]) ;
169      }
170      else
171      {
172        winComm_[rank] = MPI_COMM_NULL ;
173        windows_[rank].resize(2) ;
174        windows_[rank][0] = MPI_WIN_NULL ;
175        windows_[rank][1] = MPI_WIN_NULL ;
176      }   
177
178      it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first;
179      lastTimeLine[rank]=0 ;
180      itLastTimeLine=lastTimeLine.begin() ;
181
182      return true;
183    }
184    else
185    {
186        std::pair<MPI_Message,MPI_Status> mypair(message,status) ;
187        pendingProbe[rank].push_back(mypair) ;
188        return false;
189    }
190  }
191
192  void CContextServer::checkPendingProbe(void)
193  {
194   
195    list<int> recvProbe ;
196    list<int>::iterator itRecv ;
197    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
198
199    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
200    {
201      int rank=itProbe->first ;
202      if (pendingRequest.count(rank)==0)
203      {
204        MPI_Message& message = itProbe->second.front().first ;
205        MPI_Status& status = itProbe->second.front().second ;
206        int count ;
207        MPI_Get_count(&status,MPI_CHAR,&count);
208        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
209        if (it->second->isBufferFree(count))
210        {
211          char * addr;
212          addr=(char*)it->second->getBuffer(count);
213          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
214          bufferRequest[rank]=addr;
215          recvProbe.push_back(rank) ;
216          itProbe->second.pop_front() ;
217        }
218      }
219    }
220
221    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
222  }
223
224
225  void CContextServer::checkPendingRequest(void)
226  {
227    map<int,MPI_Request>::iterator it;
228    list<int> recvRequest;
229    list<int>::iterator itRecv;
230    int rank;
231    int flag;
232    int count;
233    MPI_Status status;
234   
235    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume();
236    else CTimer::get("receiving requests").suspend();
237
238    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
239    {
240      rank=it->first;
241      traceOff();
242      MPI_Test(& it->second, &flag, &status);
243      traceOn();
244      if (flag==true)
245      {
246        buffers[rank]->updateCurrentWindows() ;
247        recvRequest.push_back(rank);
248        MPI_Get_count(&status,MPI_CHAR,&count);
249        processRequest(rank,bufferRequest[rank],count);
250      }
251    }
252
253    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
254    {
255      pendingRequest.erase(*itRecv);
256      bufferRequest.erase(*itRecv);
257    }
258  }
259
260  void CContextServer::getBufferFromClient(size_t timeLine)
261  {
262    CTimer::get("CContextServer::getBufferFromClient").resume() ;
263    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
264    { 
265      int rank ;
266      char *buffer ;
267      size_t count ; 
268
269      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
270      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
271      {
272        rank=itLastTimeLine->first ;
273        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())
274        {
275          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);
276          if (count >= 0) break ;
277        }
278      }
279    }
280    CTimer::get("CContextServer::getBufferFromClient").suspend() ;
281  }
282         
283       
284  void CContextServer::processRequest(int rank, char* buff,int count)
285  {
286
287    CBufferIn buffer(buff,count);
288    char* startBuffer,endBuffer;
289    int size, offset;
290    size_t timeLine=0;
291    map<size_t,CEventServer*>::iterator it;
292
293   
294    CTimer::get("Process request").resume();
295    while(count>0)
296    {
297      char* startBuffer=(char*)buffer.ptr();
298      CBufferIn newBuffer(startBuffer,buffer.remain());
299      newBuffer>>size>>timeLine;
300
301      if (timeLine==timelineEventNotifyChangeBufferSize)
302      {
303        buffers[rank]->notifyBufferResizing() ;
304        buffers[rank]->updateCurrentWindows() ;
305        info(100)<<"Receive NotifyChangeBufferSize from client rank "<<rank<<endl ;
306      } 
307      else if (timeLine==timelineEventChangeBufferSize)
308      {
309        size_t newSize ;
310        vector<MPI_Aint> winAdress(2) ;
311        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ;
312        buffers.erase(rank) ;
313        buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, newSize)));
314        info(100)<<"Receive ChangeBufferSize from client rank "<<rank<<"  newSize : "<<newSize<<" Address : "<<winAdress[0]<<" & "<<winAdress[1]<<endl ;
315      }
316      else
317      {
318        info(100)<<"Receive standard event from client rank "<<rank<<"  with timeLine : "<<timeLine<<endl ;
319        it=events.find(timeLine);
320        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
321        it->second->push(rank,buffers[rank],startBuffer,size);
322        if (timeLine>0) lastTimeLine[rank]=timeLine ;
323      }
324      buffer.advance(size);
325      count=buffer.remain();
326    }
327   
328    CTimer::get("Process request").suspend();
329  }
330
331  void CContextServer::processEvents(void)
332  {
333    map<size_t,CEventServer*>::iterator it;
334    CEventServer* event;
335   
336//    if (context->isProcessingEvent()) return ;
337    if (isProcessingEvent_) return ;
338    if (isAttachedModeEnabled())
339      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;
340
341    it=events.find(currentTimeLine);
342    if (it!=events.end())
343    {
344      event=it->second;
345
346      if (event->isFull())
347      {
348        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
349        {
350          eventScheduler_->registerEvent(currentTimeLine,hashId);
351          scheduled=true;
352        }
353        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
354        {
355
356          if (!eventScheduled_) 
357          {
358            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
359            eventScheduled_=true ;
360            return ;
361          }
362          else 
363          {
364            MPI_Status status ;
365            int flag ;
366            MPI_Test(&processEventRequest_, &flag, &status) ;
367            if (!flag) return ;
368            eventScheduled_=false ;
369          }
370
371          if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ;
372          //MPI_Barrier(intraComm) ;
373         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
374         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
375         // for now just set up a MPI barrier
376//ym to be check later
377//         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
378
379//         context->setProcessingEvent() ;
380         isProcessingEvent_=true ;
381         CTimer::get("Process events").resume();
382         info(100)<<"Received Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
383         dispatchEvent(*event);
384         CTimer::get("Process events").suspend();
385         isProcessingEvent_=false ;
386//         context->unsetProcessingEvent() ;
387         pendingEvent=false;
388         delete event;
389         events.erase(it);
390         currentTimeLine++;
391         scheduled = false;
392         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;
393        }
394      }
395      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ;
396    }
397    else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
398  }
399
400  CContextServer::~CContextServer()
401  {
402    map<int,CServerBuffer*>::iterator it;
403    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
404  }
405
406  void CContextServer::releaseBuffers()
407  {
408    for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
409    buffers.clear() ; 
410    freeWindows() ;
411  }
412
413  void CContextServer::freeWindows()
414  {
415    if (!isAttachedModeEnabled())
416    {
417      for(auto& it : winComm_)
418      {
419        int rank = it.first ;
420        MPI_Win_free(&windows_[rank][0]);
421        MPI_Win_free(&windows_[rank][1]);
422        MPI_Comm_free(&winComm_[rank]) ;
423      }
424    }
425  }
426
427  void CContextServer::notifyClientsFinalize(void)
428  {
429    for(auto it=buffers.begin();it!=buffers.end();++it)
430    {
431      it->second->notifyClientFinalize() ;
432    }
433  }
434
435  void CContextServer::dispatchEvent(CEventServer& event)
436  {
437    string contextName;
438    string buff;
439    int MsgSize;
440    int rank;
441    list<CEventServer::SSubEvent>::iterator it;
442    StdString ctxId = context->getId();
443    CContext::setCurrent(ctxId);
444    StdSize totalBuf = 0;
445
446    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
447    {
448      finished=true;
449      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
450      notifyClientsFinalize() ;
451      CTimer::get("receiving requests").suspend();
452      context->finalize();
453      freeWindows() ;
454
455      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
456                           iteMap = mapBufferSize_.end(), itMap;
457      for (itMap = itbMap; itMap != iteMap; ++itMap)
458      {
459        rank = itMap->first;
460        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
461            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
462        totalBuf += itMap->second;
463      }
464      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
465    }
466    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
467    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
468    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
469    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
470    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
471    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
472    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
473    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
474    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
475    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
476    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
477    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
478    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
479    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
480    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
481    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
482    else
483    {
484      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
485    }
486  }
487
488  bool CContextServer::isCollectiveEvent(CEventServer& event)
489  {
490    if (event.classId==CField::GetType()) return CField::isCollectiveEvent(event);
491    else return true ;
492  }
493}
Note: See TracBrowser for help on using the repository browser.