Changeset 2258 for XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp
- Timestamp:
- 11/16/21 17:37:42 (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp
r2246 r2258 46 46 else attachedMode=true ; 47 47 48 int clientSize ; 49 if (flag) MPI_Comm_remote_size(interComm,&clientSize); 50 else MPI_Comm_size(interComm,&clientSize); 48 if (flag) MPI_Comm_remote_size(interComm,&clientSize_); 49 else MPI_Comm_size(interComm,&clientSize_); 51 50 52 51 … … 80 79 MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 81 80 82 // We create dummy pair of intercommunicator between clients and server 83 // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 84 // We don't know the reason 81 double time ; 82 windows_.resize(clientSize_) ; 85 83 MPI_Comm commSelf ; 86 84 MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf) ; 87 vector<MPI_Comm> dummyComm(clientSize) ; 88 for(int rank=0; rank<clientSize ; rank++) MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &dummyComm[rank]) ; 89 90 // create windows for one sided comm 91 MPI_Comm winComm ; 92 windows.resize(2) ; 93 for(int rank=clientSize; rank<clientSize+intraCommSize; rank++) 94 { 95 if (rank==clientSize+intraCommRank) 96 { 97 MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 98 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]); 99 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]); 100 } 101 else MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 102 // ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 103 // Bug or not ? 104 // MPI_Comm_free(&winComm) ; 105 } 106 107 // free dummy intercommunicator 108 for(int rank=0; rank<clientSize ; rank++) MPI_Comm_free(&dummyComm[rank]) ; 85 MPI_Comm interComm ; 86 winComm_.resize(clientSize_) ; 87 for(int rank=0; rank<clientSize_ ; rank++) 88 { 89 time=MPI_Wtime() ; 90 MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &interComm) ; 91 MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 92 windows_[rank].resize(2) ; 93 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 94 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 95 time=MPI_Wtime()-time ; 96 info(100)<< "MPI_Win_create_dynamic : server to client rank "<<rank<<" => "<<time/1e-6<<" us"<<endl ; 97 } 109 98 MPI_Comm_free(&commSelf) ; 110 99 CTimer::get("create Windows").suspend() ; … … 112 101 else 113 102 { 114 windows.resize(2) ; 115 windows[0]=MPI_WIN_NULL ; 116 windows[1]=MPI_WIN_NULL ; 103 winComm_.resize(clientSize_) ; 104 windows_.resize(clientSize_) ; 105 for(int rank=0; rank<clientSize_ ; rank++) 106 { 107 winComm_[rank] = MPI_COMM_NULL ; 108 windows_[rank].resize(2) ; 109 windows_[rank][0]=MPI_WIN_NULL ; 110 windows_[rank][1]=MPI_WIN_NULL ; 111 } 117 112 } 118 113 … … 160 155 return finished; 161 156 } 162 /*163 void CContextServer::listen(void)164 {165 int rank;166 int flag;167 int count;168 char * addr;169 MPI_Status status;170 map<int,CServerBuffer*>::iterator it;171 bool okLoop;172 173 traceOff();174 // WARNING : with intel MPI, probing crash on an intercommunicator with release library but not with release_mt175 // ==> source $I_MPI_ROOT/intel64/bin/mpivars.sh release_mt needed176 MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status);177 traceOn();178 179 if (flag==true)180 {181 rank=status.MPI_SOURCE ;182 okLoop = true;183 if (pendingRequest.find(rank)==pendingRequest.end())184 okLoop = !listenPendingRequest(status) ;185 if (okLoop)186 {187 for(rank=0;rank<commSize;rank++)188 {189 if (pendingRequest.find(rank)==pendingRequest.end())190 {191 192 traceOff();193 MPI_Iprobe(rank, 20,interComm,&flag,&status);194 traceOn();195 if (flag==true) listenPendingRequest(status) ;196 }197 }198 }199 }200 }201 202 bool CContextServer::listenPendingRequest(MPI_Status& status)203 {204 int count;205 char * addr;206 map<int,CServerBuffer*>::iterator it;207 int rank=status.MPI_SOURCE ;208 209 it=buffers.find(rank);210 if (it==buffers.end()) // Receive the buffer size and allocate the buffer211 {212 MPI_Aint recvBuff[4] ;213 MPI_Recv(recvBuff, 4, MPI_AINT, rank, 20, interComm, &status);214 remoteHashId_ = recvBuff[0] ;215 StdSize buffSize = recvBuff[1];216 vector<MPI_Aint> winAdress(2) ;217 winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;218 mapBufferSize_.insert(std::make_pair(rank, buffSize));219 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first;220 221 lastTimeLine[rank]=0 ;222 itLastTimeLine=lastTimeLine.begin() ;223 224 return true;225 }226 else227 {228 MPI_Get_count(&status,MPI_CHAR,&count);229 if (it->second->isBufferFree(count))230 {231 addr=(char*)it->second->getBuffer(count);232 MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]);233 bufferRequest[rank]=addr;234 return true;235 }236 else237 return false;238 }239 }240 */241 157 242 158 void CContextServer::listen(void) … … 274 190 winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 275 191 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 276 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows , winAdress, rank, buffSize)))).first;192 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first; 277 193 lastTimeLine[rank]=0 ; 278 194 itLastTimeLine=lastTimeLine.begin() ; … … 407 323 newBuffer>>newSize>>winAdress[0]>>winAdress[1] ; 408 324 buffers.erase(rank) ; 409 buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows , winAdress, rank, newSize)));325 buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, newSize))); 410 326 } 411 327 else … … 500 416 void CContextServer::releaseBuffers() 501 417 { 502 map<int,CServerBuffer*>::iterator it; 503 bool out ; 504 do 505 { 506 out=true ; 507 for(it=buffers.begin();it!=buffers.end();++it) 508 { 509 // out = out && it->second->freeWindows() ; 510 511 } 512 } while (! out) ; 513 MPI_Win_free(&windows[0]) ; 514 MPI_Win_free(&windows[1]) ; 418 for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ; 419 buffers.clear() ; 420 freeWindows() ; 421 } 422 423 void CContextServer::freeWindows() 424 { 425 if (!isAttachedModeEnabled()) 426 { 427 for(int rank=0; rank<clientSize_; rank++) 428 { 429 MPI_Win_free(&windows_[rank][0]); 430 MPI_Win_free(&windows_[rank][1]); 431 MPI_Comm_free(&winComm_[rank]) ; 432 } 433 } 515 434 } 516 435 … … 538 457 finished=true; 539 458 info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl; 540 // releaseBuffers() ;541 459 notifyClientsFinalize() ; 542 460 CTimer::get("receiving requests").suspend(); 543 461 context->finalize(); 544 545 // don't know where release windows 546 MPI_Win_free(&windows[0]) ; 547 MPI_Win_free(&windows[1]) ; 462 freeWindows() ; 548 463 549 464 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
Note: See TracChangeset
for help on using the changeset viewer.