source: XIOS3/trunk/src/transport/p2p_client_buffer.hpp

Last change on this file was 2597, checked in by jderouillat, 8 months ago

Bug fix in p2p mirror protocol, the client's buffer remain method need to be implemented as the server's one

  • Property svn:executable set to *
File size: 7.6 KB
Line 
1#ifndef __P2P_CLIENT_BUFFER_HPP__
2#define __P2P_CLIENT_BUFFER_HPP__
3
4#include "xios_spl.hpp"
5#include "buffer_out.hpp"
6#include "mpi.hpp"
7#include "cxios.hpp"
8#include "event_client.hpp"
9#include "p2p_cs_buffer_base.hpp"
10
11namespace xios
12{
13  extern CLogType logProtocol ;
14
15  class CP2pClientBuffer : public CP2pCSBufferBase
16  {
17    class CBuffer
18    {
19      char* buffer_ ;
20      size_t start_ ;
21      size_t end_ ;
22      size_t count_ ;
23      size_t size_ ;
24      bool fixed_ ;
25      char* window_ ;
26
27      public:
28        CBuffer(char* window, size_t size, bool fixed) : start_(size), end_(0), count_(0), size_(size), fixed_(fixed), window_(window) 
29        { 
30          size_t trueSize = (size/8 + 1)*8 + 8 ;  // seems to have some problem with OpenMPi/UCX when trying to tranfer 1 byte
31                                                  // at the end of the buffer. Alignment problem ? Or simple bug ? Seems that allocate
32                                                  // greater buffer solves the problem
33          MPI_Alloc_mem(trueSize, MPI_INFO_NULL, &buffer_) ;
34          info(logProtocol)<<"Attach memory to windows : addr="<<(MPI_Aint)buffer_<<"   count="<<size<<endl ;
35
36          //MPI_Win_attach(window_, buffer_, trueSize) ;
37         
38        }
39        ~CBuffer() 
40        { 
41          if (count_>0) ERROR("COneSidedClientBuffer::~CBuffer()",<<"Try to delete buffer that is not empty"<<std::endl) ;
42          //MPI_Win_detach(window_, buffer_) ;
43          MPI_Free_mem(buffer_) ;
44          info(logProtocol)<<"Detach memory from windows : addr="<<(MPI_Aint)buffer_<<"   count="<<size_<<endl ;
45        }
46       
47        void write(char** buffer, size_t& size, MPI_Aint& addr, size_t& start, size_t& count)
48        {
49          addr = 0 ;
50          count = 0 ;
51          if (end_ < start_)
52          {
53            if (start_-end_ >= size)
54            {
55              //info(logProtocol)<<" CASE 0 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl;
56              count=size ;
57              size = 0 ;
58              start=end_ ;
59              MPI_Get_address(&buffer_[start], &addr) ;
60              end_  += count ; 
61              count_+= count ;
62              memcpy(&buffer_[start], *buffer, count) ;
63              *buffer+=count ;
64            } 
65            else
66            {
67              //info(logProtocol)<<" CASE 1 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl;
68              count = start_-end_ ;
69              size -= count ;
70              start=end_ ;
71              MPI_Get_address(&buffer_[start], &addr) ;
72              end_ = start_ ;
73              count_+=count ;
74              memcpy(&buffer_[start], *buffer, count) ;
75              *buffer+=count ;
76            }
77          }
78          else if ( end_> start_ )
79          {
80            if (size_-end_ >= size)
81            {
82              //info(logProtocol)<<" CASE 2 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl;
83              count = size ;
84              size = 0;
85              start=end_ ;
86              MPI_Get_address(&buffer_[start], &addr) ;
87              end_   += count ;
88              count_ += count ;
89              memcpy(&buffer_[start], *buffer, count) ;
90              *buffer+=count ;
91            }
92            else
93            {
94              //info(logProtocol)<<" CASE 3 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl;
95              count = size_ - end_ ;
96              size -= count ;
97              start=end_ ;
98              MPI_Get_address(&buffer_[start], &addr) ;
99              end_ = 0 ;
100              count_+= count ;
101              memcpy(&buffer_[start], *buffer, count) ;
102              *buffer+=count ;
103            }
104          }
105          else if (end_==start_)
106          {
107            count = 0 ;
108          }
109
110          // check
111
112          if (count!=0)
113          {
114            MPI_Aint startBufferAddr,endBufferAddr ;
115            MPI_Get_address(&buffer_[0], &startBufferAddr) ;
116            MPI_Get_address(&buffer_[size_-1], &endBufferAddr) ;
117
118            if (addr<startBufferAddr || MPI_Aint_add(addr,count-1)>endBufferAddr)
119            {
120              ERROR("CP2pClientBuffer::CBuffer::write(char** buffer, size_t& size, MPI_Aint& addr, size_t& start, size_t& count)",<<" out of bounds"<<std::endl) ;
121            }
122
123          }
124        }
125
126        void free(size_t start, size_t count)
127        {
128          start_ = start+count-1 ;
129          count_ -= count ;
130        }
131
132        size_t remain(void) {
133            if (count_==0)
134              return size_;
135            else if (end_<start_)
136              return start_-end_;
137            else
138              return size_-end_;
139          }
140        size_t getSize(void) { return size_ ;}
141        size_t getCount(void) {return count_ ;}
142        size_t isFixed(void) {return fixed_;}
143    } ;
144 
145    public:
146      CP2pClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) ;
147      void newBuffer(size_t size, bool fixed) ;
148      bool isBufferFree(size_t size) ;
149      int writeBuffer(char* buff, size_t size) ;
150      void freeBuffer(MPI_Aint addr) ;
151      bool freeBloc(MPI_Aint addr) ;
152      bool writeEvent(size_t timeLine, CEventClient& event) ;
153      bool isEmpty(void) { return blocs_.empty() ;}
154      bool isNotifiedFinalized(void) { eventLoop() ; return buffers_.empty() && isFinalized_ ;}
155      void setFixed(size_t size) { fixed_=true ; fixedSize_=size ;}
156      void setGrowable(double growingFactor) { fixed_= false ; growingFactor_=growingFactor;}
157      void setGrowingFactor(double growingFactor) {growingFactor_=growingFactor;}
158      void eventLoop(void) ;
159      void sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs) ;
160      void sendResizeBufferEvent(size_t timeline, size_t currentBufferSize_) ;
161      void sendNewBuffer(void) ;
162      void createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank ) ;
163      void listenFinalize(void) ;
164
165    private :
166     
167      struct SBloc
168      {
169        MPI_Aint addr ;
170        CBuffer* buffer ;
171        size_t start ;
172        int    count ;
173        int    window ;
174      } ;
175
176      struct SRequest
177      {
178        CBufferOut* buffer ;
179        MPI_Request mpiRequest ;
180      } ;
181
182      struct SBlocRequest
183      {
184        MPI_Aint addr ;
185        MPI_Request mpiRequest ;
186      } ;
187     
188      MPI_Aint* control_ ;
189
190      MPI_Comm interComm_; 
191      MPI_Comm winComm_ ;
192
193      //MPI_Win window_ ;
194      //vector<MPI_Win> windows_ ;
195      vector<char*> windows_ ;
196      vector<bool> usedWindows_ ;
197      int currentWindow_ ;
198      int currentMirror_ ;
199      int maxWindows_ ;
200
201      //MPI_Win winControl_ ;
202      int serverRank_ ;
203
204      MPI_Comm interCommMerged_; 
205      int intraServerRank_ ; 
206
207      std::list<CBuffer*> buffers_ ;
208      std::list<SBloc> blocs_ ;
209      std::list<SBloc>::iterator lastBlocEvent_ ;
210      CBuffer* currentBuffer_=nullptr ;
211      std::list<SRequest> requests_ ;
212
213      std::list<SBlocRequest> sentBlocRequest_ ;
214      MPI_Request finalizeRequest_ ;
215
216      bool fixed_=false;
217      size_t fixedSize_ = 0 ;
218      size_t currentBufferSize_= 0  ;
219      double growingFactor_ = 1.2 ; 
220      MPI_Aint lastFreedBloc_=0 ;
221      bool isFinalized_ = false ;
222      int maxSentBlocRequests_ = 10000 ;
223
224  } ;
225
226}
227
228
229#endif
Note: See TracBrowser for help on using the repository browser.