source: XIOS3/trunk/src/transport/one_sided_client_buffer.cpp @ 2634

Last change on this file since 2634 was 2628, checked in by jderouillat, 3 months ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 8.9 KB
Line 
1#include "one_sided_client_buffer.hpp"
2#include "event_client.hpp"
3#include "timer.hpp"
4
5namespace xios
6{
7 
8  extern CLogType logProtocol;
9  extern CLogType logTimers ;
10
11  COneSidedClientBuffer::COneSidedClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) : interComm_(interComm), serverRank_(serverRank), interCommMerged_(interCommMerged), intraServerRank_(intraServerRank)
12  {
13   
14    MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
15    control_[CONTROL_ADDR] = 0 ;
16    control_[CONTROL_FINALIZE] = 0 ;
17    sendNewBuffer() ;
18    createWindow(commSelf, interCommMerged, intraServerRank ) ;
19  }
20
21  void COneSidedClientBuffer::createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank )
22  {
23    if (info.isActive(logTimers)) CTimer::get("create Windows").resume() ;
24    MPI_Comm interComm ;
25    xios::MPI_Intercomm_create(commSelf, 0, interCommMerged, intraServerRank, 0, &interComm) ;
26    xios::MPI_Intercomm_merge(interComm, false, &winComm_) ;
27    int rank ;
28    MPI_Comm_rank(winComm_,&rank) ;
29    info(logProtocol)<<"Windows rank="<<rank<<endl ;
30    CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
31    xios::MPI_Comm_free(&interComm) ;
32   
33    maxWindows_=MAX_WINDOWS ;   
34    windows_.resize(maxWindows_) ;
35    usedWindows_.resize(maxWindows_,false) ;
36    for(int i=0;i<maxWindows_;++i) 
37    {
38      MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
39      CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
40    }
41    currentWindow_=-1 ;
42   
43    MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
44    CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
45
46    MPI_Barrier(winComm_) ;
47    MPI_Win_attach(winControl_, control_, controlSize_*sizeof(MPI_Aint)) ;
48    MPI_Barrier(winComm_) ;
49    if (info.isActive(logTimers)) CTimer::get("create Windows").suspend() ;
50 
51 //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, winControl_) ;
52 //   MPI_Win_unlock(0,winControl_) ;
53
54  } 
55
56
57
58  void COneSidedClientBuffer::newBuffer(size_t size, bool fixed)
59  { 
60    currentWindow_=(currentWindow_+1)%maxWindows_ ;
61    if (usedWindows_[currentWindow_]) 
62    {
63      ERROR("void COneSidedClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ;
64    }
65    else usedWindows_[currentWindow_]=true ;
66    buffers_.push_back(new CBuffer(windows_[currentWindow_], size, fixed)); 
67    currentBuffer_=buffers_.back() ;
68    info(logProtocol)<<"   Nb attached memory blocs="<<buffers_.size()<<endl ;
69  }
70
71  bool COneSidedClientBuffer::isBufferFree(size_t size)
72  {
73    if (buffers_.size()>maxWindows_-1) return false ; 
74    CBuffer* buffer ;
75    if (buffers_.size()==0) return true ;
76    else if (!fixed_) return true ;
77    else 
78    {
79      buffer = buffers_.back() ;
80      if (buffer->remain()>=size) return true ;
81      else
82      {
83        if (buffer->isFixed()) 
84        {
85          if ( size > buffer->getSize()) return true ;
86          else return false ;
87        }
88        else return true ;
89      }
90    }
91  }
92 
93  int COneSidedClientBuffer::writeBuffer(char* buffer, size_t size)
94  {
95    MPI_Aint addr ;
96    size_t start ;
97    size_t count ;
98    int nbBlocs=0 ;
99
100    if (isBufferFree(size))
101    {
102      while (size > 0)
103      {
104        if (buffers_.empty())
105        {
106          if (fixed_) newBuffer(fixedSize_,fixed_) ;
107          else
108          { 
109            if (currentBufferSize_==0) currentBufferSize_=size ;
110            newBuffer(currentBufferSize_, fixed_) ;
111          }
112        }
113        CBuffer* currentBuffer = buffers_.back() ;
114
115        MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, windows_[currentWindow_]) ;
116        currentBuffer->write(&buffer, size, addr, start, count) ;
117        if (count > 0) 
118        {
119          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ;
120          nbBlocs++ ; 
121        }
122
123        currentBuffer->write(&buffer, size, addr, start, count) ;
124        MPI_Win_unlock(0,windows_[currentWindow_]) ;
125
126        if (count > 0) 
127        {
128          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ;
129          nbBlocs++ ; 
130        }
131
132        if (size>0) 
133        {
134          if (fixed_) 
135          {
136            currentBufferSize_ = fixedSize_ ;
137            newBuffer(currentBufferSize_,fixed_) ;
138          }
139          else
140          {
141            currentBufferSize_ = max((size_t)(currentBufferSize_*growingFactor_), size) ;
142            newBuffer(currentBufferSize_,fixed_) ;
143          }
144        } 
145      }     
146      // send message here ?
147      return nbBlocs ;
148    }
149    else return 0 ;
150  }
151
152  void COneSidedClientBuffer::freeBuffer(MPI_Aint addr)
153  {
154//    if (addr != lastFreedBloc_)
155    if (addr != 0)
156    {
157      while(freeBloc(addr)) ;
158//      lastFreedBloc_ = addr ;
159    }
160   
161    if (isFinalized_ && !buffers_.empty() && buffers_.front()->getCount()==0) 
162    {
163      delete buffers_.front() ;
164      buffers_.pop_front() ;
165    }
166  }
167 
168  bool COneSidedClientBuffer::freeBloc(MPI_Aint addr)
169  {
170    SBloc& bloc = blocs_.front() ;
171    bloc.buffer->free(bloc.start, bloc.count) ;
172    if (bloc.buffer->getCount()==0) 
173      if (buffers_.size()>1) 
174      { 
175        usedWindows_[bloc.window]=false ;
176        delete buffers_.front() ;
177        buffers_.pop_front() ;
178      }
179   
180    if (addr != bloc.addr) 
181    {
182      blocs_.pop_front() ;
183      return true ;
184    }
185    else 
186    {
187      blocs_.pop_front() ;
188      return false ;
189    }
190
191  }
192
193  bool COneSidedClientBuffer::writeEvent(size_t timeline, CEventClient& event)
194  {
195    size_t size = event.getSize() ;
196    if (isBufferFree(size))
197    {
198      CBufferOut buffer(size) ;
199      event.send(timeline, size, &buffer) ;
200      size_t bufferSizeBefore = currentBufferSize_ ; 
201      int nbBlocs = writeBuffer((char*)buffer.start(),buffer.count()) ;
202      if (currentBufferSize_!=bufferSizeBefore) sendResizeBufferEvent(timeline,currentBufferSize_) ;
203      sendTimelineEvent(timeline, event.getNbSender(), nbBlocs) ;
204      return true ;
205    }
206    else return false ;
207  }
208
209  void COneSidedClientBuffer::eventLoop(void)
210  {
211    // check to free requests
212    int flag ;
213    bool out = true;
214    SRequest request ; 
215    while (!requests_.empty() && out) 
216    {
217      request = requests_.front() ;
218      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").resume() ;
219      MPI_Test(&request.mpiRequest, &flag, MPI_STATUS_IGNORE) ;
220      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").suspend() ;
221      if (flag==true)
222      {
223        delete request.buffer ;
224        requests_.pop_front() ;
225      }
226      else out=false; 
227    }
228   
229    // check to free blocs
230    MPI_Aint addr ;
231    MPI_Aint finalize ;
232    MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, winControl_) ;
233    addr = control_[CONTROL_ADDR] ;
234    control_[CONTROL_ADDR] = 0 ;
235    finalize = control_[CONTROL_FINALIZE] ;
236    MPI_Win_unlock(0, winControl_) ;
237    freeBuffer(addr) ;
238    if (finalize==1) isFinalized_=true ;
239
240
241  }
242
243  void COneSidedClientBuffer::sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs)
244  {
245    ostringstream outStr ;
246    SRequest request ;
247    request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int))*nbBlocs) ; 
248    *(request.buffer)<<timeline<<nbSenders<<nbBlocs ;
249    if (info.isActive(logProtocol))  outStr<<"New timeline event sent to server rank "<<serverRank_<<" : timeLine="<<timeline<<"  nbSenders="<<nbSenders<<"  nbBlocs="<<nbBlocs<<endl ;
250    auto it = blocs_.end() ;
251    for(int i=0 ; i<nbBlocs; ++i,--it) ;
252    for(int i=0 ; i<nbBlocs; ++i,++it) 
253    {
254      *(request.buffer) << it->addr << it->count << it->window;
255   
256      if (info.isActive(logProtocol))
257      {
258        size_t checksum=0 ;
259        for(size_t j=0;j<it->count;j++) checksum+=((unsigned char*)(it->addr))[j] ;
260        outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  ;  " ;
261      }
262    }
263    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ;
264    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
265    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ;
266    info(logProtocol)<<outStr.str()<<endl ;
267    requests_.push_back(request) ;
268  }
269
270  void COneSidedClientBuffer::sendResizeBufferEvent(size_t timeline, size_t size)
271  {
272    SRequest request ;
273    request.buffer = new CBufferOut(sizeof(EVENT_BUFFER_RESIZE)+sizeof(timeline)+sizeof(size)) ; 
274    *(request.buffer)<<EVENT_BUFFER_RESIZE<<timeline<<size ;
275    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
276    requests_.push_back(request) ;
277  }
278
279  void COneSidedClientBuffer::sendNewBuffer(void)
280  {
281    MPI_Aint controlAddr ;
282    MPI_Get_address(control_, &controlAddr) ;
283    MPI_Send(&controlAddr, 1, MPI_AINT, intraServerRank_, 20, interCommMerged_) ;
284  }
285
286}
Note: See TracBrowser for help on using the repository browser.