source: XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp @ 2123

Last change on this file since 2123 was 2123, checked in by ymipsl, 3 years ago

Possibly fix deadlock in attched mode
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.6 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22
23#include <boost/functional/hash.hpp>
24#include <random>
25#include <chrono>
26
27
28namespace xios
29{
30  using namespace std ;
31
32  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
33    : eventScheduler_(nullptr), isProcessingEvent_(false), associatedClient_(nullptr)
34  {
35    context=parent;
36    intraComm=intraComm_;
37    MPI_Comm_size(intraComm,&intraCommSize);
38    MPI_Comm_rank(intraComm,&intraCommRank);
39
40    interComm=interComm_;
41    int flag;
42    MPI_Comm_test_inter(interComm,&flag);
43
44    if (flag) attachedMode=false ;
45    else  attachedMode=true ;
46   
47    if (flag) MPI_Comm_remote_size(interComm,&commSize);
48    else  MPI_Comm_size(interComm,&commSize);
49
50   
51    SRegisterContextInfo contextInfo ;
52    CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ;
53
54  //  if (contextInfo.serviceType != CServicesManager::CLIENT) // we must have an event scheduler => to be retrieve from the associated services
55  //  {
56      //if (!isAttachedModeEnabled()) eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
57      eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
58
59  //  }
60
61
62    currentTimeLine=1;
63    scheduled=false;
64    finished=false;
65
66    // generate unique hash for server
67    auto time=chrono::system_clock::now().time_since_epoch().count() ;
68    std::default_random_engine rd(time); // not reproducible from a run to another
69    std::uniform_int_distribution<size_t> dist;
70    hashId=dist(rd) ;
71    MPI_Bcast(&hashId,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context
72
73
74    if (!isAttachedModeEnabled())
75    {
76      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ;
77// create windows for one sided comm
78      int interCommMergedRank;
79      MPI_Comm winComm ;
80      MPI_Comm_rank(intraComm, &interCommMergedRank);
81      windows.resize(2) ;
82      for(int rank=commSize; rank<commSize+intraCommSize; rank++)
83      {
84        if (rank==commSize+interCommMergedRank) 
85        {
86          MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm);
87          int myRank ;
88          MPI_Comm_rank(winComm,&myRank);
89          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]);
90          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);     
91        }
92        else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm);
93        MPI_Comm_free(&winComm) ;
94      }
95    }
96    else 
97    {
98      windows.resize(2) ;
99      windows[0]=MPI_WIN_NULL ;
100      windows[1]=MPI_WIN_NULL ;
101    }
102
103
104   
105    MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ;
106    itLastTimeLine=lastTimeLine.begin() ;
107
108    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
109    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
110     
111  }
112
113//! Attached mode is used ?
114//! \return true if attached mode is used, false otherwise
115  bool CContextServer::isAttachedModeEnabled() const
116  {
117    return attachedMode ;
118  }
119 
120  void CContextServer::setPendingEvent(void)
121  {
122    pendingEvent=true;
123  }
124
125  bool CContextServer::hasPendingEvent(void)
126  {
127    return pendingEvent;
128  }
129
130  bool CContextServer::hasFinished(void)
131  {
132    return finished;
133  }
134
135  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
136  {
137    listen();
138    checkPendingRequest();
139    if (enableEventsProcessing)  processEvents();
140    return finished;
141  }
142
143  void CContextServer::listen(void)
144  {
145    int rank;
146    int flag;
147    int count;
148    char * addr;
149    MPI_Status status;
150    map<int,CServerBuffer*>::iterator it;
151    bool okLoop;
152
153    traceOff();
154    MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status);
155    traceOn();
156
157    if (flag==true)
158    {
159      rank=status.MPI_SOURCE ;
160      okLoop = true;
161      if (pendingRequest.find(rank)==pendingRequest.end())
162        okLoop = !listenPendingRequest(status) ;
163      if (okLoop)
164      {
165        for(rank=0;rank<commSize;rank++)
166        {
167          if (pendingRequest.find(rank)==pendingRequest.end())
168          {
169
170            traceOff();
171            MPI_Iprobe(rank, 20,interComm,&flag,&status);
172            traceOn();
173            if (flag==true) listenPendingRequest(status) ;
174          }
175        }
176      }
177    }
178  }
179
180  bool CContextServer::listenPendingRequest(MPI_Status& status)
181  {
182    int count;
183    char * addr;
184    map<int,CServerBuffer*>::iterator it;
185    int rank=status.MPI_SOURCE ;
186
187    it=buffers.find(rank);
188    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
189    {
190       MPI_Aint recvBuff[3] ;
191       MPI_Recv(recvBuff, 3, MPI_AINT, rank, 20, interComm, &status);
192       StdSize buffSize = recvBuff[0];
193       vector<MPI_Aint> winAdress(2) ;
194       winAdress[0]=recvBuff[1] ; winAdress[1]=recvBuff[2] ;
195       mapBufferSize_.insert(std::make_pair(rank, buffSize));
196       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first;
197     
198       lastTimeLine[rank]=0 ;
199       itLastTimeLine=lastTimeLine.begin() ;
200
201       return true;
202    }
203    else
204    {
205      MPI_Get_count(&status,MPI_CHAR,&count);
206      if (it->second->isBufferFree(count))
207      {
208         addr=(char*)it->second->getBuffer(count);
209         MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]);
210         bufferRequest[rank]=addr;
211         return true;
212       }
213      else
214        return false;
215    }
216  }
217
218
219  void CContextServer::checkPendingRequest(void)
220  {
221    map<int,MPI_Request>::iterator it;
222    list<int> recvRequest;
223    list<int>::iterator itRecv;
224    int rank;
225    int flag;
226    int count;
227    MPI_Status status;
228
229    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
230    {
231      rank=it->first;
232      traceOff();
233      MPI_Test(& it->second, &flag, &status);
234      traceOn();
235      if (flag==true)
236      {
237        buffers[rank]->updateCurrentWindows() ;
238        recvRequest.push_back(rank);
239        MPI_Get_count(&status,MPI_CHAR,&count);
240        processRequest(rank,bufferRequest[rank],count);
241      }
242    }
243
244    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
245    {
246      pendingRequest.erase(*itRecv);
247      bufferRequest.erase(*itRecv);
248    }
249  }
250
251  void CContextServer::getBufferFromClient(size_t timeLine)
252  {
253    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
254    { 
255      int rank ;
256      char *buffer ;
257      size_t count ; 
258
259      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
260      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
261      {
262        rank=itLastTimeLine->first ;
263        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0)
264        {
265          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count))
266          {
267            processRequest(rank, buffer, count);
268            break ;
269          }
270        }
271      }
272    }
273  }
274         
275       
276  void CContextServer::processRequest(int rank, char* buff,int count)
277  {
278
279    CBufferIn buffer(buff,count);
280    char* startBuffer,endBuffer;
281    int size, offset;
282    size_t timeLine=0;
283    map<size_t,CEventServer*>::iterator it;
284
285   
286    CTimer::get("Process request").resume();
287    while(count>0)
288    {
289      char* startBuffer=(char*)buffer.ptr();
290      CBufferIn newBuffer(startBuffer,buffer.remain());
291      newBuffer>>size>>timeLine;
292      it=events.find(timeLine);
293      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
294      it->second->push(rank,buffers[rank],startBuffer,size);
295
296      buffer.advance(size);
297      count=buffer.remain();
298    }
299
300    if (timeLine>0) lastTimeLine[rank]=timeLine ;
301   
302    CTimer::get("Process request").suspend();
303  }
304
305  void CContextServer::processEvents(void)
306  {
307    map<size_t,CEventServer*>::iterator it;
308    CEventServer* event;
309   
310//    if (context->isProcessingEvent()) return ;
311    if (isProcessingEvent_) return ;
312
313    it=events.find(currentTimeLine);
314    if (it!=events.end())
315    {
316      event=it->second;
317
318      if (event->isFull())
319      {
320        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
321        {
322          eventScheduler_->registerEvent(currentTimeLine,hashId);
323          scheduled=true;
324        }
325        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
326        {
327          MPI_Request req ;
328          MPI_Status status ;
329
330          MPI_Ibarrier(intraComm,&req) ;
331          int flag=false ;
332          do 
333          {
334            eventScheduler_->checkEvent()  ;
335            MPI_Test(&req,&flag,&status) ;
336          } while (!flag) ;
337
338          //MPI_Barrier(intraComm) ;
339         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
340         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
341         // for now just set up a MPI barrier
342//ym to be check later
343//         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
344
345//         context->setProcessingEvent() ;
346         isProcessingEvent_=true ;
347         CTimer::get("Process events").resume();
348         info(100)<<"Received Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
349         dispatchEvent(*event);
350         CTimer::get("Process events").suspend();
351         isProcessingEvent_=false ;
352//         context->unsetProcessingEvent() ;
353         pendingEvent=false;
354         delete event;
355         events.erase(it);
356         currentTimeLine++;
357         scheduled = false;
358        }
359      }
360      else getBufferFromClient(currentTimeLine) ;
361    }
362    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
363  }
364
365  CContextServer::~CContextServer()
366  {
367    map<int,CServerBuffer*>::iterator it;
368    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
369  }
370
371  void CContextServer::releaseBuffers()
372  {
373    map<int,CServerBuffer*>::iterator it;
374    bool out ;
375    do
376    {
377      out=true ;
378      for(it=buffers.begin();it!=buffers.end();++it)
379      {
380//        out = out && it->second->freeWindows() ;
381
382      }
383    } while (! out) ; 
384  }
385
386  void CContextServer::notifyClientsFinalize(void)
387  {
388    for(auto it=buffers.begin();it!=buffers.end();++it)
389    {
390      it->second->notifyClientFinalize() ;
391    }
392  }
393
394  void CContextServer::dispatchEvent(CEventServer& event)
395  {
396    string contextName;
397    string buff;
398    int MsgSize;
399    int rank;
400    list<CEventServer::SSubEvent>::iterator it;
401    StdString ctxId = context->getId();
402    CContext::setCurrent(ctxId);
403    StdSize totalBuf = 0;
404
405    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
406    {
407      finished=true;
408      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
409//      releaseBuffers() ;
410      notifyClientsFinalize() ;
411      context->finalize();
412
413/* don't know where release windows
414      MPI_Win_free(&windows[0]) ;
415      MPI_Win_free(&windows[1]) ;
416*/     
417      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
418                           iteMap = mapBufferSize_.end(), itMap;
419      for (itMap = itbMap; itMap != iteMap; ++itMap)
420      {
421        rank = itMap->first;
422        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
423            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
424        totalBuf += itMap->second;
425      }
426      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
427    }
428    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
429    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
430    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
431    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
432    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
433    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
434    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
435    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
436    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
437    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
438    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
439    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
440    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
441    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
442    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
443    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
444    else
445    {
446      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
447    }
448  }
449}
Note: See TracBrowser for help on using the repository browser.