Changeset 1757 for XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp
- Timestamp:
- 10/18/19 14:55:57 (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp
r1639 r1757 12 12 size_t CClientBuffer::maxRequestSize = 0; 13 13 14 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize, StdSize maxBufferedEvents)14 CClientBuffer::CClientBuffer(MPI_Comm interComm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 15 15 : interComm(interComm) 16 , clientRank_(clientRank) 16 17 , serverRank(serverRank) 17 18 , bufferSize(bufferSize) … … 20 21 , current(0) 21 22 , count(0) 22 , bufferedEvents(0)23 , maxBufferedEvents(maxBufferedEvents)24 23 , pending(false) 25 { 26 buffer[0] = new char[bufferSize]; // transform it with MPI_ALLOC_MEM later 27 buffer[1] = new char[bufferSize]; 24 , hasWindows(false) 25 , windows_(windows) 26 { 27 if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 28 else hasWindows=true ; 29 30 MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 31 MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 32 buffer[0] = bufferHeader[0]+headerSize ; 33 buffer[1] = bufferHeader[1]+headerSize ; 34 firstTimeLine[0]=(size_t*)bufferHeader[0] ; 35 firstTimeLine[1]=(size_t*)bufferHeader[1] ; 36 bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 37 bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 38 control[0]=(size_t*)bufferHeader[0] +2 ; 39 control[1]=(size_t*)bufferHeader[1] +2 ; 40 finalize[0]=(size_t*)bufferHeader[0] +3 ; 41 finalize[1]=(size_t*)bufferHeader[1] +3 ; 42 43 *firstTimeLine[0]=0 ; 44 *firstTimeLine[1]=0 ; 45 *bufferCount[0]=0 ; 46 *bufferCount[1]=0 ; 47 *control[0]=0 ; 48 *control[1]=0 ; 49 *finalize[0]=0 ; 50 *finalize[1]=0 ; 51 winState[0]=false ; 52 winState[1]=false ; 53 54 55 if (hasWindows) 56 { 57 58 MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; 59 MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; 60 61 MPI_Group group ; 62 int groupSize,groupRank ; 63 MPI_Win_get_group(windows_[0], &group) ; 64 MPI_Group_size(group, &groupSize) ; 65 MPI_Group_rank(group, &groupRank) ; 66 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 67 68 MPI_Win_get_group(windows_[1], &group) ; 69 MPI_Group_size(group, &groupSize) ; 70 MPI_Group_rank(group, &groupRank) ; 71 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 72 73 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 74 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 75 76 MPI_Win_unlock(clientRank_, windows_[1]) ; 77 MPI_Win_unlock(clientRank_, windows_[0]) ; 78 } 28 79 retBuffer = new CBufferOut(buffer[current], bufferSize); 29 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << " with a maximum of " << maxBufferedEvents << " buffered events" << endl; 80 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 81 } 82 83 MPI_Aint CClientBuffer::getWinAddress(int i) 84 { 85 MPI_Aint address ; 86 87 if (hasWindows) MPI_Get_address(bufferHeader[i], &address) ; 88 else address=0 ; 89 90 return address ; 30 91 } 31 92 32 93 CClientBuffer::~CClientBuffer() 33 94 { 34 delete [] buffer[0]; 35 delete [] buffer[1]; 36 delete retBuffer; 95 //freeWindows() ; 96 if (hasWindows) 97 { 98 MPI_Win_detach(windows_[0],bufferHeader[0]); 99 MPI_Win_detach(windows_[1],bufferHeader[1]); 100 MPI_Free_mem(bufferHeader[0]) ; 101 MPI_Free_mem(bufferHeader[1]) ; 102 } 103 delete retBuffer; 104 } 105 106 /* void CClientBuffer::createWindows(MPI_Comm oneSidedComm) 107 { 108 MPI_Barrier(oneSidedComm) ; 109 MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 110 MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 111 112 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 113 *firstTimeLine[0]=0 ; 114 *bufferCount[0]=0 ; 115 *control[0]=0 ; 116 MPI_Win_unlock(0, windows[0]) ; 117 118 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 119 *firstTimeLine[1]=0 ; 120 *bufferCount[1]=0 ; 121 *control[1]=0 ; 122 MPI_Win_unlock(0, windows[1]) ; 123 winState[0]=false ; 124 winState[1]=false ; 125 MPI_Barrier(oneSidedComm) ; 126 hasWindows=true ; 127 } 128 */ 129 130 /* 131 void CClientBuffer::freeWindows() 132 { 133 if (hasWindows) 134 { 135 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[0]) ; 136 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[1]) ; 137 *control[0]=2 ; 138 *control[1]=2 ; 139 MPI_Win_unlock(0, windows_[1]) ; 140 MPI_Win_unlock(0, windows_[0]) ; 141 142 MPI_Win_free(&windows_[0]) ; 143 MPI_Win_free(&windows_[1]) ; 144 hasWindows=false ; 145 } 146 } 147 */ 148 void CClientBuffer::lockBuffer(void) 149 { 150 if (hasWindows) 151 { 152 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[current]) ; 153 long long int lock=1 ; 154 long long int zero=0, one=1 ; 155 156 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 157 158 while(lock!=0) 159 { 160 MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 161 windows_[current]) ; 162 MPI_Win_flush(clientRank_, windows_[current]) ; 163 } 164 165 // info(100)<<"Buffer locked "<<&windows_<<" "<<current<<endl ; 166 winState[current]=true ; 167 } 168 } 169 170 void CClientBuffer::unlockBuffer(void) 171 { 172 if (hasWindows) 173 { 174 long long int lock=1 ; 175 long long int zero=0, one=1 ; 176 177 MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 178 windows_[current]) ; 179 MPI_Win_unlock(clientRank_, windows_[current]) ; 180 181 // info(100)<<"Buffer unlocked "<<&windows_<<" "<<current<<endl ; 182 winState[current]=false ; 183 } 37 184 } 38 185 … … 44 191 bool CClientBuffer::isBufferFree(StdSize size) 45 192 { 193 // bool loop=true ; 194 // while (loop) 195 // { 196 // lockBuffer(); 197 // if (*control[current]==0) loop=false ; // attemp to read from server ? 198 // else unlockBuffer() ; 199 // } 200 201 lockBuffer(); 46 202 if (size > bufferSize) 47 203 ERROR("bool CClientBuffer::isBufferFree(StdSize size)", … … 59 215 } 60 216 61 62 return (size <= remain() && bufferedEvents < maxBufferedEvents);63 } 64 65 66 CBufferOut* CClientBuffer::getBuffer( StdSize size)217 count=*bufferCount[current] ; 218 return (size <= remain()); 219 } 220 221 222 CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size) 67 223 { 68 224 if (size <= remain()) … … 70 226 retBuffer->realloc(buffer[current] + count, size); 71 227 count += size; 72 bufferedEvents++; 228 if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 229 *bufferCount[current]=count ; 230 /* info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 231 <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ; 232 if (!winState[current]) info(40)<<"CClientBuffer::getBuffer "<<" Windows Not Locked... "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 233 <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;*/ 73 234 return retBuffer; 74 235 } … … 81 242 } 82 243 83 bool CClientBuffer::checkBuffer(void) 244 void CClientBuffer::infoBuffer(void) 245 { 246 247 char checksum=0 ; 248 for(size_t i=0;i<*bufferCount[current];i++) checksum=checksum+buffer[current][i] ; 249 250 char checksumFirst=0 ; 251 for(size_t i=5; i<10 && i<*bufferCount[current] ;i++) checksumFirst=checksumFirst+buffer[current][i] ; 252 253 char checksumLast=0 ; 254 for(size_t i=(*bufferCount[current]<10)?0:*bufferCount[current]-10; i<*bufferCount[current] ; i++) checksumLast=checksumLast+buffer[current][i] ; 255 256 info(45)<<"CClientBuffer::infoBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current<<" WinState "<<winState[current] 257 <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<" checksum "<<(int)checksum<<" " 258 <<(int)buffer[current][0]<<" "<<(int)buffer[current][1]<<" "<<(int)buffer[current][2]<<" "<<(int)buffer[current][3]<<" "<<(int)buffer[current][4]<<" "<<(int)buffer[current][5]<<" " 259 <<(int)buffer[current][6]<<" "<<(int)buffer[current][7]<<" "<<(int)buffer[current][8]<<" "<<(int)buffer[current][9]<<" "<<(int)buffer[current][10]<<" "<<(int)buffer[current][11]<<endl ; 260 261 } 262 263 bool CClientBuffer::checkBuffer(bool send) 84 264 { 85 265 MPI_Status status; … … 96 276 if (!pending) 97 277 { 278 if (!send) return false ; 98 279 if (count > 0) 99 280 { 100 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 101 pending = true; 102 if (current == 1) current = 0; 103 else current = 1; 104 count = 0; 105 bufferedEvents = 0; 281 lockBuffer() ; 282 // if (*control[current]==0 && bufferCount[current] > 0) 283 if (*bufferCount[current] > 0) 284 { 285 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 286 pending = true; 287 // *control[current]=0 ; 288 *firstTimeLine[current]=0 ; 289 *bufferCount[current]=0 ; 290 291 unlockBuffer() ; 292 293 if (current == 1) current = 0; 294 else current = 1; 295 count = 0; 296 } 297 else unlockBuffer() ; 106 298 } 107 299 } … … 112 304 bool CClientBuffer::hasPendingRequest(void) 113 305 { 306 307 lockBuffer() ; 308 count=*bufferCount[current] ; 309 unlockBuffer() ; 310 114 311 return (pending || count > 0); 115 312 } 313 314 bool CClientBuffer::isNotifiedFinalized(void) 315 { 316 317 bool ret ; 318 lockBuffer() ; 319 ret=*finalize[current] == 1 ? true : false ; 320 unlockBuffer() ; 321 322 return ret; 323 } 324 116 325 }
Note: See TracChangeset
for help on using the changeset viewer.