Changeset 2019 for XIOS/dev/dev_trunk_graph/src/context_client.cpp
- Timestamp:
- 01/22/21 12:00:29 (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_trunk_graph/src/context_client.cpp
r1634 r2019 22 22 */ 23 23 CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) 24 : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4) 25 { 24 : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4), associatedServer_(nullptr) 25 { 26 26 27 context = parent; 27 28 intraComm = intraComm_; … … 32 33 int flag; 33 34 MPI_Comm_test_inter(interComm, &flag); 35 if (flag) isAttached_=false ; 36 else isAttached_=true ; 37 38 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 39 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 40 41 42 34 43 if (flag) MPI_Comm_remote_size(interComm, &serverSize); 35 44 else MPI_Comm_size(interComm, &serverSize); … … 37 46 computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 38 47 39 timeLine = 0; 48 if (flag) MPI_Intercomm_merge(interComm_,false,&interCommMerged) ; 49 50 if (!isAttachedModeEnabled()) 51 { 52 windows.resize(serverSize) ; 53 MPI_Comm winComm ; 54 for(int rank=0; rank<serverSize; rank++) 55 { 56 windows[rank].resize(2) ; 57 MPI_Comm_split(interCommMerged, rank, clientRank, &winComm); 58 int myRank ; 59 MPI_Comm_rank(winComm,&myRank); 60 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][0]); 61 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][1]); 62 MPI_Comm_free(&winComm) ; 63 } 64 } 65 66 MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 67 68 timeLine = 1; 40 69 } 41 70 … … 116 145 list<int> sizes = event.getSizes(); 117 146 118 // We force the getBuffers call to be non-blocking on classical servers147 // We force the getBuffers call to be non-blocking on classical servers 119 148 list<CBufferOut*> buffList; 120 bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 121 // bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer ); 122 123 if (couldBuffer) 124 { 125 event.send(timeLine, sizes, buffList); 126 info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<" sent"<<endl ; 127 128 checkBuffers(ranks); 129 130 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 131 { 132 waitEvent(ranks); 133 CContext::setCurrent(context->getId()); 134 } 135 } 136 else 137 { 138 tmpBufferedEvent.ranks = ranks; 139 tmpBufferedEvent.sizes = sizes; 140 141 for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 142 tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 143 info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ; 144 event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 145 info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<" sent"<<endl ; 146 } 147 } 148 149 getBuffers(timeLine, ranks, sizes, buffList) ; 150 151 event.send(timeLine, sizes, buffList); 152 153 //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ; 154 155 unlockBuffers(ranks) ; 156 info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<" sent"<<endl ; 157 158 checkBuffers(ranks); 159 } 160 161 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 162 { 163 waitEvent(ranks); 164 CContext::setCurrent(context->getId()); 165 } 166 149 167 timeLine++; 150 }151 152 /*!153 * Send the temporarily buffered event (if any).154 *155 * \return true if a temporarily buffered event could be sent, false otherwise156 */157 bool CContextClient::sendTemporarilyBufferedEvent()158 {159 bool couldSendTmpBufferedEvent = false;160 161 if (hasTemporarilyBufferedEvent())162 {163 list<CBufferOut*> buffList;164 if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call165 {166 list<CBufferOut*>::iterator it, itBuffer;167 168 for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++)169 (*itBuffer)->put((char*)(*it)->start(), (*it)->count());170 171 info(100)<<"DEBUG : temporaly event sent "<<endl ;172 checkBuffers(tmpBufferedEvent.ranks);173 174 tmpBufferedEvent.clear();175 176 couldSendTmpBufferedEvent = true;177 }178 }179 180 return couldSendTmpBufferedEvent;181 168 } 182 169 … … 188 175 void CContextClient::waitEvent(list<int>& ranks) 189 176 { 177 while (checkBuffers(ranks)) 178 { 179 CXios::getDaemonsManager()->eventLoop() ; 180 } 181 182 MPI_Request req ; 183 MPI_Status status ; 184 185 MPI_Ibarrier(intraComm,&req) ; 186 int flag=false ; 187 188 do 189 { 190 CXios::getDaemonsManager()->eventLoop() ; 191 MPI_Test(&req,&flag,&status) ; 192 } while (!flag) ; 193 194 195 } 196 197 198 void CContextClient::waitEvent_old(list<int>& ranks) 199 { 190 200 parentServer->server->setPendingEvent(); 191 201 while (checkBuffers(ranks)) … … 205 215 * it is explicitly requested to be non-blocking. 206 216 * 217 * 218 * \param [in] timeLine time line of the event which will be sent to servers 207 219 * \param [in] serverList list of rank of connected server 208 220 * \param [in] sizeList size of message corresponding to each connection … … 211 223 * \return whether the already allocated buffers could be used 212 224 */ 213 bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,225 bool CContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 214 226 bool nonBlocking /*= false*/) 215 227 { … … 236 248 areBuffersFree = true; 237 249 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 250 { 238 251 areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 252 } 239 253 240 254 if (!areBuffersFree) 241 255 { 256 for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 242 257 checkBuffers(); 243 if (CServer::serverLevel == 0)244 245 246 else if (CServer::serverLevel == 1)258 259 context->server->listen(); 260 261 if (context->serverPrimServer.size()>0) 247 262 { 248 context->server->listen(); 249 for (int i = 0; i < context->serverPrimServer.size(); ++i) 250 context->serverPrimServer[i]->listen(); 251 CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 263 for (int i = 0; i < context->serverPrimServer.size(); ++i) context->serverPrimServer[i]->listen(); 264 //ym CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 265 context->globalEventLoop() ; 252 266 } 253 267 254 else if (CServer::serverLevel == 2)255 context->server->listen();256 257 268 } 258 269 } while (!areBuffersFree && !nonBlocking); 259 260 270 CTimer::get("Blocking time").suspend(); 261 271 … … 263 273 { 264 274 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 265 retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 266 } 267 275 retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 276 } 268 277 return areBuffersFree; 269 278 } … … 281 290 maxEventSizes[rank] = CXios::minBufferSize; 282 291 } 283 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank], maxBufferedEvents); 292 293 vector<MPI_Win> Wins(2,MPI_WIN_NULL) ; 294 if (!isAttachedModeEnabled()) Wins=windows[rank] ; 295 296 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, Wins, clientRank, rank, mapBufferSize_[rank], maxEventSizes[rank]); 284 297 // Notify the server 285 CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize)); 286 bufOut->put(mapBufferSize_[rank]); // Stupid C++ 287 buffer->checkBuffer(); 298 CBufferOut* bufOut = buffer->getBuffer(0, 3*sizeof(MPI_Aint)); 299 MPI_Aint sendBuff[3] ; 300 sendBuff[0]=mapBufferSize_[rank]; // Stupid C++ 301 sendBuff[1]=buffers[rank]->getWinAddress(0); 302 sendBuff[2]=buffers[rank]->getWinAddress(1); 303 info(100)<<"CContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl; 304 bufOut->put(sendBuff, 3); // Stupid C++ 305 buffer->checkBuffer(true); 306 288 307 } 289 308 … … 297 316 bool pending = false; 298 317 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 299 pending |= itBuff->second->checkBuffer( );318 pending |= itBuff->second->checkBuffer(!pureOneSided); 300 319 return pending; 301 320 } … … 307 326 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 308 327 { 309 328 delete itBuff->second; 310 329 } 311 330 buffers.clear(); 312 } 313 331 332 /* don't know when release windows 333 334 if (!isAttachedModeEnabled()) 335 { 336 for(int rank=0; rank<serverSize; rank++) 337 { 338 MPI_Win_free(&windows[rank][0]); 339 MPI_Win_free(&windows[rank][1]); 340 } 341 } 342 */ 343 } 344 345 346 /*! 347 Lock the buffers for one sided communications 348 \param [in] ranks list rank of server to which client connects to 349 */ 350 void CContextClient::lockBuffers(list<int>& ranks) 351 { 352 list<int>::iterator it; 353 for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer(); 354 } 355 356 /*! 357 Unlock the buffers for one sided communications 358 \param [in] ranks list rank of server to which client connects to 359 */ 360 void CContextClient::unlockBuffers(list<int>& ranks) 361 { 362 list<int>::iterator it; 363 for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer(); 364 } 365 314 366 /*! 315 367 Verify state of buffers corresponding to a connection … … 321 373 list<int>::iterator it; 322 374 bool pending = false; 323 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer( );375 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided); 324 376 return pending; 325 377 } … … 335 387 mapBufferSize_ = mapSize; 336 388 maxEventSizes = maxEventSize; 337 338 // Compute the maximum number of events that can be safely buffered.339 double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max();340 for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it)341 {342 double ratio = double(it->second) / maxEventSizes[it->first];343 if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio;344 }345 MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm);346 347 if (minBufferSizeEventSizeRatio < 1.0)348 {349 ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)",350 << "The buffer sizes and the maximum events sizes are incoherent.");351 }352 else if (minBufferSizeEventSizeRatio == std::numeric_limits<double>::max())353 minBufferSizeEventSizeRatio = 1.0; // In this case, maxBufferedEvents will never be used but we want to avoid any floating point exception354 355 maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server356 + size_t(minBufferSizeEventSizeRatio) // one local buffer can always be fully used357 + 1; // the other local buffer might contain only one event358 389 } 359 390 … … 394 425 } 395 426 396 /*!397 * Check if the attached mode is used.398 *399 * \return true if and only if attached mode is used400 */401 bool CContextClient::isAttachedModeEnabled() const402 {403 return (parentServer != 0);404 }405 406 427 /*! 407 428 * Finalize context client and do some reports. Function is non-blocking. … … 410 431 { 411 432 map<int,CClientBuffer*>::iterator itBuff; 433 std::list<int>::iterator ItServerLeader; 434 412 435 bool stop = false; 413 436 437 int* nbServerConnectionLocal = new int[serverSize] ; 438 int* nbServerConnectionGlobal = new int[serverSize] ; 439 for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ; 440 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) nbServerConnectionLocal[itBuff->first]=1 ; 441 for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++) nbServerConnectionLocal[*ItServerLeader]=1 ; 442 443 MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm); 444 445 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 446 CMessage msg; 447 448 for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ; 449 sendEvent(event); 450 451 delete[] nbServerConnectionLocal ; 452 delete[] nbServerConnectionGlobal ; 453 454 414 455 CTimer::get("Blocking time").resume(); 415 while (hasTemporarilyBufferedEvent()) 416 { 417 checkBuffers(); 418 sendTemporarilyBufferedEvent(); 419 } 420 CTimer::get("Blocking time").suspend(); 421 422 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 423 if (isServerLeader()) 424 { 425 CMessage msg; 426 const std::list<int>& ranks = getRanksServerLeader(); 427 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 428 { 429 info(100)<<"DEBUG : Sent context Finalize event to rank "<<*itRank<<endl ; 430 event.push(*itRank, 1, msg); 431 } 432 sendEvent(event); 433 } 434 else sendEvent(event); 435 436 CTimer::get("Blocking time").resume(); 437 // while (!stop) 438 { 439 checkBuffers(); 440 if (hasTemporarilyBufferedEvent()) 441 sendTemporarilyBufferedEvent(); 442 443 stop = true; 444 // for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 445 } 456 checkBuffers(); 446 457 CTimer::get("Blocking time").suspend(); 447 458 … … 458 469 report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 459 470 460 //releaseBuffers(); // moved to CContext::finalize()461 471 } 462 472 … … 472 482 return pending; 473 483 } 474 484 485 bool CContextClient::isNotifiedFinalized(void) 486 { 487 if (isAttachedModeEnabled()) return true ; 488 489 bool finalized = true; 490 map<int,CClientBuffer*>::iterator itBuff; 491 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 492 finalized &= itBuff->second->isNotifiedFinalized(); 493 return finalized; 494 } 475 495 476 496 }
Note: See TracChangeset
for help on using the changeset viewer.