#include "xios_spl.hpp" #include "exception.hpp" #include "log.hpp" #include "buffer_out.hpp" #include "buffer_client.hpp" #include "cxios.hpp" #include "mpi.hpp" #include "tracer.hpp" #include "timeline_events.hpp" namespace xios { size_t CClientBuffer::maxRequestSize = 0; CClientBuffer::CClientBuffer(MPI_Comm interComm, vector& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) : interComm(interComm) , clientRank_(clientRank) , serverRank(serverRank) , bufferSize(bufferSize) , estimatedMaxEventSize(estimatedMaxEventSize) , maxEventSize(0) , current(0) , count(0) , pending(false) , hasWindows(false) , windows_(windows) { if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; else hasWindows=true ; MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; buffer[0] = bufferHeader[0]+headerSize ; buffer[1] = bufferHeader[1]+headerSize ; firstTimeLine[0]=(size_t*)bufferHeader[0] ; firstTimeLine[1]=(size_t*)bufferHeader[1] ; bufferCount[0]=(size_t*)bufferHeader[0] +1 ; bufferCount[1]=(size_t*)bufferHeader[1] +1 ; control[0]=(size_t*)bufferHeader[0] +2 ; control[1]=(size_t*)bufferHeader[1] +2 ; finalize[0]=(size_t*)bufferHeader[0] +3 ; finalize[1]=(size_t*)bufferHeader[1] +3 ; *firstTimeLine[0]=0 ; *firstTimeLine[1]=0 ; *bufferCount[0]=0 ; *bufferCount[1]=0 ; *control[0]=0 ; *control[1]=0 ; *finalize[0]=0 ; *finalize[1]=0 ; winState[0]=false ; winState[1]=false ; if (hasWindows) { MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; MPI_Group group ; int groupSize,groupRank ; MPI_Win_get_group(windows_[0], &group) ; MPI_Group_size(group, &groupSize) ; MPI_Group_rank(group, &groupRank) ; if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "< 0 ) return false ; if (size > bufferSize) { // ERROR("bool CClientBuffer::isBufferFree(StdSize size)", // << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl); resizingBufferStep_=1 ; newBufferSize_=size ; return false ; } if (size > maxEventSize) { maxEventSize = size; if (size > estimatedMaxEventSize) error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl; if (size > maxRequestSize) maxRequestSize = size; } if (size > remain()) { if (isGrowableBuffer_) { resizingBufferStep_ = 1 ; newBufferSize_ = (count+size)*growFactor_ ; } return false ; } else return true ; } CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size) { if (size <= remain()) { retBuffer->realloc(buffer[current] + count, size); count += size; if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; *bufferCount[current]=count ; /* info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "< 0) { lockBuffer() ; // if (*control[current]==0 && bufferCount[current] > 0) if (*bufferCount[current] > 0) { MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); if (resizingBufferStep_==3) resizingBufferStep_=0 ; pending = true; // *control[current]=0 ; *firstTimeLine[current]=0 ; *bufferCount[current]=0 ; unlockBuffer() ; if (current == 1) current = 0; else current = 1; count = 0; } else { unlockBuffer() ; } } else { if (resizingBufferStep_==2) resizeBuffer(newBufferSize_) ; if (resizingBufferStep_==1) resizeBufferNotify() ; } } return pending; } void CClientBuffer::resizeBufferNotify(void) { // notify server of changing buffers size lockBuffer() ; int size=sizeof(int)+sizeof(size_t) ; CBufferOut* bufOut = this->getBuffer(timelineEventNotifyChangeBufferSize, size); bufOut->put(size); bufOut->put(timelineEventNotifyChangeBufferSize); resizingBufferStep_ = 2 ; unlockBuffer() ; } void CClientBuffer::resizeBuffer(size_t newSize) { if (hasWindows) { MPI_Win_detach(windows_[0], bufferHeader[0]) ; MPI_Win_detach(windows_[1], bufferHeader[1]) ; } MPI_Free_mem(bufferHeader[0]) ; MPI_Free_mem(bufferHeader[1]) ; bufferSize=newSize ; MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; buffer[0] = bufferHeader[0]+headerSize ; buffer[1] = bufferHeader[1]+headerSize ; firstTimeLine[0]=(size_t*)bufferHeader[0] ; firstTimeLine[1]=(size_t*)bufferHeader[1] ; bufferCount[0]=(size_t*)bufferHeader[0] +1 ; bufferCount[1]=(size_t*)bufferHeader[1] +1 ; control[0]=(size_t*)bufferHeader[0] +2 ; control[1]=(size_t*)bufferHeader[1] +2 ; finalize[0]=(size_t*)bufferHeader[0] +3 ; finalize[1]=(size_t*)bufferHeader[1] +3 ; *firstTimeLine[0]=0 ; *firstTimeLine[1]=0 ; *bufferCount[0]=0 ; *bufferCount[1]=0 ; *control[0]=0 ; *control[1]=0 ; *finalize[0]=0 ; *finalize[1]=0 ; winState[0]=false ; winState[1]=false ; current=0 ; if (hasWindows) { MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; MPI_Win_unlock(clientRank_, windows_[1]) ; MPI_Win_unlock(clientRank_, windows_[0]) ; } lockBuffer() ; int size=sizeof(int)+2*sizeof(size_t)+2*sizeof(MPI_AINT) ; CBufferOut* bufOut = this->getBuffer(timelineEventChangeBufferSize, size); bufOut->put(size); bufOut->put(timelineEventChangeBufferSize); bufOut->put(newBufferSize_); bufOut->put(this->getWinAddress(0)); bufOut->put(this->getWinAddress(1)); resizingBufferStep_=3; unlockBuffer() ; } bool CClientBuffer::hasPendingRequest(void) { lockBuffer() ; count=*bufferCount[current] ; unlockBuffer() ; return (pending || count > 0); } bool CClientBuffer::isNotifiedFinalized(void) { bool ret ; lockBuffer() ; ret=*finalize[current] == 1 ? true : false ; unlockBuffer() ; return ret; } }