source: XIOS3/trunk/src/transport/p2p_context_server.cpp @ 2559

Last change on this file since 2559 was 2559, checked in by jderouillat, 13 months ago

Forced the request processing, on servers, to start with the buffer creation in P2P

  • Property svn:executable set to *
File size: 10.1 KB
Line 
1#include "p2p_context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  CP2pContextServer::CP2pContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
34                         : CContextServer(parent, intraComm_, interComm_), 
35                           isProcessingEvent_(false)
36  {
37   
38    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
39 
40    currentTimeLine=1;
41    scheduled=false;
42    finished=false;
43
44    MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
45    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
46   
47    itLastTimeLine=lastTimeLine.begin() ;
48
49    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
50     
51  }
52
53  void CP2pContextServer::setPendingEvent(void)
54  {
55    pendingEvent=true;
56  }
57
58  bool CP2pContextServer::hasPendingEvent(void)
59  {
60    return pendingEvent;
61  }
62
63  bool CP2pContextServer::hasFinished(void)
64  {
65    return finished;
66  }
67
68  bool CP2pContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
69  {
70    CTimer::get("listen request").resume();
71    listen();
72    CTimer::get("listen request").suspend();
73
74    CTimer::get("listen pending request").resume();
75    listenPendingRequest() ;
76    CTimer::get("listen pending request").suspend();
77
78    CTimer::get("check server Buffers").resume();
79    checkBuffers() ;
80    CTimer::get("check server Buffers").suspend();
81
82    CTimer::get("check event process").resume();
83    processEvents(enableEventsProcessing);
84    CTimer::get("check event process").suspend();
85    return finished;
86
87  }
88
89 void CP2pContextServer::listen(void)
90  {
91    int rank;
92    int flag;
93    MPI_Status status;
94    flag=true ;
95
96    while(flag)
97    {
98      traceOff();
99      MPI_Iprobe(MPI_ANY_SOURCE, 20,interCommMerged_, &flag, &status);
100      traceOn();
101      if (flag==true)
102      {
103        requests_.push_back(new CRequest(interCommMerged_, status)) ;
104        if (requests_.back()->test()) 
105        {
106          if ( processRequest( *(requests_.back()) ) )
107          {
108            delete requests_.back();
109            requests_.pop_back() ;
110          }
111        }
112      }
113    }
114  }
115
116  void CP2pContextServer::listenPendingRequest(void)
117  {
118    auto it = requests_.begin() ;
119    while (it != requests_.end())
120    {
121      if ((*it)->test())
122      {
123        if (processRequest(*(*it)))
124        {
125          delete (*it);
126          auto it2=it ;
127          ++it ;
128          requests_.erase(it2) ;
129        }
130      }
131      else ++it ;
132    }
133  }
134
135  bool CP2pContextServer::processRequest(CRequest& request)
136  {
137    int rank = request.getRank() ;
138    auto it=buffers_.find(rank);
139    // getCount(new CP2pServerBuffer) = sizeof(MPI_AINT)
140    // getCount(RESIZE) : size_t timeline, size_t size + size_t EVENT_BUFFER_RESIZE = 24
141    // getCount(HEADER) : size_t timeline, int nbSenders, int nbBlocs
142    //                     + nbBlocs * (sizeof(MPI_Aint) addr + int count + int window) = 16 + nbBlocs * 16
143    if ((it==buffers_.end())&&(request.getCount() < 3*sizeof(size_t)))
144    {
145      buffers_[rank] = new CP2pServerBuffer(rank, commSelf_, interCommMerged_, pendingEvents_, completedEvents_, request.getBuffer()) ;
146      return true;
147    }
148    else if (it!=buffers_.end()) {
149      it->second->receivedRequest(request.getBuffer()) ;
150      return true;
151    }
152    else
153      return false;
154  }
155
156  void CP2pContextServer::checkBuffers(void)
157  {
158    if (!pendingEvents_.empty())
159    {
160/*
161      SPendingEvent& nextEvent = pendingEvents_.begin()->second ;
162      for(auto& buffer : nextEvent.buffers ) buffer->eventLoop() ;
163      if (nextEvent.nbSenders==0) pendingEvents_.erase(pendingEvents_.begin()) ;
164*/
165      for(auto it=pendingEvents_.begin() ;  it!=pendingEvents_.end() ;)
166      {
167        SPendingEvent& nextEvent = it->second ;
168        for(auto& buffer : nextEvent.buffers ) buffer->eventLoop() ;
169        if (nextEvent.nbSenders==0) it=pendingEvents_.erase(it) ;
170        else ++it ;
171      }
172    }
173  }
174
175
176  void CP2pContextServer::processEvents(bool enableEventsProcessing)
177  {
178 
179    if (isProcessingEvent_) return ;
180
181    auto it=completedEvents_.find(currentTimeLine);
182
183    if (it!=completedEvents_.end())
184    {
185      if (it->second.nbSenders == it->second.currentNbSenders)
186      {
187        if (!scheduled) 
188        {
189          eventScheduler_->registerEvent(currentTimeLine,hashId);
190          scheduled=true;
191        }
192        else if (eventScheduler_->queryEvent(currentTimeLine,hashId) )
193        {
194          //if (!enableEventsProcessing && isCollectiveEvent(event)) return ;
195
196          if (!eventScheduled_) 
197          {
198            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
199            eventScheduled_=true ;
200            return ;
201          }
202          else 
203          {
204            MPI_Status status ;
205            int flag ;
206            MPI_Test(&processEventRequest_, &flag, &status) ;
207            if (!flag) return ;
208            eventScheduled_=false ;
209          }
210
211          eventScheduler_->popEvent() ;
212
213          isProcessingEvent_=true ;
214          CEventServer event(this) ;
215          for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine, event) ;
216//          MPI_Barrier(intraComm) ;
217          CTimer::get("Process events").resume();
218          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event.classId<<" of type "<<event.type<<endl ;
219          dispatchEvent(event);
220          CTimer::get("Process events").suspend();
221          isProcessingEvent_=false ;
222//         context->unsetProcessingEvent() ;
223          pendingEvent=false;
224          completedEvents_.erase(it);
225          currentTimeLine++;
226          scheduled = false;
227        }
228      }
229    }
230  }
231
232  CP2pContextServer::~CP2pContextServer()
233  {
234    for(auto& buffer : buffers_) delete buffer.second;
235    buffers_.clear() ;
236  }
237
238  void CP2pContextServer::releaseBuffers()
239  {
240    //for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
241    //buffers.clear() ;
242    freeWindows() ;
243  }
244
245  void CP2pContextServer::freeWindows()
246  {
247    //  for(auto& it : winComm_)
248    //  {
249    //    int rank = it.first ;
250    //    MPI_Win_free(&windows_[rank][0]);
251    //    MPI_Win_free(&windows_[rank][1]);
252    //    MPI_Comm_free(&winComm_[rank]) ;
253    //  }
254  }
255
256  void CP2pContextServer::notifyClientsFinalize(void)
257  {
258    for(auto it=buffers_.begin();it!=buffers_.end();++it)
259    {
260      it->second->notifyClientFinalize() ;
261    }
262  }
263
264  void CP2pContextServer::dispatchEvent(CEventServer& event)
265  {
266    string contextName;
267    string buff;
268    int MsgSize;
269    int rank;
270    list<CEventServer::SSubEvent>::iterator it;
271    StdString ctxId = context->getId();
272    CContext::setCurrent(ctxId);
273    StdSize totalBuf = 0;
274
275    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
276    {
277      finished=true;
278      info(20)<<" CP2pContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
279      notifyClientsFinalize() ;
280      CTimer::get("receiving requests").suspend();
281      context->finalize();
282     
283      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
284                           iteMap = mapBufferSize_.end(), itMap;
285      for (itMap = itbMap; itMap != iteMap; ++itMap)
286      {
287        rank = itMap->first;
288        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
289            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
290        totalBuf += itMap->second;
291      }
292      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
293    }
294    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
295    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
296    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
297    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
298    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
299    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
300    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
301    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
302    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
303    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
304    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
305    else if (event.classId==CField::GetType()) 
306    {
307      if (event.type==CField::EVENT_ID_UPDATE_DATA) CField::dispatchEvent(event);
308      else CField::dispatchEvent(event);
309    }
310    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
311    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
312    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
313    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
314    else
315    {
316      ERROR("void CP2pContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
317    }
318  }
319
320  bool CP2pContextServer::isCollectiveEvent(CEventServer& event)
321  {
322    if (event.type>1000) return false ;
323    else return true ;
324  }
325}
Note: See TracBrowser for help on using the repository browser.