source: XIOS3/trunk/src/transport/one_sided_context_server.cpp @ 2407

Last change on this file since 2407 was 2407, checked in by ymipsl, 21 months ago

Implement separate "reader" and "writer" service. Default reader live on same ressources that "writer" or "gatherer" services.

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.8 KB
Line 
1#include "one_sided_context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  COneSidedContextServer::COneSidedContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
34                         : CContextServer(parent, intraComm_, interComm_), 
35                           isProcessingEvent_(false)
36  {
37   
38    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
39 
40    currentTimeLine=1;
41    scheduled=false;
42    finished=false;
43
44    if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
45    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
46   
47    itLastTimeLine=lastTimeLine.begin() ;
48
49    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
50    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
51     
52  }
53
54  void COneSidedContextServer::setPendingEvent(void)
55  {
56    pendingEvent=true;
57  }
58
59  bool COneSidedContextServer::hasPendingEvent(void)
60  {
61    return pendingEvent;
62  }
63
64  bool COneSidedContextServer::hasFinished(void)
65  {
66    return finished;
67  }
68
69  bool COneSidedContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
70  {
71    CTimer::get("listen request").resume();
72    listen();
73    CTimer::get("listen request").suspend();
74
75    CTimer::get("listen pending request").resume();
76    listenPendingRequest() ;
77    CTimer::get("listen pending request").suspend();
78
79    CTimer::get("check server Buffers").resume();
80    checkBuffers() ;
81    CTimer::get("check server Buffers").suspend();
82
83    CTimer::get("check event process").resume();
84    processEvents(enableEventsProcessing);
85    CTimer::get("check event process").suspend();
86    return finished;
87
88  }
89
90 void COneSidedContextServer::listen(void)
91  {
92    int rank;
93    int flag;
94    MPI_Status status;
95    flag=true ;
96
97    while(flag)
98    {
99      traceOff();
100      MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm, &flag, &status);
101      traceOn();
102      if (flag==true)
103      {
104        requests_.push_back(CRequest(interComm, status)) ;
105        if (requests_.back().test()) 
106        {
107          processRequest(requests_.back()) ;
108          requests_.pop_back() ;
109        }
110      }
111    }
112  }
113
114  void COneSidedContextServer::listenPendingRequest(void)
115  {
116    auto it = requests_.begin() ;
117    while (it != requests_.end())
118    {
119      if (it->test())
120      {
121        processRequest(*it) ;
122        auto it2=it ;
123        ++it ;
124        requests_.erase(it2) ;
125      }
126      else ++it ;
127    }
128  }
129
130  void COneSidedContextServer::processRequest(CRequest& request)
131  {
132    int rank = request.getRank() ;
133    auto it=buffers_.find(rank);
134    if (it==buffers_.end())
135    {
136      buffers_[rank] = new COneSidedServerBuffer(rank, commSelf_, interCommMerged_, pendingEvents_, completedEvents_, request.getBuffer()) ;
137    }
138    else it->second->receivedRequest(request.getBuffer()) ;
139  }
140
141  void COneSidedContextServer::checkBuffers(void)
142  {
143    if (!pendingEvents_.empty())
144    {
145/*
146      SPendingEvent& nextEvent = pendingEvents_.begin()->second ;
147      for(auto& buffer : nextEvent.buffers ) buffer->eventLoop() ;
148      if (nextEvent.nbSenders==0) pendingEvents_.erase(pendingEvents_.begin()) ;
149*/
150      for(auto it=pendingEvents_.begin() ;  it!=pendingEvents_.end() ;)
151      {
152        SPendingEvent& nextEvent = it->second ;
153        for(auto& buffer : nextEvent.buffers ) buffer->eventLoop() ;
154        if (nextEvent.nbSenders==0) it=pendingEvents_.erase(it) ;
155        else ++it ;
156      }
157    }
158  }
159
160
161  void COneSidedContextServer::processEvents(bool enableEventsProcessing)
162  {
163 
164    if (isProcessingEvent_) return ;
165    if (isAttachedModeEnabled())
166      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;
167
168    auto it=completedEvents_.find(currentTimeLine);
169
170    if (it!=completedEvents_.end())
171    {
172      if (it->second.nbSenders == it->second.currentNbSenders)
173      {
174        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
175        {
176          eventScheduler_->registerEvent(currentTimeLine,hashId);
177          scheduled=true;
178        }
179        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
180        {
181          //if (!enableEventsProcessing && isCollectiveEvent(event)) return ;
182
183          if (!eventScheduled_) 
184          {
185            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
186            eventScheduled_=true ;
187            return ;
188          }
189          else 
190          {
191            MPI_Status status ;
192            int flag ;
193            MPI_Test(&processEventRequest_, &flag, &status) ;
194            if (!flag) return ;
195            eventScheduled_=false ;
196          }
197
198          if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ;
199
200          isProcessingEvent_=true ;
201          CEventServer event(this) ;
202          for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine, event) ;
203//          MPI_Barrier(intraComm) ;
204          CTimer::get("Process events").resume();
205          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event.classId<<" of type "<<event.type<<endl ;
206          dispatchEvent(event);
207          CTimer::get("Process events").suspend();
208          isProcessingEvent_=false ;
209//         context->unsetProcessingEvent() ;
210          pendingEvent=false;
211          completedEvents_.erase(it);
212          currentTimeLine++;
213          scheduled = false;
214          if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;
215        }
216      }
217    }
218  }
219
220  COneSidedContextServer::~COneSidedContextServer()
221  {
222    for(auto& buffer : buffers_) delete buffer.second;
223    buffers_.clear() ;
224  }
225
226  void COneSidedContextServer::releaseBuffers()
227  {
228    //for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
229    //buffers.clear() ;
230    freeWindows() ;
231  }
232
233  void COneSidedContextServer::freeWindows()
234  {
235    //if (!isAttachedModeEnabled())
236    //{
237    //  for(auto& it : winComm_)
238    //  {
239    //    int rank = it.first ;
240    //    MPI_Win_free(&windows_[rank][0]);
241    //    MPI_Win_free(&windows_[rank][1]);
242    //    MPI_Comm_free(&winComm_[rank]) ;
243    //  }
244    //}
245  }
246
247  void COneSidedContextServer::notifyClientsFinalize(void)
248  {
249    for(auto it=buffers_.begin();it!=buffers_.end();++it)
250    {
251      it->second->notifyClientFinalize() ;
252    }
253  }
254
255  void COneSidedContextServer::dispatchEvent(CEventServer& event)
256  {
257    string contextName;
258    string buff;
259    int MsgSize;
260    int rank;
261    list<CEventServer::SSubEvent>::iterator it;
262    StdString ctxId = context->getId();
263    CContext::setCurrent(ctxId);
264    StdSize totalBuf = 0;
265
266    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
267    {
268      finished=true;
269      info(20)<<" COneSidedContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
270      notifyClientsFinalize() ;
271      CTimer::get("receiving requests").suspend();
272      context->finalize();
273     
274      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
275                           iteMap = mapBufferSize_.end(), itMap;
276      for (itMap = itbMap; itMap != iteMap; ++itMap)
277      {
278        rank = itMap->first;
279        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
280            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
281        totalBuf += itMap->second;
282      }
283      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
284    }
285    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
286    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
287    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
288    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
289    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
290    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
291    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
292    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
293    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
294    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
295    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
296    else if (event.classId==CField::GetType()) 
297    {
298      if (event.type==CField::EVENT_ID_UPDATE_DATA) CField::dispatchEvent(event);
299      else CField::dispatchEvent(event);
300    }
301    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
302    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
303    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
304    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
305    else
306    {
307      ERROR("void COneSidedContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
308    }
309  }
310
311  bool COneSidedContextServer::isCollectiveEvent(CEventServer& event)
312  {
313    if (event.type>1000) return false ;
314    else return true ;
315  }
316}
Note: See TracBrowser for help on using the repository browser.