source: XIOS/dev/dev_ym/XIOS_COUPLING/src/transport/one_sided_server_buffer.cpp @ 2346

Last change on this file since 2346 was 2346, checked in by jderouillat, 2 years ago

Fix new info log method for GNU compiler

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 12.6 KB
Line 
1#include "one_sided_server_buffer.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4#include "timer.hpp"
5#include "buffer_in.hpp"
6
7
8
9namespace xios
10{
11  CLogType logProtocol("log_protocol") ;
12
13  COneSidedServerBuffer::COneSidedServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
14                                               map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer) 
15                        : clientRank_(clientRank), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents)
16  {
17    MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
18    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
19    bufferIn >> controlAddr_;
20    createWindow(commSelf, interCommMerged) ;
21  }
22
23  void COneSidedServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged)
24  {
25    CTimer::get("create Windows").resume() ;
26    MPI_Comm interComm ;
27    MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ;
28    MPI_Intercomm_merge(interComm, true, &winComm_) ;
29    CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
30    MPI_Comm_free(&interComm) ;
31   
32    maxWindows_=MAX_WINDOWS ;
33    windows_.resize(maxWindows_) ;
34   
35    for(int i=0;i<maxWindows_;++i) 
36    {
37      MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
38      CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
39    }
40    MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
41    CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
42    CTimer::get("create Windows").suspend() ;
43    MPI_Barrier(winComm_) ;
44    MPI_Barrier(winComm_) ;
45
46  }
47
48  void COneSidedServerBuffer::receivedRequest(vector<char>& buffer)
49  {
50    size_t timeline ;
51    int nbSenders ;
52    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
53    bufferIn >> timeline ;
54    if (timeline==EVENT_BUFFER_RESIZE)
55    {
56      size_t AssociatedTimeline ;
57      size_t newSize ;
58      bufferIn >>AssociatedTimeline>>newSize ;
59      bufferResize_.push_back({AssociatedTimeline,newSize}) ;
60    }
61    else // receive standard event
62    {
63
64      bufferIn>> nbSenders ;
65      nbSenders_[timeline] = nbSenders ;
66      auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
67      if (pendingFullEvent==pendingFullEvents_.end()) 
68      {
69        SPendingEvent pendingEvent = {nbSenders,1,{this}} ;
70        pendingFullEvents_[timeline]=pendingEvent ;
71      }
72      else 
73      { 
74        pendingFullEvent->second.currentNbSenders++ ;
75        pendingFullEvent->second.buffers.push_back(this) ;
76      }
77   
78      int nbBlocs ; 
79      int count ;
80      int window ;
81      bufferIn >> nbBlocs ;
82      MPI_Aint bloc ;
83      auto& blocs = pendingBlocs_[timeline] ;
84      for(int i=0;i<nbBlocs;++i) 
85      {
86        bufferIn >> bloc >> count >> window;
87        blocs.push_back({bloc, count, window}) ;
88      }
89    }
90  }
91
92  void COneSidedServerBuffer::eventLoop(void)
93  {
94    int flag ;
95    if (!pendingRmaRequests_.empty()) testPendingRequests() ;
96    if (pendingRmaRequests_.empty()) transferEvents() ;
97
98    if (!isLocked_)
99    {
100      if (lastBlocToFree_!=0)
101      {
102        info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ;
103        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
104        MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ;
105        MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
106        MPI_Win_unlock(windowRank_,winControl_) ; 
107        lastBlocToFree_ = 0 ;       
108      }
109    }
110
111    if (buffers_.size()>1) 
112     if (buffers_.front()->getCount()==0) buffers_.pop_front() ; // if buffer is empty free buffer
113  }
114
115  void COneSidedServerBuffer::notifyClientFinalize(void)
116  {
117    eventLoop() ; // to free the last bloc
118    MPI_Aint finalize=1 ;
119    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
120    MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ;
121    MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
122    MPI_Win_unlock(windowRank_,winControl_) ; 
123  }
124  void COneSidedServerBuffer::testPendingRequests(void)
125  {
126    if (!pendingRmaRequests_.empty())
127    {
128      int flag ;
129      MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ;
130      if (flag==true) 
131      {
132        if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked");
133        for(auto& win : windowsLocked_) 
134        {
135          info(logProtocol)<<"unlock window "<<win<<endl ;
136          MPI_Win_unlock(windowRank_,windows_[win]) ; 
137        }
138        windowsLocked_.clear() ;
139         
140        isLocked_=false ;
141        pendingRmaRequests_.clear() ;
142        pendingRmaStatus_.clear() ;
143        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ;
144       
145        for(auto & event : onTransferEvents_) 
146        {
147          size_t timeline = event.first ;
148
149          auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
150          pendingFullEvent->second.nbSenders-- ;
151          pendingFullEvent->second.currentNbSenders-- ;
152         
153
154          auto completedFullEvent=completedFullEvents_.find(timeline) ;
155          if (completedFullEvent==completedFullEvents_.end()) 
156          {
157            SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ;
158            completedFullEvents_[timeline]=pendingEvent ;
159          }
160          else 
161          {
162            completedFullEvent->second.currentNbSenders++ ;
163            completedFullEvent->second.buffers.push_back(this) ;
164          }
165          nbSenders_.erase(timeline) ;
166        } 
167        onTransferEvents_.clear() ;
168      }
169    }
170
171  }
172 
173  size_t COneSidedServerBuffer::remainSize(void)
174  {
175    if (!fixed_) return std::numeric_limits<size_t>::max() ;
176    else
177    {
178      if (currentBuffer_ == nullptr) return fixedSize_ ;
179      else return currentBuffer_->remain() ;
180    }
181  }
182
183  void COneSidedServerBuffer::transferEvents(void)
184  {
185    if (pendingRmaRequests_.empty() && !pendingBlocs_.empty())
186    {
187      size_t remain=remainSize() ;
188      size_t transferedSize=0 ;
189
190      size_t timeline =  pendingBlocs_.begin()->first ;
191      auto& blocs = pendingBlocs_.begin()->second ;
192     
193      if (!bufferResize_.empty()) 
194      {
195        if (bufferResize_.front().first==timeline)
196        {
197          currentBufferSize_=bufferResize_.front().second ;
198          info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
199          bufferResize_.pop_front() ;
200          newBuffer(currentBufferSize_,fixed_) ;
201        }
202      }
203
204      size_t eventSize=0 ;
205      for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
206     
207      if (eventSize > remain) 
208      {
209        if ( eventSize <= currentBufferSize_) return ; // wait for free storage ;
210        else 
211        {
212          if (currentBuffer_==nullptr) remain = eventSize ;
213          else remain = currentBuffer_->remain() + fixedSize_ ;
214        }
215      }
216     
217      if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked");
218      for(auto& bloc : blocs) 
219      {
220        int win=get<2>(bloc) ;
221        if (windowsLocked_.count(win)==0) 
222        {
223          MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
224          windowsLocked_.insert(win) ;
225        }
226      }
227      isLocked_=true ;
228      do
229      {
230        transferEvent() ; // ok enough storage for this bloc
231       
232        transferedSize += eventSize ;
233        pendingBlocs_.erase(pendingBlocs_.begin()) ;
234       
235//        break ; // transfering just one event temporary => to remove
236       
237        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop
238
239        timeline =  pendingBlocs_.begin()->first ;
240        auto& blocs=pendingBlocs_.begin()->second ;
241       
242        if (!bufferResize_.empty()) 
243        {
244          if (bufferResize_.front().first==timeline)
245          {
246            currentBufferSize_=bufferResize_.front().second ;
247            info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
248            bufferResize_.pop_front() ;
249            newBuffer(currentBufferSize_,fixed_) ;
250          }
251        }
252
253        for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
254        if (transferedSize+eventSize<=remain)
255        {
256          for(auto& bloc : blocs) 
257          {
258            int win=get<2>(bloc) ;
259            if (windowsLocked_.count(win)==0) 
260            {
261              MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
262              windowsLocked_.insert(win) ;
263            }
264          }
265        }
266      }
267      while(transferedSize+eventSize<=remain) ;
268     
269    }
270  }
271 
272  void COneSidedServerBuffer::transferEvent(void)
273  {
274    MPI_Aint addr;
275    MPI_Aint offset ;
276
277    size_t size;
278    size_t start;
279    size_t count;
280    int window ;
281
282    auto& blocs=pendingBlocs_.begin()->second ;
283    size_t timeline = pendingBlocs_.begin() -> first ;
284 
285
286    for(auto& bloc : blocs)
287    {
288      addr = std::get<0>(bloc) ;
289      size = std::get<1>(bloc) ;
290      window = std::get<2>(bloc) ;
291
292      offset=0 ;
293
294      do
295      {
296        if (currentBuffer_!=nullptr)
297        {
298          currentBuffer_->reserve(size, start, count) ;
299     
300          if ( count > 0)
301          {
302            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
303            offset=MPI_Aint_add(offset, count) ;
304          }
305          currentBuffer_->reserve(size, start, count) ;
306     
307          if ( count > 0)
308          {
309            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
310            offset=MPI_Aint_add(offset, count) ;
311          }
312        }
313
314        if (size>0) 
315        {
316          if (fixed_) newBuffer(fixedSize_,fixed_) ;
317          else
318          {
319            currentBufferSize_ = std::max((size_t)(currentBufferSize_*growingFactor_), size) ;
320            newBuffer(currentBufferSize_,fixed_) ;
321          }
322        }
323      } while (size > 0 ) ;
324    }
325
326    pendingRmaStatus_.resize(pendingRmaRequests_.size()) ;
327  }
328
329  void COneSidedServerBuffer::transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window)
330  {
331    MPI_Request request ;
332    MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ;
333    info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<endl ;
334    info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1)
335             <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ;
336    MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ;
337    pendingRmaRequests_.push_back(request) ;
338    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ;
339  }
340
341  void COneSidedServerBuffer::fillEventServer(size_t timeline, CEventServer& event)
342  {
343    auto &completedEvent=completedEvents_[timeline] ;
344    size_t size=0 ;
345    for(auto& bloc : completedEvent) size+=bloc.count ;
346    char* buffer = new char[size] ;
347    size=0 ;
348   
349    ostringstream outStr ;
350    outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline<<"  nbBlocs="<<completedEvent.size()<<endl ;
351    int i=0 ;
352    MPI_Aint addr ;
353    for(auto& bloc : completedEvent) 
354    {
355      memcpy(&buffer[size], bloc.buffer->getBuffer()+bloc.start, bloc.count) ;
356     
357      if (info.isActive(logProtocol))
358      {
359        size_t checksum=0 ;
360        for(size_t j=0;j<bloc.count;j++) checksum += (unsigned char) buffer[size+j] ;
361        outStr<<"bloc "<<i<<"  count="<<bloc.count<<" checksum="<<checksum<<"  ;  " ;
362        i++ ;
363      }
364
365      size+=bloc.count ;
366      bloc.buffer->free(bloc.start, bloc.count) ; // free bloc
367      addr=bloc.addr ;
368      if (bloc.buffer->getCount()==0) if (buffers_.size() > 1) buffers_.pop_front() ; // if buffer is empty free buffer
369    }
370    event.push(clientRank_, nullptr, buffer, size) ;
371    if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<<event.getNbSender() ;
372    info(logProtocol)<<outStr.str()<<endl ;
373   
374    lastBlocToFree_=addr ;
375    /*
376    if (!isLocked_) MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, window_) ;
377    MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR) ;
378    MPI_Put(&addr, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, window_) ;
379    if (!isLocked_) MPI_Win_unlock(windowRank_,window_) ;
380   */
381    completedEvents_.erase(timeline) ;
382    eventLoop() ;
383  }
384
385 
386
387}
Note: See TracBrowser for help on using the repository browser.