source: XIOS/dev/dev_ym/XIOS_COUPLING/src/transport/one_sided_server_buffer.hpp @ 2343

Last change on this file since 2343 was 2343, checked in by ymipsl, 2 years ago
  • Implement new infrastructure for transfert protocol.
  • new purelly one sided protocol is now available, the previous protocol (legacy, mix send/recv and one sided) is still available. Other specific protocol could be implemented more easilly in future.
  • switch can be operate with "transport_protocol" variable in XIOS context :

ex:
<variable id="transport_protocol" type="string">one_sided</variable>

Available protocols are : one_sided, legacy or default. The default protocol is "legacy".

YM

  • Property svn:executable set to *
File size: 5.0 KB
Line 
1#ifndef __ONE_SIDED_SERVER_BUFFER_HPP__
2#define __ONE_SIDED_SERVER_BUFFER_HPP__
3
4#include "xios_spl.hpp"
5#include "buffer_out.hpp"
6#include "mpi.hpp"
7#include "cxios.hpp"
8#include "event_server.hpp"
9#include "one_sided_cs_buffer_base.hpp"
10#include "one_sided_server_base.hpp"
11
12namespace xios
13{
14  extern CLogType logProtocol ;
15
16  class COneSidedServerBuffer : public COneSidedCSBufferBase, public COneSidedServerBase
17  {
18    private :
19
20      class CBuffer
21      {
22        char* buffer_ ;
23        size_t start_ ;
24        size_t end_ ;
25        size_t count_ ;
26        size_t size_ ;
27        bool fixed_ ;
28
29        public:
30          CBuffer(size_t size, bool fixed) : start_(size), end_(0), count_(0), size_(size), fixed_(fixed) 
31          { 
32            MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer_) ;
33            info(logProtocol)<<"New buffer of size="<<size<<endl ;
34          }
35          ~CBuffer() 
36          { 
37            if (count_>0) ERROR("COneSidedServerBuffer::~CBuffer()",<<"Try to delete buffer that is not empty"<<std::endl) ;
38            MPI_Free_mem(&buffer_) ;
39          }
40       
41          void reserve(size_t& size, size_t& start, size_t& count)
42          {
43            count = 0 ;
44            if (end_ < start_)
45            {
46              if (start_-end_ >= size)
47              {
48                count=size ;
49                size = 0 ;
50                start=end_ ;
51                end_  += count ; 
52                count_+= count ;
53              } 
54              else
55              {
56                count = start_-end_ ;
57                size -= count ;
58                start=end_ ;
59                end_ = start_ ;
60                count_+=count ;
61              }
62            }
63            else if ( end_> start_ )
64            {
65              if (size_-end_ >= size)
66              {
67                count = size ;
68                size = 0;
69                start=end_ ;
70                end_   += count ;
71                count_ += count ;
72              }
73              else
74              {
75                count = size_ - end_ ;
76                size -= count ;
77                start=end_ ;
78                end_ = 0 ;
79                count_+= count ;
80              }
81            }
82            else if (end_==start_)
83            {
84              count = 0 ;
85            }
86          }
87
88          void free(size_t start, size_t count)
89          {
90            start_ = start+count-1 ;
91            count_ -= count ;
92          }
93
94          size_t remain(void) { return size_-count_; }
95          size_t getSize(void) { return size_ ;}
96          size_t getCount(void) {return count_ ;}
97          size_t isFixed(void) {return fixed_;}
98          char* getBuffer(void) {return buffer_ ;}
99      } ;
100     
101    public:
102   
103      COneSidedServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
104                             map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer)  ;
105     
106      void receivedRequest(vector<char>& buffer) ;
107      void eventLoop(void) ;
108      void fillEventServer(size_t timeline, CEventServer& event) ;
109      void notifyClientFinalize(void);
110
111    private:
112      struct SBloc
113      {
114        CBuffer* buffer ;
115        size_t start ;
116        int count ;
117        MPI_Aint addr ;
118      } ;
119
120      void createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged) ;
121      void newBuffer(size_t size, bool fixed) { buffers_.push_back(new CBuffer(size, fixed)); currentBuffer_=buffers_.back() ;}
122      void testPendingRequests(void) ;
123      void transferEvents(void) ;
124      void transferEvent(void) ;
125      void transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window) ;
126      size_t remainSize(void) ;
127
128
129
130      bool fixed_=false;
131      size_t fixedSize_ = 0 ;
132      size_t currentBufferSize_=0 ;
133      double growingFactor_ = 2. ;
134     
135      std::list<CBuffer*> buffers_ ;
136      CBuffer* currentBuffer_=nullptr ;
137
138      map<size_t, SPendingEvent>& pendingFullEvents_ ;
139      map<size_t, SPendingEvent>& completedFullEvents_ ;
140
141      map<size_t, int> nbSenders_ ;
142      map<size_t, list<tuple<MPI_Aint,int,int>>> pendingBlocs_;
143     
144      vector<MPI_Request> pendingRmaRequests_ ;
145      vector<MPI_Status> pendingRmaStatus_ ;
146
147      map<size_t, list<SBloc>> onTransferEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
148      map<size_t, list<SBloc>> completedEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
149      list<std::pair<size_t,size_t>> bufferResize_ ; // list<size_t AssociatedTimeline, size_t newSize>
150
151      int clientRank_ ;
152      MPI_Aint * control_ ;
153      MPI_Aint controlAddr_ ;
154
155      MPI_Comm winComm_ ;
156      vector<MPI_Win> windows_ ;
157      int maxWindows_ ;
158      set<int> windowsLocked_ ;
159
160      MPI_Win winControl_ ;
161      bool isLocked_=false ;
162      const int windowRank_=0 ;
163      MPI_Aint lastBlocToFree_=0 ;
164
165  } ;
166
167}
168
169
170#endif
Note: See TracBrowser for help on using the repository browser.