source: XIOS/dev/dev_ym/XIOS_COUPLING/src/transport/one_sided_client_buffer.cpp @ 2347

Last change on this file since 2347 was 2347, checked in by jderouillat, 2 years ago

Fix new transfer protocol for LLVM compiler

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 8.4 KB
Line 
1#include "one_sided_client_buffer.hpp"
2#include "event_client.hpp"
3#include "timer.hpp"
4
5namespace xios
6{
7 
8  extern CLogType logProtocol;
9
10  COneSidedClientBuffer::COneSidedClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) : interComm_(interComm), serverRank_(serverRank)
11  {
12   
13    MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
14    control_[CONTROL_ADDR] = 0 ;
15    control_[CONTROL_FINALIZE] = 0 ;
16    sendNewBuffer() ;
17    createWindow(commSelf, interCommMerged, intraServerRank ) ;
18  }
19
20  void COneSidedClientBuffer::createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank )
21  {
22    CTimer::get("create Windows").resume() ;
23    MPI_Comm interComm ;
24    MPI_Intercomm_create(commSelf, 0, interCommMerged, intraServerRank, 0, &interComm) ;
25    MPI_Intercomm_merge(interComm, false, &winComm_) ;
26    int rank ;
27    MPI_Comm_rank(winComm_,&rank) ;
28    info(logProtocol)<<"Windows rank="<<rank<<endl ;
29    CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
30    MPI_Comm_free(&interComm) ;
31   
32    maxWindows_=MAX_WINDOWS ;   
33    windows_.resize(maxWindows_) ;
34    usedWindows_.resize(maxWindows_,false) ;
35    for(int i=0;i<maxWindows_;++i) 
36    {
37      MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
38      CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
39    }
40    currentWindow_=-1 ;
41   
42    MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
43    CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
44
45   
46
47    CTimer::get("create Windows").suspend() ;
48
49    MPI_Barrier(winComm_) ;
50    MPI_Win_attach(winControl_, control_, controlSize_*sizeof(MPI_Aint)) ;
51    MPI_Barrier(winComm_) ;
52 //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, winControl_) ;
53 //   MPI_Win_unlock(0,winControl_) ;
54
55  } 
56
57
58
59  void COneSidedClientBuffer::newBuffer(size_t size, bool fixed)
60  { 
61    currentWindow_=(currentWindow_+1)%maxWindows_ ;
62    if (usedWindows_[currentWindow_]) 
63    {
64      ERROR("void COneSidedClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ;
65    }
66    else usedWindows_[currentWindow_]=true ;
67    buffers_.push_back(new CBuffer(windows_[currentWindow_], size, fixed)); 
68    currentBuffer_=buffers_.back() ;
69    info(logProtocol)<<"   Nb attached memory blocs="<<buffers_.size()<<endl ;
70  }
71
72  bool COneSidedClientBuffer::isBufferFree(size_t size)
73  {
74    if (buffers_.size()>maxWindows_-1) return false ; 
75    CBuffer* buffer ;
76    if (buffers_.size()==0) return true ;
77    else if (!fixed) return true ;
78    else 
79    {
80      buffer = buffers_.back() ;
81      if (buffer->remain()>=size) return true ;
82      else
83      {
84        if (buffer->isFixed()) 
85        {
86          if ( size > buffer->getSize()) return true ;
87          else return false ;
88        }
89        else return true ;
90      }
91    }
92  }
93 
94  int COneSidedClientBuffer::writeBuffer(char* buffer, size_t size)
95  {
96    MPI_Aint addr ;
97    size_t start ;
98    size_t count ;
99    int nbBlocs=0 ;
100
101    if (isBufferFree(size))
102    {
103      while (size > 0)
104      {
105        if (buffers_.empty())
106        {
107          if (fixed_) newBuffer(fixedSize_,fixed_) ;
108          else
109          { 
110            if (currentBufferSize_==0) currentBufferSize_=size ;
111            newBuffer(currentBufferSize_, fixed_) ;
112          }
113        }
114        CBuffer* currentBuffer = buffers_.back() ;
115
116        MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, windows_[currentWindow_]) ;
117        currentBuffer->write(&buffer, size, addr, start, count) ;
118        if (count > 0) 
119        {
120          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ;
121          nbBlocs++ ; 
122        }
123
124        currentBuffer->write(&buffer, size, addr, start, count) ;
125        MPI_Win_unlock(0,windows_[currentWindow_]) ;
126
127        if (count > 0) 
128        {
129          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ;
130          nbBlocs++ ; 
131        }
132
133        if (size>0) 
134        {
135          if (fixed_) 
136          {
137            currentBufferSize_ = fixedSize_ ;
138            newBuffer(currentBufferSize_,fixed_) ;
139          }
140          else
141          {
142            currentBufferSize_ = max((size_t)(currentBufferSize_*growingFactor_), size) ;
143            newBuffer(currentBufferSize_,fixed_) ;
144          }
145        } 
146      }     
147      // send message here ?
148      return nbBlocs ;
149    }
150    else return 0 ;
151  }
152
153  void COneSidedClientBuffer::freeBuffer(MPI_Aint addr)
154  {
155//    if (addr != lastFreedBloc_)
156    if (addr != 0)
157    {
158      while(freeBloc(addr)) ;
159//      lastFreedBloc_ = addr ;
160    }
161   
162    if (isFinalized_ && !buffers_.empty() && buffers_.front()->getCount()==0) 
163    {
164      delete buffers_.front() ;
165      buffers_.pop_front() ;
166    }
167  }
168 
169  bool COneSidedClientBuffer::freeBloc(MPI_Aint addr)
170  {
171    SBloc& bloc = blocs_.front() ;
172    bloc.buffer->free(bloc.start, bloc.count) ;
173    if (bloc.buffer->getCount()==0) 
174      if (buffers_.size()>1) 
175      { 
176        usedWindows_[bloc.window]=false ;
177        delete buffers_.front() ;
178        buffers_.pop_front() ;
179      }
180   
181    if (addr != bloc.addr) 
182    {
183      blocs_.pop_front() ;
184      return true ;
185    }
186    else 
187    {
188      blocs_.pop_front() ;
189      return false ;
190    }
191
192  }
193
194  bool COneSidedClientBuffer::writeEvent(size_t timeline, CEventClient& event)
195  {
196    size_t size = event.getSize() ;
197    if (isBufferFree(size))
198    {
199      CBufferOut buffer(size) ;
200      event.send(timeline, size, &buffer) ;
201      size_t bufferSizeBefore = currentBufferSize_ ; 
202      int nbBlocs = writeBuffer((char*)buffer.start(),buffer.count()) ;
203      if (currentBufferSize_!=bufferSizeBefore) sendResizeBufferEvent(timeline,currentBufferSize_) ;
204      sendTimelineEvent(timeline, event.getNbSender(), nbBlocs) ;
205      return true ;
206    }
207    else return false ;
208  }
209
210  void COneSidedClientBuffer::eventLoop(void)
211  {
212    // check to free requests
213    int flag ;
214    bool out = true;
215    SRequest request ; 
216    while (!requests_.empty() && out) 
217    {
218      request = requests_.front() ;
219      MPI_Test(&request.mpiRequest, &flag, MPI_STATUS_IGNORE) ;
220      if (flag==true)
221      {
222        delete request.buffer ;
223        requests_.pop_front() ;
224      }
225      else out=false; 
226    }
227   
228    // check to free blocs
229    MPI_Aint addr ;
230    MPI_Aint finalize ;
231    MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, winControl_) ;
232    addr = control_[CONTROL_ADDR] ;
233    control_[CONTROL_ADDR] = 0 ;
234    finalize = control_[CONTROL_FINALIZE] ;
235    MPI_Win_unlock(0, winControl_) ;
236    freeBuffer(addr) ;
237    if (finalize==1) isFinalized_=true ;
238
239
240  }
241
242  void COneSidedClientBuffer::sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs)
243  {
244    ostringstream outStr ;
245    SRequest request ;
246    request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int))*nbBlocs) ; 
247    *(request.buffer)<<timeline<<nbSenders<<nbBlocs ;
248    if (info.isActive(logProtocol))  outStr<<"New timeline event sent to server rank "<<serverRank_<<" : timeLine="<<timeline<<"  nbSenders="<<nbSenders<<"  nbBlocs="<<nbBlocs<<endl ;
249    auto it = blocs_.end() ;
250    for(int i=0 ; i<nbBlocs; ++i,--it) ;
251    for(int i=0 ; i<nbBlocs; ++i,++it) 
252    {
253      *(request.buffer) << it->addr << it->count << it->window;
254   
255      if (info.isActive(logProtocol))
256      {
257        size_t checksum=0 ;
258        for(size_t j=0;j<it->count;j++) checksum+=((unsigned char*)(it->addr))[j] ;
259        outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  ;  " ;
260      }
261    }
262    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, serverRank_, 20, interComm_, &request.mpiRequest ) ;
263    info(logProtocol)<<outStr.str()<<endl ;
264    requests_.push_back(request) ;
265  }
266
267  void COneSidedClientBuffer::sendResizeBufferEvent(size_t timeline, size_t size)
268  {
269    SRequest request ;
270    request.buffer = new CBufferOut(sizeof(EVENT_BUFFER_RESIZE)+sizeof(timeline)+sizeof(size)) ; 
271    *(request.buffer)<<EVENT_BUFFER_RESIZE<<timeline<<size ;
272    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, serverRank_, 20, interComm_, &request.mpiRequest ) ;
273    requests_.push_back(request) ;
274  }
275
276  void COneSidedClientBuffer::sendNewBuffer(void)
277  {
278    MPI_Aint controlAddr ;
279    MPI_Get_address(control_, &controlAddr) ;
280    MPI_Send(&controlAddr, 1, MPI_AINT, serverRank_, 20, interComm_) ;
281  }
282
283}
Note: See TracBrowser for help on using the repository browser.