source: XIOS3/trunk/src/transport/p2p_server_buffer.hpp

Last change on this file was 2594, checked in by jderouillat, 8 months ago

Update the p2p protocol as a mirror protocol : the servers buffers will strictly mirror (number of buffers, positions of messages in the buffers) the clients buffers. The memory consumption of servers will be capped impplicitly by the clients behavior where the time spent to wait for free buffers could be present again.

  • Property svn:executable set to *
File size: 6.9 KB
Line 
1#ifndef __P2P_SERVER_BUFFER_HPP__
2#define __P2P_SERVER_BUFFER_HPP__
3
4#include "xios_spl.hpp"
5#include "buffer_out.hpp"
6#include "mpi.hpp"
7#include "cxios.hpp"
8#include "event_server.hpp"
9#include "p2p_cs_buffer_base.hpp"
10#include "p2p_server_base.hpp"
11
12namespace xios
13{
14  extern CLogType logProtocol ;
15
16  class CP2pServerBuffer : public CP2pCSBufferBase, public CP2pServerBase
17  {
18    private :
19
20      class CBuffer
21      {
22        char* buffer_ ;
23        size_t start_ ;
24        size_t end_ ;
25        size_t count_ ;
26        size_t size_ ;
27        bool fixed_ ;
28
29        public:
30          CBuffer(size_t size, bool fixed) : start_(size), end_(0), count_(0), size_(size), fixed_(fixed) 
31          { 
32            MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer_) ;
33            info(logProtocol)<<"New buffer of size="<<size<<endl ;
34          }
35          ~CBuffer() 
36          { 
37            if (count_>0) ERROR("COneSidedServerBuffer::~CBuffer()",<<"Try to delete buffer that is not empty"<<std::endl) ;
38            MPI_Free_mem(buffer_) ;
39          }
40       
41          void reserve(size_t& size, size_t& start, size_t& count)
42          {
43            count = 0 ;
44            if (end_ < start_)
45            {
46              if (start_-end_ >= size)
47              {
48                //if (start!=end_) info(logProtocol)<<" CASE 0, recv/computed : " << start << "/" << end_ << endl;
49                count=size ;
50                size = 0 ;
51                start=end_ ;
52                end_  += count ; 
53                count_+= count ;
54              } 
55              else
56              {
57                // Server buffers get blocs from clients, they should not be splitted (mirrors the client buffers)
58                //if (start!=end_) info(logProtocol)<<" CASE 1"<< endl;
59                //info(logProtocol)<<" CASE 1 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl;
60                ERROR("COneSidedServerBuffer::reserve()",<<"This should be the case of the 2nd part of a splitted message"<<std::endl);
61                //count = start_-end_ ;
62                //size -= count ;
63                //start=end_ ;
64                //end_ = start_ ;
65                //count_+=count ;
66                count = 0 ; 
67              }
68            }
69            else if ( end_> start_ )
70            {
71              if (size_-end_ >= size)
72              {
73                //if (start!=end_ ) info(logProtocol)<<" CASE 2, recv/computed : "<< start << "/" << end_ << endl;
74                count = size ;
75                size = 0;
76                start=end_ ;
77                end_   += count ;
78                count_ += count ;
79              }
80              else
81              {
82                // Server buffers get blocs from clients, they should not be splitted (mirrors the client buffers)
83                //     1st part of a splitted message, fill the end of the buffer
84                //     end_ must be set to 0 like on clients
85                //if (start!=end_) info(logProtocol)<<" CASE 3"<< endl;
86                //  info(logProtocol)<<" CASE 3 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl;
87                //count = size_ - end_ ;
88                //size -= count ;
89                //start=end_ ;
90                end_ = 0 ;
91                //count_+= count ;
92                count = 0 ;
93              }
94            }
95            else if (end_==start_)
96            {
97              count = 0 ;
98            }
99          }
100
101          void free(size_t start, size_t count)
102          {
103            start_ = start+count-1 ;
104            count_ -= count ;
105          }
106
107          size_t remain(void) {
108            if (count_==0)
109              return size_;
110            else if (end_<start_)
111              return start_-end_;
112            else
113              return size_-end_;
114          }
115          size_t getSize(void) { return size_ ;}
116          size_t getCount(void) {return count_ ;}
117          size_t isFixed(void) {return fixed_;}
118          char* getBuffer(void) {return buffer_ ;}
119      } ;
120     
121    public:
122   
123      CP2pServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
124                             map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer)  ;
125     
126      ~CP2pServerBuffer()
127      {
128          while (!buffers_.empty()) {
129              delete buffers_.front();
130              buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer
131              countDeletedBuffers_++;
132          }
133      };
134
135      void receivedRequest(vector<char>& buffer) ;
136      void eventLoop(void) ;
137      void fillEventServer(size_t timeline, CEventServer& event) ;
138      void notifyClientFinalize(void);
139
140    private:
141      struct SBloc
142      {
143        CBuffer* buffer ;
144        size_t start ;
145        int count ;
146        MPI_Aint addr ;
147      } ;
148
149      void createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged) ;
150      void newBuffer(size_t size, bool fixed) { buffers_.push_back(new CBuffer(size, fixed)); currentBuffer_=buffers_.back() ;}
151      void testPendingRequests(void) ;
152      void transferEvents(void) ;
153      void transferEvent(void) ;
154      void transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window) ;
155      size_t remainSize(void) ;
156      size_t remainSize(int bufferId) ;
157
158
159
160      bool fixed_=false;
161      size_t fixedSize_ = 0 ;
162      size_t currentBufferSize_=0 ;
163      double growingFactor_ = 1. ;
164      double bufferServerFactor_=1. ;
165     
166      std::vector<CBuffer*> buffers_ ;
167      CBuffer* currentBuffer_=nullptr ;
168
169      map<size_t, SPendingEvent>& pendingFullEvents_ ;
170      map<size_t, SPendingEvent>& completedFullEvents_ ;
171
172      map<size_t, int> nbSenders_ ;
173      map<size_t, list<tuple<MPI_Aint,int,int,size_t>>> pendingBlocs_;
174     
175      vector<MPI_Request> pendingRmaRequests_ ;
176      vector<MPI_Status> pendingRmaStatus_ ;
177      vector<char*> pendingRmaAddr_ ;
178      vector<int> pendingRmaCount_ ;
179
180      map<size_t, list<SBloc>> onTransferEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
181      map<size_t, list<SBloc>> completedEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
182      list<std::pair<size_t,size_t>> bufferResize_ ; // list<size_t AssociatedTimeline, size_t newSize>
183
184      int clientRank_ ;
185      MPI_Aint * control_ ;
186      MPI_Aint controlAddr_ ;
187
188      MPI_Comm interCommMerged_ ;
189      MPI_Comm winComm_ ;
190      vector<MPI_Win> windows_ ;
191      int maxWindows_ ;
192      set<int> windowsLocked_ ;
193
194      MPI_Win winControl_ ;
195      //bool isLocked_=false ;
196      const int windowRank_=0 ;
197      MPI_Aint lastBlocToFree_=0 ;
198      int countDeletedBuffers_;
199
200  } ;
201
202}
203
204
205#endif
Note: See TracBrowser for help on using the repository browser.