source: XIOS3/trunk/src/transport/legacy_context_server.cpp @ 2528

Last change on this file since 2528 was 2528, checked in by jderouillat, 12 months ago

Fix intracommunicator probing for attached mode

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 17.1 KB
Line 
1#include "legacy_context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  CLegacyContextServer::CLegacyContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : CContextServer(parent, intraComm_, interComm_),
35      isProcessingEvent_(false)
36  {
37 
38    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
39 
40
41    currentTimeLine=1;
42    scheduled=false;
43    finished=false;
44
45    if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
46    else interCommMerged_ = interComm_; // interComm_ is yet an intracommunicator in attached
47    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
48   
49    itLastTimeLine=lastTimeLine.begin() ;
50
51    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
52    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
53     
54  }
55 
56  void CLegacyContextServer::setPendingEvent(void)
57  {
58    pendingEvent=true;
59  }
60
61  bool CLegacyContextServer::hasPendingEvent(void)
62  {
63    return pendingEvent;
64  }
65
66  bool CLegacyContextServer::hasFinished(void)
67  {
68    return finished;
69  }
70
71  bool CLegacyContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
72  {
73    CTimer::get("listen request").resume();
74    listen();
75    CTimer::get("listen request").suspend();
76    CTimer::get("check pending request").resume();
77    checkPendingRequest();
78    checkPendingProbe() ;
79    CTimer::get("check pending request").suspend();
80    CTimer::get("check event process").resume();
81    processEvents(enableEventsProcessing);
82    CTimer::get("check event process").suspend();
83    return finished;
84  }
85
86 void CLegacyContextServer::listen(void)
87  {
88    int rank;
89    int flag;
90    int count;
91    char * addr;
92    MPI_Status status;
93    MPI_Message message ;
94    map<int,CServerBuffer*>::iterator it;
95    bool okLoop;
96
97    traceOff();
98    MPI_Improbe(MPI_ANY_SOURCE, 20, interCommMerged_,&flag,&message, &status);
99    traceOn();
100    if (flag==true) listenPendingRequest(message, status) ;
101  }
102
103  bool CLegacyContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
104  {
105    int count;
106    char * addr;
107    map<int,CServerBuffer*>::iterator it;
108    int rank=status.MPI_SOURCE ;
109
110    it=buffers.find(rank);
111    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
112    {
113      MPI_Aint recvBuff[4] ;
114      MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status);
115      remoteHashId_ = recvBuff[0] ;
116      StdSize buffSize = recvBuff[1];
117      vector<MPI_Aint> winAdress(2) ;
118      winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;
119      mapBufferSize_.insert(std::make_pair(rank, buffSize));
120
121      // create windows dynamically for one-sided
122      if (!isAttachedModeEnabled())
123      { 
124        CTimer::get("create Windows").resume() ;
125        MPI_Comm interComm ;
126        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ;
127        MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ;
128        CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
129        MPI_Comm_free(&interComm) ;
130        windows_[rank].resize(2) ;
131        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
132        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
133        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);
134        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
135        CTimer::get("create Windows").suspend() ;
136        MPI_Barrier(winComm_[rank]) ;
137      }
138      else
139      {
140        winComm_[rank] = MPI_COMM_NULL ;
141        windows_[rank].resize(2) ;
142        windows_[rank][0] = MPI_WIN_NULL ;
143        windows_[rank][1] = MPI_WIN_NULL ;
144      }   
145
146      it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first;
147      lastTimeLine[rank]=0 ;
148      itLastTimeLine=lastTimeLine.begin() ;
149
150      return true;
151    }
152    else
153    {
154        std::pair<MPI_Message,MPI_Status> mypair(message,status) ;
155        pendingProbe[rank].push_back(mypair) ;
156        return false;
157    }
158  }
159
160  void CLegacyContextServer::checkPendingProbe(void)
161  {
162   
163    list<int> recvProbe ;
164    list<int>::iterator itRecv ;
165    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
166
167    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
168    {
169      int rank=itProbe->first ;
170      if (pendingRequest.count(rank)==0)
171      {
172        MPI_Message& message = itProbe->second.front().first ;
173        MPI_Status& status = itProbe->second.front().second ;
174        int count ;
175        MPI_Get_count(&status,MPI_CHAR,&count);
176        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
177        if ( (it->second->isBufferFree(count) && !it->second->isResizing()) // accept new request if buffer is free
178          || (it->second->isResizing() && it->second->isBufferEmpty()) )    // or if resizing wait for buffer is empty
179        {
180          char * addr;
181          addr=(char*)it->second->getBuffer(count);
182          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
183          bufferRequest[rank]=addr;
184          recvProbe.push_back(rank) ;
185          itProbe->second.pop_front() ;
186        }
187      }
188    }
189
190    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
191  }
192
193
194  void CLegacyContextServer::checkPendingRequest(void)
195  {
196    map<int,MPI_Request>::iterator it;
197    list<int> recvRequest;
198    list<int>::iterator itRecv;
199    int rank;
200    int flag;
201    int count;
202    MPI_Status status;
203   
204    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume();
205    else CTimer::get("receiving requests").suspend();
206
207    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
208    {
209      rank=it->first;
210      traceOff();
211      MPI_Test(& it->second, &flag, &status);
212      traceOn();
213      if (flag==true)
214      {
215        buffers[rank]->updateCurrentWindows() ;
216        recvRequest.push_back(rank);
217        MPI_Get_count(&status,MPI_CHAR,&count);
218        processRequest(rank,bufferRequest[rank],count);
219      }
220    }
221
222    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
223    {
224      pendingRequest.erase(*itRecv);
225      bufferRequest.erase(*itRecv);
226    }
227  }
228
229  void CLegacyContextServer::getBufferFromClient(size_t timeLine)
230  {
231    CTimer::get("CLegacyContextServer::getBufferFromClient").resume() ;
232    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
233    { 
234      int rank ;
235      char *buffer ;
236      size_t count ; 
237
238      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
239      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
240      {
241        rank=itLastTimeLine->first ;
242        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())
243        {
244          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);
245          if (count >= 0) ++itLastTimeLine ;
246          break ;
247        }
248      }
249    }
250    CTimer::get("CLegacyContextServer::getBufferFromClient").suspend() ;
251  }
252         
253       
254  void CLegacyContextServer::processRequest(int rank, char* buff,int count)
255  {
256
257    CBufferIn buffer(buff,count);
258    char* startBuffer,endBuffer;
259    int size, offset;
260    size_t timeLine=0;
261    map<size_t,CEventServer*>::iterator it;
262
263   
264    CTimer::get("Process request").resume();
265    while(count>0)
266    {
267      char* startBuffer=(char*)buffer.ptr();
268      CBufferIn newBuffer(startBuffer,buffer.remain());
269      newBuffer>>size>>timeLine;
270
271      if (timeLine==timelineEventNotifyChangeBufferSize)
272      {
273        buffers[rank]->notifyBufferResizing() ;
274        buffers[rank]->updateCurrentWindows() ;
275        buffers[rank]->popBuffer(count) ;
276        info(100)<<"Context id "<<context->getId()<<" : Receive NotifyChangeBufferSize from client rank "<<rank<<endl
277                 <<"isBufferEmpty ? "<<buffers[rank]->isBufferEmpty()<<"  remaining count : "<<buffers[rank]->getUsed()<<endl;
278      } 
279      else if (timeLine==timelineEventChangeBufferSize)
280      {
281        size_t newSize ;
282        vector<MPI_Aint> winAdress(2) ;
283        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ;
284        buffers[rank]->freeBuffer(count) ;
285        delete buffers[rank] ;
286        buffers[rank] = new CServerBuffer(windows_[rank], winAdress, 0, newSize) ;
287        info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank
288                 <<"  newSize : "<<newSize<<" Address : "<<winAdress[0]<<" & "<<winAdress[1]<<endl ;
289      }
290      else
291      {
292        info(100)<<"Context id "<<context->getId()<<" : Receive standard event from client rank "<<rank<<"  with timeLine : "<<timeLine<<endl ;
293        it=events.find(timeLine);
294       
295        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
296        it->second->push(rank,buffers[rank],startBuffer,size);
297        if (timeLine>0) lastTimeLine[rank]=timeLine ;
298      }
299      buffer.advance(size);
300      count=buffer.remain();
301    }
302   
303    CTimer::get("Process request").suspend();
304  }
305
306  void CLegacyContextServer::processEvents(bool enableEventsProcessing)
307  {
308    map<size_t,CEventServer*>::iterator it;
309    CEventServer* event;
310   
311//    if (context->isProcessingEvent()) return ;
312    if (isProcessingEvent_) return ;
313    if (isAttachedModeEnabled())
314      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;
315
316    it=events.find(currentTimeLine);
317    if (it!=events.end())
318    {
319      event=it->second;
320
321      if (event->isFull())
322      {
323        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
324        {
325          eventScheduler_->registerEvent(currentTimeLine,hashId);
326          info(100)<<"Context id "<<context->getId()<<"Schedule event : "<< currentTimeLine <<"  "<<hashId<<endl ;
327          scheduled=true;
328        }
329        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
330        {
331          if (!enableEventsProcessing && isCollectiveEvent(*event)) return ;
332
333          if (!eventScheduled_) 
334          {
335            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
336            eventScheduled_=true ;
337            return ;
338          }
339          else 
340          {
341            MPI_Status status ;
342            int flag ;
343            MPI_Test(&processEventRequest_, &flag, &status) ;
344            if (!flag) return ;
345            eventScheduled_=false ;
346          }
347         
348          if (CXios::checkEventSync)
349          {
350            int typeId, classId, typeId_in, classId_in;
351            long long timeLine_out;
352            long long timeLine_in( currentTimeLine );
353            typeId_in=event->type ;
354            classId_in=event->classId ;
355   //        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
356            MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
357            MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
358            MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
359            if (typeId/intraCommSize!=event->type || classId/intraCommSize!=event->classId || timeLine_out/intraCommSize!=currentTimeLine)
360            {
361               ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
362                  << "Event are not coherent between client for timeline = "<<currentTimeLine);
363            }
364          }
365
366          if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ;
367          //MPI_Barrier(intraComm) ;
368         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
369         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
370         // for now just set up a MPI barrier
371//ym to be check later
372//         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
373
374//         context->setProcessingEvent() ;
375         isProcessingEvent_=true ;
376         CTimer::get("Process events").resume();
377         info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
378         dispatchEvent(*event);
379         CTimer::get("Process events").suspend();
380         isProcessingEvent_=false ;
381//         context->unsetProcessingEvent() ;
382         pendingEvent=false;
383         delete event;
384         events.erase(it);
385         currentTimeLine++;
386         scheduled = false;
387         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;
388        }
389      }
390      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ;
391    }
392    else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
393  }
394
395  CLegacyContextServer::~CLegacyContextServer()
396  {
397    map<int,CServerBuffer*>::iterator it;
398    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
399    buffers.clear() ;
400  }
401
402  void CLegacyContextServer::releaseBuffers()
403  {
404    //for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
405    //buffers.clear() ;
406    freeWindows() ;
407  }
408
409  void CLegacyContextServer::freeWindows()
410  {
411    //if (!isAttachedModeEnabled())
412    //{
413    //  for(auto& it : winComm_)
414    //  {
415    //    int rank = it.first ;
416    //    MPI_Win_free(&windows_[rank][0]);
417    //    MPI_Win_free(&windows_[rank][1]);
418    //    MPI_Comm_free(&winComm_[rank]) ;
419    //  }
420    //}
421  }
422
423  void CLegacyContextServer::notifyClientsFinalize(void)
424  {
425    for(auto it=buffers.begin();it!=buffers.end();++it)
426    {
427      it->second->notifyClientFinalize() ;
428    }
429  }
430
431  void CLegacyContextServer::dispatchEvent(CEventServer& event)
432  {
433    string contextName;
434    string buff;
435    int MsgSize;
436    int rank;
437    list<CEventServer::SSubEvent>::iterator it;
438    StdString ctxId = context->getId();
439    CContext::setCurrent(ctxId);
440    StdSize totalBuf = 0;
441
442    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
443    {
444      finished=true;
445      info(20)<<" CLegacyContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
446      notifyClientsFinalize() ;
447      CTimer::get("receiving requests").suspend();
448      context->finalize();
449     
450      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
451                           iteMap = mapBufferSize_.end(), itMap;
452      for (itMap = itbMap; itMap != iteMap; ++itMap)
453      {
454        rank = itMap->first;
455        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
456            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
457        totalBuf += itMap->second;
458      }
459      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
460    }
461    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
462    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
463    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
464    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
465    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
466    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
467    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
468    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
469    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
470    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
471    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
472    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
473    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
474    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
475    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
476    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
477    else
478    {
479      ERROR("void CLegacyContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
480    }
481  }
482
483  bool CLegacyContextServer::isCollectiveEvent(CEventServer& event)
484  {
485    if (event.type>1000) return false ;
486    else return true ;
487  }
488}
Note: See TracBrowser for help on using the repository browser.