Changeset 2671
- Timestamp:
- 10/29/24 15:25:33 (3 months ago)
- Location:
- XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport
- Files:
-
- 16 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/legacy_context_client.cpp
r2669 r2671 29 29 CLegacyContextClient::CLegacyContextClient(CContext* parent, MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtSer) 30 30 : CContextClient(parent, intraComm, interComm, cxtSer), 31 mapBufferSize_(), maxBufferedEvents (4)31 mapBufferSize_(), maxBufferedEvents_(4) 32 32 { 33 pureOneSided =CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)33 pureOneSided_=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 34 34 xios::MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 35 35 CXios::getMpiGarbageCollector().registerCommunicator(interCommMerged_) ; … … 37 37 CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ; 38 38 eventScheduler_ = parent->getEventScheduler() ; 39 timeLine = 1;39 timeLine_ = 1; 40 40 } 41 41 … … 49 49 // ostringstream str ; 50 50 // for(auto& rank : ranks) str<<rank<<" ; " ; 51 // info(100)<<"Event "<<timeLine <<" of context "<<context_->getId()<<" for ranks : "<<str.str()<<endl ;51 // info(100)<<"Event "<<timeLine_<<" of context "<<context_->getId()<<" for ranks : "<<str.str()<<endl ; 52 52 53 53 if (CXios::checkEventSync) … … 55 55 int typeId, classId, typeId_in, classId_in; 56 56 long long timeLine_out; 57 long long timeLine_in( timeLine );57 long long timeLine_in( timeLine_ ); 58 58 typeId_in=event.getTypeId() ; 59 59 classId_in=event.getClassId() ; 60 // MPI_Allreduce(&timeLine ,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm_) ; // MPI_UINT64_T standardized by MPI 360 // MPI_Allreduce(&timeLine_,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm_) ; // MPI_UINT64_T standardized by MPI 3 61 61 MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm_) ; 62 62 MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm_) ; 63 63 MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm_) ; 64 if (typeId/clientSize_!=event.getTypeId() || classId/clientSize_!=event.getClassId() || timeLine_out/clientSize_!=timeLine )64 if (typeId/clientSize_!=event.getTypeId() || classId/clientSize_!=event.getClassId() || timeLine_out/clientSize_!=timeLine_) 65 65 { 66 66 ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)", 67 << "Event are not coherent between client for timeline = "<<timeLine );67 << "Event are not coherent between client for timeline = "<<timeLine_); 68 68 } 69 69 … … 77 77 { 78 78 ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)", 79 <<" Some servers will not receive the message for timeline = "<<timeLine <<endl79 <<" Some servers will not receive the message for timeline = "<<timeLine_<<endl 80 80 <<"Servers are : "<<osstr.str()) ; 81 81 } … … 90 90 // We force the getBuffers call to be non-blocking on classical servers 91 91 list<CBufferOut*> buffList; 92 getBuffers(timeLine , ranks, sizes, buffList) ;93 94 event.send(timeLine , sizes, buffList);92 getBuffers(timeLine_, ranks, sizes, buffList) ; 93 94 event.send(timeLine_, sizes, buffList); 95 95 96 96 //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ; … … 102 102 103 103 synchronize() ; 104 timeLine ++;104 timeLine_++; 105 105 } 106 106 … … 297 297 error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl; 298 298 mapBufferSize_[rank] = CXios::minBufferSize; 299 maxEventSizes [rank] = CXios::minBufferSize;299 maxEventSizes_[rank] = CXios::minBufferSize; 300 300 } 301 301 bool hasWindows = true ; … … 328 328 bool pending = false; 329 329 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 330 pending |= itBuff->second->checkBuffer(!pureOneSided );330 pending |= itBuff->second->checkBuffer(!pureOneSided_); 331 331 return pending; 332 332 } … … 378 378 list<int>::iterator it; 379 379 bool pending = false; 380 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided );380 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided_); 381 381 return pending; 382 382 } -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/legacy_context_client.hpp
r2547 r2671 62 62 63 63 64 size_t timeLine ; //!< Timeline of each event64 size_t timeLine_; //!< Timeline of each event 65 65 66 66 MPI_Comm interCommMerged_; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. … … 69 69 map<int,CClientBuffer*> buffers; //!< Buffers for connection to servers 70 70 71 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used.71 bool pureOneSided_ ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 72 72 73 73 //! Mapping of server and buffer size for each connection to server 74 74 std::map<int,StdSize> mapBufferSize_; 75 75 //! Maximum event sizes estimated for each connection to server 76 std::map<int,StdSize> maxEventSizes ;76 std::map<int,StdSize> maxEventSizes_; 77 77 //! Maximum number of events that can be buffered 78 StdSize maxBufferedEvents ;78 StdSize maxBufferedEvents_; 79 79 80 80 std::map<int, MPI_Comm> winComm_ ; //! Window communicators -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/legacy_context_client_v2.cpp
r2669 r2671 29 29 CLegacyContextClientV2::CLegacyContextClientV2(CContext* parent, MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtSer) 30 30 : CContextClient(parent, intraComm, interComm, cxtSer), 31 mapBufferSize_(), maxBufferedEvents (4)31 mapBufferSize_(), maxBufferedEvents_(4) 32 32 { 33 pureOneSided =CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)33 pureOneSided_=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 34 34 xios::MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 35 35 CXios::getMpiGarbageCollector().registerCommunicator(interCommMerged_) ; … … 49 49 50 50 eventScheduler_ = parent->getEventScheduler() ; 51 timeLine = 1;51 timeLine_ = 1; 52 52 } 53 53 … … 63 63 int typeId, classId, typeId_in, classId_in; 64 64 long long timeLine_out; 65 long long timeLine_in( timeLine );65 long long timeLine_in( timeLine_ ); 66 66 typeId_in=event.getTypeId() ; 67 67 classId_in=event.getClassId() ; 68 // MPI_Allreduce(&timeLine ,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm_) ; // MPI_UINT64_T standardized by MPI 368 // MPI_Allreduce(&timeLine_,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm_) ; // MPI_UINT64_T standardized by MPI 3 69 69 MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm_) ; 70 70 MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm_) ; 71 71 MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm_) ; 72 if (typeId/clientSize_!=event.getTypeId() || classId/clientSize_!=event.getClassId() || timeLine_out/clientSize_!=timeLine )72 if (typeId/clientSize_!=event.getTypeId() || classId/clientSize_!=event.getClassId() || timeLine_out/clientSize_!=timeLine_) 73 73 { 74 74 ERROR("void CLegacyContextClientV2::sendEvent(CEventClient& event)", 75 << "Event are not coherent between client for timeline = "<<timeLine );75 << "Event are not coherent between client for timeline = "<<timeLine_); 76 76 } 77 77 … … 85 85 { 86 86 ERROR("void CLegacyContextClientV2::sendEvent(CEventClient& event)", 87 <<" Some servers will not receive the message for timeline = "<<timeLine <<endl87 <<" Some servers will not receive the message for timeline = "<<timeLine_<<endl 88 88 <<"Servers are : "<<osstr.str()) ; 89 89 } … … 98 98 // We force the getBuffers call to be non-blocking on classical servers 99 99 list<CBufferOut*> buffList; 100 getBuffers(timeLine , ranks, sizes, buffList) ;101 102 event.send(timeLine , sizes, buffList);100 getBuffers(timeLine_, ranks, sizes, buffList) ; 101 102 event.send(timeLine_, sizes, buffList); 103 103 104 104 unlockBuffers(ranks) ; … … 108 108 109 109 synchronize() ; 110 timeLine ++;110 timeLine_++; 111 111 } 112 112 … … 221 221 error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl; 222 222 mapBufferSize_[rank] = CXios::minBufferSize; 223 maxEventSizes [rank] = CXios::minBufferSize;223 maxEventSizes_[rank] = CXios::minBufferSize; 224 224 } 225 225 bool hasWindows = true ; … … 256 256 bool pending = false; 257 257 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 258 pending |= itBuff->second->checkBuffer(!pureOneSided );258 pending |= itBuff->second->checkBuffer(!pureOneSided_); 259 259 return pending; 260 260 } … … 306 306 list<int>::iterator it; 307 307 bool pending = false; 308 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided );308 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided_); 309 309 return pending; 310 310 } -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/legacy_context_client_v2.hpp
r2667 r2671 62 62 63 63 64 size_t timeLine ; //!< Timeline of each event64 size_t timeLine_; //!< Timeline of each event 65 65 66 66 MPI_Comm interCommMerged_; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. … … 69 69 map<int,CLegacyClientBufferV2*> buffers; //!< Buffers for connection to servers 70 70 71 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used.71 bool pureOneSided_ ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 72 72 73 73 //! Mapping of server and buffer size for each connection to server 74 74 std::map<int,StdSize> mapBufferSize_; 75 75 //! Maximum event sizes estimated for each connection to server 76 std::map<int,StdSize> maxEventSizes ;76 std::map<int,StdSize> maxEventSizes_; 77 77 //! Maximum number of events that can be buffered 78 StdSize maxBufferedEvents ;78 StdSize maxBufferedEvents_; 79 79 80 80 std::map<int, MPI_Comm> winComm_ ; //! Window communicators -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/legacy_context_server.cpp
r2670 r2671 40 40 CXios::getMpiGarbageCollector().registerCommunicator(processEventBarrier_) ; 41 41 42 currentTimeLine =1;43 scheduled =false;44 finished =false;42 currentTimeLine_=1; 43 scheduled_=false; 44 finished_=false; 45 45 46 46 xios::MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; … … 49 49 CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ; 50 50 51 itLastTimeLine =lastTimeLine.begin() ;52 53 pureOneSided =CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)51 itLastTimeLine_=lastTimeLine_.begin() ; 52 53 pureOneSided_=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 54 54 } 55 55 56 56 void CLegacyContextServer::setPendingEvent(void) 57 57 { 58 pendingEvent =true;58 pendingEvent_=true; 59 59 } 60 60 61 61 bool CLegacyContextServer::hasPendingEvent(void) 62 62 { 63 return (pendingRequest .size()!=0);63 return (pendingRequest_.size()!=0); 64 64 } 65 65 66 66 bool CLegacyContextServer::hasFinished(void) 67 67 { 68 return finished ;68 return finished_; 69 69 } 70 70 … … 83 83 if (info.isActive(logTimers)) CTimer::get("check event process").suspend(); 84 84 if (info.isActive(logProfile)) CTimer::get("Recv event loop (legacy)").suspend(); 85 return finished ;85 return finished_; 86 86 } 87 87 … … 110 110 int rank=status.MPI_SOURCE ; 111 111 112 it=buffers .find(rank);113 if (it==buffers .end()) // Receive the buffer size and allocate the buffer112 it=buffers_.find(rank); 113 if (it==buffers_.end()) // Receive the buffer size and allocate the buffer 114 114 { 115 115 MPI_Aint recvBuff[4] ; … … 148 148 MPI_Barrier(winComm_[rank]) ; 149 149 150 it=(buffers .insert(pair<int,CServerBuffer*>(rank, new CServerBuffer(rank, windows_[rank], winBufferAddress, 0, buffSize)))).first;151 lastTimeLine [rank]=0 ;152 itLastTimeLine =lastTimeLine.begin() ;150 it=(buffers_.insert(pair<int,CServerBuffer*>(rank, new CServerBuffer(rank, windows_[rank], winBufferAddress, 0, buffSize)))).first; 151 lastTimeLine_[rank]=0 ; 152 itLastTimeLine_=lastTimeLine_.begin() ; 153 153 154 154 return true; … … 157 157 { 158 158 std::pair<MPI_Message,MPI_Status> mypair(message,status) ; 159 pendingProbe [rank].push_back(mypair) ;159 pendingProbe_[rank].push_back(mypair) ; 160 160 return false; 161 161 } … … 169 169 map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe; 170 170 171 for(itProbe=pendingProbe .begin();itProbe!=pendingProbe.end();itProbe++)171 for(itProbe=pendingProbe_.begin();itProbe!=pendingProbe_.end();itProbe++) 172 172 { 173 173 int rank=itProbe->first ; 174 if (pendingRequest .count(rank)==0)174 if (pendingRequest_.count(rank)==0) 175 175 { 176 176 MPI_Message& message = itProbe->second.front().first ; … … 178 178 int count ; 179 179 MPI_Get_count(&status,MPI_CHAR,&count); 180 map<int,CServerBuffer*>::iterator it = buffers .find(rank);180 map<int,CServerBuffer*>::iterator it = buffers_.find(rank); 181 181 if ( (it->second->isBufferFree(count) && !it->second->isResizing()) // accept new request if buffer is free 182 182 || (it->second->isResizing() && it->second->isBufferEmpty()) ) // or if resizing wait for buffer is empty … … 184 184 char * addr; 185 185 addr=(char*)it->second->getBuffer(count); 186 MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest [rank]);187 bufferRequest [rank]=addr;186 MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest_[rank]); 187 bufferRequest_[rank]=addr; 188 188 recvProbe.push_back(rank) ; 189 189 itProbe->second.pop_front() ; … … 192 192 } 193 193 194 for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe [*itRecv].empty()) pendingProbe.erase(*itRecv) ;194 for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe_[*itRecv].empty()) pendingProbe_.erase(*itRecv) ; 195 195 } 196 196 … … 206 206 MPI_Status status; 207 207 208 if (!pendingRequest .empty()) if (info.isActive(logTimers)) CTimer::get("receiving requests").resume();208 if (!pendingRequest_.empty()) if (info.isActive(logTimers)) CTimer::get("receiving requests").resume(); 209 209 else if (info.isActive(logTimers)) CTimer::get("receiving requests").suspend(); 210 210 211 for(it=pendingRequest .begin();it!=pendingRequest.end();it++)211 for(it=pendingRequest_.begin();it!=pendingRequest_.end();it++) 212 212 { 213 213 rank=it->first; … … 217 217 if (flag==true) 218 218 { 219 buffers [rank]->updateCurrentWindows() ;219 buffers_[rank]->updateCurrentWindows() ; 220 220 recvRequest.push_back(rank); 221 221 MPI_Get_count(&status,MPI_CHAR,&count); 222 processRequest(rank,bufferRequest [rank],count);222 processRequest(rank,bufferRequest_[rank],count); 223 223 } 224 224 } … … 226 226 for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++) 227 227 { 228 pendingRequest .erase(*itRecv);229 bufferRequest .erase(*itRecv);228 pendingRequest_.erase(*itRecv); 229 bufferRequest_.erase(*itRecv); 230 230 } 231 231 } … … 239 239 size_t count ; 240 240 241 if (itLastTimeLine ==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;242 for(;itLastTimeLine !=lastTimeLine.end();++itLastTimeLine)243 { 244 rank=itLastTimeLine ->first ;245 if (itLastTimeLine ->second < timeLine && pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())246 { 247 if (buffers [rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);248 if (count >= 0) ++itLastTimeLine ;241 if (itLastTimeLine_==lastTimeLine_.end()) itLastTimeLine_=lastTimeLine_.begin() ; 242 for(;itLastTimeLine_!=lastTimeLine_.end();++itLastTimeLine_) 243 { 244 rank=itLastTimeLine_->first ; 245 if (itLastTimeLine_->second < timeLine && pendingRequest_.count(rank)==0 && buffers_[rank]->isBufferEmpty()) 246 { 247 if (buffers_[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 248 if (count >= 0) ++itLastTimeLine_ ; 249 249 break ; 250 250 } … … 273 273 if (timeLine==timelineEventNotifyChangeBufferSize) 274 274 { 275 buffers [rank]->notifyBufferResizing() ;276 buffers [rank]->updateCurrentWindows() ;277 buffers [rank]->popBuffer(count) ;275 buffers_[rank]->notifyBufferResizing() ; 276 buffers_[rank]->updateCurrentWindows() ; 277 buffers_[rank]->popBuffer(count) ; 278 278 info(100)<<"Context id "<<context_->getId()<<" : Receive NotifyChangeBufferSize from client rank "<<rank<<endl 279 <<"isBufferEmpty ? "<<buffers [rank]->isBufferEmpty()<<" remaining count : "<<buffers[rank]->getUsed()<<endl;279 <<"isBufferEmpty ? "<<buffers_[rank]->isBufferEmpty()<<" remaining count : "<<buffers_[rank]->getUsed()<<endl; 280 280 } 281 281 else if (timeLine==timelineEventChangeBufferSize) … … 284 284 vector<MPI_Aint> winBufferAdress(2) ; 285 285 newBuffer>>newSize>>winBufferAdress[0]>>winBufferAdress[1] ; 286 buffers [rank]->freeBuffer(count) ;287 delete buffers [rank] ;286 buffers_[rank]->freeBuffer(count) ; 287 delete buffers_[rank] ; 288 288 windows_[rank][0] -> setWinBufferAddress(winBufferAdress[0],0) ; 289 289 windows_[rank][1] -> setWinBufferAddress(winBufferAdress[1],0) ; 290 buffers [rank] = new CServerBuffer(rank, windows_[rank], winBufferAdress, 0, newSize) ;290 buffers_[rank] = new CServerBuffer(rank, windows_[rank], winBufferAdress, 0, newSize) ; 291 291 info(100)<<"Context id "<<context_->getId()<<" : Receive ChangeBufferSize from client rank "<<rank 292 292 <<" newSize : "<<newSize<<" Address : "<<winBufferAdress[0]<<" & "<<winBufferAdress[1]<<endl ; … … 295 295 { 296 296 info(100)<<"Context id "<<context_->getId()<<" : Receive standard event from client rank "<<rank<<" with timeLine : "<<timeLine<<endl ; 297 it=events .find(timeLine);297 it=events_.find(timeLine); 298 298 299 if (it==events .end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;300 it->second->push(rank,buffers [rank],startBuffer,size);301 if (timeLine>0) lastTimeLine [rank]=timeLine ;299 if (it==events_.end()) it=events_.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first; 300 it->second->push(rank,buffers_[rank],startBuffer,size); 301 if (timeLine>0) lastTimeLine_[rank]=timeLine ; 302 302 } 303 303 buffer.advance(size); … … 315 315 if (isProcessingEvent_) return ; 316 316 317 it=events .find(currentTimeLine);318 if (it!=events .end())317 it=events_.find(currentTimeLine_); 318 if (it!=events_.end()) 319 319 { 320 320 event=it->second; … … 322 322 if (event->isFull()) 323 323 { 324 if (!scheduled )324 if (!scheduled_) 325 325 { 326 eventScheduler_->registerEvent(currentTimeLine ,hashId_);327 info(100)<<"Context id "<<context_->getId()<<"Schedule event : "<< currentTimeLine <<" "<<hashId_<<endl ;328 scheduled =true;326 eventScheduler_->registerEvent(currentTimeLine_,hashId_); 327 info(100)<<"Context id "<<context_->getId()<<"Schedule event : "<< currentTimeLine_ <<" "<<hashId_<<endl ; 328 scheduled_=true; 329 329 } 330 else if (eventScheduler_->queryEvent(currentTimeLine ,hashId_) )330 else if (eventScheduler_->queryEvent(currentTimeLine_,hashId_) ) 331 331 { 332 332 if (!enableEventsProcessing && isCollectiveEvent(*event)) return ; … … 351 351 int typeId, classId, typeId_in, classId_in; 352 352 long long timeLine_out; 353 long long timeLine_in( currentTimeLine );353 long long timeLine_in( currentTimeLine_ ); 354 354 typeId_in=event->type ; 355 355 classId_in=event->classId ; … … 358 358 MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm_) ; 359 359 MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm_) ; 360 if (typeId/intraCommSize_!=event->type || classId/intraCommSize_!=event->classId || timeLine_out/intraCommSize_!=currentTimeLine )360 if (typeId/intraCommSize_!=event->type || classId/intraCommSize_!=event->classId || timeLine_out/intraCommSize_!=currentTimeLine_) 361 361 { 362 362 ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)", 363 << "Event are not coherent between client for timeline = "<<currentTimeLine );363 << "Event are not coherent between client for timeline = "<<currentTimeLine_); 364 364 } 365 365 } … … 367 367 isProcessingEvent_=true ; 368 368 CTimer::get("Process events").resume(); 369 info(100)<<"Context id "<<context_->getId()<<" : Process Event "<<currentTimeLine <<" of class "<<event->classId<<" of type "<<event->type<<endl ;369 info(100)<<"Context id "<<context_->getId()<<" : Process Event "<<currentTimeLine_<<" of class "<<event->classId<<" of type "<<event->type<<endl ; 370 370 eventScheduler_->popEvent() ; 371 371 dispatchEvent(*event); 372 372 CTimer::get("Process events").suspend(); 373 373 isProcessingEvent_=false ; 374 pendingEvent =false;374 pendingEvent_=false; 375 375 delete event; 376 events .erase(it);377 currentTimeLine ++;378 scheduled = false;376 events_.erase(it); 377 currentTimeLine_++; 378 scheduled_ = false; 379 379 } 380 380 } 381 else if (pendingRequest .empty()) getBufferFromClient(currentTimeLine) ;382 } 383 else if (pendingRequest .empty()) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line381 else if (pendingRequest_.empty()) getBufferFromClient(currentTimeLine_) ; 382 } 383 else if (pendingRequest_.empty()) getBufferFromClient(currentTimeLine_) ; // if pure one sided check buffer even if no event recorded at current time line 384 384 } 385 385 … … 387 387 { 388 388 map<int,CServerBuffer*>::iterator it; 389 for(it=buffers .begin();it!=buffers.end();++it) delete it->second;390 buffers .clear() ;389 for(it=buffers_.begin();it!=buffers_.end();++it) delete it->second; 390 buffers_.clear() ; 391 391 } 392 392 393 393 void CLegacyContextServer::releaseBuffers() 394 394 { 395 //for(auto it=buffers .begin();it!=buffers.end();++it) delete it->second ;396 //buffers .clear() ;395 //for(auto it=buffers_.begin();it!=buffers_.end();++it) delete it->second ; 396 //buffers_.clear() ; 397 397 freeWindows() ; 398 398 } … … 412 412 void CLegacyContextServer::notifyClientsFinalize(void) 413 413 { 414 for(auto it=buffers .begin();it!=buffers.end();++it)414 for(auto it=buffers_.begin();it!=buffers_.end();++it) 415 415 { 416 416 it->second->notifyClientFinalize() ; … … 432 432 { 433 433 if (info.isActive(logProfile)) CTimer::get("Context finalize").resume(); 434 finished =true;434 finished_=true; 435 435 info(20)<<" CLegacyContextServer: Receive context <"<<context_->getId()<<"> finalize."<<endl; 436 436 notifyClientsFinalize() ; -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/legacy_context_server.hpp
r2667 r2671 42 42 MPI_Comm commSelf_ ; //!< Communicator for proc alone from interCommMerged 43 43 44 map<int,CServerBuffer*> buffers ;45 map<int,size_t> lastTimeLine ; //!< last event time line for a processed request46 map<int,size_t>::iterator itLastTimeLine ; //!< iterator on lastTimeLine47 map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe ;48 map<int,MPI_Request> pendingRequest ;49 map<int,char*> bufferRequest ;44 map<int,CServerBuffer*> buffers_ ; 45 map<int,size_t> lastTimeLine_ ; //!< last event time line for a processed request 46 map<int,size_t>::iterator itLastTimeLine_ ; //!< iterator on lastTimeLine 47 map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe_; 48 map<int,MPI_Request> pendingRequest_ ; 49 map<int,char*> bufferRequest_ ; 50 50 51 map<size_t,CEventServer*> events ;52 size_t currentTimeLine ;51 map<size_t,CEventServer*> events_ ; 52 size_t currentTimeLine_ ; 53 53 54 bool finished ;55 bool pendingEvent ;56 bool scheduled ; /*!< event of current timeline is alreading scheduled ? */57 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used.54 bool finished_ ; 55 bool pendingEvent_ ; 56 bool scheduled_ ; /*!< event of current timeline is alreading scheduled ? */ 57 bool pureOneSided_ ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 58 58 59 59 ~CLegacyContextServer() ; -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/legacy_context_server_v2.cpp
r2670 r2671 40 40 CXios::getMpiGarbageCollector().registerCommunicator(processEventBarrier_) ; 41 41 42 currentTimeLine =1;43 scheduled =false;44 finished =false;42 currentTimeLine_=1; 43 scheduled_=false; 44 finished_=false; 45 45 46 46 xios::MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; … … 59 59 } 60 60 61 itLastTimeLine =lastTimeLine.begin() ;62 63 pureOneSided =CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)61 itLastTimeLine_=lastTimeLine_.begin() ; 62 63 pureOneSided_=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 64 64 } 65 65 66 66 void CLegacyContextServerV2::setPendingEvent(void) 67 67 { 68 pendingEvent =true;68 pendingEvent_=true; 69 69 } 70 70 71 71 bool CLegacyContextServerV2::hasPendingEvent(void) 72 72 { 73 return (pendingRequest .size()!=0);73 return (pendingRequest_.size()!=0); 74 74 } 75 75 76 76 bool CLegacyContextServerV2::hasFinished(void) 77 77 { 78 return finished ;78 return finished_; 79 79 } 80 80 … … 93 93 if (info.isActive(logTimers)) CTimer::get("check event process").suspend(); 94 94 if (info.isActive(logProfile)) CTimer::get("Recv event loop (legacy)").suspend(); 95 return finished ;95 return finished_; 96 96 } 97 97 … … 120 120 int rank=status.MPI_SOURCE ; 121 121 122 it=buffers .find(rank);123 if (it==buffers .end()) // Receive the buffer size and allocate the buffer122 it=buffers_.find(rank); 123 if (it==buffers_.end()) // Receive the buffer size and allocate the buffer 124 124 { 125 125 MPI_Aint recvBuff[4] ; … … 148 148 if (info.isActive(logTimers)) CTimer::get("create Windows").suspend() ; 149 149 150 it=(buffers .insert(pair<int,CServerBuffer*>(rank, new CServerBuffer(rank, windows_[rank], winBufferAddress, rank, buffSize)))).first;151 lastTimeLine [rank]=0 ;152 itLastTimeLine =lastTimeLine.begin() ;150 it=(buffers_.insert(pair<int,CServerBuffer*>(rank, new CServerBuffer(rank, windows_[rank], winBufferAddress, rank, buffSize)))).first; 151 lastTimeLine_[rank]=0 ; 152 itLastTimeLine_=lastTimeLine_.begin() ; 153 153 154 154 return true; … … 157 157 { 158 158 std::pair<MPI_Message,MPI_Status> mypair(message,status) ; 159 pendingProbe [rank].push_back(mypair) ;159 pendingProbe_[rank].push_back(mypair) ; 160 160 return false; 161 161 } … … 169 169 map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe; 170 170 171 for(itProbe=pendingProbe .begin();itProbe!=pendingProbe.end();itProbe++)171 for(itProbe=pendingProbe_.begin();itProbe!=pendingProbe_.end();itProbe++) 172 172 { 173 173 int rank=itProbe->first ; 174 if (pendingRequest .count(rank)==0)174 if (pendingRequest_.count(rank)==0) 175 175 { 176 176 MPI_Message& message = itProbe->second.front().first ; … … 178 178 int count ; 179 179 MPI_Get_count(&status,MPI_CHAR,&count); 180 map<int,CServerBuffer*>::iterator it = buffers .find(rank);180 map<int,CServerBuffer*>::iterator it = buffers_.find(rank); 181 181 if ( (it->second->isBufferFree(count) && !it->second->isResizing()) // accept new request if buffer is free 182 182 || (it->second->isResizing() && it->second->isBufferEmpty()) ) // or if resizing wait for buffer is empty … … 184 184 char * addr; 185 185 addr=(char*)it->second->getBuffer(count); 186 MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest [rank]);187 bufferRequest [rank]=addr;186 MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest_[rank]); 187 bufferRequest_[rank]=addr; 188 188 recvProbe.push_back(rank) ; 189 189 itProbe->second.pop_front() ; … … 192 192 } 193 193 194 for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe [*itRecv].empty()) pendingProbe.erase(*itRecv) ;194 for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe_[*itRecv].empty()) pendingProbe_.erase(*itRecv) ; 195 195 } 196 196 … … 206 206 MPI_Status status; 207 207 208 if (!pendingRequest .empty()) if (info.isActive(logTimers)) CTimer::get("receiving requests").resume();208 if (!pendingRequest_.empty()) if (info.isActive(logTimers)) CTimer::get("receiving requests").resume(); 209 209 else if (info.isActive(logTimers)) CTimer::get("receiving requests").suspend(); 210 210 211 for(it=pendingRequest .begin();it!=pendingRequest.end();it++)211 for(it=pendingRequest_.begin();it!=pendingRequest_.end();it++) 212 212 { 213 213 rank=it->first; … … 217 217 if (flag==true) 218 218 { 219 buffers [rank]->updateCurrentWindows() ;219 buffers_[rank]->updateCurrentWindows() ; 220 220 recvRequest.push_back(rank); 221 221 MPI_Get_count(&status,MPI_CHAR,&count); 222 processRequest(rank,bufferRequest [rank],count);222 processRequest(rank,bufferRequest_[rank],count); 223 223 } 224 224 } … … 226 226 for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++) 227 227 { 228 pendingRequest .erase(*itRecv);229 bufferRequest .erase(*itRecv);228 pendingRequest_.erase(*itRecv); 229 bufferRequest_.erase(*itRecv); 230 230 } 231 231 } … … 239 239 size_t count ; 240 240 241 if (itLastTimeLine ==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;242 for(;itLastTimeLine !=lastTimeLine.end();++itLastTimeLine)243 { 244 rank=itLastTimeLine ->first ;245 if (itLastTimeLine ->second < timeLine && pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())246 { 247 if (buffers [rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);248 if (count >= 0) ++itLastTimeLine ;241 if (itLastTimeLine_==lastTimeLine_.end()) itLastTimeLine_=lastTimeLine_.begin() ; 242 for(;itLastTimeLine_!=lastTimeLine_.end();++itLastTimeLine_) 243 { 244 rank=itLastTimeLine_->first ; 245 if (itLastTimeLine_->second < timeLine && pendingRequest_.count(rank)==0 && buffers_[rank]->isBufferEmpty()) 246 { 247 if (buffers_[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 248 if (count >= 0) ++itLastTimeLine_ ; 249 249 break ; 250 250 } … … 273 273 if (timeLine==timelineEventNotifyChangeBufferSize) 274 274 { 275 buffers [rank]->notifyBufferResizing() ;276 buffers [rank]->updateCurrentWindows() ;277 buffers [rank]->popBuffer(count) ;275 buffers_[rank]->notifyBufferResizing() ; 276 buffers_[rank]->updateCurrentWindows() ; 277 buffers_[rank]->popBuffer(count) ; 278 278 info(100)<<"Context id "<<context_->getId()<<" : Receive NotifyChangeBufferSize from client rank "<<rank<<endl 279 <<"isBufferEmpty ? "<<buffers [rank]->isBufferEmpty()<<" remaining count : "<<buffers[rank]->getUsed()<<endl;279 <<"isBufferEmpty ? "<<buffers_[rank]->isBufferEmpty()<<" remaining count : "<<buffers_[rank]->getUsed()<<endl; 280 280 } 281 281 else if (timeLine==timelineEventChangeBufferSize) … … 284 284 vector<MPI_Aint> winBufferAdress(2) ; 285 285 newBuffer>>newSize>>winBufferAdress[0]>>winBufferAdress[1] ; 286 buffers [rank]->freeBuffer(count) ;287 delete buffers [rank] ;286 buffers_[rank]->freeBuffer(count) ; 287 delete buffers_[rank] ; 288 288 289 289 int serverRank, commSize ; 290 290 windows_[rank][0] -> setWinBufferAddress(winBufferAdress[0], rank) ; 291 291 windows_[rank][1] -> setWinBufferAddress(winBufferAdress[1], rank) ; 292 buffers [rank] = new CServerBuffer(rank, windows_[rank], winBufferAdress, rank, newSize) ;292 buffers_[rank] = new CServerBuffer(rank, windows_[rank], winBufferAdress, rank, newSize) ; 293 293 info(100)<<"Context id "<<context_->getId()<<" : Receive ChangeBufferSize from client rank "<<rank 294 294 <<" newSize : "<<newSize<<" Address : "<<winBufferAdress[0]<<" & "<<winBufferAdress[1]<<endl ; … … 297 297 { 298 298 info(100)<<"Context id "<<context_->getId()<<" : Receive standard event from client rank "<<rank<<" with timeLine : "<<timeLine<<endl ; 299 it=events .find(timeLine);299 it=events_.find(timeLine); 300 300 301 if (it==events .end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;302 it->second->push(rank,buffers [rank],startBuffer,size);303 if (timeLine>0) lastTimeLine [rank]=timeLine ;301 if (it==events_.end()) it=events_.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first; 302 it->second->push(rank,buffers_[rank],startBuffer,size); 303 if (timeLine>0) lastTimeLine_[rank]=timeLine ; 304 304 } 305 305 buffer.advance(size); … … 317 317 if (isProcessingEvent_) return ; 318 318 319 it=events .find(currentTimeLine);320 if (it!=events .end())319 it=events_.find(currentTimeLine_); 320 if (it!=events_.end()) 321 321 { 322 322 event=it->second; … … 324 324 if (event->isFull()) 325 325 { 326 if (!scheduled )326 if (!scheduled_) 327 327 { 328 eventScheduler_->registerEvent(currentTimeLine ,hashId_);329 info(100)<<"Context id "<<context_->getId()<<"Schedule event : "<< currentTimeLine <<" "<<hashId_<<endl ;330 scheduled =true;328 eventScheduler_->registerEvent(currentTimeLine_,hashId_); 329 info(100)<<"Context id "<<context_->getId()<<"Schedule event : "<< currentTimeLine_ <<" "<<hashId_<<endl ; 330 scheduled_=true; 331 331 } 332 else if (eventScheduler_->queryEvent(currentTimeLine ,hashId_) )332 else if (eventScheduler_->queryEvent(currentTimeLine_,hashId_) ) 333 333 { 334 334 if (!enableEventsProcessing && isCollectiveEvent(*event)) return ; … … 353 353 int typeId, classId, typeId_in, classId_in; 354 354 long long timeLine_out; 355 long long timeLine_in( currentTimeLine );355 long long timeLine_in( currentTimeLine_ ); 356 356 typeId_in=event->type ; 357 357 classId_in=event->classId ; … … 360 360 MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm_) ; 361 361 MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm_) ; 362 if (typeId/intraCommSize_!=event->type || classId/intraCommSize_!=event->classId || timeLine_out/intraCommSize_!=currentTimeLine )362 if (typeId/intraCommSize_!=event->type || classId/intraCommSize_!=event->classId || timeLine_out/intraCommSize_!=currentTimeLine_) 363 363 { 364 364 ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)", 365 << "Event are not coherent between client for timeline = "<<currentTimeLine );365 << "Event are not coherent between client for timeline = "<<currentTimeLine_); 366 366 } 367 367 } … … 369 369 isProcessingEvent_=true ; 370 370 CTimer::get("Process events").resume(); 371 info(100)<<"Context id "<<context_->getId()<<" : Process Event "<<currentTimeLine <<" of class "<<event->classId<<" of type "<<event->type<<endl ;371 info(100)<<"Context id "<<context_->getId()<<" : Process Event "<<currentTimeLine_<<" of class "<<event->classId<<" of type "<<event->type<<endl ; 372 372 eventScheduler_->popEvent() ; 373 373 dispatchEvent(*event); 374 374 CTimer::get("Process events").suspend(); 375 375 isProcessingEvent_=false ; 376 pendingEvent =false;376 pendingEvent_=false; 377 377 delete event; 378 events .erase(it);379 currentTimeLine ++;380 scheduled = false;378 events_.erase(it); 379 currentTimeLine_++; 380 scheduled_ = false; 381 381 } 382 382 } 383 else if (pendingRequest .empty()) getBufferFromClient(currentTimeLine) ;384 } 385 else if (pendingRequest .empty()) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line383 else if (pendingRequest_.empty()) getBufferFromClient(currentTimeLine_) ; 384 } 385 else if (pendingRequest_.empty()) getBufferFromClient(currentTimeLine_) ; // if pure one sided check buffer even if no event recorded at current time line 386 386 } 387 387 … … 389 389 { 390 390 map<int,CServerBuffer*>::iterator it; 391 for(it=buffers .begin();it!=buffers.end();++it) delete it->second;392 buffers .clear() ;391 for(it=buffers_.begin();it!=buffers_.end();++it) delete it->second; 392 buffers_.clear() ; 393 393 } 394 394 395 395 void CLegacyContextServerV2::releaseBuffers() 396 396 { 397 //for(auto it=buffers .begin();it!=buffers.end();++it) delete it->second ;398 //buffers .clear() ;397 //for(auto it=buffers_.begin();it!=buffers_.end();++it) delete it->second ; 398 //buffers_.clear() ; 399 399 freeWindows() ; 400 400 } … … 412 412 void CLegacyContextServerV2::notifyClientsFinalize(void) 413 413 { 414 for(auto it=buffers .begin();it!=buffers.end();++it)414 for(auto it=buffers_.begin();it!=buffers_.end();++it) 415 415 { 416 416 it->second->notifyClientFinalize() ; … … 432 432 { 433 433 if (info.isActive(logProfile)) CTimer::get("Context finalize").resume(); 434 finished =true;434 finished_=true; 435 435 info(20)<<" CLegacyContextServerV2: Receive context <"<<context_->getId()<<"> finalize."<<endl; 436 436 notifyClientsFinalize() ; -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/legacy_context_server_v2.hpp
r2667 r2671 42 42 MPI_Comm commSelf_ ; //!< Communicator for proc alone from interCommMerged 43 43 44 map<int,CServerBuffer*> buffers ;45 map<int,size_t> lastTimeLine ; //!< last event time line for a processed request46 map<int,size_t>::iterator itLastTimeLine ; //!< iterator on lastTimeLine47 map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe ;48 map<int,MPI_Request> pendingRequest ;49 map<int,char*> bufferRequest ;44 map<int,CServerBuffer*> buffers_ ; 45 map<int,size_t> lastTimeLine_ ; //!< last event time line for a processed request 46 map<int,size_t>::iterator itLastTimeLine_ ; //!< iterator on lastTimeLine 47 map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe_; 48 map<int,MPI_Request> pendingRequest_ ; 49 map<int,char*> bufferRequest_ ; 50 50 51 map<size_t,CEventServer*> events ;52 size_t currentTimeLine ;51 map<size_t,CEventServer*> events_ ; 52 size_t currentTimeLine_ ; 53 53 54 bool finished ;55 bool pendingEvent ;56 bool scheduled ; /*!< event of current timeline is alreading scheduled ? */57 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used.54 bool finished_ ; 55 bool pendingEvent_ ; 56 bool scheduled_ ; /*!< event of current timeline is alreading scheduled ? */ 57 bool pureOneSided_ ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 58 58 59 59 ~CLegacyContextServerV2() ; -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/one_sided_context_client.cpp
r2669 r2671 25 25 COneSidedContextClient::COneSidedContextClient(CContext* parent, MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtSer) 26 26 : CContextClient(parent, intraComm, interComm, cxtSer), 27 mapBufferSize_(), maxBufferedEvents (4)27 mapBufferSize_(), maxBufferedEvents_(4) 28 28 { 29 29 30 pureOneSided =CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)30 pureOneSided_=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 31 31 32 32 xios::MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; … … 36 36 CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ; 37 37 eventScheduler_ = parent->getEventScheduler() ; 38 timeLine = 1;38 timeLine_ = 1; 39 39 } 40 40 … … 50 50 // ostringstream str ; 51 51 // for(auto& rank : ranks) str<<rank<<" ; " ; 52 // info(100)<<"Event "<<timeLine <<" of context "<<context_->getId()<<" for ranks : "<<str.str()<<endl ;52 // info(100)<<"Event "<<timeLine_<<" of context "<<context_->getId()<<" for ranks : "<<str.str()<<endl ; 53 53 54 54 if (CXios::checkEventSync) … … 56 56 int typeId, classId, typeId_in, classId_in; 57 57 long long timeLine_out; 58 long long timeLine_in( timeLine );58 long long timeLine_in( timeLine_ ); 59 59 typeId_in=event.getTypeId() ; 60 60 classId_in=event.getClassId() ; 61 // MPI_Allreduce(&timeLine ,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm_) ; // MPI_UINT64_T standardized by MPI 361 // MPI_Allreduce(&timeLine_,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm_) ; // MPI_UINT64_T standardized by MPI 3 62 62 MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm_) ; 63 63 MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm_) ; 64 64 MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm_) ; 65 if (typeId/clientSize_!=event.getTypeId() || classId/clientSize_!=event.getClassId() || timeLine_out/clientSize_!=timeLine )65 if (typeId/clientSize_!=event.getTypeId() || classId/clientSize_!=event.getClassId() || timeLine_out/clientSize_!=timeLine_) 66 66 { 67 67 ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)", 68 << "Event are not coherent between client for timeline = "<<timeLine );68 << "Event are not coherent between client for timeline = "<<timeLine_); 69 69 } 70 70 … … 78 78 { 79 79 ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)", 80 <<" Some servers will not receive the message for timeline = "<<timeLine <<endl80 <<" Some servers will not receive the message for timeline = "<<timeLine_<<endl 81 81 <<"Servers are : "<<osstr.str()) ; 82 82 } … … 97 97 itBuffer->second->eventLoop() ; 98 98 double time=CTimer::getTime() ; 99 bool succed = itBuffer->second->writeEvent(timeLine , event) ;99 bool succed = itBuffer->second->writeEvent(timeLine_, event) ; 100 100 if (succed) 101 101 { … … 117 117 synchronize() ; 118 118 119 timeLine ++;119 timeLine_++; 120 120 } 121 121 … … 160 160 error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl; 161 161 mapBufferSize_[rank] = CXios::minBufferSize; 162 maxEventSizes [rank] = CXios::minBufferSize;162 maxEventSizes_[rank] = CXios::minBufferSize; 163 163 } 164 164 -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/one_sided_context_client.hpp
r2547 r2671 57 57 void setFixedBuffer(void) { isGrowableBuffer_=false;} 58 58 59 size_t timeLine ; //!< Timeline of each event59 size_t timeLine_; //!< Timeline of each event 60 60 61 61 MPI_Comm interCommMerged_; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. … … 63 63 map<int,COneSidedClientBuffer*> buffers; //!< Buffers for connection to servers 64 64 65 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used.65 bool pureOneSided_ ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 66 66 67 67 private: … … 70 70 std::map<int,StdSize> mapBufferSize_; 71 71 //! Maximum event sizes estimated for each connection to server 72 std::map<int,StdSize> maxEventSizes ;72 std::map<int,StdSize> maxEventSizes_; 73 73 //! Maximum number of events that can be buffered 74 StdSize maxBufferedEvents ;74 StdSize maxBufferedEvents_; 75 75 76 76 std::map<int, MPI_Comm> winComm_ ; //! Window communicators -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/one_sided_context_server.cpp
r2670 r2671 40 40 CXios::getMpiGarbageCollector().registerCommunicator(processEventBarrier_) ; 41 41 42 currentTimeLine =1;43 scheduled =false;44 finished =false;42 currentTimeLine_=1; 43 scheduled_=false; 44 finished_=false; 45 45 46 46 xios::MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; … … 49 49 CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ; 50 50 51 itLastTimeLine =lastTimeLine.begin() ;52 53 pureOneSided =CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)51 itLastTimeLine_=lastTimeLine_.begin() ; 52 53 pureOneSided_=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 54 54 55 55 } … … 57 57 void COneSidedContextServer::setPendingEvent(void) 58 58 { 59 pendingEvent =true;59 pendingEvent_=true; 60 60 } 61 61 … … 67 67 bool COneSidedContextServer::hasFinished(void) 68 68 { 69 return finished ;69 return finished_; 70 70 } 71 71 … … 87 87 processEvents(enableEventsProcessing); 88 88 if (info.isActive(logTimers)) CTimer::get("check event process").suspend(); 89 return finished ;89 return finished_; 90 90 91 91 } … … 170 170 if (isProcessingEvent_) return ; 171 171 172 auto it=completedEvents_.find(currentTimeLine );172 auto it=completedEvents_.find(currentTimeLine_); 173 173 174 174 if (it!=completedEvents_.end()) … … 176 176 if (it->second.nbSenders == it->second.currentNbSenders) 177 177 { 178 if (!scheduled )178 if (!scheduled_) 179 179 { 180 eventScheduler_->registerEvent(currentTimeLine ,hashId_);181 scheduled =true;180 eventScheduler_->registerEvent(currentTimeLine_,hashId_); 181 scheduled_=true; 182 182 } 183 else if (eventScheduler_->queryEvent(currentTimeLine ,hashId_) )183 else if (eventScheduler_->queryEvent(currentTimeLine_,hashId_) ) 184 184 { 185 185 //if (!enableEventsProcessing && isCollectiveEvent(event)) return ; … … 204 204 isProcessingEvent_=true ; 205 205 CEventServer event(this) ; 206 for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine , event) ;206 for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine_, event) ; 207 207 // MPI_Barrier(intraComm) ; 208 208 CTimer::get("Process events").resume(); 209 info(100)<<"Context id "<<context_->getId()<<" : Process Event "<<currentTimeLine <<" of class "<<event.classId<<" of type "<<event.type<<endl ;209 info(100)<<"Context id "<<context_->getId()<<" : Process Event "<<currentTimeLine_<<" of class "<<event.classId<<" of type "<<event.type<<endl ; 210 210 dispatchEvent(event); 211 211 CTimer::get("Process events").suspend(); 212 212 isProcessingEvent_=false ; 213 213 // context_->unsetProcessingEvent() ; 214 pendingEvent =false;214 pendingEvent_=false; 215 215 completedEvents_.erase(it); 216 currentTimeLine ++;217 scheduled = false;216 currentTimeLine_++; 217 scheduled_ = false; 218 218 } 219 219 } … … 267 267 { 268 268 if (info.isActive(logProfile)) CTimer::get("Context finalize").resume(); 269 finished =true;269 finished_=true; 270 270 info(20)<<" COneSidedContextServer: Receive context <"<<context_->getId()<<"> finalize."<<endl; 271 271 notifyClientsFinalize() ; -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/one_sided_context_server.hpp
r2561 r2671 80 80 81 81 map<int,COneSidedServerBuffer*> buffers_ ; 82 map<int,size_t> lastTimeLine ; //!< last event time line for a processed request83 map<int,size_t>::iterator itLastTimeLine ; //!< iterator on lastTimeLine84 map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe ;85 map<int,MPI_Request> pendingRequest ;86 map<int,char*> bufferRequest ;82 map<int,size_t> lastTimeLine_ ; //!< last event time line for a processed request 83 map<int,size_t>::iterator itLastTimeLine_ ; //!< iterator on lastTimeLine 84 map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe_; 85 map<int,MPI_Request> pendingRequest_ ; 86 map<int,char*> bufferRequest_ ; 87 87 88 map<size_t,CEventServer*> events ;89 size_t currentTimeLine ;88 map<size_t,CEventServer*> events_ ; 89 size_t currentTimeLine_ ; 90 90 91 bool finished ;92 bool pendingEvent ;93 bool scheduled ; /*!< event of current timeline is alreading scheduled ? */94 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used.91 bool finished_ ; 92 bool pendingEvent_ ; 93 bool scheduled_ ; /*!< event of current timeline is alreading scheduled ? */ 94 bool pureOneSided_ ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 95 95 96 96 private: -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/p2p_context_client.cpp
r2669 r2671 25 25 CP2pContextClient::CP2pContextClient(CContext* parent, MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtSer) 26 26 : CContextClient(parent, intraComm, interComm, cxtSer), 27 mapBufferSize_(), maxBufferedEvents (4)27 mapBufferSize_(), maxBufferedEvents_(4) 28 28 { 29 29 30 pureOneSided =CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)30 pureOneSided_=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 31 31 32 32 xios::MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; … … 36 36 CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ; 37 37 eventScheduler_ = parent->getEventScheduler() ; 38 timeLine = 1;38 timeLine_ = 1; 39 39 } 40 40 … … 50 50 // ostringstream str ; 51 51 // for(auto& rank : ranks) str<<rank<<" ; " ; 52 // info(100)<<"Event "<<timeLine <<" of context "<<context_->getId()<<" for ranks : "<<str.str()<<endl ;52 // info(100)<<"Event "<<timeLine_<<" of context "<<context_->getId()<<" for ranks : "<<str.str()<<endl ; 53 53 54 54 if (CXios::checkEventSync) … … 56 56 int typeId, classId, typeId_in, classId_in; 57 57 long long timeLine_out; 58 long long timeLine_in( timeLine );58 long long timeLine_in( timeLine_ ); 59 59 typeId_in=event.getTypeId() ; 60 60 classId_in=event.getClassId() ; 61 // MPI_Allreduce(&timeLine ,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm_) ; // MPI_UINT64_T standardized by MPI 361 // MPI_Allreduce(&timeLine_,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm_) ; // MPI_UINT64_T standardized by MPI 3 62 62 MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm_) ; 63 63 MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm_) ; 64 64 MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm_) ; 65 if (typeId/clientSize_!=event.getTypeId() || classId/clientSize_!=event.getClassId() || timeLine_out/clientSize_!=timeLine )65 if (typeId/clientSize_!=event.getTypeId() || classId/clientSize_!=event.getClassId() || timeLine_out/clientSize_!=timeLine_) 66 66 { 67 67 ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)", 68 << "Event are not coherent between client for timeline = "<<timeLine );68 << "Event are not coherent between client for timeline = "<<timeLine_); 69 69 } 70 70 … … 78 78 { 79 79 ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)", 80 <<" Some servers will not receive the message for timeline = "<<timeLine <<endl80 <<" Some servers will not receive the message for timeline = "<<timeLine_<<endl 81 81 <<"Servers are : "<<osstr.str()) ; 82 82 } … … 89 89 { 90 90 int rank=event.getRank() ; 91 auto itBuffer=buffers .find(rank) ;92 if (itBuffer==buffers .end())91 auto itBuffer=buffers_.find(rank) ; 92 if (itBuffer==buffers_.end()) 93 93 { 94 94 newBuffer(rank) ; 95 itBuffer=buffers .find(rank) ;95 itBuffer=buffers_.find(rank) ; 96 96 } 97 97 itBuffer->second->eventLoop() ; 98 98 double time=CTimer::getTime() ; 99 bool succed = itBuffer->second->writeEvent(timeLine , event) ;99 bool succed = itBuffer->second->writeEvent(timeLine_, event) ; 100 100 if (succed) 101 101 { … … 117 117 synchronize() ; 118 118 119 timeLine ++;119 timeLine_++; 120 120 } 121 121 … … 163 163 } 164 164 165 CP2pClientBuffer* buffer = buffers [rank] = new CP2pClientBuffer(interComm_, rank, commSelf_, interCommMerged_, clientSize_+rank );165 CP2pClientBuffer* buffer = buffers_[rank] = new CP2pClientBuffer(interComm_, rank, commSelf_, interCommMerged_, clientSize_+rank ); 166 166 if (isGrowableBuffer_) { buffer->setGrowable(growingFactor_) ; } 167 167 else buffer->setFixed(mapBufferSize_[rank]) ; … … 176 176 { 177 177 bool pending = false; 178 for (auto itBuff : buffers )178 for (auto itBuff : buffers_) 179 179 { 180 180 itBuff.second->eventLoop() ; … … 187 187 void CP2pContextClient::releaseBuffers() 188 188 { 189 for (auto& itBuff : buffers ) delete itBuff.second;190 buffers .clear();189 for (auto& itBuff : buffers_) delete itBuff.second; 190 buffers_.clear(); 191 191 } 192 192 … … 202 202 for (auto& rank : ranks) 203 203 { 204 buffers [rank]->eventLoop() ;205 pending |= !(buffers [rank]->isEmpty()) ;204 buffers_[rank]->eventLoop() ; 205 pending |= !(buffers_[rank]->isEmpty()) ; 206 206 } 207 207 return pending; … … 221 221 size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) ; 222 222 mapBufferSize_[it.first]=size ; 223 if (buffers .count(it.first)>0) buffers[it.first]->setFixed(size);223 if (buffers_.count(it.first)>0) buffers_[it.first]->setFixed(size); 224 224 } 225 225 } … … 236 236 int* nbServerConnectionGlobal = new int[serverSize_] ; 237 237 for(int i=0;i<serverSize_;++i) nbServerConnectionLocal[i]=0 ; 238 for (auto itBuff = buffers .begin(); itBuff != buffers.end(); itBuff++) nbServerConnectionLocal[itBuff->first]=1 ;238 for (auto itBuff = buffers_.begin(); itBuff != buffers_.end(); itBuff++) nbServerConnectionLocal[itBuff->first]=1 ; 239 239 for (auto ItServerLeader = ranksServerLeader_.begin(); ItServerLeader != ranksServerLeader_.end(); ItServerLeader++) nbServerConnectionLocal[*ItServerLeader]=1 ; 240 240 … … 286 286 287 287 bool finalized = true; 288 for (auto& it : buffers ) finalized &= it.second->isNotifiedFinalized();288 for (auto& it : buffers_ ) finalized &= it.second->isNotifiedFinalized(); 289 289 return finalized; 290 290 } -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/p2p_context_client.hpp
r2556 r2671 57 57 void setFixedBuffer(void) { isGrowableBuffer_=false;} 58 58 59 size_t timeLine ; //!< Timeline of each event59 size_t timeLine_; //!< Timeline of each event 60 60 61 61 MPI_Comm interCommMerged_; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 62 62 MPI_Comm commSelf_ ; //!< Communicator for proc alone from interCommMerged 63 map<int,CP2pClientBuffer*> buffers ; //!< Buffers for connection to servers63 map<int,CP2pClientBuffer*> buffers_; //!< Buffers for connection to servers 64 64 65 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used.65 bool pureOneSided_ ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 66 66 67 67 private: … … 72 72 std::map<int,StdSize> maxEventSizes; 73 73 //! Maximum number of events that can be buffered 74 StdSize maxBufferedEvents ;74 StdSize maxBufferedEvents_; 75 75 76 76 std::map<int, MPI_Comm> winComm_ ; //! Window communicators -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/p2p_context_server.cpp
r2670 r2671 40 40 CXios::getMpiGarbageCollector().registerCommunicator(processEventBarrier_) ; 41 41 42 currentTimeLine =1;43 scheduled =false;44 finished =false;42 currentTimeLine_=1; 43 scheduled_=false; 44 finished_=false; 45 45 46 46 xios::MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; … … 49 49 CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ; 50 50 51 itLastTimeLine =lastTimeLine.begin() ;52 53 pureOneSided =CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)51 itLastTimeLine_=lastTimeLine_.begin() ; 52 53 pureOneSided_=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 54 54 55 55 } … … 57 57 void CP2pContextServer::setPendingEvent(void) 58 58 { 59 pendingEvent =true;59 pendingEvent_=true; 60 60 } 61 61 … … 67 67 bool CP2pContextServer::hasFinished(void) 68 68 { 69 return finished ;69 return finished_; 70 70 } 71 71 … … 89 89 if (info.isActive(logTimers)) CTimer::get("check event process").suspend(); 90 90 if (info.isActive(logProfile)) CTimer::get("Recv event loop (p2p)").suspend(); 91 return finished ;91 return finished_; 92 92 93 93 } … … 176 176 if (isProcessingEvent_) return ; 177 177 178 auto it=completedEvents_.find(currentTimeLine );178 auto it=completedEvents_.find(currentTimeLine_); 179 179 180 180 if (it!=completedEvents_.end()) … … 182 182 if (it->second.nbSenders == it->second.currentNbSenders) 183 183 { 184 if (!scheduled )184 if (!scheduled_) 185 185 { 186 eventScheduler_->registerEvent(currentTimeLine ,hashId_);187 scheduled =true;186 eventScheduler_->registerEvent(currentTimeLine_,hashId_); 187 scheduled_=true; 188 188 } 189 else if (eventScheduler_->queryEvent(currentTimeLine ,hashId_) )189 else if (eventScheduler_->queryEvent(currentTimeLine_,hashId_) ) 190 190 { 191 191 //if (!enableEventsProcessing && isCollectiveEvent(event)) return ; … … 210 210 isProcessingEvent_=true ; 211 211 CEventServer event(this) ; 212 for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine , event) ;212 for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine_, event) ; 213 213 // MPI_Barrier(intraComm) ; 214 214 CTimer::get("Process events").resume(); 215 info(100)<<"Context id "<<context_->getId()<<" : Process Event "<<currentTimeLine <<" of class "<<event.classId<<" of type "<<event.type<<endl ;215 info(100)<<"Context id "<<context_->getId()<<" : Process Event "<<currentTimeLine_<<" of class "<<event.classId<<" of type "<<event.type<<endl ; 216 216 dispatchEvent(event); 217 217 CTimer::get("Process events").suspend(); 218 218 isProcessingEvent_=false ; 219 219 // context_->unsetProcessingEvent() ; 220 pendingEvent =false;220 pendingEvent_=false; 221 221 completedEvents_.erase(it); 222 currentTimeLine ++;223 scheduled = false;222 currentTimeLine_++; 223 scheduled_ = false; 224 224 } 225 225 } … … 273 273 { 274 274 CTimer::get("Context finalize").resume(); 275 finished =true;275 finished_=true; 276 276 info(20)<<" CP2pContextServer: Receive context <"<<context_->getId()<<"> finalize."<<endl; 277 277 notifyClientsFinalize() ; -
XIOS3/dev/XIOS_NOTIFICATIONS_MANAGER/src/transport/p2p_context_server.hpp
r2563 r2671 80 80 81 81 map<int,CP2pServerBuffer*> buffers_ ; 82 map<int,size_t> lastTimeLine ; //!< last event time line for a processed request83 map<int,size_t>::iterator itLastTimeLine ; //!< iterator on lastTimeLine84 map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe ;85 map<int,MPI_Request> pendingRequest ;86 map<int,char*> bufferRequest ;82 map<int,size_t> lastTimeLine_ ; //!< last event time line for a processed request 83 map<int,size_t>::iterator itLastTimeLine_ ; //!< iterator on lastTimeLine 84 map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe_; 85 map<int,MPI_Request> pendingRequest_ ; 86 map<int,char*> bufferRequest_ ; 87 87 88 map<size_t,CEventServer*> events ;89 size_t currentTimeLine ;88 map<size_t,CEventServer*> events_ ; 89 size_t currentTimeLine_ ; 90 90 91 bool finished ;92 bool pendingEvent ;93 bool scheduled ; /*!< event of current timeline is alreading scheduled ? */94 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used.91 bool finished_ ; 92 bool pendingEvent_ ; 93 bool scheduled_ ; /*!< event of current timeline is alreading scheduled ? */ 94 bool pureOneSided_ ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 95 95 96 96 private:
Note: See TracChangeset
for help on using the changeset viewer.