source: XIOS3/trunk/src/transport/p2p_server_buffer.cpp @ 2556

Last change on this file since 2556 was 2556, checked in by ymipsl, 10 months ago

First version on the point to point transport protocol, activated by the variable : transport_protocol="p2p"

YM

  • Property svn:executable set to *
File size: 15.7 KB
Line 
1#include "p2p_server_buffer.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4#include "timer.hpp"
5#include "buffer_in.hpp"
6
7
8
9namespace xios
10{
11  extern CLogType logProtocol ;
12
13  CP2pServerBuffer::CP2pServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
14                                               map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer) 
15                        : clientRank_(clientRank), interCommMerged_(interCommMerged), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents)
16  {
17    //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
18    //CBufferIn bufferIn(buffer.data(),buffer.size()) ;
19    //bufferIn >> controlAddr_;
20    createWindow(commSelf, interCommMerged) ;
21  }
22
23  void CP2pServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged)
24  {
25    CTimer::get("create Windows").resume() ;
26    //MPI_Comm interComm ;
27    //MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ;
28    //MPI_Intercomm_merge(interComm, true, &winComm_) ;
29    //CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
30    //MPI_Comm_free(&interComm) ;
31   
32    //maxWindows_=MAX_WINDOWS ;
33    //windows_.resize(maxWindows_) ;
34   
35    //for(int i=0;i<maxWindows_;++i)
36    //{
37    //  MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
38    //  CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
39    //}
40    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
41    //CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
42    CTimer::get("create Windows").suspend() ;
43    //MPI_Barrier(winComm_) ;
44    //MPI_Barrier(winComm_) ;
45
46  }
47
48  void CP2pServerBuffer::receivedRequest(vector<char>& buffer)
49  {
50    size_t timeline ;
51    int nbSenders ;
52    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
53    bufferIn >> timeline ;
54    if (timeline==EVENT_BUFFER_RESIZE)
55    {
56      size_t AssociatedTimeline ;
57      size_t newSize ;
58      bufferIn >>AssociatedTimeline>>newSize ;
59      bufferResize_.push_back({AssociatedTimeline,newSize}) ;
60    }
61    else // receive standard event
62    {
63      info(logProtocol)<<"received request from rank : "<<clientRank_<<"  with timeline : "<<timeline
64                                                        <<"   at time : "<<CTimer::get("XIOS server").getTime()<<endl ;
65      bufferIn>> nbSenders ;
66      nbSenders_[timeline] = nbSenders ;
67      auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
68      if (pendingFullEvent==pendingFullEvents_.end()) 
69      {
70        SPendingEvent pendingEvent = {nbSenders,1,{this}} ;
71        pendingFullEvents_[timeline]=pendingEvent ;
72      }
73      else 
74      { 
75        pendingFullEvent->second.currentNbSenders++ ;
76        pendingFullEvent->second.buffers.push_back(this) ;
77      }
78   
79      int nbBlocs ; 
80      int count ;
81      int window ;
82      bufferIn >> nbBlocs ;
83      MPI_Aint bloc ;
84      auto& blocs = pendingBlocs_[timeline] ;
85      for(int i=0;i<nbBlocs;++i) 
86      {
87        bufferIn >> bloc >> count >> window;
88        blocs.push_back({bloc, count, window}) ;
89      }
90    }
91  }
92
93  void CP2pServerBuffer::eventLoop(void)
94  {
95    int flag ;
96    if (!pendingRmaRequests_.empty()) testPendingRequests() ;
97    if (pendingRmaRequests_.empty()) transferEvents() ;
98
99    if (!isLocked_)
100    {
101      if (lastBlocToFree_!=0)
102      {
103        info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ;
104        if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ;
105        //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
106        //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ;
107        //MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
108        //MPI_Win_unlock(windowRank_,winControl_) ;
109        if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ;
110        lastBlocToFree_ = 0 ;       
111      }
112    }
113
114    if (buffers_.size()>1) 
115     if (buffers_.front()->getCount()==0) buffers_.pop_front() ; // if buffer is empty free buffer
116  }
117
118  void CP2pServerBuffer::notifyClientFinalize(void)
119  {
120    eventLoop() ; // to free the last bloc
121    //MPI_Aint finalize=1 ;
122    //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
123    //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ;
124    //MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
125    //MPI_Win_unlock(windowRank_,winControl_) ;
126    int dummy ;
127    MPI_Send(&dummy, 0, MPI_CHAR, clientRank_, 22, interCommMerged_) ;
128  }
129 
130  void CP2pServerBuffer::testPendingRequests(void)
131  {
132    if (!pendingRmaRequests_.empty())
133    {
134      int flag ;   
135
136      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ;
137      MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ;
138      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ;
139     
140      if (flag==true) 
141      {
142        //if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked");
143        //for(auto& win : windowsLocked_)
144        //{
145        //  info(logProtocol)<<"unlock window "<<win<<endl ;
146        //  if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ;
147        //  MPI_Win_unlock(windowRank_,windows_[win]) ;
148        //  if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ;
149        //}
150        //windowsLocked_.clear() ;
151       
152
153        if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ;
154        if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ;
155       
156        size_t transferedSize = 0 ;
157        for(auto& count : pendingRmaCount_) transferedSize+=count ;
158
159        if (info.isActive(logProtocol))
160        {
161          double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ;
162          info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<"  nbBlocs : "<< pendingRmaStatus_.size()
163                           << "  total count = "<<transferedSize<<"  duration : "<<time<<" s"
164                           << "  Bandwith : "<< transferedSize/time<< "byte/s"<<endl ;
165          CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ;
166         }
167
168        //isLocked_=false ;
169        pendingRmaRequests_.clear() ;
170        pendingRmaStatus_.clear() ;
171        pendingRmaCount_.clear() ;
172        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ;
173       
174        for(auto & event : onTransferEvents_) 
175        {
176          size_t timeline = event.first ;
177
178          auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
179          pendingFullEvent->second.nbSenders-- ;
180          pendingFullEvent->second.currentNbSenders-- ;
181         
182
183          auto completedFullEvent=completedFullEvents_.find(timeline) ;
184          if (completedFullEvent==completedFullEvents_.end()) 
185          {
186            SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ;
187            completedFullEvents_[timeline]=pendingEvent ;
188          }
189          else 
190          {
191            completedFullEvent->second.currentNbSenders++ ;
192            completedFullEvent->second.buffers.push_back(this) ;
193          }
194          nbSenders_.erase(timeline) ;
195        } 
196        onTransferEvents_.clear() ;
197      }
198    }
199
200  }
201 
202  size_t CP2pServerBuffer::remainSize(void)
203  {
204    if (!fixed_) return std::numeric_limits<size_t>::max() ;
205    else
206    {
207      if (currentBuffer_ == nullptr) return fixedSize_ ;
208      else return currentBuffer_->remain() ;
209    }
210  }
211
212  void CP2pServerBuffer::transferEvents(void)
213  {
214    if (pendingRmaRequests_.empty() && !pendingBlocs_.empty())
215    {
216      size_t remain=remainSize() ;
217      size_t transferedSize=0 ;
218
219      size_t timeline =  pendingBlocs_.begin()->first ;
220      auto& blocs = pendingBlocs_.begin()->second ;
221     
222      if (!bufferResize_.empty()) 
223      {
224        if (bufferResize_.front().first==timeline)
225        {
226          currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
227          info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
228          bufferResize_.pop_front() ;
229          newBuffer(currentBufferSize_,fixed_) ;
230        }
231      }
232
233      size_t eventSize=0 ;
234      for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
235     
236      if (eventSize > remain) 
237      {
238        if ( eventSize <= currentBufferSize_) return ; // wait for free storage ;
239        else 
240        {
241          if (currentBuffer_==nullptr) remain = eventSize ;
242          else remain = currentBuffer_->remain() + fixedSize_ ;
243        }
244      }
245     
246      //if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked");
247     
248      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ;
249      if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ;
250      //for(auto& bloc : blocs)
251      //{
252      //  int win=get<2>(bloc) ;
253      //  if (windowsLocked_.count(win)==0)
254      //  {
255      //    info(logProtocol)<<"lock window "<<win<<endl ;
256      //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
257      //    MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
258      //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
259      //    windowsLocked_.insert(win) ;
260      //  }
261      //}
262      //isLocked_=true ;
263      do
264      {
265        transferEvent() ; // ok enough storage for this bloc
266       
267        transferedSize += eventSize ;
268        pendingBlocs_.erase(pendingBlocs_.begin()) ;
269       
270        //  break ; // transfering just one event temporary => to remove
271       
272        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop
273
274        timeline =  pendingBlocs_.begin()->first ;
275        auto& blocs=pendingBlocs_.begin()->second ;
276       
277        if (!bufferResize_.empty()) 
278        {
279          if (bufferResize_.front().first==timeline)
280          {
281            currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
282            info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
283            bufferResize_.pop_front() ;
284            newBuffer(currentBufferSize_,fixed_) ;
285          }
286        }
287
288        for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
289        if (transferedSize+eventSize<=remain)
290        {
291          //for(auto& bloc : blocs)
292          //{
293          //  int win=get<2>(bloc) ;
294          //  if (windowsLocked_.count(win)==0)
295          //  {
296          //    info(logProtocol)<<"lock window "<<win<<endl ;
297          //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
298          //    MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
299          //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
300          //    windowsLocked_.insert(win) ;
301          //  }
302          //}
303        }
304      }
305      while(transferedSize+eventSize<=remain) ;
306     
307    }
308  }
309 
310  void CP2pServerBuffer::transferEvent(void)
311  {
312    MPI_Aint addr;
313    MPI_Aint offset ;
314
315    size_t size;
316    size_t start;
317    size_t count;
318    int window ;
319
320    auto& blocs=pendingBlocs_.begin()->second ;
321    size_t timeline = pendingBlocs_.begin() -> first ;
322 
323
324    for(auto& bloc : blocs)
325    {
326      addr = std::get<0>(bloc) ;
327      size = std::get<1>(bloc) ;
328      window = std::get<2>(bloc) ;
329
330      offset=0 ;
331
332      do
333      {
334        if (currentBuffer_!=nullptr)
335        {
336          currentBuffer_->reserve(size, start, count) ;
337     
338          if ( count > 0)
339          {
340            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
341            offset=MPI_Aint_add(offset, count) ;
342          }
343          //currentBuffer_->reserve(size, start, count) ;
344     
345          //if ( count > 0)
346          //{
347          //  transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
348          //  offset=MPI_Aint_add(offset, count) ;
349          //}
350        }
351
352        if (size>0) 
353        {
354          if (fixed_) newBuffer(std::max(fixedSize_, size),fixed_) ;
355          else
356          {
357            currentBufferSize_ = std::max((size_t)(currentBufferSize_*growingFactor_), size) ;
358            newBuffer(currentBufferSize_,fixed_) ;
359          }
360        }
361      } while (size > 0 ) ;
362    }
363
364    pendingRmaStatus_.resize(pendingRmaRequests_.size()) ;
365  }
366
367  void CP2pServerBuffer::transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window)
368  {
369    MPI_Request request ;
370    MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ;
371    if (info.isActive(logProtocol))
372    {
373      info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<endl ;
374      info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1)
375               <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ;
376    }
377    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ;
378    //MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ;
379    MPI_Irecv(buffer->getBuffer()+start, count, MPI_CHAR, clientRank_, 21, interCommMerged_, &request) ;
380    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ;
381    pendingRmaRequests_.push_back(request) ;
382    pendingRmaCount_.push_back(count) ;
383    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ;
384  }
385
386  void CP2pServerBuffer::fillEventServer(size_t timeline, CEventServer& event)
387  {
388    auto &completedEvent=completedEvents_[timeline] ;
389    size_t size=0 ;
390    for(auto& bloc : completedEvent) size+=bloc.count ;
391    char* buffer = new char[size] ;
392    size=0 ;
393   
394    ostringstream outStr ;
395    if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline
396                                          <<"  nbBlocs="<<completedEvent.size()<<endl ;
397    int i=0 ;
398    MPI_Aint addr ;
399    for(auto& bloc : completedEvent) 
400    {
401      memcpy(&buffer[size], bloc.buffer->getBuffer()+bloc.start, bloc.count) ;
402     
403      if (info.isActive(logProtocol))
404      {
405        size_t checksum=0 ;
406        for(size_t j=0;j<bloc.count;j++) checksum += (unsigned char) buffer[size+j] ;
407        outStr<<"bloc "<<i<<"  count="<<bloc.count<<" checksum="<<checksum<<"  ;  " ;
408        i++ ;
409      }
410
411      size+=bloc.count ;
412      bloc.buffer->free(bloc.start, bloc.count) ; // free bloc
413      addr=bloc.addr ;
414      if (bloc.buffer->getCount()==0) if (buffers_.size() > 1) buffers_.pop_front() ; // if buffer is empty free buffer
415    }
416    event.push(clientRank_, nullptr, buffer, size) ;
417    if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<<event.getNbSender() ;
418    info(logProtocol)<<outStr.str()<<endl ;
419   
420    lastBlocToFree_=addr ;
421
422    completedEvents_.erase(timeline) ;
423    eventLoop() ;
424  }
425
426 
427
428}
Note: See TracBrowser for help on using the repository browser.