#include "one_sided_server_buffer.hpp" #include "xios_spl.hpp" #include "mpi.hpp" #include "timer.hpp" #include "buffer_in.hpp" namespace xios { CLogType logProtocol("log_protocol") ; COneSidedServerBuffer::COneSidedServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map& pendingEvents, map& completedEvents, vector& buffer) : clientRank_(clientRank), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents) { MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ; CBufferIn bufferIn(buffer.data(),buffer.size()) ; bufferIn >> controlAddr_; createWindow(commSelf, interCommMerged) ; } void COneSidedServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged) { CTimer::get("create Windows").resume() ; MPI_Comm interComm ; MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ; MPI_Intercomm_merge(interComm, true, &winComm_) ; CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ; MPI_Comm_free(&interComm) ; maxWindows_=MAX_WINDOWS ; windows_.resize(maxWindows_) ; for(int i=0;i& buffer) { size_t timeline ; int nbSenders ; CBufferIn bufferIn(buffer.data(),buffer.size()) ; bufferIn >> timeline ; if (timeline==EVENT_BUFFER_RESIZE) { size_t AssociatedTimeline ; size_t newSize ; bufferIn >>AssociatedTimeline>>newSize ; bufferResize_.push_back({AssociatedTimeline,newSize}) ; } else // receive standard event { bufferIn>> nbSenders ; nbSenders_[timeline] = nbSenders ; auto pendingFullEvent=pendingFullEvents_.find(timeline) ; if (pendingFullEvent==pendingFullEvents_.end()) { SPendingEvent pendingEvent = {nbSenders,1,{this}} ; pendingFullEvents_[timeline]=pendingEvent ; } else { pendingFullEvent->second.currentNbSenders++ ; pendingFullEvent->second.buffers.push_back(this) ; } int nbBlocs ; int count ; int window ; bufferIn >> nbBlocs ; MPI_Aint bloc ; auto& blocs = pendingBlocs_[timeline] ; for(int i=0;i> bloc >> count >> window; blocs.push_back({bloc, count, window}) ; } } } void COneSidedServerBuffer::eventLoop(void) { int flag ; if (!pendingRmaRequests_.empty()) testPendingRequests() ; if (pendingRmaRequests_.empty()) transferEvents() ; if (!isLocked_) { if (lastBlocToFree_!=0) { info(logProtocol)<<"Send bloc to free : "<1) if (buffers_.front()->getCount()==0) buffers_.pop_front() ; // if buffer is empty free buffer } void COneSidedServerBuffer::notifyClientFinalize(void) { eventLoop() ; // to free the last bloc MPI_Aint finalize=1 ; MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ; MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ; MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ; MPI_Win_unlock(windowRank_,winControl_) ; } void COneSidedServerBuffer::testPendingRequests(void) { if (!pendingRmaRequests_.empty()) { int flag ; MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ; if (flag==true) { if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked"); for(auto& win : windowsLocked_) { info(logProtocol)<<"unlock window "<second.nbSenders-- ; pendingFullEvent->second.currentNbSenders-- ; auto completedFullEvent=completedFullEvents_.find(timeline) ; if (completedFullEvent==completedFullEvents_.end()) { SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ; completedFullEvents_[timeline]=pendingEvent ; } else { completedFullEvent->second.currentNbSenders++ ; completedFullEvent->second.buffers.push_back(this) ; } nbSenders_.erase(timeline) ; } onTransferEvents_.clear() ; } } } size_t COneSidedServerBuffer::remainSize(void) { if (!fixed_) return std::numeric_limits::max() ; else { if (currentBuffer_ == nullptr) return fixedSize_ ; else return currentBuffer_->remain() ; } } void COneSidedServerBuffer::transferEvents(void) { if (pendingRmaRequests_.empty() && !pendingBlocs_.empty()) { size_t remain=remainSize() ; size_t transferedSize=0 ; size_t timeline = pendingBlocs_.begin()->first ; auto& blocs = pendingBlocs_.begin()->second ; if (!bufferResize_.empty()) { if (bufferResize_.front().first==timeline) { currentBufferSize_=bufferResize_.front().second ; info(logProtocol)<<"Received new buffer size="<(bloc) ; if (windowsLocked_.count(win)==0) { MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ; windowsLocked_.insert(win) ; } } isLocked_=true ; do { transferEvent() ; // ok enough storage for this bloc transferedSize += eventSize ; pendingBlocs_.erase(pendingBlocs_.begin()) ; // break ; // transfering just one event temporary => to remove if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop timeline = pendingBlocs_.begin()->first ; auto& blocs=pendingBlocs_.begin()->second ; if (!bufferResize_.empty()) { if (bufferResize_.front().first==timeline) { currentBufferSize_=bufferResize_.front().second ; info(logProtocol)<<"Received new buffer size="<free(bloc.start, bloc.count) ; // free bloc addr=bloc.addr ; if (bloc.buffer->getCount()==0) if (buffers_.size() > 1) buffers_.pop_front() ; // if buffer is empty free buffer } event.push(clientRank_, nullptr, buffer, size) ; if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<