#include "xios_spl.hpp" #include "exception.hpp" #include "buffer_server.hpp" #include "timer.hpp" namespace xios { CServerBuffer::CServerBuffer(vector& windows, vector& winAddress, int windowsRank, StdSize buffSize) : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress) { size = 3 * buffSize; first = 0; current = 1; end = size; used=0 ; MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer) ; currentWindows=1 ; if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; } CServerBuffer::~CServerBuffer() { MPI_Free_mem(buffer) ; } void CServerBuffer::updateCurrentWindows(void) { if (currentWindows==0) currentWindows=1 ; else currentWindows=0 ; } bool CServerBuffer::isBufferFree(size_t count) { bool ret ; if (count==0) return true ; if (current>first) { if (current+count0) { ret=true ; } else { ret=false ; } } else { if (countfirst) { if (current+count0) { ret=buffer+current ; current=0 ; } else { ERROR("void* CServerBuffer::getBuffer(size_t count)", <<"cannot allocate required size in buffer") ; } } else { end=current ; if (countfirst) { current-=count ; } else { ERROR("void CServerBuffer::popBuffer(size_t count)", <<"cannot pop required size in buffer") ; } } else { if (current-count>=0) { current-=count ; } else { ERROR("void CServerBuffer::freeBuffer(size_t count)", <<"cannot pop required size in buffer") ; } } used-=count ; } bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) { count = -1 ; if (!hasWindows || resizingBuffer_) return false ; double time=MPI_Wtime() ; if (time-bufferFromClientTime_ < bufferFromClientLatency_ ) return false; bufferFromClientTime_ = time ; CTimer::get("getBufferFromClient").resume() ; size_t clientTimeline ; size_t clientCount ; bool ok=false ; MPI_Group group ; int groupSize,groupRank ; MPI_Win_get_group(windows_[currentWindows], &group) ; MPI_Group_size(group, &groupSize) ; MPI_Group_rank(group, &groupRank) ; lockBuffer(); CTimer::get("getBufferFromClient_locked").resume() ; // lock is acquired MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; if (timeLine==clientTimeline) { buffer=(char*)getBuffer(clientCount) ; count=clientCount ; MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ; clientTimeline = 0 ; clientCount = 0 ; MPI_Put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; MPI_Put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; // release lock CTimer::get("getBufferFromClient_locked").suspend() ; unlockBuffer() ; ok=true ; char checksum=0 ; for(size_t i=0;i