Changeset 2246 for XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.cpp
- Timestamp:
- 10/11/21 14:41:56 (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.cpp
r2130 r2246 2 2 #include "exception.hpp" 3 3 #include "buffer_server.hpp" 4 #include "timer.hpp" 4 5 5 6 … … 31 32 } 32 33 33 /*34 void CServerBuffer::createWindows(MPI_Comm oneSidedComm)35 {36 MPI_Barrier(oneSidedComm) ;37 MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ;38 MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ;39 hasWindows=true ;40 updateCurrentWindows() ;41 MPI_Barrier(oneSidedComm) ;42 }43 */44 45 /*46 bool CServerBuffer::freeWindows()47 {48 if (hasWindows)49 {50 size_t header[3] ;51 size_t& control=header[2] ;52 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows_[0]) ;53 MPI_Get(&control, 1, MPI_LONG_LONG_INT, windowsRank , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ;54 MPI_Win_unlock(0,windows[0]) ;55 if (control==2) // ok for free windows56 {57 MPI_Win_free( &(windows[0])) ;58 MPI_Win_free( &(windows[1])) ;59 hasWindows=false ;60 return true ;61 }62 else return false ;63 }64 else return true ;65 }66 */67 34 68 35 bool CServerBuffer::isBufferFree(size_t count) … … 222 189 bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 223 190 { 191 count = -1 ; 224 192 if (!hasWindows || resizingBuffer_) return false ; 225 226 227 size_t header[3];228 size_t& clientTimeline=header[0] ;229 size_t & clientCount=header[1];230 size_t & control=header[2];193 double time=MPI_Wtime() ; 194 if (time-bufferFromClientTime_ < bufferFromClientLatency_ ) return false; 195 bufferFromClientTime_ = time ; 196 CTimer::get("getBufferFromClient").resume() ; 197 size_t clientTimeline ; 198 size_t clientCount ; 231 199 bool ok=false ; 232 200 … … 238 206 239 207 lockBuffer(); 240 208 CTimer::get("getBufferFromClient_locked").resume() ; 241 209 // lock is acquired 242 210 243 MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], 0), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;244 MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;211 MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 212 MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 245 213 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 246 214 247 // control=1 ;248 // MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;249 250 // MPI_Win_unlock(windowsRank_, windows_[currentWindows]) ;251 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;252 // info(100)<<"getBufferFromClient : windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ;253 215 if (timeLine==clientTimeline) 254 216 { 255 // info(50)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ;256 257 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ;258 217 buffer=(char*)getBuffer(clientCount) ; 259 218 count=clientCount ; … … 261 220 clientTimeline = 0 ; 262 221 clientCount = 0 ; 263 // control=0;264 MPI_Put(& header[0], 2, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],0) , 2, MPI_LONG_LONG_INT,windows_[currentWindows]) ;222 MPI_Put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 223 MPI_Put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 265 224 266 225 // release lock 267 unlockBuffer() ; 226 CTimer::get("getBufferFromClient_locked").suspend() ; 227 unlockBuffer() ; 268 228 269 229 ok=true ; … … 283 243 else 284 244 { 285 //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 286 //control=0 ; 287 //MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 245 count=0 ; 288 246 289 247 // release lock 248 CTimer::get("getBufferFromClient_locked").suspend() ; 290 249 unlockBuffer() ; 291 250 } 292 251 CTimer::get("getBufferFromClient").suspend() ; 293 252 if (ok) return true ; 294 253 … … 299 258 { 300 259 if (!hasWindows) return ; 301 302 long long int lock=1 ;303 long long int zero=0, one=1 ;304 // control=1 ;305 260 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 306 while(lock!=0)307 {308 MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)),309 windows_[currentWindows]) ;310 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;311 }312 261 } 313 262 … … 315 264 { 316 265 if (!hasWindows) return ; 317 long long int lock=1 ;318 long long int zero=0, one=1 ;319 320 MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)),321 windows_[currentWindows]) ;322 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;323 266 MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ; 324 267 } … … 327 270 { 328 271 if (!hasWindows) return ; 329 size_t finalize=1;272 size_t notify=notifyFinalize_ ; 330 273 lockBuffer(); 331 274 // lock is acquired 332 MPI_Put(& finalize, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],3*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;275 MPI_Put(¬ify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 333 276 unlockBuffer() ; 334 277 } 278 279 void CServerBuffer::notifyBufferResizing(void) 280 { 281 resizingBuffer_=true ; 282 if (!hasWindows) return ; 283 size_t notify=notifyResizeBuffer_ ; 284 lockBuffer(); 285 // lock is acquired 286 MPI_Put(¬ify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 287 unlockBuffer() ; 288 } 335 289 }
Note: See TracChangeset
for help on using the changeset viewer.