Changeset 2547 for XIOS3/trunk/src/buffer_client.cpp
- Timestamp:
- 08/29/23 17:24:04 (10 months ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/buffer_client.cpp
r2458 r2547 14 14 size_t CClientBuffer::maxRequestSize = 0; 15 15 16 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize)16 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, bool hasWindows) 17 17 : interComm(interComm) 18 18 , clientRank_(0) 19 19 , serverRank(serverRank) 20 20 , bufferSize(bufferSize) 21 , estimatedMaxEventSize(estimatedMaxEventSize)22 21 , maxEventSize(0) 23 22 , current(0) 24 23 , count(0) 25 24 , pending(false) 26 , hasWindows(false) 27 { 28 /* 29 if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 30 else hasWindows=true ; 31 */ 32 33 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 34 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 25 , hasWindows_(hasWindows) 26 { 27 if (hasWindows_) 28 { 29 windows_.resize(2) ; 30 windows_[0] = new CWindowDynamic() ; 31 windows_[0]->allocateBuffer(bufferSize+headerSize_) ; 32 bufferHeader[0] = (char*) windows_[0]->getBufferAddress() ; 33 windows_[1] = new CWindowDynamic() ; 34 windows_[1]->allocateBuffer(bufferSize+headerSize_) ; 35 bufferHeader[1] = (char*) windows_[1]->getBufferAddress() ; 36 } 37 else 38 { 39 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 40 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 41 } 42 35 43 buffer[0] = bufferHeader[0]+headerSize_ ; 36 44 buffer[1] = bufferHeader[1]+headerSize_ ; … … 55 63 winState[1]=false ; 56 64 57 58 if (hasWindows) 59 { 60 61 MPI_Aint buffSize=bufferSize+headerSize_ ; 62 MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 63 MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 64 65 MPI_Group group ; 66 int groupSize,groupRank ; 67 MPI_Win_get_group(windows_[0], &group) ; 68 MPI_Group_size(group, &groupSize) ; 69 MPI_Group_rank(group, &groupRank) ; 70 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 71 72 MPI_Win_get_group(windows_[1], &group) ; 73 MPI_Group_size(group, &groupSize) ; 74 MPI_Group_rank(group, &groupRank) ; 75 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 76 77 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 78 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 79 80 MPI_Win_unlock(clientRank_, windows_[1]) ; 81 MPI_Win_unlock(clientRank_, windows_[0]) ; 82 } 83 retBuffer = new CBufferOut(buffer[current], bufferSize); 84 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 65 66 retBuffer = new CBufferOut(buffer[current], bufferSize); 67 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 85 68 } 86 69 … … 91 74 return address ; 92 75 } 93 94 void CClientBuffer::attachWindows(vector<MPI_Win>& windows) 76 77 MPI_Aint CClientBuffer::getWinBufferAddress(int i) 78 { 79 return windows_[i]->getWinBufferAddress() ; 80 } 81 82 void CClientBuffer::attachWindows(MPI_Comm& winComm) 95 83 { 96 84 isAttachedWindows_=true ; 97 windows_=windows ; 98 if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ; 99 else hasWindows=true ; 100 101 if (hasWindows) 85 86 if (hasWindows_) 102 87 { 103 88 MPI_Aint buffSize=bufferSize+headerSize_ ; 104 MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 105 MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 106 107 MPI_Group group ; 108 int groupSize,groupRank ; 109 MPI_Win_get_group(windows_[0], &group) ; 110 MPI_Group_size(group, &groupSize) ; 111 MPI_Group_rank(group, &groupRank) ; 112 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 113 114 MPI_Win_get_group(windows_[1], &group) ; 115 MPI_Group_size(group, &groupSize) ; 116 MPI_Group_rank(group, &groupRank) ; 117 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 118 119 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 120 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 121 122 MPI_Win_unlock(clientRank_, windows_[1]) ; 123 MPI_Win_unlock(clientRank_, windows_[0]) ; 89 windows_[0]->create(winComm) ; 90 windows_[0]->attach() ; 91 windows_[1]->create(winComm) ; 92 windows_[1]->attach() ; 93 94 windows_[0]->lockExclusive(clientRank_) ; 95 windows_[1]->lockExclusive(clientRank_) ; 96 97 windows_[0]->unlockExclusive(clientRank_) ; 98 windows_[1]->unlockExclusive(clientRank_) ; 99 124 100 } 125 101 … … 129 105 CClientBuffer::~CClientBuffer() 130 106 { 131 //freeWindows() ; 132 if (hasWindows) 133 { 134 MPI_Win_detach(windows_[0],bufferHeader[0]); 135 MPI_Win_detach(windows_[1],bufferHeader[1]); 136 MPI_Free_mem(bufferHeader[0]) ; 137 MPI_Free_mem(bufferHeader[1]) ; 138 } 139 delete retBuffer; 107 if (hasWindows_) 108 { 109 windows_[0]->detach() ; 110 windows_[1]->detach() ; 111 delete windows_[0] ; 112 delete windows_[1] ; 113 } 114 else 115 { 116 MPI_Free_mem(bufferHeader[0]) ; 117 MPI_Free_mem(bufferHeader[1]) ; 118 } 119 delete retBuffer; 140 120 } 141 121 … … 143 123 { 144 124 CTimer::get("lock buffer").resume(); 145 if ( hasWindows)125 if (isAttachedWindows_) 146 126 { 147 127 if (winState[current]==true) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo lock client buffer but winState said it is already locked") ; 148 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 128 //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 129 windows_[current]->lockExclusive(clientRank_) ; 149 130 winState[current]=true ; 150 131 } … … 155 136 { 156 137 CTimer::get("unlock buffer").resume(); 157 if ( hasWindows)138 if (isAttachedWindows_) 158 139 { 159 140 if (winState[current]==false) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo unlock client buffer but winState said it is already unlocked") ; 160 MPI_Win_unlock(clientRank_, windows_[current]) ; 141 //MPI_Win_unlock(clientRank_, windows_[current]) ; 142 windows_[current]->unlockExclusive(clientRank_) ; 161 143 winState[current]=false ; 162 144 } … … 189 171 { 190 172 maxEventSize = size; 191 192 if (size > estimatedMaxEventSize)193 error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank194 << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl;195 173 196 174 if (size > maxRequestSize) maxRequestSize = size; … … 319 297 { 320 298 321 if (hasWindows) 299 bufferSize=newSize ; 300 301 if (hasWindows_) 322 302 { 323 MPI_Win_detach(windows_[0], bufferHeader[0]) ; 324 MPI_Win_detach(windows_[1], bufferHeader[1]) ; 325 } 326 MPI_Free_mem(bufferHeader[0]) ; 327 MPI_Free_mem(bufferHeader[1]) ; 328 329 bufferSize=newSize ; 330 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 331 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 303 windows_[0]->detach(); 304 windows_[1]->detach(); 305 306 windows_[0]->attach(bufferSize+headerSize_) ; 307 bufferHeader[0] = (char*) windows_[0] -> getBufferAddress() ; 308 windows_[1]->attach(bufferSize+headerSize_) ; 309 bufferHeader[1] = (char*) windows_[1] -> getBufferAddress() ; 310 } 311 312 332 313 buffer[0] = bufferHeader[0]+headerSize_ ; 333 314 buffer[1] = bufferHeader[1]+headerSize_ ; … … 353 334 current=0 ; 354 335 355 if (hasWindows )336 if (hasWindows_) 356 337 { 357 358 MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ; 359 MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ; 360 361 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 362 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 363 364 MPI_Win_unlock(clientRank_, windows_[1]) ; 365 MPI_Win_unlock(clientRank_, windows_[0]) ; 338 339 windows_[0]->lockExclusive(clientRank_) ; 340 windows_[1]->lockExclusive(clientRank_) ; 341 342 windows_[1]->unlockExclusive(clientRank_) ; 343 windows_[0]->unlockExclusive(clientRank_) ; 366 344 } 367 345 … … 373 351 bufOut->put(timelineEventChangeBufferSize); 374 352 bufOut->put(newBufferSize_); 375 bufOut->put(this->getWinAddress(0)); 376 bufOut->put(this->getWinAddress(1)); 353 354 bufOut->put(this->getWinBufferAddress(0)); 355 bufOut->put(this->getWinBufferAddress(1)); 377 356 378 357 resizingBufferStep_=4; 379 358 unlockBuffer() ; 380 info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWin Address(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl;359 info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinBufferAddress(0)<<" winAdress[1] "<<this->getWinBufferAddress(1)<<endl; 381 360 } 382 361 … … 397 376 lockBuffer() ; 398 377 ret=*notify[current] == notifyResizeBuffer_ ? true : false ; 399 if (ret || !hasWindows )378 if (ret || !hasWindows_) 400 379 { 401 380 *notify[current] = notifyNothing_ ;
Note: See TracChangeset
for help on using the changeset viewer.