source: XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp @ 2223

Last change on this file since 2223 was 2223, checked in by ymipsl, 3 years ago

Warning : standard release intelMPI library is bugged when probing onto a MPI intercommunicator, make a segfault.
Unfortunately, works with release_mt version.
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.6 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : eventScheduler_(nullptr), isProcessingEvent_(false), associatedClient_(nullptr)
35  {
36    context=parent;
37    intraComm=intraComm_;
38    MPI_Comm_size(intraComm,&intraCommSize);
39    MPI_Comm_rank(intraComm,&intraCommRank);
40
41    interComm=interComm_;
42    int flag;
43    MPI_Comm_test_inter(interComm,&flag);
44
45    if (flag) attachedMode=false ;
46    else  attachedMode=true ;
47   
48    if (flag) MPI_Comm_remote_size(interComm,&commSize);
49    else  MPI_Comm_size(interComm,&commSize);
50
51   
52    SRegisterContextInfo contextInfo ;
53    CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ;
54
55  //  if (contextInfo.serviceType != CServicesManager::CLIENT) // we must have an event scheduler => to be retrieve from the associated services
56  //  {
57      //if (!isAttachedModeEnabled()) eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
58      eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
59
60  //  }
61
62
63    currentTimeLine=1;
64    scheduled=false;
65    finished=false;
66
67    // generate unique hash for server
68    auto time=chrono::system_clock::now().time_since_epoch().count() ;
69    std::default_random_engine rd(time); // not reproducible from a run to another
70    std::uniform_int_distribution<size_t> dist;
71    hashId=dist(rd) ;
72    MPI_Bcast(&hashId,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context
73
74
75    if (!isAttachedModeEnabled())
76    {
77      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ;
78// create windows for one sided comm
79      int interCommMergedRank;
80      MPI_Comm winComm ;
81      MPI_Comm_rank(intraComm, &interCommMergedRank);
82      windows.resize(2) ;
83      for(int rank=commSize; rank<commSize+intraCommSize; rank++)
84      {
85        if (rank==commSize+interCommMergedRank) 
86        {
87          MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm);
88          int myRank ;
89          MPI_Comm_rank(winComm,&myRank);
90          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]);
91          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);     
92        }
93        else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm);
94        MPI_Comm_free(&winComm) ;
95      }
96    }
97    else 
98    {
99      windows.resize(2) ;
100      windows[0]=MPI_WIN_NULL ;
101      windows[1]=MPI_WIN_NULL ;
102    }
103
104
105   
106    MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ;
107    itLastTimeLine=lastTimeLine.begin() ;
108
109    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
110    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
111     
112  }
113
114//! Attached mode is used ?
115//! \return true if attached mode is used, false otherwise
116  bool CContextServer::isAttachedModeEnabled() const
117  {
118    return attachedMode ;
119  }
120 
121  void CContextServer::setPendingEvent(void)
122  {
123    pendingEvent=true;
124  }
125
126  bool CContextServer::hasPendingEvent(void)
127  {
128    return pendingEvent;
129  }
130
131  bool CContextServer::hasFinished(void)
132  {
133    return finished;
134  }
135
136  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
137  {
138    listen();
139    checkPendingRequest();
140    if (enableEventsProcessing)  processEvents();
141    return finished;
142  }
143
144  void CContextServer::listen(void)
145  {
146    int rank;
147    int flag;
148    int count;
149    char * addr;
150    MPI_Status status;
151    map<int,CServerBuffer*>::iterator it;
152    bool okLoop;
153
154    traceOff();
155    // WARNING : with intel MPI, probing crash on an intercommunicator with release library but not with release_mt
156    // ==>  source $I_MPI_ROOT/intel64/bin/mpivars.sh release_mt    needed
157    MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status);
158    traceOn();
159
160    if (flag==true)
161    {
162      rank=status.MPI_SOURCE ;
163      okLoop = true;
164      if (pendingRequest.find(rank)==pendingRequest.end())
165        okLoop = !listenPendingRequest(status) ;
166      if (okLoop)
167      {
168        for(rank=0;rank<commSize;rank++)
169        {
170          if (pendingRequest.find(rank)==pendingRequest.end())
171          {
172
173            traceOff();
174            MPI_Iprobe(rank, 20,interComm,&flag,&status);
175            traceOn();
176            if (flag==true) listenPendingRequest(status) ;
177          }
178        }
179      }
180    }
181  }
182
183  bool CContextServer::listenPendingRequest(MPI_Status& status)
184  {
185    int count;
186    char * addr;
187    map<int,CServerBuffer*>::iterator it;
188    int rank=status.MPI_SOURCE ;
189
190    it=buffers.find(rank);
191    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
192    {
193       MPI_Aint recvBuff[4] ;
194       MPI_Recv(recvBuff, 4, MPI_AINT, rank, 20, interComm, &status);
195       remoteHashId_ = recvBuff[0] ;
196       StdSize buffSize = recvBuff[1];
197       vector<MPI_Aint> winAdress(2) ;
198       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;
199       mapBufferSize_.insert(std::make_pair(rank, buffSize));
200       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first;
201     
202       lastTimeLine[rank]=0 ;
203       itLastTimeLine=lastTimeLine.begin() ;
204
205       return true;
206    }
207    else
208    {
209      MPI_Get_count(&status,MPI_CHAR,&count);
210      if (it->second->isBufferFree(count))
211      {
212         addr=(char*)it->second->getBuffer(count);
213         MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]);
214         bufferRequest[rank]=addr;
215         return true;
216       }
217      else
218        return false;
219    }
220  }
221
222
223  void CContextServer::checkPendingRequest(void)
224  {
225    map<int,MPI_Request>::iterator it;
226    list<int> recvRequest;
227    list<int>::iterator itRecv;
228    int rank;
229    int flag;
230    int count;
231    MPI_Status status;
232
233    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
234    {
235      rank=it->first;
236      traceOff();
237      MPI_Test(& it->second, &flag, &status);
238      traceOn();
239      if (flag==true)
240      {
241        buffers[rank]->updateCurrentWindows() ;
242        recvRequest.push_back(rank);
243        MPI_Get_count(&status,MPI_CHAR,&count);
244        processRequest(rank,bufferRequest[rank],count);
245      }
246    }
247
248    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
249    {
250      pendingRequest.erase(*itRecv);
251      bufferRequest.erase(*itRecv);
252    }
253  }
254
255  void CContextServer::getBufferFromClient(size_t timeLine)
256  {
257    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
258    { 
259      int rank ;
260      char *buffer ;
261      size_t count ; 
262
263      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
264      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
265      {
266        rank=itLastTimeLine->first ;
267        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0)
268        {
269          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count))
270          {
271            processRequest(rank, buffer, count);
272            break ;
273          }
274        }
275      }
276    }
277  }
278         
279       
280  void CContextServer::processRequest(int rank, char* buff,int count)
281  {
282
283    CBufferIn buffer(buff,count);
284    char* startBuffer,endBuffer;
285    int size, offset;
286    size_t timeLine=0;
287    map<size_t,CEventServer*>::iterator it;
288
289   
290    CTimer::get("Process request").resume();
291    while(count>0)
292    {
293      char* startBuffer=(char*)buffer.ptr();
294      CBufferIn newBuffer(startBuffer,buffer.remain());
295      newBuffer>>size>>timeLine;
296
297      if (timeLine==timelineEventNotifyChangeBufferSize)
298      {
299        buffers[rank]->notifyBufferResizing() ;
300        buffers[rank]->updateCurrentWindows() ;
301      } 
302      else if (timeLine==timelineEventChangeBufferSize)
303      {
304        size_t newSize ;
305        vector<MPI_Aint> winAdress(2) ;
306        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ;
307        buffers.erase(rank) ;
308        buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, newSize)));
309      }
310      else
311      {
312        it=events.find(timeLine);
313        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
314        it->second->push(rank,buffers[rank],startBuffer,size);
315        if (timeLine>0) lastTimeLine[rank]=timeLine ;
316      }
317      buffer.advance(size);
318      count=buffer.remain();
319    }
320   
321    CTimer::get("Process request").suspend();
322  }
323
324  void CContextServer::processEvents(void)
325  {
326    map<size_t,CEventServer*>::iterator it;
327    CEventServer* event;
328   
329//    if (context->isProcessingEvent()) return ;
330    if (isProcessingEvent_) return ;
331    if (isAttachedModeEnabled())
332      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;
333
334    it=events.find(currentTimeLine);
335    if (it!=events.end())
336    {
337      event=it->second;
338
339      if (event->isFull())
340      {
341        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
342        {
343          eventScheduler_->registerEvent(currentTimeLine,hashId);
344          scheduled=true;
345        }
346        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
347        {
348          MPI_Request req ;
349          MPI_Status status ;
350
351          MPI_Ibarrier(intraComm,&req) ;
352          int flag=false ;
353          do 
354          {
355            eventScheduler_->checkEvent()  ;
356            MPI_Test(&req,&flag,&status) ;
357          } while (!flag) ;
358
359          //MPI_Barrier(intraComm) ;
360         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
361         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
362         // for now just set up a MPI barrier
363//ym to be check later
364//         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
365
366//         context->setProcessingEvent() ;
367         isProcessingEvent_=true ;
368         CTimer::get("Process events").resume();
369         info(100)<<"Received Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
370         dispatchEvent(*event);
371         CTimer::get("Process events").suspend();
372         isProcessingEvent_=false ;
373//         context->unsetProcessingEvent() ;
374         pendingEvent=false;
375         delete event;
376         events.erase(it);
377         currentTimeLine++;
378         scheduled = false;
379         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;
380        }
381      }
382      else getBufferFromClient(currentTimeLine) ;
383    }
384    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
385  }
386
387  CContextServer::~CContextServer()
388  {
389    map<int,CServerBuffer*>::iterator it;
390    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
391  }
392
393  void CContextServer::releaseBuffers()
394  {
395    map<int,CServerBuffer*>::iterator it;
396    bool out ;
397    do
398    {
399      out=true ;
400      for(it=buffers.begin();it!=buffers.end();++it)
401      {
402//        out = out && it->second->freeWindows() ;
403
404      }
405    } while (! out) ; 
406  }
407
408  void CContextServer::notifyClientsFinalize(void)
409  {
410    for(auto it=buffers.begin();it!=buffers.end();++it)
411    {
412      it->second->notifyClientFinalize() ;
413    }
414  }
415
416  void CContextServer::dispatchEvent(CEventServer& event)
417  {
418    string contextName;
419    string buff;
420    int MsgSize;
421    int rank;
422    list<CEventServer::SSubEvent>::iterator it;
423    StdString ctxId = context->getId();
424    CContext::setCurrent(ctxId);
425    StdSize totalBuf = 0;
426
427    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
428    {
429      finished=true;
430      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
431//      releaseBuffers() ;
432      notifyClientsFinalize() ;
433      context->finalize();
434
435/* don't know where release windows
436      MPI_Win_free(&windows[0]) ;
437      MPI_Win_free(&windows[1]) ;
438*/     
439      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
440                           iteMap = mapBufferSize_.end(), itMap;
441      for (itMap = itbMap; itMap != iteMap; ++itMap)
442      {
443        rank = itMap->first;
444        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
445            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
446        totalBuf += itMap->second;
447      }
448      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
449    }
450    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
451    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
452    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
453    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
454    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
455    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
456    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
457    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
458    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
459    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
460    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
461    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
462    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
463    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
464    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
465    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
466    else
467    {
468      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
469    }
470  }
471}
Note: See TracBrowser for help on using the repository browser.