#ifndef __ONE_SIDED_SERVER_BUFFER_HPP__ #define __ONE_SIDED_SERVER_BUFFER_HPP__ #include "xios_spl.hpp" #include "buffer_out.hpp" #include "mpi.hpp" #include "cxios.hpp" #include "event_server.hpp" #include "one_sided_cs_buffer_base.hpp" #include "one_sided_server_base.hpp" namespace xios { extern CLogType logProtocol ; class COneSidedServerBuffer : public COneSidedCSBufferBase, public COneSidedServerBase { private : class CBuffer { char* buffer_ ; size_t start_ ; size_t end_ ; size_t count_ ; size_t size_ ; bool fixed_ ; public: CBuffer(size_t size, bool fixed) : start_(size), end_(0), count_(0), size_(size), fixed_(fixed) { MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer_) ; info(logProtocol)<<"New buffer of size="<0) ERROR("COneSidedServerBuffer::~CBuffer()",<<"Try to delete buffer that is not empty"<= size) { count=size ; size = 0 ; start=end_ ; end_ += count ; count_+= count ; } else { count = start_-end_ ; size -= count ; start=end_ ; end_ = start_ ; count_+=count ; } } else if ( end_> start_ ) { if (size_-end_ >= size) { count = size ; size = 0; start=end_ ; end_ += count ; count_ += count ; } else { count = size_ - end_ ; size -= count ; start=end_ ; end_ = 0 ; count_+= count ; } } else if (end_==start_) { count = 0 ; } } void free(size_t start, size_t count) { start_ = start+count-1 ; count_ -= count ; } size_t remain(void) { return size_-count_; } size_t getSize(void) { return size_ ;} size_t getCount(void) {return count_ ;} size_t isFixed(void) {return fixed_;} char* getBuffer(void) {return buffer_ ;} } ; public: COneSidedServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map& pendingEvents, map& completedEvents, vector& buffer) ; void receivedRequest(vector& buffer) ; void eventLoop(void) ; void fillEventServer(size_t timeline, CEventServer& event) ; void notifyClientFinalize(void); private: struct SBloc { CBuffer* buffer ; size_t start ; int count ; MPI_Aint addr ; } ; void createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged) ; void newBuffer(size_t size, bool fixed) { buffers_.push_back(new CBuffer(size, fixed)); currentBuffer_=buffers_.back() ;} void testPendingRequests(void) ; void transferEvents(void) ; void transferEvent(void) ; void transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window) ; size_t remainSize(void) ; bool fixed_=false; size_t fixedSize_ = 0 ; size_t currentBufferSize_=0 ; double growingFactor_ = 2. ; std::list buffers_ ; CBuffer* currentBuffer_=nullptr ; map& pendingFullEvents_ ; map& completedFullEvents_ ; map nbSenders_ ; map>> pendingBlocs_; vector pendingRmaRequests_ ; vector pendingRmaStatus_ ; map> onTransferEvents_ ; // map>> map> completedEvents_ ; // map>> list> bufferResize_ ; // list int clientRank_ ; MPI_Aint * control_ ; MPI_Aint controlAddr_ ; MPI_Comm winComm_ ; vector windows_ ; int maxWindows_ ; set windowsLocked_ ; MPI_Win winControl_ ; bool isLocked_=false ; const int windowRank_=0 ; MPI_Aint lastBlocToFree_=0 ; } ; } #endif