source: XIOS3/trunk/src/transport/p2p_server_buffer.hpp @ 2556

Last change on this file since 2556 was 2556, checked in by ymipsl, 10 months ago

First version on the point to point transport protocol, activated by the variable : transport_protocol="p2p"

YM

  • Property svn:executable set to *
File size: 5.2 KB
Line 
1#ifndef __P2P_SERVER_BUFFER_HPP__
2#define __P2P_SERVER_BUFFER_HPP__
3
4#include "xios_spl.hpp"
5#include "buffer_out.hpp"
6#include "mpi.hpp"
7#include "cxios.hpp"
8#include "event_server.hpp"
9#include "p2p_cs_buffer_base.hpp"
10#include "p2p_server_base.hpp"
11
12namespace xios
13{
14  extern CLogType logProtocol ;
15
16  class CP2pServerBuffer : public CP2pCSBufferBase, public CP2pServerBase
17  {
18    private :
19
20      class CBuffer
21      {
22        char* buffer_ ;
23        size_t start_ ;
24        size_t end_ ;
25        size_t count_ ;
26        size_t size_ ;
27        bool fixed_ ;
28
29        public:
30          CBuffer(size_t size, bool fixed) : start_(size), end_(0), count_(0), size_(size), fixed_(fixed) 
31          { 
32            MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer_) ;
33            info(logProtocol)<<"New buffer of size="<<size<<endl ;
34          }
35          ~CBuffer() 
36          { 
37            if (count_>0) ERROR("COneSidedServerBuffer::~CBuffer()",<<"Try to delete buffer that is not empty"<<std::endl) ;
38            MPI_Free_mem(&buffer_) ;
39          }
40       
41          void reserve(size_t& size, size_t& start, size_t& count)
42          {
43            count = 0 ;
44            if (end_ < start_)
45            {
46              if (start_-end_ >= size)
47              {
48                count=size ;
49                size = 0 ;
50                start=end_ ;
51                end_  += count ; 
52                count_+= count ;
53              } 
54              else
55              {
56                //count = start_-end_ ;
57                //size -= count ;
58                //start=end_ ;
59                //end_ = start_ ;
60                //count_+=count ;
61                count = 0 ; 
62              }
63            }
64            else if ( end_> start_ )
65            {
66              if (size_-end_ >= size)
67              {
68                count = size ;
69                size = 0;
70                start=end_ ;
71                end_   += count ;
72                count_ += count ;
73              }
74              else
75              {
76                //count = size_ - end_ ;
77                //size -= count ;
78                //start=end_ ;
79                //end_ = 0 ;
80                //count_+= count ;
81                count = 0 ;
82              }
83            }
84            else if (end_==start_)
85            {
86              count = 0 ;
87            }
88          }
89
90          void free(size_t start, size_t count)
91          {
92            start_ = start+count-1 ;
93            count_ -= count ;
94          }
95
96          size_t remain(void) { return size_-count_; }
97          size_t getSize(void) { return size_ ;}
98          size_t getCount(void) {return count_ ;}
99          size_t isFixed(void) {return fixed_;}
100          char* getBuffer(void) {return buffer_ ;}
101      } ;
102     
103    public:
104   
105      CP2pServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
106                             map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer)  ;
107     
108      void receivedRequest(vector<char>& buffer) ;
109      void eventLoop(void) ;
110      void fillEventServer(size_t timeline, CEventServer& event) ;
111      void notifyClientFinalize(void);
112
113    private:
114      struct SBloc
115      {
116        CBuffer* buffer ;
117        size_t start ;
118        int count ;
119        MPI_Aint addr ;
120      } ;
121
122      void createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged) ;
123      void newBuffer(size_t size, bool fixed) { buffers_.push_back(new CBuffer(size, fixed)); currentBuffer_=buffers_.back() ;}
124      void testPendingRequests(void) ;
125      void transferEvents(void) ;
126      void transferEvent(void) ;
127      void transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window) ;
128      size_t remainSize(void) ;
129
130
131
132      bool fixed_=false;
133      size_t fixedSize_ = 0 ;
134      size_t currentBufferSize_=0 ;
135      double growingFactor_ = 2. ;
136      double bufferServerFactor_=10. ;
137     
138      std::list<CBuffer*> buffers_ ;
139      CBuffer* currentBuffer_=nullptr ;
140
141      map<size_t, SPendingEvent>& pendingFullEvents_ ;
142      map<size_t, SPendingEvent>& completedFullEvents_ ;
143
144      map<size_t, int> nbSenders_ ;
145      map<size_t, list<tuple<MPI_Aint,int,int>>> pendingBlocs_;
146     
147      vector<MPI_Request> pendingRmaRequests_ ;
148      vector<MPI_Status> pendingRmaStatus_ ;
149      vector<int> pendingRmaCount_ ;
150
151      map<size_t, list<SBloc>> onTransferEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
152      map<size_t, list<SBloc>> completedEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
153      list<std::pair<size_t,size_t>> bufferResize_ ; // list<size_t AssociatedTimeline, size_t newSize>
154
155      int clientRank_ ;
156      MPI_Aint * control_ ;
157      MPI_Aint controlAddr_ ;
158
159      MPI_Comm interCommMerged_ ;
160      MPI_Comm winComm_ ;
161      vector<MPI_Win> windows_ ;
162      int maxWindows_ ;
163      set<int> windowsLocked_ ;
164
165      MPI_Win winControl_ ;
166      bool isLocked_=false ;
167      const int windowRank_=0 ;
168      MPI_Aint lastBlocToFree_=0 ;
169
170  } ;
171
172}
173
174
175#endif
Note: See TracBrowser for help on using the repository browser.