#include "p2p_server_buffer.hpp" #include "xios_spl.hpp" #include "mpi.hpp" #include "timer.hpp" #include "buffer_in.hpp" namespace xios { extern CLogType logProtocol ; CP2pServerBuffer::CP2pServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map& pendingEvents, map& completedEvents, vector& buffer) : clientRank_(clientRank), interCommMerged_(interCommMerged), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents) { //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ; //CBufferIn bufferIn(buffer.data(),buffer.size()) ; //bufferIn >> controlAddr_; createWindow(commSelf, interCommMerged) ; } void CP2pServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged) { CTimer::get("create Windows").resume() ; //MPI_Comm interComm ; //MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ; //MPI_Intercomm_merge(interComm, true, &winComm_) ; //CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ; //MPI_Comm_free(&interComm) ; //maxWindows_=MAX_WINDOWS ; //windows_.resize(maxWindows_) ; //for(int i=0;i& buffer) { size_t timeline ; int nbSenders ; CBufferIn bufferIn(buffer.data(),buffer.size()) ; bufferIn >> timeline ; if (timeline==EVENT_BUFFER_RESIZE) { size_t AssociatedTimeline ; size_t newSize ; bufferIn >>AssociatedTimeline>>newSize ; bufferResize_.push_back({AssociatedTimeline,newSize}) ; } else // receive standard event { info(logProtocol)<<"received request from rank : "<> nbSenders ; nbSenders_[timeline] = nbSenders ; auto pendingFullEvent=pendingFullEvents_.find(timeline) ; if (pendingFullEvent==pendingFullEvents_.end()) { SPendingEvent pendingEvent = {nbSenders,1,{this}} ; pendingFullEvents_[timeline]=pendingEvent ; } else { pendingFullEvent->second.currentNbSenders++ ; pendingFullEvent->second.buffers.push_back(this) ; } int nbBlocs ; int count ; int window ; bufferIn >> nbBlocs ; MPI_Aint bloc ; auto& blocs = pendingBlocs_[timeline] ; for(int i=0;i> bloc >> count >> window; blocs.push_back({bloc, count, window}) ; } } } void CP2pServerBuffer::eventLoop(void) { int flag ; if (!pendingRmaRequests_.empty()) testPendingRequests() ; if (pendingRmaRequests_.empty()) transferEvents() ; if (!isLocked_) { if (lastBlocToFree_!=0) { info(logProtocol)<<"Send bloc to free : "<1) if (buffers_.front()->getCount()==0) buffers_.pop_front() ; // if buffer is empty free buffer } void CP2pServerBuffer::notifyClientFinalize(void) { eventLoop() ; // to free the last bloc //MPI_Aint finalize=1 ; //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ; //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ; //MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ; //MPI_Win_unlock(windowRank_,winControl_) ; int dummy ; MPI_Send(&dummy, 0, MPI_CHAR, clientRank_, 22, interCommMerged_) ; } void CP2pServerBuffer::testPendingRequests(void) { if (!pendingRmaRequests_.empty()) { int flag ; if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ; MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ; if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ; if (flag==true) { //if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked"); //for(auto& win : windowsLocked_) //{ // info(logProtocol)<<"unlock window "<second.nbSenders-- ; pendingFullEvent->second.currentNbSenders-- ; auto completedFullEvent=completedFullEvents_.find(timeline) ; if (completedFullEvent==completedFullEvents_.end()) { SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ; completedFullEvents_[timeline]=pendingEvent ; } else { completedFullEvent->second.currentNbSenders++ ; completedFullEvent->second.buffers.push_back(this) ; } nbSenders_.erase(timeline) ; } onTransferEvents_.clear() ; } } } size_t CP2pServerBuffer::remainSize(void) { if (!fixed_) return std::numeric_limits::max() ; else { if (currentBuffer_ == nullptr) return fixedSize_ ; else return currentBuffer_->remain() ; } } void CP2pServerBuffer::transferEvents(void) { if (pendingRmaRequests_.empty() && !pendingBlocs_.empty()) { size_t remain=remainSize() ; size_t transferedSize=0 ; size_t timeline = pendingBlocs_.begin()->first ; auto& blocs = pendingBlocs_.begin()->second ; if (!bufferResize_.empty()) { if (bufferResize_.front().first==timeline) { currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; info(logProtocol)<<"Received new buffer size="<(bloc) ; // if (windowsLocked_.count(win)==0) // { // info(logProtocol)<<"lock window "< to remove if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop timeline = pendingBlocs_.begin()->first ; auto& blocs=pendingBlocs_.begin()->second ; if (!bufferResize_.empty()) { if (bufferResize_.front().first==timeline) { currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; info(logProtocol)<<"Received new buffer size="<second ; size_t timeline = pendingBlocs_.begin() -> first ; for(auto& bloc : blocs) { addr = std::get<0>(bloc) ; size = std::get<1>(bloc) ; window = std::get<2>(bloc) ; offset=0 ; do { if (currentBuffer_!=nullptr) { currentBuffer_->reserve(size, start, count) ; if ( count > 0) { transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ; offset=MPI_Aint_add(offset, count) ; } //currentBuffer_->reserve(size, start, count) ; //if ( count > 0) //{ // transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ; // offset=MPI_Aint_add(offset, count) ; //} } if (size>0) { if (fixed_) newBuffer(std::max(fixedSize_, size),fixed_) ; else { currentBufferSize_ = std::max((size_t)(currentBufferSize_*growingFactor_), size) ; newBuffer(currentBufferSize_,fixed_) ; } } } while (size > 0 ) ; } pendingRmaStatus_.resize(pendingRmaRequests_.size()) ; } void CP2pServerBuffer::transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window) { MPI_Request request ; MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ; if (info.isActive(logProtocol)) { info(logProtocol)<<"receive Bloc from client "<getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ; MPI_Irecv(buffer->getBuffer()+start, count, MPI_CHAR, clientRank_, 21, interCommMerged_, &request) ; if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ; pendingRmaRequests_.push_back(request) ; pendingRmaCount_.push_back(count) ; onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ; } void CP2pServerBuffer::fillEventServer(size_t timeline, CEventServer& event) { auto &completedEvent=completedEvents_[timeline] ; size_t size=0 ; for(auto& bloc : completedEvent) size+=bloc.count ; char* buffer = new char[size] ; size=0 ; ostringstream outStr ; if (info.isActive(logProtocol)) outStr<<"Received Event from client "<free(bloc.start, bloc.count) ; // free bloc addr=bloc.addr ; if (bloc.buffer->getCount()==0) if (buffers_.size() > 1) buffers_.pop_front() ; // if buffer is empty free buffer } event.push(clientRank_, nullptr, buffer, size) ; if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<