source: XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp @ 2258

Last change on this file since 2258 was 2258, checked in by ymipsl, 3 years ago

One sided protocol improvment.
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 16.4 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : eventScheduler_(nullptr), isProcessingEvent_(false), associatedClient_(nullptr)
35  {
36    context=parent;
37    intraComm=intraComm_;
38    MPI_Comm_size(intraComm,&intraCommSize);
39    MPI_Comm_rank(intraComm,&intraCommRank);
40
41    interComm=interComm_;
42    int flag;
43    MPI_Comm_test_inter(interComm,&flag);
44
45    if (flag) attachedMode=false ;
46    else  attachedMode=true ;
47   
48    if (flag) MPI_Comm_remote_size(interComm,&clientSize_);
49    else  MPI_Comm_size(interComm,&clientSize_);
50
51   
52    SRegisterContextInfo contextInfo ;
53    CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ;
54
55  //  if (contextInfo.serviceType != CServicesManager::CLIENT) // we must have an event scheduler => to be retrieve from the associated services
56  //  {
57      //if (!isAttachedModeEnabled()) eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
58    eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
59    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
60  //  }
61
62
63    currentTimeLine=1;
64    scheduled=false;
65    finished=false;
66
67    // generate unique hash for server
68    auto time=chrono::system_clock::now().time_since_epoch().count() ;
69    std::default_random_engine rd(time); // not reproducible from a run to another
70    std::uniform_int_distribution<size_t> dist;
71    hashId=dist(rd) ;
72    MPI_Bcast(&hashId,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context
73
74
75    if (!isAttachedModeEnabled())
76    {
77      CTimer::get("create Windows").resume() ;
78
79      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ;
80
81      double time ;
82      windows_.resize(clientSize_) ;
83      MPI_Comm commSelf ;
84      MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf) ;
85      MPI_Comm interComm ;
86      winComm_.resize(clientSize_) ;
87      for(int rank=0; rank<clientSize_ ; rank++) 
88      {
89        time=MPI_Wtime() ;
90        MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &interComm) ;
91        MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ;
92        windows_[rank].resize(2) ;
93        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
94        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 
95        time=MPI_Wtime()-time ;
96        info(100)<< "MPI_Win_create_dynamic : server to client rank "<<rank<<" => "<<time/1e-6<<" us"<<endl ;
97      }
98      MPI_Comm_free(&commSelf) ;
99      CTimer::get("create Windows").suspend() ;
100    }
101    else 
102    {
103      winComm_.resize(clientSize_) ;
104      windows_.resize(clientSize_) ;
105      for(int rank=0; rank<clientSize_ ; rank++) 
106      {
107        winComm_[rank] = MPI_COMM_NULL ;
108        windows_[rank].resize(2) ;
109        windows_[rank][0]=MPI_WIN_NULL ;
110        windows_[rank][1]=MPI_WIN_NULL ;
111      }
112    }
113   
114    itLastTimeLine=lastTimeLine.begin() ;
115
116    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
117    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
118     
119  }
120
121//! Attached mode is used ?
122//! \return true if attached mode is used, false otherwise
123  bool CContextServer::isAttachedModeEnabled() const
124  {
125    return attachedMode ;
126  }
127 
128  void CContextServer::setPendingEvent(void)
129  {
130    pendingEvent=true;
131  }
132
133  bool CContextServer::hasPendingEvent(void)
134  {
135    return pendingEvent;
136  }
137
138  bool CContextServer::hasFinished(void)
139  {
140    return finished;
141  }
142
143  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
144  {
145    CTimer::get("listen request").resume();
146    listen();
147    CTimer::get("listen request").suspend();
148    CTimer::get("check pending request").resume();
149    checkPendingRequest();
150    checkPendingProbe() ;
151    CTimer::get("check pending request").suspend();
152    CTimer::get("check event process").resume();
153    if (enableEventsProcessing)  processEvents();
154    CTimer::get("check event process").suspend();
155    return finished;
156  }
157
158 void CContextServer::listen(void)
159  {
160    int rank;
161    int flag;
162    int count;
163    char * addr;
164    MPI_Status status;
165    MPI_Message message ;
166    map<int,CServerBuffer*>::iterator it;
167    bool okLoop;
168
169    traceOff();
170    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status);
171    traceOn();
172    if (flag==true) listenPendingRequest(message, status) ;
173  }
174
175  bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
176  {
177    int count;
178    char * addr;
179    map<int,CServerBuffer*>::iterator it;
180    int rank=status.MPI_SOURCE ;
181
182    it=buffers.find(rank);
183    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
184    {
185       MPI_Aint recvBuff[4] ;
186       MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status);
187       remoteHashId_ = recvBuff[0] ;
188       StdSize buffSize = recvBuff[1];
189       vector<MPI_Aint> winAdress(2) ;
190       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;
191       mapBufferSize_.insert(std::make_pair(rank, buffSize));
192       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first;
193       lastTimeLine[rank]=0 ;
194       itLastTimeLine=lastTimeLine.begin() ;
195       return true;
196    }
197    else
198    {
199        std::pair<MPI_Message,MPI_Status> mypair(message,status) ;
200        pendingProbe[rank].push_back(mypair) ;
201        return false;
202    }
203  }
204
205  void CContextServer::checkPendingProbe(void)
206  {
207   
208    list<int> recvProbe ;
209    list<int>::iterator itRecv ;
210    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
211
212    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
213    {
214      int rank=itProbe->first ;
215      if (pendingRequest.count(rank)==0)
216      {
217        MPI_Message& message = itProbe->second.front().first ;
218        MPI_Status& status = itProbe->second.front().second ;
219        int count ;
220        MPI_Get_count(&status,MPI_CHAR,&count);
221        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
222        if (it->second->isBufferFree(count))
223        {
224          char * addr;
225          addr=(char*)it->second->getBuffer(count);
226          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
227          bufferRequest[rank]=addr;
228          recvProbe.push_back(rank) ;
229          itProbe->second.pop_front() ;
230        }
231      }
232    }
233
234    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
235  }
236
237
238  void CContextServer::checkPendingRequest(void)
239  {
240    map<int,MPI_Request>::iterator it;
241    list<int> recvRequest;
242    list<int>::iterator itRecv;
243    int rank;
244    int flag;
245    int count;
246    MPI_Status status;
247   
248    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume();
249    else CTimer::get("receiving requests").suspend();
250
251    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
252    {
253      rank=it->first;
254      traceOff();
255      MPI_Test(& it->second, &flag, &status);
256      traceOn();
257      if (flag==true)
258      {
259        buffers[rank]->updateCurrentWindows() ;
260        recvRequest.push_back(rank);
261        MPI_Get_count(&status,MPI_CHAR,&count);
262        processRequest(rank,bufferRequest[rank],count);
263      }
264    }
265
266    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
267    {
268      pendingRequest.erase(*itRecv);
269      bufferRequest.erase(*itRecv);
270    }
271  }
272
273  void CContextServer::getBufferFromClient(size_t timeLine)
274  {
275    CTimer::get("CContextServer::getBufferFromClient").resume() ;
276    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
277    { 
278      int rank ;
279      char *buffer ;
280      size_t count ; 
281
282      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
283      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
284      {
285        rank=itLastTimeLine->first ;
286        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())
287        {
288          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);
289          if (count >= 0) break ;
290        }
291      }
292    }
293    CTimer::get("CContextServer::getBufferFromClient").suspend() ;
294  }
295         
296       
297  void CContextServer::processRequest(int rank, char* buff,int count)
298  {
299
300    CBufferIn buffer(buff,count);
301    char* startBuffer,endBuffer;
302    int size, offset;
303    size_t timeLine=0;
304    map<size_t,CEventServer*>::iterator it;
305
306   
307    CTimer::get("Process request").resume();
308    while(count>0)
309    {
310      char* startBuffer=(char*)buffer.ptr();
311      CBufferIn newBuffer(startBuffer,buffer.remain());
312      newBuffer>>size>>timeLine;
313
314      if (timeLine==timelineEventNotifyChangeBufferSize)
315      {
316        buffers[rank]->notifyBufferResizing() ;
317        buffers[rank]->updateCurrentWindows() ;
318      } 
319      else if (timeLine==timelineEventChangeBufferSize)
320      {
321        size_t newSize ;
322        vector<MPI_Aint> winAdress(2) ;
323        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ;
324        buffers.erase(rank) ;
325        buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, newSize)));
326      }
327      else
328      {
329        it=events.find(timeLine);
330        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
331        it->second->push(rank,buffers[rank],startBuffer,size);
332        if (timeLine>0) lastTimeLine[rank]=timeLine ;
333      }
334      buffer.advance(size);
335      count=buffer.remain();
336    }
337   
338    CTimer::get("Process request").suspend();
339  }
340
341  void CContextServer::processEvents(void)
342  {
343    map<size_t,CEventServer*>::iterator it;
344    CEventServer* event;
345   
346//    if (context->isProcessingEvent()) return ;
347    if (isProcessingEvent_) return ;
348    if (isAttachedModeEnabled())
349      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;
350
351    it=events.find(currentTimeLine);
352    if (it!=events.end())
353    {
354      event=it->second;
355
356      if (event->isFull())
357      {
358        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
359        {
360          eventScheduler_->registerEvent(currentTimeLine,hashId);
361          scheduled=true;
362        }
363        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
364        {
365
366          if (!eventScheduled_) 
367          {
368            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
369            eventScheduled_=true ;
370            return ;
371          }
372          else 
373          {
374            MPI_Status status ;
375            int flag ;
376            MPI_Test(&processEventRequest_, &flag, &status) ;
377            if (!flag) return ;
378            eventScheduled_=false ;
379          }
380
381          if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ;
382          //MPI_Barrier(intraComm) ;
383         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
384         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
385         // for now just set up a MPI barrier
386//ym to be check later
387//         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
388
389//         context->setProcessingEvent() ;
390         isProcessingEvent_=true ;
391         CTimer::get("Process events").resume();
392         info(100)<<"Received Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
393         dispatchEvent(*event);
394         CTimer::get("Process events").suspend();
395         isProcessingEvent_=false ;
396//         context->unsetProcessingEvent() ;
397         pendingEvent=false;
398         delete event;
399         events.erase(it);
400         currentTimeLine++;
401         scheduled = false;
402         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;
403        }
404      }
405      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ;
406    }
407    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
408  }
409
410  CContextServer::~CContextServer()
411  {
412    map<int,CServerBuffer*>::iterator it;
413    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
414  }
415
416  void CContextServer::releaseBuffers()
417  {
418    for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
419    buffers.clear() ; 
420    freeWindows() ;
421  }
422
423  void CContextServer::freeWindows()
424  {
425    if (!isAttachedModeEnabled())
426    {
427      for(int rank=0; rank<clientSize_; rank++)
428      {
429        MPI_Win_free(&windows_[rank][0]);
430        MPI_Win_free(&windows_[rank][1]);
431        MPI_Comm_free(&winComm_[rank]) ;
432      }
433    }
434  }
435
436  void CContextServer::notifyClientsFinalize(void)
437  {
438    for(auto it=buffers.begin();it!=buffers.end();++it)
439    {
440      it->second->notifyClientFinalize() ;
441    }
442  }
443
444  void CContextServer::dispatchEvent(CEventServer& event)
445  {
446    string contextName;
447    string buff;
448    int MsgSize;
449    int rank;
450    list<CEventServer::SSubEvent>::iterator it;
451    StdString ctxId = context->getId();
452    CContext::setCurrent(ctxId);
453    StdSize totalBuf = 0;
454
455    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
456    {
457      finished=true;
458      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
459      notifyClientsFinalize() ;
460      CTimer::get("receiving requests").suspend();
461      context->finalize();
462      freeWindows() ;
463
464      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
465                           iteMap = mapBufferSize_.end(), itMap;
466      for (itMap = itbMap; itMap != iteMap; ++itMap)
467      {
468        rank = itMap->first;
469        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
470            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
471        totalBuf += itMap->second;
472      }
473      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
474    }
475    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
476    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
477    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
478    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
479    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
480    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
481    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
482    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
483    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
484    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
485    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
486    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
487    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
488    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
489    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
490    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
491    else
492    {
493      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
494    }
495  }
496
497  bool CContextServer::isCollectiveEvent(CEventServer& event)
498  {
499    if (event.classId==CField::GetType()) return CField::isCollectiveEvent(event);
500    else return true ;
501  }
502}
Note: See TracBrowser for help on using the repository browser.