Changeset 2594
- Timestamp:
- 10/25/23 11:40:07 (15 months ago)
- Location:
- XIOS3/trunk/src/transport
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/transport/p2p_client_buffer.cpp
r2589 r2594 41 41 //} 42 42 currentWindow_=-1 ; 43 currentMirror_=-1 ; 43 44 44 45 //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_); … … 59 60 void CP2pClientBuffer::newBuffer(size_t size, bool fixed) 60 61 { 62 currentMirror_++; 61 63 currentWindow_=(currentWindow_+1)%maxWindows_ ; 62 if (usedWindows_[currentWindow_])63 {64 ERROR("void CP2pClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ;65 }66 else usedWindows_[currentWindow_]=true ;67 buffers_.push_back(new CBuffer(windows_[current Window_], size, fixed));64 //if (usedWindows_[currentWindow_]) 65 //{ 66 // ERROR("void CP2pClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ; 67 //} 68 //else usedWindows_[currentWindow_]=true ; 69 buffers_.push_back(new CBuffer(windows_[currentMirror_], size, fixed)); 68 70 currentBuffer_=buffers_.back() ; 69 71 info(logProtocol)<<" Nb attached memory blocs="<<buffers_.size()<<endl ; … … 106 108 if (buffers_.empty()) 107 109 { 108 if (fixed_) newBuffer(fixedSize_,fixed_) ; 110 if (fixed_) { 111 currentBufferSize_=fixedSize_; 112 newBuffer(fixedSize_,fixed_) ; 113 } 109 114 else 110 115 { … … 119 124 if (count > 0) 120 125 { 121 blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ; 126 //info(logProtocol) << "Using currentMirror_ 1 : "<<currentMirror_ << endl; 127 blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentMirror_}) ; 122 128 nbBlocs++ ; 123 129 } … … 128 134 if (count > 0) 129 135 { 130 blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ; 136 //info(logProtocol) << "Using currentMirror_ 2 : "<<currentMirror_ << endl; 137 blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentMirror_}) ; 131 138 nbBlocs++ ; 132 139 } … … 181 188 if (buffers_.size()>1) 182 189 { 183 usedWindows_[bloc.window]=false ;190 //usedWindows_[bloc.window]=false ; 184 191 delete buffers_.front() ; 185 192 buffers_.pop_front() ; … … 279 286 ostringstream outStr ; 280 287 SRequest request ; 281 request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int) )*nbBlocs) ;288 request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int)+sizeof(size_t))*nbBlocs) ; 282 289 *(request.buffer)<<timeline<<nbSenders<<nbBlocs ; 283 290 if (info.isActive(logProtocol)) outStr<<"New timeline event sent to server rank "<<serverRank_<<" : timeLine="<<timeline<<" nbSenders="<<nbSenders<<" nbBlocs="<<nbBlocs<<endl ; … … 286 293 for(int i=0 ; i<nbBlocs; ++i,++it) 287 294 { 288 *(request.buffer) << it->addr << it->count << it->window ;295 *(request.buffer) << it->addr << it->count << it->window << it->start; 289 296 290 297 if (info.isActive(logProtocol)) … … 292 299 size_t checksum=0 ; 293 300 for(size_t j=0;j<it->count;j++) checksum+=((unsigned char*)(it->addr))[j] ; 294 outStr<<"Bloc "<<i<<" addr="<<it->addr<<" count="<<it->count<<" checksum="<<checksum<<" ; " ;301 outStr<<"Bloc "<<i<<" addr="<<it->addr<<" count="<<it->count<<" checksum="<<checksum<<" window="<<it->window<<" start="<<it->start<<" ; " ; 295 302 } 296 303 … … 300 307 } 301 308 if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ; 309 //info(logProtocol) << "Send event : " << request.buffer->count() << endl; 302 310 MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ; 303 311 if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ; … … 311 319 request.buffer = new CBufferOut(sizeof(EVENT_BUFFER_RESIZE)+sizeof(timeline)+sizeof(size)) ; 312 320 *(request.buffer)<<EVENT_BUFFER_RESIZE<<timeline<<size ; 321 //info(logProtocol) << "Send resize : " << request.buffer->count() << endl; 313 322 MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ; 314 323 requests_.push_back(request) ; -
XIOS3/trunk/src/transport/p2p_client_buffer.hpp
r2558 r2594 53 53 if (start_-end_ >= size) 54 54 { 55 //info(logProtocol)<<" CASE 0 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 55 56 count=size ; 56 57 size = 0 ; … … 64 65 else 65 66 { 67 //info(logProtocol)<<" CASE 1 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 66 68 count = start_-end_ ; 67 69 size -= count ; … … 78 80 if (size_-end_ >= size) 79 81 { 82 //info(logProtocol)<<" CASE 2 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 80 83 count = size ; 81 84 size = 0; … … 89 92 else 90 93 { 94 //info(logProtocol)<<" CASE 3 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 91 95 count = size_ - end_ ; 92 96 size -= count ; … … 185 189 vector<bool> usedWindows_ ; 186 190 int currentWindow_ ; 191 int currentMirror_ ; 187 192 int maxWindows_ ; 188 193 -
XIOS3/trunk/src/transport/p2p_server_buffer.cpp
r2589 r2594 19 19 //bufferIn >> controlAddr_; 20 20 createWindow(commSelf, interCommMerged) ; 21 countDeletedBuffers_ = 0; 21 22 } 22 23 … … 80 81 int count ; 81 82 int window ; 83 size_t start ; 82 84 bufferIn >> nbBlocs ; 83 85 MPI_Aint bloc ; … … 85 87 for(int i=0;i<nbBlocs;++i) 86 88 { 87 bufferIn >> bloc >> count >> window; 88 blocs.push_back({bloc, count, window}) ; 89 bufferIn >> bloc >> count >> window >> start; 90 //info(logProtocol) << "Receiving window : "<<window << endl; 91 blocs.push_back({bloc, count, window,start}) ; 89 92 } 90 93 } … … 115 118 { 116 119 if (buffers_.front()->getCount()==0) { 120 // If the front buffer is empty and if another buffer become the active one (buffers_.size()>1) 121 // the front buffer can be deleted, no new message will be sent through the front buffer 117 122 delete buffers_.front(); 118 buffers_.pop_front() ; // if buffer is empty free buffer 123 buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer 124 //info(logProtocol) << "Deleting win : " << countDeletedBuffers_ << endl; 125 countDeletedBuffers_++; 119 126 } 120 127 } … … 223 230 } 224 231 } 232 233 size_t CP2pServerBuffer::remainSize(int bufferId) 234 { 235 if (bufferId-countDeletedBuffers_>=buffers_.size()) 236 { 237 //info(logProtocol) << "The buffer " << bufferId << " is not yet allocated" << endl; 238 return 0; 239 } 240 return buffers_[bufferId-countDeletedBuffers_]->remain() ; 241 } 242 225 243 226 244 void CP2pServerBuffer::transferEvents(void) … … 232 250 233 251 size_t timeline = pendingBlocs_.begin()->first ; 234 auto& blocs = pendingBlocs_.begin()->second ; 235 236 if (!bufferResize_.empty()) 237 { 238 if (bufferResize_.front().first==timeline) 239 { 240 currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 241 info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<" at timeline="<<timeline<<endl ; 242 bufferResize_.pop_front() ; 243 newBuffer(currentBufferSize_,fixed_) ; 244 } 245 } 252 auto& blocs = pendingBlocs_.begin()->second ; // map<size_t , list<tuple<MPI_Aint,int ,int,size_t>>> pendingBlocs_; 253 // timeline, addr ,size,win,start 254 // addr = std::get<0>(bloc) ; 255 // size = std::get<1>(bloc) ; 256 // window = std::get<2>(bloc) ; 257 // start = std::get<3>(bloc) ; // start : used to check mirror behavior 246 258 247 259 size_t eventSize=0 ; 248 for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;249 250 if (eventSize > remain)251 {252 if ( eventSize <= currentBufferSize_) return ; // wait for free storage ;253 else254 {255 if (currentBuffer_==nullptr) remain = eventSize ;256 else remain = currentBuffer_->remain() + fixedSize_ ;257 }258 }259 260 260 261 //if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked"); … … 275 276 //} 276 277 //isLocked_=true ; 277 do 278 // do 279 280 bool spaceForAllblocks = true; 281 int lastBufferUsed = -1; 282 if (blocs.size()==0) spaceForAllblocks = false; 283 else 284 { 285 for(auto& bloc : blocs) 286 { 287 //info(logProtocol) << "blocSize = " << get<1>(bloc) 288 // << " - remain in win : " << get<2>(bloc) << " : " << remainSize( get<2>(bloc) ) 289 // << "; bufferResize_ = " << bufferResize_.size() << endl; 290 291 // if the active buffer change, the new buffer must be considered as empty 292 if (lastBufferUsed != get<2>(bloc) ) eventSize = 0; 293 294 // if the targeted buffer does not exist 295 if ( get<2>(bloc)-countDeletedBuffers_>=buffers_.size() ) 296 { 297 if ( bufferResize_.empty() ) // no resize order 298 { 299 spaceForAllblocks = false; 300 break; 301 } 302 } 303 else if ( ( get<1>(bloc) > (remainSize(get<2>(bloc))-eventSize) ) ) // if there is no enough place in the targeted bloc 304 { 305 spaceForAllblocks = false; 306 break; 307 } 308 else 309 { 310 // if there is enough place in the targeted bloc, store the 311 lastBufferUsed = get<2>(bloc); 312 eventSize += get<1>(bloc); 313 } 314 } 315 } 316 317 if (spaceForAllblocks) 278 318 { 279 319 transferEvent() ; // ok enough storage for this bloc … … 284 324 // break ; // transfering just one event temporary => to remove 285 325 286 if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop 287 288 timeline = pendingBlocs_.begin()->first ; 289 auto& blocs=pendingBlocs_.begin()->second ; 290 291 if (!bufferResize_.empty()) 292 { 293 if (bufferResize_.front().first==timeline) 294 { 295 currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 296 info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<" at timeline="<<timeline<<endl ; 297 bufferResize_.pop_front() ; 298 newBuffer(currentBufferSize_,fixed_) ; 299 } 300 } 301 302 for(auto& bloc : blocs) eventSize+=get<1>(bloc) ; 303 if (transferedSize+eventSize<=remain) 326 // if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop 327 // 328 // timeline = pendingBlocs_.begin()->first ; 329 // auto& blocs=pendingBlocs_.begin()->second ; 330 // 331 // 332 // for(auto& bloc : blocs) eventSize+=get<1>(bloc) ; 333 // if (transferedSize+eventSize<=remain) 304 334 { 305 335 //for(auto& bloc : blocs) … … 317 347 } 318 348 } 319 while(transferedSize+eventSize<=remain) ;349 // while(transferedSize+eventSize<=remain) ; 320 350 321 351 } … … 341 371 size = std::get<1>(bloc) ; 342 372 window = std::get<2>(bloc) ; 373 start = std::get<3>(bloc) ; // start : used to check mirror behavior 343 374 344 375 offset=0 ; 345 376 377 // Need to keep loop even if a given bloc will not be split. 378 // To mimic client behavior, especially if (size_==end_) reset end_ = 0 ; 346 379 do 347 380 { 348 if (currentBuffer_!=nullptr) 349 { 350 currentBuffer_->reserve(size, start, count) ; 381 //if ( (currentBuffer_!=nullptr) || (window-countDeletedBuffers_ == buffers_.size() ) ) 382 { 383 if (window-countDeletedBuffers_ >= buffers_.size()) 384 { 385 if (!bufferResize_.empty()) 386 { 387 if (bufferResize_.front().first==timeline) 388 { 389 currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 390 //info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<" at timeline="<<timeline<<endl ; 391 bufferResize_.pop_front() ; 392 newBuffer(currentBufferSize_,fixed_) ; 393 } 394 } 395 } 396 397 buffers_[window-countDeletedBuffers_]->reserve(size, start, count) ; 351 398 352 399 if ( count > 0) 353 400 { 354 transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;401 transferRmaRequest(timeline, addr, offset, buffers_[window-countDeletedBuffers_], start, count, window) ; 355 402 offset=MPI_Aint_add(offset, count) ; 356 } 357 //currentBuffer_->reserve(size, start, count) ; 358 359 //if ( count > 0) 360 //{ 361 // transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ; 362 // offset=MPI_Aint_add(offset, count) ; 363 //} 403 404 } 364 405 } 365 406 366 if (size>0)367 {368 if (fixed_) newBuffer(std::max(fixedSize_, size),fixed_) ;369 else370 {371 currentBufferSize_ = std::max((size_t)(currentBufferSize_*growingFactor_), size) ;372 newBuffer(currentBufferSize_,fixed_) ;373 }374 }375 407 } while (size > 0 ) ; 376 408 } … … 385 417 if (info.isActive(logProtocol)) 386 418 { 387 info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<" addr="<<addr<<" count="<<count<<" buffer="<<buffer<<" start="<<start<< endl ;419 info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<" addr="<<addr<<" count="<<count<<" buffer="<<buffer<<" start="<<start<<" window="<<window<<endl ; 388 420 info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<" end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1) 389 421 <<" start="<<static_cast<void*>(buffer->getBuffer()+start)<<" end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ; … … 425 457 426 458 size+=bloc.count ; 459 //info(logProtocol) << "Free from : " << bloc.start << ", size : " << bloc.count<< endl; 427 460 bloc.buffer->free(bloc.start, bloc.count) ; // free bloc 428 461 addr=bloc.addr ; … … 431 464 if (buffers_.size() > 1) 432 465 { 466 // If the front buffer is empty and if another buffer become the active one (buffers_.size()>1) 467 // the front buffer can be deleted, no new message will be sent through the front buffer 433 468 delete buffers_.front(); 434 buffers_.pop_front() ; // if buffer is empty free buffer 469 buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer 470 //info(logProtocol) << "Deleting win : " << countDeletedBuffers_ << endl; 471 countDeletedBuffers_++; 435 472 } 436 473 } -
XIOS3/trunk/src/transport/p2p_server_buffer.hpp
r2585 r2594 46 46 if (start_-end_ >= size) 47 47 { 48 //if (start!=end_) info(logProtocol)<<" CASE 0, recv/computed : " << start << "/" << end_ << endl; 48 49 count=size ; 49 50 size = 0 ; … … 54 55 else 55 56 { 57 // Server buffers get blocs from clients, they should not be splitted (mirrors the client buffers) 58 //if (start!=end_) info(logProtocol)<<" CASE 1"<< endl; 59 //info(logProtocol)<<" CASE 1 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 60 ERROR("COneSidedServerBuffer::reserve()",<<"This should be the case of the 2nd part of a splitted message"<<std::endl); 56 61 //count = start_-end_ ; 57 62 //size -= count ; … … 66 71 if (size_-end_ >= size) 67 72 { 73 //if (start!=end_ ) info(logProtocol)<<" CASE 2, recv/computed : "<< start << "/" << end_ << endl; 68 74 count = size ; 69 75 size = 0; … … 74 80 else 75 81 { 82 // Server buffers get blocs from clients, they should not be splitted (mirrors the client buffers) 83 // 1st part of a splitted message, fill the end of the buffer 84 // end_ must be set to 0 like on clients 85 //if (start!=end_) info(logProtocol)<<" CASE 3"<< endl; 86 // info(logProtocol)<<" CASE 3 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 76 87 //count = size_ - end_ ; 77 88 //size -= count ; 78 89 //start=end_ ; 79 //end_ = 0 ;90 end_ = 0 ; 80 91 //count_+= count ; 81 92 count = 0 ; … … 94 105 } 95 106 96 size_t remain(void) { return size_-count_; } 107 size_t remain(void) { 108 if (count_==0) 109 return size_; 110 else if (end_<start_) 111 return start_-end_; 112 else 113 return size_-end_; 114 } 97 115 size_t getSize(void) { return size_ ;} 98 116 size_t getCount(void) {return count_ ;} … … 110 128 while (!buffers_.empty()) { 111 129 delete buffers_.front(); 112 buffers_.pop_front() ; // if buffer is empty free buffer 130 buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer 131 countDeletedBuffers_++; 113 132 } 114 133 }; … … 135 154 void transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window) ; 136 155 size_t remainSize(void) ; 156 size_t remainSize(int bufferId) ; 137 157 138 158 … … 144 164 double bufferServerFactor_=1. ; 145 165 146 std:: list<CBuffer*> buffers_ ;166 std::vector<CBuffer*> buffers_ ; 147 167 CBuffer* currentBuffer_=nullptr ; 148 168 … … 151 171 152 172 map<size_t, int> nbSenders_ ; 153 map<size_t, list<tuple<MPI_Aint,int,int >>> pendingBlocs_;173 map<size_t, list<tuple<MPI_Aint,int,int,size_t>>> pendingBlocs_; 154 174 155 175 vector<MPI_Request> pendingRmaRequests_ ; … … 176 196 const int windowRank_=0 ; 177 197 MPI_Aint lastBlocToFree_=0 ; 198 int countDeletedBuffers_; 178 199 179 200 } ;
Note: See TracChangeset
for help on using the changeset viewer.