source: XIOS/dev/dev_ym/XIOS_ASSIM2K/src/context_server.cpp @ 2015

Last change on this file since 2015 was 2015, checked in by ymipsl, 3 years ago

ASSIM2K Branch :
Improve transfer protocol using mpi matching Probe / matching receive
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 11.2 KB
RevLine 
[300]1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
[352]5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
[300]8#include "domain.hpp"
[352]9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[401]15#include "cxios.hpp"
[492]16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include <boost/functional/hash.hpp>
[300]19
20
21
[335]22namespace xios
[300]23{
24
[1158]25  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
[300]26  {
[549]27    context=parent;
28    intraComm=intraComm_;
29    MPI_Comm_size(intraComm,&intraCommSize);
30    MPI_Comm_rank(intraComm,&intraCommRank);
[1054]31
[549]32    interComm=interComm_;
33    int flag;
34    MPI_Comm_test_inter(interComm,&flag);
[300]35    if (flag) MPI_Comm_remote_size(interComm,&commSize);
[549]36    else  MPI_Comm_size(interComm,&commSize);
[983]37
[549]38    currentTimeLine=0;
39    scheduled=false;
40    finished=false;
41    boost::hash<string> hashString;
[1130]42    if (CServer::serverLevel == 1)
43      hashId=hashString(context->getId() + boost::lexical_cast<string>(context->clientPrimServer.size()));
44    else
45      hashId=hashString(context->getId());
[300]46  }
[992]47
[300]48  void CContextServer::setPendingEvent(void)
49  {
[549]50    pendingEvent=true;
[300]51  }
[489]52
[300]53  bool CContextServer::hasPendingEvent(void)
54  {
[549]55    return pendingEvent;
[300]56  }
[489]57
[597]58  bool CContextServer::hasFinished(void)
59  {
60    return finished;
61  }
62
[1054]63  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
[300]64  {
[549]65    listen();
[2015]66    checkPendingProbe();
[549]67    checkPendingRequest();
[1054]68    if (enableEventsProcessing)
69      processEvents();
[549]70    return finished;
[300]71  }
[2015]72/*
[300]73  void CContextServer::listen(void)
74  {
75    int rank;
[549]76    int flag;
77    int count;
78    char * addr;
[489]79    MPI_Status status;
[300]80    map<int,CServerBuffer*>::iterator it;
[1230]81    bool okLoop;
[489]82
[1225]83    traceOff();
84    MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status);
85    traceOn();
86
87    if (flag==true)
[300]88    {
[1225]89      rank=status.MPI_SOURCE ;
[1230]90      okLoop = true;
[1228]91      if (pendingRequest.find(rank)==pendingRequest.end())
92        okLoop = !listenPendingRequest(status) ;
93      if (okLoop)
[300]94      {
[1225]95        for(rank=0;rank<commSize;rank++)
[300]96        {
[1225]97          if (pendingRequest.find(rank)==pendingRequest.end())
[300]98          {
[1225]99
100            traceOff();
101            MPI_Iprobe(rank, 20,interComm,&flag,&status);
102            traceOn();
103            if (flag==true) listenPendingRequest(status) ;
[300]104          }
105        }
106      }
107    }
108  }
[489]109
[1228]110  bool CContextServer::listenPendingRequest(MPI_Status& status)
[1225]111  {
112    int count;
113    char * addr;
114    map<int,CServerBuffer*>::iterator it;
115    int rank=status.MPI_SOURCE ;
116
117    it=buffers.find(rank);
118    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
119    {
120       StdSize buffSize = 0;
121       MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status);
122       mapBufferSize_.insert(std::make_pair(rank, buffSize));
123       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first;
[1228]124       return true;
[1225]125    }
126    else
127    {
128      MPI_Get_count(&status,MPI_CHAR,&count);
129      if (it->second->isBufferFree(count))
130      {
131         addr=(char*)it->second->getBuffer(count);
132         MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]);
133         bufferRequest[rank]=addr;
[1228]134         return true;
[1225]135       }
[1228]136      else
137        return false;
[1225]138    }
139  }
140
[2015]141*/
[1225]142
[2015]143  void CContextServer::listen(void)
144  {
145    int rank;
146    int flag;
147    int count;
148    char * addr;
149    MPI_Status status;
150    MPI_Message message ;
151    map<int,CServerBuffer*>::iterator it;
152    bool okLoop;
153
154    traceOff();
155    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status);
156    traceOn();
157    if (flag==true) listenPendingRequest(message, status) ;
158  }
159
160  bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
161  {
162    int count;
163    char * addr;
164    map<int,CServerBuffer*>::iterator it;
165    int rank=status.MPI_SOURCE ;
166
167    it=buffers.find(rank);
168    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
169    {
170       StdSize buffSize = 0;
171       MPI_Mrecv(&buffSize, 1, MPI_LONG, &message, &status);
172       mapBufferSize_.insert(std::make_pair(rank, buffSize));
173       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first;
174       return true;
175    }
176    else
177    {
178//      MPI_Get_count(&status,MPI_CHAR,&count);
179//      if (it->second->isBufferFree(count))
180//      {
181//        addr=(char*)it->second->getBuffer(count);
182//         MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
183//         bufferRequest[rank]=addr;
184//         return true;
185//       }
186//      else
187//      {
188        pendingProbe[rank].push_back(make_pair<MPI_Message,MPI_Status>(message,status)) ;
189        return false;
190//      }
191    }
192  }
193
194  void CContextServer::checkPendingProbe(void)
195  {
196   
197    list<int> recvProbe ;
198    list<int>::iterator itRecv ;
199    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
200
201    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
202    {
203      int rank=itProbe->first ;
204      if (pendingRequest.count(rank)==0)
205      {
206        MPI_Message& message = itProbe->second.front().first ;
207        MPI_Status& status = itProbe->second.front().second ;
208        int count ;
209        MPI_Get_count(&status,MPI_CHAR,&count);
210        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
211        if (it->second->isBufferFree(count))
212        {
213          char * addr;
214          addr=(char*)it->second->getBuffer(count);
215          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
216          bufferRequest[rank]=addr;
217          recvProbe.push_back(rank) ;
218          itProbe->second.pop_front() ;
219        }
220      }
221    }
222
223    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
224  }
225
[300]226  void CContextServer::checkPendingRequest(void)
227  {
228    map<int,MPI_Request>::iterator it;
[549]229    list<int> recvRequest;
[300]230    list<int>::iterator itRecv;
[549]231    int rank;
232    int flag;
233    int count;
234    MPI_Status status;
[489]235
[300]236    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
237    {
[549]238      rank=it->first;
239      traceOff();
240      MPI_Test(& it->second, &flag, &status);
241      traceOn();
[300]242      if (flag==true)
243      {
[549]244        recvRequest.push_back(rank);
245        MPI_Get_count(&status,MPI_CHAR,&count);
246        processRequest(rank,bufferRequest[rank],count);
[300]247      }
248    }
[489]249
250    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
[300]251    {
[549]252      pendingRequest.erase(*itRecv);
253      bufferRequest.erase(*itRecv);
[300]254    }
255  }
[489]256
[300]257  void CContextServer::processRequest(int rank, char* buff,int count)
258  {
[489]259
[549]260    CBufferIn buffer(buff,count);
261    char* startBuffer,endBuffer;
262    int size, offset;
263    size_t timeLine;
264    map<size_t,CEventServer*>::iterator it;
[489]265
[1225]266    CTimer::get("Process request").resume();
[300]267    while(count>0)
268    {
[549]269      char* startBuffer=(char*)buffer.ptr();
270      CBufferIn newBuffer(startBuffer,buffer.remain());
271      newBuffer>>size>>timeLine;
[300]272
[549]273      it=events.find(timeLine);
274      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first;
275      it->second->push(rank,buffers[rank],startBuffer,size);
[300]276
[549]277      buffer.advance(size);
278      count=buffer.remain();
[489]279    }
[1225]280    CTimer::get("Process request").suspend();
[300]281  }
[489]282
[300]283  void CContextServer::processEvents(void)
284  {
[549]285    map<size_t,CEventServer*>::iterator it;
286    CEventServer* event;
[489]287
[549]288    it=events.find(currentTimeLine);
[489]289    if (it!=events.end())
[300]290    {
[549]291      event=it->second;
[509]292
[300]293      if (event->isFull())
294      {
[597]295        if (!scheduled && CServer::eventScheduler) // Skip event scheduling for attached mode and reception on client side
[492]296        {
[549]297          CServer::eventScheduler->registerEvent(currentTimeLine,hashId);
298          scheduled=true;
[492]299        }
[597]300        else if (!CServer::eventScheduler || CServer::eventScheduler->queryEvent(currentTimeLine,hashId) )
[492]301        {
[851]302         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
303         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
304         // for now just set up a MPI barrier
[1130]305         if (!CServer::eventScheduler && CXios::isServer) MPI_Barrier(intraComm) ;
[851]306
[549]307         CTimer::get("Process events").resume();
308         dispatchEvent(*event);
309         CTimer::get("Process events").suspend();
310         pendingEvent=false;
311         delete event;
312         events.erase(it);
313         currentTimeLine++;
314         scheduled = false;
[492]315        }
316      }
317    }
318  }
[489]319
[300]320  CContextServer::~CContextServer()
321  {
[549]322    map<int,CServerBuffer*>::iterator it;
[1158]323    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
[489]324  }
[300]325
326  void CContextServer::dispatchEvent(CEventServer& event)
327  {
[549]328    string contextName;
329    string buff;
330    int MsgSize;
331    int rank;
332    list<CEventServer::SSubEvent>::iterator it;
[1054]333    StdString ctxId = context->getId();
334    CContext::setCurrent(ctxId);
[1130]335    StdSize totalBuf = 0;
[489]336
[300]337    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
338    {
[597]339      finished=true;
[1194]340      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
341      context->finalize();
[511]342      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
[983]343                           iteMap = mapBufferSize_.end(), itMap;
[511]344      for (itMap = itbMap; itMap != iteMap; ++itMap)
345      {
[1054]346        rank = itMap->first;
[1130]347        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
348            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
349        totalBuf += itMap->second;
[511]350      }
[1130]351      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
[300]352    }
[549]353    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
354    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
355    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
356    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
357    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
358    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
359    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
[887]360    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
361    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
[549]362    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
363    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
364    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
365    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
366    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
367    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
368    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
[300]369    else
370    {
[549]371      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
[300]372    }
373  }
374}
Note: See TracBrowser for help on using the repository browser.