source: XIOS3/trunk/src/transport/p2p_server_buffer.cpp @ 2564

Last change on this file since 2564 was 2564, checked in by jderouillat, 10 months ago

Clean memory leaks

  • Property svn:executable set to *
File size: 16.4 KB
Line 
1#include "p2p_server_buffer.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4#include "timer.hpp"
5#include "buffer_in.hpp"
6
7
8
9namespace xios
10{
11  extern CLogType logProtocol ;
12
13  CP2pServerBuffer::CP2pServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
14                                               map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer) 
15                        : clientRank_(clientRank), interCommMerged_(interCommMerged), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents)
16  {
17    //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
18    //CBufferIn bufferIn(buffer.data(),buffer.size()) ;
19    //bufferIn >> controlAddr_;
20    createWindow(commSelf, interCommMerged) ;
21  }
22
23  void CP2pServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged)
24  {
25    CTimer::get("create Windows").resume() ;
26    //MPI_Comm interComm ;
27    //MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ;
28    //MPI_Intercomm_merge(interComm, true, &winComm_) ;
29    //CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
30    //MPI_Comm_free(&interComm) ;
31   
32    //maxWindows_=MAX_WINDOWS ;
33    //windows_.resize(maxWindows_) ;
34   
35    //for(int i=0;i<maxWindows_;++i)
36    //{
37    //  MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
38    //  CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
39    //}
40    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
41    //CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
42    CTimer::get("create Windows").suspend() ;
43    //MPI_Barrier(winComm_) ;
44    //MPI_Barrier(winComm_) ;
45
46  }
47
48  void CP2pServerBuffer::receivedRequest(vector<char>& buffer)
49  {
50    size_t timeline ;
51    int nbSenders ;
52    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
53    bufferIn >> timeline ;
54    if (timeline==EVENT_BUFFER_RESIZE)
55    {
56      size_t AssociatedTimeline ;
57      size_t newSize ;
58      bufferIn >>AssociatedTimeline>>newSize ;
59      bufferResize_.push_back({AssociatedTimeline,newSize}) ;
60    }
61    else // receive standard event
62    {
63      info(logProtocol)<<"received request from rank : "<<clientRank_<<"  with timeline : "<<timeline
64                                                        <<"   at time : "<<CTimer::get("XIOS server").getTime()<<endl ;
65      bufferIn>> nbSenders ;
66      nbSenders_[timeline] = nbSenders ;
67      auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
68      if (pendingFullEvent==pendingFullEvents_.end()) 
69      {
70        SPendingEvent pendingEvent = {nbSenders,1,{this}} ;
71        pendingFullEvents_[timeline]=pendingEvent ;
72      }
73      else 
74      { 
75        pendingFullEvent->second.currentNbSenders++ ;
76        pendingFullEvent->second.buffers.push_back(this) ;
77      }
78   
79      int nbBlocs ; 
80      int count ;
81      int window ;
82      bufferIn >> nbBlocs ;
83      MPI_Aint bloc ;
84      auto& blocs = pendingBlocs_[timeline] ;
85      for(int i=0;i<nbBlocs;++i) 
86      {
87        bufferIn >> bloc >> count >> window;
88        blocs.push_back({bloc, count, window}) ;
89      }
90    }
91  }
92
93  void CP2pServerBuffer::eventLoop(void)
94  {
95    int flag ;
96    if (!pendingRmaRequests_.empty()) testPendingRequests() ;
97    if (pendingRmaRequests_.empty()) transferEvents() ;
98
99    //if (!isLocked_)
100    //{
101      if (lastBlocToFree_!=0)
102      {
103        info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ;
104        //if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ;
105        //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
106        //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ;
107        //MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
108        //MPI_Win_unlock(windowRank_,winControl_) ;
109        //if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ;
110        lastBlocToFree_ = 0 ;       
111      }
112    //}
113
114    if (buffers_.size()>1) 
115    {
116      if (buffers_.front()->getCount()==0) {
117        delete buffers_.front();
118        buffers_.pop_front() ; // if buffer is empty free buffer
119      }
120    }
121  }
122
123  void CP2pServerBuffer::notifyClientFinalize(void)
124  {
125    eventLoop() ; // to free the last bloc
126    //MPI_Aint finalize=1 ;
127    //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
128    //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ;
129    //MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
130    //MPI_Win_unlock(windowRank_,winControl_) ;
131    int dummy ;
132    MPI_Send(&dummy, 0, MPI_CHAR, clientRank_, 22, interCommMerged_) ;
133  }
134 
135  void CP2pServerBuffer::testPendingRequests(void)
136  {
137    if (!pendingRmaRequests_.empty())
138    {
139      int flag ;   
140
141      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ;
142      MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ;
143      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ;
144     
145      if (flag==true) 
146      {
147        //if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked");
148        //for(auto& win : windowsLocked_)
149        //{
150        //  info(logProtocol)<<"unlock window "<<win<<endl ;
151        //  if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ;
152        //  MPI_Win_unlock(windowRank_,windows_[win]) ;
153        //  if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ;
154        //}
155        //windowsLocked_.clear() ;
156       
157
158        if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ;
159        if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ;
160       
161        size_t transferedSize = 0 ;
162        for(auto& count : pendingRmaCount_) transferedSize+=count ;
163
164        if (info.isActive(logProtocol))
165        {
166          double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ;
167          info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<"  nbBlocs : "<< pendingRmaStatus_.size()
168                           << "  total count = "<<transferedSize<<"  duration : "<<time<<" s"
169                           << "  Bandwith : "<< transferedSize/time<< "byte/s"<<endl ;
170          CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ;
171          for(int i=0;i<pendingRmaAddr_.size();i++)
172          {
173            size_t checksum=0 ;
174            unsigned char* buffer = (unsigned char*) pendingRmaAddr_[i] ;
175            for(size_t j=0;j<pendingRmaCount_[i];j++) checksum += buffer[j] ;
176            info(logProtocol)<<"Bloc transfered to adrr="<<(void*) buffer<<"  count="<<pendingRmaCount_[i]<<"  checksum="<<checksum<<endl ;
177          }
178
179         }
180
181        //isLocked_=false ;
182        pendingRmaRequests_.clear() ;
183        pendingRmaStatus_.clear() ;
184        pendingRmaCount_.clear() ;
185        pendingRmaAddr_.clear() ;
186        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ;
187       
188        for(auto & event : onTransferEvents_) 
189        {
190          size_t timeline = event.first ;
191
192          auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
193          pendingFullEvent->second.nbSenders-- ;
194          pendingFullEvent->second.currentNbSenders-- ;
195         
196
197          auto completedFullEvent=completedFullEvents_.find(timeline) ;
198          if (completedFullEvent==completedFullEvents_.end()) 
199          {
200            SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ;
201            completedFullEvents_[timeline]=pendingEvent ;
202          }
203          else 
204          {
205            completedFullEvent->second.currentNbSenders++ ;
206            completedFullEvent->second.buffers.push_back(this) ;
207          }
208          nbSenders_.erase(timeline) ;
209        } 
210        onTransferEvents_.clear() ;
211      }
212    }
213
214  }
215 
216  size_t CP2pServerBuffer::remainSize(void)
217  {
218    if (!fixed_) return std::numeric_limits<size_t>::max() ;
219    else
220    {
221      if (currentBuffer_ == nullptr) return fixedSize_ ;
222      else return currentBuffer_->remain() ;
223    }
224  }
225
226  void CP2pServerBuffer::transferEvents(void)
227  {
228    if (pendingRmaRequests_.empty() && !pendingBlocs_.empty())
229    {
230      size_t remain=remainSize() ;
231      size_t transferedSize=0 ;
232
233      size_t timeline =  pendingBlocs_.begin()->first ;
234      auto& blocs = pendingBlocs_.begin()->second ;
235     
236      if (!bufferResize_.empty()) 
237      {
238        if (bufferResize_.front().first==timeline)
239        {
240          currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
241          info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
242          bufferResize_.pop_front() ;
243          newBuffer(currentBufferSize_,fixed_) ;
244        }
245      }
246
247      size_t eventSize=0 ;
248      for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
249     
250      if (eventSize > remain) 
251      {
252        if ( eventSize <= currentBufferSize_) return ; // wait for free storage ;
253        else 
254        {
255          if (currentBuffer_==nullptr) remain = eventSize ;
256          else remain = currentBuffer_->remain() + fixedSize_ ;
257        }
258      }
259     
260      //if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked");
261     
262      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ;
263      if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ;
264      //for(auto& bloc : blocs)
265      //{
266      //  int win=get<2>(bloc) ;
267      //  if (windowsLocked_.count(win)==0)
268      //  {
269      //    info(logProtocol)<<"lock window "<<win<<endl ;
270      //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
271      //    MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
272      //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
273      //    windowsLocked_.insert(win) ;
274      //  }
275      //}
276      //isLocked_=true ;
277      do
278      {
279        transferEvent() ; // ok enough storage for this bloc
280       
281        transferedSize += eventSize ;
282        pendingBlocs_.erase(pendingBlocs_.begin()) ;
283       
284        //  break ; // transfering just one event temporary => to remove
285       
286        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop
287
288        timeline =  pendingBlocs_.begin()->first ;
289        auto& blocs=pendingBlocs_.begin()->second ;
290       
291        if (!bufferResize_.empty()) 
292        {
293          if (bufferResize_.front().first==timeline)
294          {
295            currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
296            info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
297            bufferResize_.pop_front() ;
298            newBuffer(currentBufferSize_,fixed_) ;
299          }
300        }
301
302        for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
303        if (transferedSize+eventSize<=remain)
304        {
305          //for(auto& bloc : blocs)
306          //{
307          //  int win=get<2>(bloc) ;
308          //  if (windowsLocked_.count(win)==0)
309          //  {
310          //    info(logProtocol)<<"lock window "<<win<<endl ;
311          //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
312          //    MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
313          //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
314          //    windowsLocked_.insert(win) ;
315          //  }
316          //}
317        }
318      }
319      while(transferedSize+eventSize<=remain) ;
320     
321    }
322  }
323 
324  void CP2pServerBuffer::transferEvent(void)
325  {
326    MPI_Aint addr;
327    MPI_Aint offset ;
328
329    size_t size;
330    size_t start;
331    size_t count;
332    int window ;
333
334    auto& blocs=pendingBlocs_.begin()->second ;
335    size_t timeline = pendingBlocs_.begin() -> first ;
336 
337
338    for(auto& bloc : blocs)
339    {
340      addr = std::get<0>(bloc) ;
341      size = std::get<1>(bloc) ;
342      window = std::get<2>(bloc) ;
343
344      offset=0 ;
345
346      do
347      {
348        if (currentBuffer_!=nullptr)
349        {
350          currentBuffer_->reserve(size, start, count) ;
351     
352          if ( count > 0)
353          {
354            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
355            offset=MPI_Aint_add(offset, count) ;
356          }
357          //currentBuffer_->reserve(size, start, count) ;
358     
359          //if ( count > 0)
360          //{
361          //  transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
362          //  offset=MPI_Aint_add(offset, count) ;
363          //}
364        }
365
366        if (size>0) 
367        {
368          if (fixed_) newBuffer(std::max(fixedSize_, size),fixed_) ;
369          else
370          {
371            currentBufferSize_ = std::max((size_t)(currentBufferSize_*growingFactor_), size) ;
372            newBuffer(currentBufferSize_,fixed_) ;
373          }
374        }
375      } while (size > 0 ) ;
376    }
377
378    pendingRmaStatus_.resize(pendingRmaRequests_.size()) ;
379  }
380
381  void CP2pServerBuffer::transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window)
382  {
383    MPI_Request request ;
384    MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ;
385    if (info.isActive(logProtocol))
386    {
387      info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<endl ;
388      info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1)
389               <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ;
390    }
391    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ;
392    //MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ;
393    MPI_Irecv(buffer->getBuffer()+start, count, MPI_CHAR, clientRank_, 21, interCommMerged_, &request) ;
394    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ;
395    pendingRmaRequests_.push_back(request) ;
396    pendingRmaCount_.push_back(count) ;
397    pendingRmaAddr_.push_back(buffer->getBuffer()+start) ;
398    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ;
399  }
400
401  void CP2pServerBuffer::fillEventServer(size_t timeline, CEventServer& event)
402  {
403    auto &completedEvent=completedEvents_[timeline] ;
404    size_t size=0 ;
405    for(auto& bloc : completedEvent) size+=bloc.count ;
406    char* buffer = new char[size] ;
407    size=0 ;
408   
409    ostringstream outStr ;
410    if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline
411                                          <<"  nbBlocs="<<completedEvent.size()<<endl ;
412    int i=0 ;
413    MPI_Aint addr ;
414    for(auto& bloc : completedEvent) 
415    {
416      memcpy(&buffer[size], bloc.buffer->getBuffer()+bloc.start, bloc.count) ;
417     
418      if (info.isActive(logProtocol))
419      {
420        size_t checksum=0 ;
421        for(size_t j=0;j<bloc.count;j++) checksum += (unsigned char) buffer[size+j] ;
422        outStr<<"bloc "<<i<<"  count="<<bloc.count<<" checksum="<<checksum<<"  ;  " ;
423        i++ ;
424      }
425
426      size+=bloc.count ;
427      bloc.buffer->free(bloc.start, bloc.count) ; // free bloc
428      addr=bloc.addr ;
429      if (bloc.buffer->getCount()==0)
430      {
431        if (buffers_.size() > 1)
432        {
433          delete buffers_.front();
434          buffers_.pop_front() ; // if buffer is empty free buffer
435        }
436      }
437    }
438    event.push(clientRank_, nullptr, buffer, size) ;
439    if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<<event.getNbSender() ;
440    info(logProtocol)<<outStr.str()<<endl ;
441   
442    lastBlocToFree_=addr ;
443
444    completedEvents_.erase(timeline) ;
445    eventLoop() ;
446  }
447
448 
449
450}
Note: See TracBrowser for help on using the repository browser.