Changeset 2246 for XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp
- Timestamp:
- 10/11/21 14:41:56 (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp
r2221 r2246 8 8 #include "tracer.hpp" 9 9 #include "timeline_events.hpp" 10 #include "timer.hpp" 10 11 11 12 namespace xios … … 29 30 else hasWindows=true ; 30 31 31 MPI_Alloc_mem(bufferSize+headerSize , MPI_INFO_NULL, &bufferHeader[0]) ;32 MPI_Alloc_mem(bufferSize+headerSize , MPI_INFO_NULL, &bufferHeader[1]) ;33 buffer[0] = bufferHeader[0]+headerSize ;34 buffer[1] = bufferHeader[1]+headerSize ;35 firstTimeLine[0]=(size_t*)bufferHeader[0] ;36 firstTimeLine[1]=(size_t*)bufferHeader[1] ;37 bufferCount[0]=(size_t*)bufferHeader[0] + 1;38 bufferCount[1]=(size_t*)bufferHeader[1] + 1;39 control[0]=(size_t*)bufferHeader[0] + 2;40 control[1]=(size_t*)bufferHeader[1] + 2;41 finalize[0]=(size_t*)bufferHeader[0] +3;42 finalize[1]=(size_t*)bufferHeader[1] +3;32 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 33 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 34 buffer[0] = bufferHeader[0]+headerSize_ ; 35 buffer[1] = bufferHeader[1]+headerSize_ ; 36 firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_ ; 37 firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_ ; 38 bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ; 39 bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ; 40 control[0]=(size_t*)bufferHeader[0] + controlOffset_ ; 41 control[1]=(size_t*)bufferHeader[1] + controlOffset_ ; 42 notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ; 43 notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ; 43 44 44 45 *firstTimeLine[0]=0 ; … … 48 49 *control[0]=0 ; 49 50 *control[1]=0 ; 50 * finalize[0]=0;51 * finalize[1]=0;51 *notify[0]=notifyNothing_ ; 52 *notify[1]=notifyNothing_ ; 52 53 winState[0]=false ; 53 54 winState[1]=false ; … … 57 58 { 58 59 59 MPI_Aint buffSize=bufferSize+headerSize ;60 MPI_Aint buffSize=bufferSize+headerSize_ ; 60 61 MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 61 62 MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; … … 106 107 } 107 108 108 /* void CClientBuffer::createWindows(MPI_Comm oneSidedComm)109 {110 MPI_Barrier(oneSidedComm) ;111 MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ;112 MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ;113 114 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ;115 *firstTimeLine[0]=0 ;116 *bufferCount[0]=0 ;117 *control[0]=0 ;118 MPI_Win_unlock(0, windows[0]) ;119 120 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ;121 *firstTimeLine[1]=0 ;122 *bufferCount[1]=0 ;123 *control[1]=0 ;124 MPI_Win_unlock(0, windows[1]) ;125 winState[0]=false ;126 winState[1]=false ;127 MPI_Barrier(oneSidedComm) ;128 hasWindows=true ;129 }130 */131 132 /*133 void CClientBuffer::freeWindows()134 {135 if (hasWindows)136 {137 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[0]) ;138 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[1]) ;139 *control[0]=2 ;140 *control[1]=2 ;141 MPI_Win_unlock(0, windows_[1]) ;142 MPI_Win_unlock(0, windows_[0]) ;143 144 MPI_Win_free(&windows_[0]) ;145 MPI_Win_free(&windows_[1]) ;146 hasWindows=false ;147 }148 }149 */150 109 void CClientBuffer::lockBuffer(void) 151 110 { 152 if (hasWindows) 153 { 154 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[current]) ; 155 long long int lock=1 ; 156 long long int zero=0, one=1 ; 157 111 CTimer::get("lock buffer").resume(); 112 if (hasWindows) 113 { 158 114 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 159 160 while(lock!=0)161 {162 MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)),163 windows_[current]) ;164 MPI_Win_flush(clientRank_, windows_[current]) ;165 }166 167 // info(100)<<"Buffer locked "<<&windows_<<" "<<current<<endl ;168 115 winState[current]=true ; 169 116 } 117 CTimer::get("lock buffer").suspend(); 170 118 } 171 119 172 120 void CClientBuffer::unlockBuffer(void) 173 121 { 174 if (hasWindows) 175 { 176 long long int lock=1 ; 177 long long int zero=0, one=1 ; 178 179 MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 180 windows_[current]) ; 122 CTimer::get("unlock buffer").resume(); 123 if (hasWindows) 124 { 181 125 MPI_Win_unlock(clientRank_, windows_[current]) ; 182 183 // info(100)<<"Buffer unlocked "<<&windows_<<" "<<current<<endl ;184 126 winState[current]=false ; 185 127 } 128 CTimer::get("unlock buffer").suspend(); 186 129 } 187 130 … … 193 136 bool CClientBuffer::isBufferFree(StdSize size) 194 137 { 195 // bool loop=true ;196 // while (loop)197 // {198 // lockBuffer();199 // if (*control[current]==0) loop=false ; // attemp to read from server ?200 // else unlockBuffer() ;201 // }202 138 203 139 lockBuffer(); … … 208 144 if (size > bufferSize) 209 145 { 210 // ERROR("bool CClientBuffer::isBufferFree(StdSize size)",211 // << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl);212 146 resizingBufferStep_=1 ; 147 *firstTimeLine[current]=0 ; 213 148 newBufferSize_=size ; 214 149 return false ; … … 231 166 { 232 167 resizingBufferStep_ = 1 ; 168 *firstTimeLine[current]=0 ; 233 169 newBufferSize_ = (count+size)*growFactor_ ; 234 170 } … … 247 183 if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 248 184 *bufferCount[current]=count ; 249 /* info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current250 <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;251 if (!winState[current]) info(40)<<"CClientBuffer::getBuffer "<<" Windows Not Locked... "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current252 <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;*/253 185 return retBuffer; 254 186 } … … 284 216 MPI_Status status; 285 217 int flag; 218 219 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 220 MPI_Win_unlock(clientRank_, windows_[0]) ; 221 222 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 223 MPI_Win_unlock(clientRank_, windows_[1]) ; 286 224 287 225 if (pending) … … 299 237 if (count > 0) 300 238 { 301 lockBuffer() ; 302 // if (*control[current]==0 && bufferCount[current] > 0) 303 if (*bufferCount[current] > 0) 239 double time=MPI_Wtime() ; 240 if (time - lastCheckedWithNothing_ > latency_) 304 241 { 305 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 306 if (resizingBufferStep_==3) resizingBufferStep_=0 ; 307 pending = true; 308 // *control[current]=0 ; 309 *firstTimeLine[current]=0 ; 310 *bufferCount[current]=0 ; 311 312 unlockBuffer() ; 313 314 if (current == 1) current = 0; 315 else current = 1; 316 count = 0; 317 } 318 else 319 { 320 unlockBuffer() ; 242 lockBuffer() ; 243 if (*bufferCount[current] > 0) 244 { 245 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 246 if (resizingBufferStep_==4) resizingBufferStep_=0 ; 247 pending = true; 248 *firstTimeLine[current]=0 ; 249 *bufferCount[current]=0 ; 250 251 unlockBuffer() ; 252 253 if (current == 1) current = 0; 254 else current = 1; 255 count = 0; 256 } 257 else 258 { 259 unlockBuffer() ; 260 lastCheckedWithNothing_ = time ; 261 } 321 262 } 322 263 } 323 264 else 324 265 { 325 if (resizingBufferStep_==2) resizeBuffer(newBufferSize_) ;326 266 if (resizingBufferStep_==1) resizeBufferNotify() ; 267 else if (resizingBufferStep_==2) isNotifiedChangeBufferSize() ; 268 else if (resizingBufferStep_==3) resizeBuffer(newBufferSize_) ; 327 269 } 328 270 } … … 345 287 void CClientBuffer::resizeBuffer(size_t newSize) 346 288 { 289 347 290 if (hasWindows) 348 291 { … … 354 297 355 298 bufferSize=newSize ; 356 MPI_Alloc_mem(bufferSize+headerSize , MPI_INFO_NULL, &bufferHeader[0]) ;357 MPI_Alloc_mem(bufferSize+headerSize , MPI_INFO_NULL, &bufferHeader[1]) ;358 buffer[0] = bufferHeader[0]+headerSize ;359 buffer[1] = bufferHeader[1]+headerSize ;360 firstTimeLine[0]=(size_t*)bufferHeader[0] ;361 firstTimeLine[1]=(size_t*)bufferHeader[1] ;362 bufferCount[0]=(size_t*)bufferHeader[0] + 1;363 bufferCount[1]=(size_t*)bufferHeader[1] + 1;364 control[0]=(size_t*)bufferHeader[0] + 2 ;365 control[1]=(size_t*)bufferHeader[1] + 2;366 finalize[0]=(size_t*)bufferHeader[0] +3;367 finalize[1]=(size_t*)bufferHeader[1] +3;299 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 300 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 301 buffer[0] = bufferHeader[0]+headerSize_ ; 302 buffer[1] = bufferHeader[1]+headerSize_ ; 303 firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_; 304 firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_; 305 bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ; 306 bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ; 307 control[0]=(size_t*)bufferHeader[0] + controlOffset_ ; // control=0 => nothing ; control=1 => changeBufferSize 308 control[1]=(size_t*)bufferHeader[1] + controlOffset_ ; 309 notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ; 310 notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ; 368 311 369 312 *firstTimeLine[0]=0 ; … … 373 316 *control[0]=0 ; 374 317 *control[1]=0 ; 375 * finalize[0]=0;376 * finalize[1]=0;318 *notify[0] = notifyNothing_ ; 319 *notify[1] = notifyNothing_ ; 377 320 winState[0]=false ; 378 321 winState[1]=false ; … … 382 325 { 383 326 384 MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize ) ;385 MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize ) ;327 MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ; 328 MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ; 386 329 387 330 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; … … 402 345 bufOut->put(this->getWinAddress(1)); 403 346 404 resizingBufferStep_=3; 405 unlockBuffer() ; 347 resizingBufferStep_=4; 348 unlockBuffer() ; 349 info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinAddress(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl; 406 350 } 407 351 … … 416 360 } 417 361 418 bool CClientBuffer::isNotified Finalized(void)362 bool CClientBuffer::isNotifiedChangeBufferSize(void) 419 363 { 420 364 421 365 bool ret ; 422 366 lockBuffer() ; 423 ret=*finalize[current] == 1 ? true : false ; 367 ret=*notify[current] == notifyResizeBuffer_ ? true : false ; 368 if (ret) 369 { 370 *notify[current] = notifyNothing_ ; 371 resizingBufferStep_=3; 372 } 424 373 unlockBuffer() ; 425 374 … … 427 376 } 428 377 378 bool CClientBuffer::isNotifiedFinalized(void) 379 { 380 381 bool ret ; 382 lockBuffer() ; 383 ret=*notify[current] == notifyFinalize_ ? true : false ; 384 unlockBuffer() ; 385 386 return ret; 387 } 388 429 389 }
Note: See TracChangeset
for help on using the changeset viewer.