- Timestamp:
- 11/17/21 16:56:04 (3 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_COUPLING/src
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp
r2258 r2259 14 14 size_t CClientBuffer::maxRequestSize = 0; 15 15 16 CClientBuffer::CClientBuffer(MPI_Comm interComm, vector<MPI_Win>& windows, int clientRank,int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize)16 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 17 17 : interComm(interComm) 18 18 , clientRank_(0) … … 25 25 , pending(false) 26 26 , hasWindows(false) 27 , windows_(windows) 28 { 29 if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 30 else hasWindows=true ; 27 { 28 /* 29 if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 30 else hasWindows=true ; 31 */ 31 32 32 33 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; … … 86 87 MPI_Aint CClientBuffer::getWinAddress(int i) 87 88 { 88 MPI_Aint address ; 89 90 if (hasWindows) MPI_Get_address(bufferHeader[i], &address) ; 91 else address=0 ; 92 93 return address ; 94 } 89 MPI_Aint address ; 90 MPI_Get_address(bufferHeader[i], &address) ; 91 return address ; 92 } 93 94 void CClientBuffer::attachWindows(vector<MPI_Win>& windows) 95 { 96 windows_=windows ; 97 if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ; 98 else hasWindows=true ; 99 100 if (hasWindows) 101 { 102 MPI_Aint buffSize=bufferSize+headerSize_ ; 103 MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 104 MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 105 106 MPI_Group group ; 107 int groupSize,groupRank ; 108 MPI_Win_get_group(windows_[0], &group) ; 109 MPI_Group_size(group, &groupSize) ; 110 MPI_Group_rank(group, &groupRank) ; 111 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 112 113 MPI_Win_get_group(windows_[1], &group) ; 114 MPI_Group_size(group, &groupSize) ; 115 MPI_Group_rank(group, &groupRank) ; 116 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 117 118 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 119 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 120 121 MPI_Win_unlock(clientRank_, windows_[1]) ; 122 MPI_Win_unlock(clientRank_, windows_[0]) ; 123 } 124 125 } 126 95 127 96 128 CClientBuffer::~CClientBuffer() … … 217 249 int flag; 218 250 219 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 220 MPI_Win_unlock(clientRank_, windows_[0]) ; 221 222 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 223 MPI_Win_unlock(clientRank_, windows_[1]) ; 224 251 if (hasWindows) 252 { 253 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 254 MPI_Win_unlock(clientRank_, windows_[0]) ; 255 256 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 257 MPI_Win_unlock(clientRank_, windows_[1]) ; 258 } 259 225 260 if (pending) 226 261 { -
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.hpp
r2258 r2259 15 15 static size_t maxRequestSize; 16 16 17 CClientBuffer(MPI_Comm intercomm, vector<MPI_Win>& windows, int clientRank,int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize);17 CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize); 18 18 ~CClientBuffer(); 19 19 // void createWindows(MPI_Comm oneSidedComm) ; … … 33 33 void fixBufferSize(size_t bufferSize) { newBufferSize_=bufferSize ; isGrowableBuffer_=false ; resizingBufferStep_=1 ;} 34 34 void fixBuffer(void) { isGrowableBuffer_=false ;} 35 void attachWindows(vector<MPI_Win>& windows) ; 35 36 private: 36 37 void resizeBuffer(size_t newSize) ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.cpp
r2258 r2259 50 50 computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 51 51 52 if (flag) MPI_Intercomm_merge(interComm_,false, &interCommMerged) ; 53 54 if (!isAttachedModeEnabled()) 55 { 56 57 CTimer::get("create Windows").resume() ; 58 59 // We create dummy pair of intercommunicator between clients and server 60 // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 61 // We don't know the reason 62 double time ; 63 MPI_Comm commSelf ; 64 MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 65 MPI_Comm interComm ; 66 winComm_.resize(serverSize) ; 67 windows_.resize(serverSize) ; 68 for(int rank=0; rank<serverSize; rank++) 69 { 70 time=MPI_Wtime() ; 71 MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &interComm) ; 72 MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 73 windows_[rank].resize(2) ; 74 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 75 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 76 time=MPI_Wtime()-time ; 77 info(100)<< "MPI_Win_create_dynamic : client to server rank "<<rank<<" => "<<time/1e-6<<" us"<<endl ; 78 } 79 MPI_Comm_free(&commSelf) ; 80 CTimer::get("create Windows").resume() ; 81 } 52 if (flag) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 53 54 MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows 82 55 83 56 auto time=chrono::system_clock::now().time_since_epoch().count() ; … … 338 311 } 339 312 340 vector<MPI_Win> Wins(2,MPI_WIN_NULL) ; 341 if (!isAttachedModeEnabled()) Wins=windows_[rank] ; 342 343 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, Wins, clientRank, rank, mapBufferSize_[rank], maxEventSizes[rank]); 313 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank]); 344 314 if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ; 345 315 else buffer->fixBuffer() ; … … 354 324 bufOut->put(sendBuff, 4); 355 325 buffer->checkBuffer(true); 356 326 327 // create windows dynamically for one-sided 328 if (!isAttachedModeEnabled()) 329 { 330 CTimer::get("create Windows").resume() ; 331 MPI_Comm interComm ; 332 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ; 333 MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 334 MPI_Comm_free(&interComm) ; 335 windows_[rank].resize(2) ; 336 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 337 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 338 CTimer::get("create Windows").suspend() ; 339 } 340 else 341 { 342 winComm_[rank] = MPI_COMM_NULL ; 343 windows_[rank].resize(2) ; 344 windows_[rank][0] = MPI_WIN_NULL ; 345 windows_[rank][1] = MPI_WIN_NULL ; 346 } 347 buffer->attachWindows(windows_[rank]) ; 357 348 } 358 349 … … 384 375 if (!isAttachedModeEnabled()) 385 376 { 386 for(int rank=0; rank<serverSize; rank++) 387 { 377 for(auto& it : winComm_) 378 { 379 int rank = it.first ; 388 380 MPI_Win_free(&windows_[rank][0]); 389 381 MPI_Win_free(&windows_[rank][1]); -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.hpp
r2258 r2259 90 90 MPI_Comm interComm; //!< Communicator of server group (interCommunicator) 91 91 92 MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 92 MPI_Comm interCommMerged_; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 93 MPI_Comm commSelf_ ; //!< Communicator for proc alone from interCommMerged 93 94 94 95 MPI_Comm intraComm; //!< Communicator of client group … … 120 121 std::list<int> ranksServerNotLeader; 121 122 122 std:: vector<MPI_Comm> winComm_ ; //! Window communicators123 std:: vector<std::vector<MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2]123 std::map<int, MPI_Comm> winComm_ ; //! Window communicators 124 std::map<int, std::vector<MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2] 124 125 bool isAttached_ ; 125 126 CContextServer* associatedServer_ ; //!< The server associated to the pair client/server -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp
r2258 r2259 73 73 74 74 75 if (!isAttachedModeEnabled()) 76 { 77 CTimer::get("create Windows").resume() ; 78 79 MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 80 81 double time ; 82 windows_.resize(clientSize_) ; 83 MPI_Comm commSelf ; 84 MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf) ; 85 MPI_Comm interComm ; 86 winComm_.resize(clientSize_) ; 87 for(int rank=0; rank<clientSize_ ; rank++) 88 { 89 time=MPI_Wtime() ; 90 MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &interComm) ; 91 MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 92 windows_[rank].resize(2) ; 93 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 94 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 95 time=MPI_Wtime()-time ; 96 info(100)<< "MPI_Win_create_dynamic : server to client rank "<<rank<<" => "<<time/1e-6<<" us"<<endl ; 97 } 98 MPI_Comm_free(&commSelf) ; 99 CTimer::get("create Windows").suspend() ; 100 } 101 else 102 { 103 winComm_.resize(clientSize_) ; 104 windows_.resize(clientSize_) ; 105 for(int rank=0; rank<clientSize_ ; rank++) 106 { 107 winComm_[rank] = MPI_COMM_NULL ; 108 windows_[rank].resize(2) ; 109 windows_[rank][0]=MPI_WIN_NULL ; 110 windows_[rank][1]=MPI_WIN_NULL ; 111 } 112 } 75 if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 76 MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows 113 77 114 78 itLastTimeLine=lastTimeLine.begin() ; … … 183 147 if (it==buffers.end()) // Receive the buffer size and allocate the buffer 184 148 { 185 MPI_Aint recvBuff[4] ; 186 MPI_Mrecv(recvBuff, 4, MPI_AINT, &message, &status); 187 remoteHashId_ = recvBuff[0] ; 188 StdSize buffSize = recvBuff[1]; 189 vector<MPI_Aint> winAdress(2) ; 190 winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 191 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 192 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first; 193 lastTimeLine[rank]=0 ; 194 itLastTimeLine=lastTimeLine.begin() ; 195 return true; 149 MPI_Aint recvBuff[4] ; 150 MPI_Mrecv(recvBuff, 4, MPI_AINT, &message, &status); 151 remoteHashId_ = recvBuff[0] ; 152 StdSize buffSize = recvBuff[1]; 153 vector<MPI_Aint> winAdress(2) ; 154 winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 155 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 156 157 // create windows dynamically for one-sided 158 if (!isAttachedModeEnabled()) 159 { 160 CTimer::get("create Windows").resume() ; 161 MPI_Comm interComm ; 162 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ; 163 MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 164 windows_[rank].resize(2) ; 165 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 166 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 167 CTimer::get("create Windows").suspend() ; 168 } 169 else 170 { 171 winComm_[rank] = MPI_COMM_NULL ; 172 windows_[rank].resize(2) ; 173 windows_[rank][0] = MPI_WIN_NULL ; 174 windows_[rank][1] = MPI_WIN_NULL ; 175 } 176 177 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first; 178 lastTimeLine[rank]=0 ; 179 itLastTimeLine=lastTimeLine.begin() ; 180 181 return true; 196 182 } 197 183 else … … 425 411 if (!isAttachedModeEnabled()) 426 412 { 427 for(int rank=0; rank<clientSize_; rank++) 428 { 413 for(auto& it : winComm_) 414 { 415 int rank = it.first ; 429 416 MPI_Win_free(&windows_[rank][0]); 430 417 MPI_Win_free(&windows_[rank][1]); -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.hpp
r2258 r2259 44 44 int clientSize_ ; 45 45 46 MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 46 MPI_Comm interCommMerged_; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 47 MPI_Comm commSelf_ ; //!< Communicator for proc alone from interCommMerged 47 48 48 49 map<int,CServerBuffer*> buffers ; … … 71 72 72 73 private: 73 74 74 75 std::map<int, StdSize> mapBufferSize_; 75 std:: vector<MPI_Comm>winComm_ ; //! Window communicators76 std:: vector<std::vector<MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side.76 std::map<int,MPI_Comm> winComm_ ; //! Window communicators 77 std::map<int,std::vector<MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side. 77 78 CEventScheduler* eventScheduler_ ; 78 79 bool isProcessingEvent_ ;
Note: See TracChangeset
for help on using the changeset viewer.