Changeset 1757 for XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.cpp
- Timestamp:
- 10/18/19 14:55:57 (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.cpp
r885 r1757 7 7 { 8 8 9 CServerBuffer::CServerBuffer(StdSize buffSize) 9 CServerBuffer::CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize) 10 : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress) 10 11 { 11 12 size = 3 * buffSize; … … 13 14 current = 1; 14 15 end = size; 16 used=0 ; 15 17 buffer = new char[size]; // use MPI_ALLOC_MEM later? 18 currentWindows=1 ; 19 if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 16 20 } 17 21 … … 21 25 } 22 26 27 void CServerBuffer::updateCurrentWindows(void) 28 { 29 if (currentWindows==0) currentWindows=1 ; 30 else currentWindows=0 ; 31 } 32 33 /* 34 void CServerBuffer::createWindows(MPI_Comm oneSidedComm) 35 { 36 MPI_Barrier(oneSidedComm) ; 37 MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 38 MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 39 hasWindows=true ; 40 updateCurrentWindows() ; 41 MPI_Barrier(oneSidedComm) ; 42 } 43 */ 44 45 /* 46 bool CServerBuffer::freeWindows() 47 { 48 if (hasWindows) 49 { 50 size_t header[3] ; 51 size_t& control=header[2] ; 52 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows_[0]) ; 53 MPI_Get(&control, 1, MPI_LONG_LONG_INT, windowsRank , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ; 54 MPI_Win_unlock(0,windows[0]) ; 55 if (control==2) // ok for free windows 56 { 57 MPI_Win_free( &(windows[0])) ; 58 MPI_Win_free( &(windows[1])) ; 59 hasWindows=false ; 60 return true ; 61 } 62 else return false ; 63 } 64 else return true ; 65 } 66 */ 23 67 24 68 bool CServerBuffer::isBufferFree(size_t count) … … 72 116 } 73 117 118 bool CServerBuffer::isBufferEmpty(void) 119 { 120 if (used==0) return true ; 121 else return false; 122 } 74 123 75 124 void* CServerBuffer::getBuffer(size_t count) … … 128 177 } 129 178 179 used+=count ; 130 180 return ret ; 131 181 } … … 167 217 } 168 218 } 169 } 170 219 used-=count ; 220 } 221 222 bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 223 { 224 if (!hasWindows) return false ; 225 226 227 size_t header[3] ; 228 size_t& clientTimeline=header[0] ; 229 size_t& clientCount=header[1] ; 230 size_t& control=header[2] ; 231 bool ok=false ; 232 233 MPI_Group group ; 234 int groupSize,groupRank ; 235 MPI_Win_get_group(windows_[currentWindows], &group) ; 236 MPI_Group_size(group, &groupSize) ; 237 MPI_Group_rank(group, &groupRank) ; 238 239 lockBuffer(); 240 241 // lock is acquired 242 243 MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],0), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 244 MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 245 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 246 247 // control=1 ; 248 // MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 249 250 // MPI_Win_unlock(windowsRank_, windows_[currentWindows]) ; 251 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 252 // info(100)<<"getBufferFromClient : windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 253 if (timeLine==clientTimeline) 254 { 255 // info(50)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 256 257 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 258 buffer=(char*)getBuffer(clientCount) ; 259 count=clientCount ; 260 MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ; 261 clientTimeline = 0 ; 262 clientCount = 0 ; 263 // control=0 ; 264 MPI_Put(&header[0], 2, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],0) , 2, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 265 266 // release lock 267 unlockBuffer() ; 268 269 ok=true ; 270 char checksum=0 ; 271 for(size_t i=0;i<count;i++) checksum=checksum+buffer[i] ; 272 char checksumFirst=0 ; 273 for(size_t i=5; i<10 && i<count ;i++) checksumFirst=checksumFirst+buffer[i] ; 274 char checksumLast=0 ; 275 for(size_t i=(count<10)?0:count-10; i<count ; i++) checksumLast=checksumLast+buffer[i] ; 276 277 info(40)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline " 278 <<clientTimeline<<" clientCount "<<count<<" checksum "<<(int)checksum<<" " 279 <<(int)buffer[0]<<" "<<(int)buffer[1]<<" "<<(int)buffer[2]<<" "<<(int)buffer[3]<<" "<<(int)buffer[4]<<" "<<(int)buffer[5]<<" " 280 <<(int)buffer[6]<<" "<<(int)buffer[7]<<" "<<(int)buffer[8]<<" "<<(int)buffer[9]<<" "<<(int)buffer[10]<<" "<<(int)buffer[11]<<endl ; 281 282 } 283 else 284 { 285 //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 286 //control=0 ; 287 //MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 288 289 // release lock 290 unlockBuffer() ; 291 } 292 293 if (ok) return true ; 294 295 return false ; 296 } 297 298 void CServerBuffer::lockBuffer(void) 299 { 300 if (!hasWindows) return ; 301 302 long long int lock=1 ; 303 long long int zero=0, one=1 ; 304 // control=1 ; 305 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 306 while(lock!=0) 307 { 308 MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 309 windows_[currentWindows]) ; 310 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 311 } 312 } 313 314 void CServerBuffer::unlockBuffer(void) 315 { 316 if (!hasWindows) return ; 317 long long int lock=1 ; 318 long long int zero=0, one=1 ; 319 320 MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 321 windows_[currentWindows]) ; 322 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 323 MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ; 324 } 325 326 void CServerBuffer::notifyClientFinalize(void) 327 { 328 if (!hasWindows) return ; 329 size_t finalize=1 ; 330 lockBuffer(); 331 // lock is acquired 332 MPI_Put(&finalize, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],3*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 333 unlockBuffer() ; 334 } 171 335 }
Note: See TracChangeset
for help on using the changeset viewer.