#include "xios_spl.hpp" #include "one_sided_context_client.hpp" #include "context_server.hpp" #include "event_client.hpp" #include "buffer_out.hpp" #include "type.hpp" #include "event_client.hpp" #include "context.hpp" #include "mpi.hpp" #include "timer.hpp" #include "cxios.hpp" #include "server.hpp" #include "services.hpp" #include #include #include namespace xios { /*! \param [in] parent Pointer to context on client side \param [in] intraComm_ communicator of group client \param [in] interComm_ communicator of group server \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode). */ COneSidedContextClient::COneSidedContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) : CContextClient(parent, intraComm_, interComm_, cxtSer), mapBufferSize_(), maxBufferedEvents(4) { pureOneSided=CXios::getin("pure_one_sided",false); // pure one sided communication (for test) if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows timeLine = 1; } /*! In case of attached mode, the current context must be reset to context for client \param [in] event Event sent to server */ void COneSidedContextClient::sendEvent(CEventClient& event) { list ranks = event.getRanks(); // ostringstream str ; // for(auto& rank : ranks) str<getId()<<" for ranks : "<second->eventLoop() ; double time=CTimer::getTime() ; bool succed = itBuffer->second->writeEvent(timeLine, event) ; if (succed) { time=CTimer::getTime()-time ; if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").minus(time) ; } if (succed) event.remove() ; else event.next() ; if (event.isFirst()) { if (CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").resume() ; callGlobalEventLoop() ; } } if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").suspend() ; if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode { while (checkBuffers(ranks)) callGlobalEventLoop() ; CXios::getDaemonsManager()->scheduleContext(hashId_) ; while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ; } timeLine++; } void COneSidedContextClient::eventLoop(void) { if (!locked_) checkBuffers() ; } void COneSidedContextClient::callGlobalEventLoop(void) { locked_=true ; context_->globalEventLoop() ; locked_=false ; } /*! Make a new buffer for a certain connection to server with specific rank \param [in] rank rank of connected server */ void COneSidedContextClient::newBuffer(int rank) { if (!mapBufferSize_.count(rank)) { error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl; mapBufferSize_[rank] = CXios::minBufferSize; maxEventSizes[rank] = CXios::minBufferSize; } COneSidedClientBuffer* buffer = buffers[rank] = new COneSidedClientBuffer(interComm, rank, commSelf_, interCommMerged_, clientSize+rank ); if (isGrowableBuffer_) { buffer->setGrowable(growingFactor_) ; } else buffer->setFixed(mapBufferSize_[rank]) ; } /*! Verify state of buffers. Buffer is under pending state if there is no message on it \return state of buffers, pending(true), ready(false) */ bool COneSidedContextClient::checkBuffers(void) { bool pending = false; for (auto itBuff : buffers) { itBuff.second->eventLoop() ; pending |= itBuff.second->isEmpty(); } return pending; } //! Release all buffers void COneSidedContextClient::releaseBuffers() { for (auto& itBuff : buffers) delete itBuff.second; buffers.clear(); } /*! Verify state of buffers corresponding to a connection \param [in] ranks list rank of server to which client connects to \return state of buffers, pending(true), ready(false) */ bool COneSidedContextClient::checkBuffers(list& ranks) { bool pending = false; for (auto& rank : ranks) { buffers[rank]->eventLoop() ; pending |= buffers[rank]->isEmpty() ; } return pending; } /*! * Set the buffer size for each connection. Warning: This function is collective. * * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event */ void COneSidedContextClient::setBufferSize(const std::map& mapSize) { setFixedBuffer() ; for(auto& it : mapSize) { size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) * 8 ; // double mapBufferSize_[it.first]=size ; if (buffers.count(it.first)>0) buffers[it.first]->setFixed(size); } } /*! * Finalize context client and do some reports. Function is non-blocking. */ void COneSidedContextClient::finalize(void) { bool stop = false; int* nbServerConnectionLocal = new int[serverSize] ; int* nbServerConnectionGlobal = new int[serverSize] ; for(int i=0;ifirst]=1 ; for (auto ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++) nbServerConnectionLocal[*ItServerLeader]=1 ; MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm); CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); CMessage msg; for (int i=0;i::const_iterator itbMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end(), itMap; StdSize totalBuf = 0; for (itMap = itbMap; itMap != iteMap; ++itMap) { report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; totalBuf += itMap->second; } report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; } /*! */ bool COneSidedContextClient::havePendingRequests(void) { return checkBuffers(); } bool COneSidedContextClient::havePendingRequests(list& ranks) { return checkBuffers(ranks) ; } bool COneSidedContextClient::isNotifiedFinalized(void) { if (isAttachedModeEnabled()) return true ; bool finalized = true; for (auto& it : buffers ) finalized &= it.second->isNotifiedFinalized(); return finalized; } }