#include "p2p_client_buffer.hpp" #include "event_client.hpp" #include "timer.hpp" namespace xios { extern CLogType logProtocol; CP2pClientBuffer::CP2pClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) : interComm_(interComm), serverRank_(serverRank), interCommMerged_(interCommMerged), intraServerRank_(intraServerRank) { //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ; //control_[CONTROL_ADDR] = 0 ; //control_[CONTROL_FINALIZE] = 0 ; sendNewBuffer() ; createWindow(commSelf, interCommMerged, intraServerRank ) ; char dummy ; MPI_Irecv(&dummy, 0, MPI_CHAR, intraServerRank, 22, interCommMerged, &finalizeRequest_) ; } void CP2pClientBuffer::createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank ) { CTimer::get("create Windows").resume() ; //MPI_Comm interComm ; //MPI_Intercomm_create(commSelf, 0, interCommMerged, intraServerRank, 0, &interComm) ; //MPI_Intercomm_merge(interComm, false, &winComm_) ; //int rank ; //MPI_Comm_rank(winComm_,&rank) ; //info(logProtocol)<<"Windows rank="<mpiRequest, &flag, &status) ; if (flag==false) { // ++it ; break ; } else addr = it->addr; } if (addr!=0) sentBlocRequest_.erase(sentBlocRequest_.begin(), it) ; freeBuffer(addr) ; // if (finalize==1) isFinalized_=true ; listenFinalize() ; } void CP2pClientBuffer::listenFinalize(void) { if (!isFinalized_) { int flag ; MPI_Status status ; MPI_Test(&finalizeRequest_,&flag, &status) ; if (flag) isFinalized_=true; } } void CP2pClientBuffer::sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs) { ostringstream outStr ; SRequest request ; request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int))*nbBlocs) ; *(request.buffer)<addr ; MPI_Issend((void*)(it->addr), it->count, MPI_CHAR, intraServerRank_, 21, interCommMerged_, &sentBlocRequest_.back().mpiRequest) ; } if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ; MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ; if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ; info(logProtocol)<start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ; requests_.push_back(request) ; } void CP2pClientBuffer::sendNewBuffer(void) { MPI_Aint controlAddr ; // MPI_Get_address(control_, &controlAddr) ; MPI_Send(&controlAddr, 1, MPI_AINT, intraServerRank_, 20, interCommMerged_) ; } }