source: XIOS3/trunk/src/transport/one_sided_server_buffer.cpp @ 2399

Last change on this file since 2399 was 2399, checked in by ymipsl, 22 months ago

-Fix performance issue in one_sided protocol

  • better timer instrumentation of the protocol

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 15.3 KB
Line 
1#include "one_sided_server_buffer.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4#include "timer.hpp"
5#include "buffer_in.hpp"
6
7
8
9namespace xios
10{
11  CLogType logProtocol("log_protocol") ;
12
13  COneSidedServerBuffer::COneSidedServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
14                                               map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer) 
15                        : clientRank_(clientRank), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents)
16  {
17    MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
18    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
19    bufferIn >> controlAddr_;
20    createWindow(commSelf, interCommMerged) ;
21  }
22
23  void COneSidedServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged)
24  {
25    CTimer::get("create Windows").resume() ;
26    MPI_Comm interComm ;
27    MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ;
28    MPI_Intercomm_merge(interComm, true, &winComm_) ;
29    CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
30    MPI_Comm_free(&interComm) ;
31   
32    maxWindows_=MAX_WINDOWS ;
33    windows_.resize(maxWindows_) ;
34   
35    for(int i=0;i<maxWindows_;++i) 
36    {
37      MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
38      CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
39    }
40    MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
41    CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
42    CTimer::get("create Windows").suspend() ;
43    MPI_Barrier(winComm_) ;
44    MPI_Barrier(winComm_) ;
45
46  }
47
48  void COneSidedServerBuffer::receivedRequest(vector<char>& buffer)
49  {
50    size_t timeline ;
51    int nbSenders ;
52    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
53    bufferIn >> timeline ;
54    if (timeline==EVENT_BUFFER_RESIZE)
55    {
56      size_t AssociatedTimeline ;
57      size_t newSize ;
58      bufferIn >>AssociatedTimeline>>newSize ;
59      bufferResize_.push_back({AssociatedTimeline,newSize}) ;
60    }
61    else // receive standard event
62    {
63      info(logProtocol)<<"received request from rank : "<<clientRank_<<"  with timeline : "<<timeline
64                                                        <<"   at time : "<<CTimer::get("XIOS server").getTime()<<endl ;
65      bufferIn>> nbSenders ;
66      nbSenders_[timeline] = nbSenders ;
67      auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
68      if (pendingFullEvent==pendingFullEvents_.end()) 
69      {
70        SPendingEvent pendingEvent = {nbSenders,1,{this}} ;
71        pendingFullEvents_[timeline]=pendingEvent ;
72      }
73      else 
74      { 
75        pendingFullEvent->second.currentNbSenders++ ;
76        pendingFullEvent->second.buffers.push_back(this) ;
77      }
78   
79      int nbBlocs ; 
80      int count ;
81      int window ;
82      bufferIn >> nbBlocs ;
83      MPI_Aint bloc ;
84      auto& blocs = pendingBlocs_[timeline] ;
85      for(int i=0;i<nbBlocs;++i) 
86      {
87        bufferIn >> bloc >> count >> window;
88        blocs.push_back({bloc, count, window}) ;
89      }
90    }
91  }
92
93  void COneSidedServerBuffer::eventLoop(void)
94  {
95    int flag ;
96    if (!pendingRmaRequests_.empty()) testPendingRequests() ;
97    if (pendingRmaRequests_.empty()) transferEvents() ;
98
99    if (!isLocked_)
100    {
101      if (lastBlocToFree_!=0)
102      {
103        info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ;
104        if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ;
105        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
106        MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ;
107        MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
108        MPI_Win_unlock(windowRank_,winControl_) ; 
109        if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ;
110        lastBlocToFree_ = 0 ;       
111      }
112    }
113
114    if (buffers_.size()>1) 
115     if (buffers_.front()->getCount()==0) buffers_.pop_front() ; // if buffer is empty free buffer
116  }
117
118  void COneSidedServerBuffer::notifyClientFinalize(void)
119  {
120    eventLoop() ; // to free the last bloc
121    MPI_Aint finalize=1 ;
122    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
123    MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ;
124    MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
125    MPI_Win_unlock(windowRank_,winControl_) ; 
126  }
127  void COneSidedServerBuffer::testPendingRequests(void)
128  {
129    if (!pendingRmaRequests_.empty())
130    {
131      int flag ;   
132
133      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ;
134      MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ;
135      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ;
136     
137      if (flag==true) 
138      {
139        if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked");
140        for(auto& win : windowsLocked_) 
141        {
142          info(logProtocol)<<"unlock window "<<win<<endl ;
143          if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ;
144          MPI_Win_unlock(windowRank_,windows_[win]) ; 
145          if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ;
146        }
147        windowsLocked_.clear() ;
148       
149
150        if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ;
151        if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ;
152       
153        size_t transferedSize = 0 ;
154        for(auto& count : pendingRmaCount_) transferedSize+=count ;
155
156        if (info.isActive(logProtocol))
157        {
158          double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ;
159          info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<"  nbBlocs : "<< pendingRmaStatus_.size()
160                           << "  total count = "<<transferedSize<<"  duration : "<<time<<" s"
161                           << "  Bandwith : "<< transferedSize/time<< "byte/s"<<endl ;
162          CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ;
163         }
164
165        isLocked_=false ;
166        pendingRmaRequests_.clear() ;
167        pendingRmaStatus_.clear() ;
168        pendingRmaCount_.clear() ;
169        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ;
170       
171        for(auto & event : onTransferEvents_) 
172        {
173          size_t timeline = event.first ;
174
175          auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
176          pendingFullEvent->second.nbSenders-- ;
177          pendingFullEvent->second.currentNbSenders-- ;
178         
179
180          auto completedFullEvent=completedFullEvents_.find(timeline) ;
181          if (completedFullEvent==completedFullEvents_.end()) 
182          {
183            SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ;
184            completedFullEvents_[timeline]=pendingEvent ;
185          }
186          else 
187          {
188            completedFullEvent->second.currentNbSenders++ ;
189            completedFullEvent->second.buffers.push_back(this) ;
190          }
191          nbSenders_.erase(timeline) ;
192        } 
193        onTransferEvents_.clear() ;
194      }
195    }
196
197  }
198 
199  size_t COneSidedServerBuffer::remainSize(void)
200  {
201    if (!fixed_) return std::numeric_limits<size_t>::max() ;
202    else
203    {
204      if (currentBuffer_ == nullptr) return fixedSize_ ;
205      else return currentBuffer_->remain() ;
206    }
207  }
208
209  void COneSidedServerBuffer::transferEvents(void)
210  {
211    if (pendingRmaRequests_.empty() && !pendingBlocs_.empty())
212    {
213      size_t remain=remainSize() ;
214      size_t transferedSize=0 ;
215
216      size_t timeline =  pendingBlocs_.begin()->first ;
217      auto& blocs = pendingBlocs_.begin()->second ;
218     
219      if (!bufferResize_.empty()) 
220      {
221        if (bufferResize_.front().first==timeline)
222        {
223          currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
224          info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
225          bufferResize_.pop_front() ;
226          newBuffer(currentBufferSize_,fixed_) ;
227        }
228      }
229
230      size_t eventSize=0 ;
231      for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
232     
233      if (eventSize > remain) 
234      {
235        if ( eventSize <= currentBufferSize_) return ; // wait for free storage ;
236        else 
237        {
238          if (currentBuffer_==nullptr) remain = eventSize ;
239          else remain = currentBuffer_->remain() + fixedSize_ ;
240        }
241      }
242     
243      if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked");
244     
245      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ;
246      if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ;
247      for(auto& bloc : blocs) 
248      {
249        int win=get<2>(bloc) ;
250        if (windowsLocked_.count(win)==0) 
251        {
252          info(logProtocol)<<"lock window "<<win<<endl ;
253          if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
254          MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
255          if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
256          windowsLocked_.insert(win) ;
257        }
258      }
259      isLocked_=true ;
260      do
261      {
262        transferEvent() ; // ok enough storage for this bloc
263       
264        transferedSize += eventSize ;
265        pendingBlocs_.erase(pendingBlocs_.begin()) ;
266       
267        //  break ; // transfering just one event temporary => to remove
268       
269        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop
270
271        timeline =  pendingBlocs_.begin()->first ;
272        auto& blocs=pendingBlocs_.begin()->second ;
273       
274        if (!bufferResize_.empty()) 
275        {
276          if (bufferResize_.front().first==timeline)
277          {
278            currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
279            info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
280            bufferResize_.pop_front() ;
281            newBuffer(currentBufferSize_,fixed_) ;
282          }
283        }
284
285        for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
286        if (transferedSize+eventSize<=remain)
287        {
288          for(auto& bloc : blocs) 
289          {
290            int win=get<2>(bloc) ;
291            if (windowsLocked_.count(win)==0) 
292            {
293              info(logProtocol)<<"lock window "<<win<<endl ;
294              if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
295              MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
296              if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
297              windowsLocked_.insert(win) ;
298            }
299          }
300        }
301      }
302      while(transferedSize+eventSize<=remain) ;
303     
304    }
305  }
306 
307  void COneSidedServerBuffer::transferEvent(void)
308  {
309    MPI_Aint addr;
310    MPI_Aint offset ;
311
312    size_t size;
313    size_t start;
314    size_t count;
315    int window ;
316
317    auto& blocs=pendingBlocs_.begin()->second ;
318    size_t timeline = pendingBlocs_.begin() -> first ;
319 
320
321    for(auto& bloc : blocs)
322    {
323      addr = std::get<0>(bloc) ;
324      size = std::get<1>(bloc) ;
325      window = std::get<2>(bloc) ;
326
327      offset=0 ;
328
329      do
330      {
331        if (currentBuffer_!=nullptr)
332        {
333          currentBuffer_->reserve(size, start, count) ;
334     
335          if ( count > 0)
336          {
337            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
338            offset=MPI_Aint_add(offset, count) ;
339          }
340          currentBuffer_->reserve(size, start, count) ;
341     
342          if ( count > 0)
343          {
344            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
345            offset=MPI_Aint_add(offset, count) ;
346          }
347        }
348
349        if (size>0) 
350        {
351          if (fixed_) newBuffer(fixedSize_,fixed_) ;
352          else
353          {
354            currentBufferSize_ = std::max((size_t)(currentBufferSize_*growingFactor_), size) ;
355            newBuffer(currentBufferSize_,fixed_) ;
356          }
357        }
358      } while (size > 0 ) ;
359    }
360
361    pendingRmaStatus_.resize(pendingRmaRequests_.size()) ;
362  }
363
364  void COneSidedServerBuffer::transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window)
365  {
366    MPI_Request request ;
367    MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ;
368    if (info.isActive(logProtocol))
369    {
370      info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<endl ;
371      info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1)
372               <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ;
373    }
374    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ;
375    MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ;
376    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ;
377    pendingRmaRequests_.push_back(request) ;
378    pendingRmaCount_.push_back(count) ;
379    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ;
380  }
381
382  void COneSidedServerBuffer::fillEventServer(size_t timeline, CEventServer& event)
383  {
384    auto &completedEvent=completedEvents_[timeline] ;
385    size_t size=0 ;
386    for(auto& bloc : completedEvent) size+=bloc.count ;
387    char* buffer = new char[size] ;
388    size=0 ;
389   
390    ostringstream outStr ;
391    if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline
392                                          <<"  nbBlocs="<<completedEvent.size()<<endl ;
393    int i=0 ;
394    MPI_Aint addr ;
395    for(auto& bloc : completedEvent) 
396    {
397      memcpy(&buffer[size], bloc.buffer->getBuffer()+bloc.start, bloc.count) ;
398     
399      if (info.isActive(logProtocol))
400      {
401        size_t checksum=0 ;
402        for(size_t j=0;j<bloc.count;j++) checksum += (unsigned char) buffer[size+j] ;
403        outStr<<"bloc "<<i<<"  count="<<bloc.count<<" checksum="<<checksum<<"  ;  " ;
404        i++ ;
405      }
406
407      size+=bloc.count ;
408      bloc.buffer->free(bloc.start, bloc.count) ; // free bloc
409      addr=bloc.addr ;
410      if (bloc.buffer->getCount()==0) if (buffers_.size() > 1) buffers_.pop_front() ; // if buffer is empty free buffer
411    }
412    event.push(clientRank_, nullptr, buffer, size) ;
413    if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<<event.getNbSender() ;
414    info(logProtocol)<<outStr.str()<<endl ;
415   
416    lastBlocToFree_=addr ;
417    /*
418    if (!isLocked_) MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, window_) ;
419    MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR) ;
420    MPI_Put(&addr, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, window_) ;
421    if (!isLocked_) MPI_Win_unlock(windowRank_,window_) ;
422   */
423    completedEvents_.erase(timeline) ;
424    eventLoop() ;
425  }
426
427 
428
429}
Note: See TracBrowser for help on using the repository browser.