source: XIOS3/trunk/src/transport/p2p_server_buffer.cpp @ 2558

Last change on this file since 2558 was 2558, checked in by ymipsl, 10 months ago

p2p transport protocol

  • bug fix
  • more diagnostics
  • set buffer parameters to realistic values

YM

  • Property svn:executable set to *
File size: 16.2 KB
Line 
1#include "p2p_server_buffer.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4#include "timer.hpp"
5#include "buffer_in.hpp"
6
7
8
9namespace xios
10{
11  extern CLogType logProtocol ;
12
13  CP2pServerBuffer::CP2pServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
14                                               map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer) 
15                        : clientRank_(clientRank), interCommMerged_(interCommMerged), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents)
16  {
17    //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
18    //CBufferIn bufferIn(buffer.data(),buffer.size()) ;
19    //bufferIn >> controlAddr_;
20    createWindow(commSelf, interCommMerged) ;
21  }
22
23  void CP2pServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged)
24  {
25    CTimer::get("create Windows").resume() ;
26    //MPI_Comm interComm ;
27    //MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ;
28    //MPI_Intercomm_merge(interComm, true, &winComm_) ;
29    //CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
30    //MPI_Comm_free(&interComm) ;
31   
32    //maxWindows_=MAX_WINDOWS ;
33    //windows_.resize(maxWindows_) ;
34   
35    //for(int i=0;i<maxWindows_;++i)
36    //{
37    //  MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
38    //  CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
39    //}
40    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
41    //CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
42    CTimer::get("create Windows").suspend() ;
43    //MPI_Barrier(winComm_) ;
44    //MPI_Barrier(winComm_) ;
45
46  }
47
48  void CP2pServerBuffer::receivedRequest(vector<char>& buffer)
49  {
50    size_t timeline ;
51    int nbSenders ;
52    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
53    bufferIn >> timeline ;
54    if (timeline==EVENT_BUFFER_RESIZE)
55    {
56      size_t AssociatedTimeline ;
57      size_t newSize ;
58      bufferIn >>AssociatedTimeline>>newSize ;
59      bufferResize_.push_back({AssociatedTimeline,newSize}) ;
60    }
61    else // receive standard event
62    {
63      info(logProtocol)<<"received request from rank : "<<clientRank_<<"  with timeline : "<<timeline
64                                                        <<"   at time : "<<CTimer::get("XIOS server").getTime()<<endl ;
65      bufferIn>> nbSenders ;
66      nbSenders_[timeline] = nbSenders ;
67      auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
68      if (pendingFullEvent==pendingFullEvents_.end()) 
69      {
70        SPendingEvent pendingEvent = {nbSenders,1,{this}} ;
71        pendingFullEvents_[timeline]=pendingEvent ;
72      }
73      else 
74      { 
75        pendingFullEvent->second.currentNbSenders++ ;
76        pendingFullEvent->second.buffers.push_back(this) ;
77      }
78   
79      int nbBlocs ; 
80      int count ;
81      int window ;
82      bufferIn >> nbBlocs ;
83      MPI_Aint bloc ;
84      auto& blocs = pendingBlocs_[timeline] ;
85      for(int i=0;i<nbBlocs;++i) 
86      {
87        bufferIn >> bloc >> count >> window;
88        blocs.push_back({bloc, count, window}) ;
89      }
90    }
91  }
92
93  void CP2pServerBuffer::eventLoop(void)
94  {
95    int flag ;
96    if (!pendingRmaRequests_.empty()) testPendingRequests() ;
97    if (pendingRmaRequests_.empty()) transferEvents() ;
98
99    //if (!isLocked_)
100    //{
101      if (lastBlocToFree_!=0)
102      {
103        info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ;
104        //if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ;
105        //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
106        //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ;
107        //MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
108        //MPI_Win_unlock(windowRank_,winControl_) ;
109        //if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ;
110        lastBlocToFree_ = 0 ;       
111      }
112    //}
113
114    if (buffers_.size()>1) 
115     if (buffers_.front()->getCount()==0) buffers_.pop_front() ; // if buffer is empty free buffer
116  }
117
118  void CP2pServerBuffer::notifyClientFinalize(void)
119  {
120    eventLoop() ; // to free the last bloc
121    //MPI_Aint finalize=1 ;
122    //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
123    //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ;
124    //MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
125    //MPI_Win_unlock(windowRank_,winControl_) ;
126    int dummy ;
127    MPI_Send(&dummy, 0, MPI_CHAR, clientRank_, 22, interCommMerged_) ;
128  }
129 
130  void CP2pServerBuffer::testPendingRequests(void)
131  {
132    if (!pendingRmaRequests_.empty())
133    {
134      int flag ;   
135
136      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ;
137      MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ;
138      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ;
139     
140      if (flag==true) 
141      {
142        //if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked");
143        //for(auto& win : windowsLocked_)
144        //{
145        //  info(logProtocol)<<"unlock window "<<win<<endl ;
146        //  if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ;
147        //  MPI_Win_unlock(windowRank_,windows_[win]) ;
148        //  if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ;
149        //}
150        //windowsLocked_.clear() ;
151       
152
153        if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ;
154        if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ;
155       
156        size_t transferedSize = 0 ;
157        for(auto& count : pendingRmaCount_) transferedSize+=count ;
158
159        if (info.isActive(logProtocol))
160        {
161          double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ;
162          info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<"  nbBlocs : "<< pendingRmaStatus_.size()
163                           << "  total count = "<<transferedSize<<"  duration : "<<time<<" s"
164                           << "  Bandwith : "<< transferedSize/time<< "byte/s"<<endl ;
165          CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ;
166          for(int i=0;i<pendingRmaAddr_.size();i++)
167          {
168            size_t checksum=0 ;
169            unsigned char* buffer = (unsigned char*) pendingRmaAddr_[i] ;
170            for(size_t j=0;j<pendingRmaCount_[i];j++) checksum += buffer[j] ;
171            info(logProtocol)<<"Bloc transfered to adrr="<<(void*) buffer<<"  count="<<pendingRmaCount_[i]<<"  checksum="<<checksum<<endl ;
172          }
173
174         }
175
176        //isLocked_=false ;
177        pendingRmaRequests_.clear() ;
178        pendingRmaStatus_.clear() ;
179        pendingRmaCount_.clear() ;
180        pendingRmaAddr_.clear() ;
181        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ;
182       
183        for(auto & event : onTransferEvents_) 
184        {
185          size_t timeline = event.first ;
186
187          auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
188          pendingFullEvent->second.nbSenders-- ;
189          pendingFullEvent->second.currentNbSenders-- ;
190         
191
192          auto completedFullEvent=completedFullEvents_.find(timeline) ;
193          if (completedFullEvent==completedFullEvents_.end()) 
194          {
195            SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ;
196            completedFullEvents_[timeline]=pendingEvent ;
197          }
198          else 
199          {
200            completedFullEvent->second.currentNbSenders++ ;
201            completedFullEvent->second.buffers.push_back(this) ;
202          }
203          nbSenders_.erase(timeline) ;
204        } 
205        onTransferEvents_.clear() ;
206      }
207    }
208
209  }
210 
211  size_t CP2pServerBuffer::remainSize(void)
212  {
213    if (!fixed_) return std::numeric_limits<size_t>::max() ;
214    else
215    {
216      if (currentBuffer_ == nullptr) return fixedSize_ ;
217      else return currentBuffer_->remain() ;
218    }
219  }
220
221  void CP2pServerBuffer::transferEvents(void)
222  {
223    if (pendingRmaRequests_.empty() && !pendingBlocs_.empty())
224    {
225      size_t remain=remainSize() ;
226      size_t transferedSize=0 ;
227
228      size_t timeline =  pendingBlocs_.begin()->first ;
229      auto& blocs = pendingBlocs_.begin()->second ;
230     
231      if (!bufferResize_.empty()) 
232      {
233        if (bufferResize_.front().first==timeline)
234        {
235          currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
236          info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
237          bufferResize_.pop_front() ;
238          newBuffer(currentBufferSize_,fixed_) ;
239        }
240      }
241
242      size_t eventSize=0 ;
243      for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
244     
245      if (eventSize > remain) 
246      {
247        if ( eventSize <= currentBufferSize_) return ; // wait for free storage ;
248        else 
249        {
250          if (currentBuffer_==nullptr) remain = eventSize ;
251          else remain = currentBuffer_->remain() + fixedSize_ ;
252        }
253      }
254     
255      //if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked");
256     
257      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ;
258      if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ;
259      //for(auto& bloc : blocs)
260      //{
261      //  int win=get<2>(bloc) ;
262      //  if (windowsLocked_.count(win)==0)
263      //  {
264      //    info(logProtocol)<<"lock window "<<win<<endl ;
265      //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
266      //    MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
267      //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
268      //    windowsLocked_.insert(win) ;
269      //  }
270      //}
271      //isLocked_=true ;
272      do
273      {
274        transferEvent() ; // ok enough storage for this bloc
275       
276        transferedSize += eventSize ;
277        pendingBlocs_.erase(pendingBlocs_.begin()) ;
278       
279        //  break ; // transfering just one event temporary => to remove
280       
281        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop
282
283        timeline =  pendingBlocs_.begin()->first ;
284        auto& blocs=pendingBlocs_.begin()->second ;
285       
286        if (!bufferResize_.empty()) 
287        {
288          if (bufferResize_.front().first==timeline)
289          {
290            currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
291            info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
292            bufferResize_.pop_front() ;
293            newBuffer(currentBufferSize_,fixed_) ;
294          }
295        }
296
297        for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
298        if (transferedSize+eventSize<=remain)
299        {
300          //for(auto& bloc : blocs)
301          //{
302          //  int win=get<2>(bloc) ;
303          //  if (windowsLocked_.count(win)==0)
304          //  {
305          //    info(logProtocol)<<"lock window "<<win<<endl ;
306          //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
307          //    MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
308          //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
309          //    windowsLocked_.insert(win) ;
310          //  }
311          //}
312        }
313      }
314      while(transferedSize+eventSize<=remain) ;
315     
316    }
317  }
318 
319  void CP2pServerBuffer::transferEvent(void)
320  {
321    MPI_Aint addr;
322    MPI_Aint offset ;
323
324    size_t size;
325    size_t start;
326    size_t count;
327    int window ;
328
329    auto& blocs=pendingBlocs_.begin()->second ;
330    size_t timeline = pendingBlocs_.begin() -> first ;
331 
332
333    for(auto& bloc : blocs)
334    {
335      addr = std::get<0>(bloc) ;
336      size = std::get<1>(bloc) ;
337      window = std::get<2>(bloc) ;
338
339      offset=0 ;
340
341      do
342      {
343        if (currentBuffer_!=nullptr)
344        {
345          currentBuffer_->reserve(size, start, count) ;
346     
347          if ( count > 0)
348          {
349            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
350            offset=MPI_Aint_add(offset, count) ;
351          }
352          //currentBuffer_->reserve(size, start, count) ;
353     
354          //if ( count > 0)
355          //{
356          //  transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
357          //  offset=MPI_Aint_add(offset, count) ;
358          //}
359        }
360
361        if (size>0) 
362        {
363          if (fixed_) newBuffer(std::max(fixedSize_, size),fixed_) ;
364          else
365          {
366            currentBufferSize_ = std::max((size_t)(currentBufferSize_*growingFactor_), size) ;
367            newBuffer(currentBufferSize_,fixed_) ;
368          }
369        }
370      } while (size > 0 ) ;
371    }
372
373    pendingRmaStatus_.resize(pendingRmaRequests_.size()) ;
374  }
375
376  void CP2pServerBuffer::transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window)
377  {
378    MPI_Request request ;
379    MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ;
380    if (info.isActive(logProtocol))
381    {
382      info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<endl ;
383      info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1)
384               <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ;
385    }
386    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ;
387    //MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ;
388    MPI_Irecv(buffer->getBuffer()+start, count, MPI_CHAR, clientRank_, 21, interCommMerged_, &request) ;
389    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ;
390    pendingRmaRequests_.push_back(request) ;
391    pendingRmaCount_.push_back(count) ;
392    pendingRmaAddr_.push_back(buffer->getBuffer()+start) ;
393    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ;
394  }
395
396  void CP2pServerBuffer::fillEventServer(size_t timeline, CEventServer& event)
397  {
398    auto &completedEvent=completedEvents_[timeline] ;
399    size_t size=0 ;
400    for(auto& bloc : completedEvent) size+=bloc.count ;
401    char* buffer = new char[size] ;
402    size=0 ;
403   
404    ostringstream outStr ;
405    if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline
406                                          <<"  nbBlocs="<<completedEvent.size()<<endl ;
407    int i=0 ;
408    MPI_Aint addr ;
409    for(auto& bloc : completedEvent) 
410    {
411      memcpy(&buffer[size], bloc.buffer->getBuffer()+bloc.start, bloc.count) ;
412     
413      if (info.isActive(logProtocol))
414      {
415        size_t checksum=0 ;
416        for(size_t j=0;j<bloc.count;j++) checksum += (unsigned char) buffer[size+j] ;
417        outStr<<"bloc "<<i<<"  count="<<bloc.count<<" checksum="<<checksum<<"  ;  " ;
418        i++ ;
419      }
420
421      size+=bloc.count ;
422      bloc.buffer->free(bloc.start, bloc.count) ; // free bloc
423      addr=bloc.addr ;
424      if (bloc.buffer->getCount()==0) if (buffers_.size() > 1) buffers_.pop_front() ; // if buffer is empty free buffer
425    }
426    event.push(clientRank_, nullptr, buffer, size) ;
427    if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<<event.getNbSender() ;
428    info(logProtocol)<<outStr.str()<<endl ;
429   
430    lastBlocToFree_=addr ;
431
432    completedEvents_.erase(timeline) ;
433    eventLoop() ;
434  }
435
436 
437
438}
Note: See TracBrowser for help on using the repository browser.