Changeset 1757 for XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.cpp
- Timestamp:
- 10/18/19 14:55:57 (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.cpp
r1639 r1757 33 33 int flag; 34 34 MPI_Comm_test_inter(interComm,&flag); 35 36 if (flag) attachedMode=false ; 37 else attachedMode=true ; 38 35 39 if (flag) MPI_Comm_remote_size(interComm,&commSize); 36 40 else MPI_Comm_size(interComm,&commSize); 37 41 38 currentTimeLine=0; 42 43 currentTimeLine=1; 39 44 scheduled=false; 40 45 finished=false; … … 44 49 else 45 50 hashId=hashString(context->getId()); 46 } 47 51 52 if (!isAttachedModeEnabled()) 53 { 54 MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 55 // create windows for one sided comm 56 int interCommMergedRank; 57 MPI_Comm winComm ; 58 MPI_Comm_rank(intraComm, &interCommMergedRank); 59 windows.resize(2) ; 60 for(int rank=commSize; rank<commSize+intraCommSize; rank++) 61 { 62 if (rank==commSize+interCommMergedRank) 63 { 64 MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 65 int myRank ; 66 MPI_Comm_rank(winComm,&myRank); 67 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]); 68 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]); 69 } 70 else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 71 MPI_Comm_free(&winComm) ; 72 } 73 } 74 else 75 { 76 windows.resize(2) ; 77 windows[0]=MPI_WIN_NULL ; 78 windows[1]=MPI_WIN_NULL ; 79 } 80 81 82 83 MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ; 84 itLastTimeLine=lastTimeLine.begin() ; 85 86 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 87 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 88 89 } 90 91 //! Attached mode is used ? 92 //! \return true if attached mode is used, false otherwise 93 bool CContextServer::isAttachedModeEnabled() const 94 { 95 return attachedMode ; 96 } 97 48 98 void CContextServer::setPendingEvent(void) 49 99 { … … 65 115 listen(); 66 116 checkPendingRequest(); 67 if (enableEventsProcessing) 68 processEvents(); 117 if (enableEventsProcessing) processEvents(); 69 118 return finished; 70 119 } … … 117 166 if (it==buffers.end()) // Receive the buffer size and allocate the buffer 118 167 { 119 StdSize buffSize = 0; 120 MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 168 MPI_Aint recvBuff[3] ; 169 MPI_Recv(recvBuff, 3, MPI_AINT, rank, 20, interComm, &status); 170 StdSize buffSize = recvBuff[0]; 171 vector<MPI_Aint> winAdress(2) ; 172 winAdress[0]=recvBuff[1] ; winAdress[1]=recvBuff[2] ; 121 173 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 122 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 174 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 175 /* 176 if (!isAttachedModeEnabled()) 177 { 178 MPI_Comm OneSidedInterComm, oneSidedComm ; 179 MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0, &OneSidedInterComm ); 180 MPI_Intercomm_merge(OneSidedInterComm,true,&oneSidedComm); 181 buffers[rank]->createWindows(oneSidedComm) ; 182 } 183 */ 184 lastTimeLine[rank]=0 ; 185 itLastTimeLine=lastTimeLine.begin() ; 186 123 187 return true; 124 188 } … … 157 221 if (flag==true) 158 222 { 223 buffers[rank]->updateCurrentWindows() ; 159 224 recvRequest.push_back(rank); 160 225 MPI_Get_count(&status,MPI_CHAR,&count); … … 170 235 } 171 236 237 void CContextServer::getBufferFromClient(size_t timeLine) 238 { 239 if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 240 { 241 int rank ; 242 char *buffer ; 243 size_t count ; 244 245 if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 246 for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 247 { 248 rank=itLastTimeLine->first ; 249 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0) 250 { 251 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 252 { 253 processRequest(rank, buffer, count); 254 break ; 255 } 256 } 257 } 258 } 259 } 260 261 172 262 void CContextServer::processRequest(int rank, char* buff,int count) 173 263 { … … 176 266 char* startBuffer,endBuffer; 177 267 int size, offset; 178 size_t timeLine ;268 size_t timeLine=0; 179 269 map<size_t,CEventServer*>::iterator it; 180 270 271 181 272 CTimer::get("Process request").resume(); 182 273 while(count>0) … … 185 276 CBufferIn newBuffer(startBuffer,buffer.remain()); 186 277 newBuffer>>size>>timeLine; 187 188 278 it=events.find(timeLine); 189 279 if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first; … … 193 283 count=buffer.remain(); 194 284 } 285 286 if (timeLine>0) lastTimeLine[rank]=timeLine ; 287 195 288 CTimer::get("Process request").suspend(); 196 289 } … … 230 323 } 231 324 } 232 } 325 else getBufferFromClient(currentTimeLine) ; 326 } 327 else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line 233 328 } 234 329 … … 237 332 map<int,CServerBuffer*>::iterator it; 238 333 for(it=buffers.begin();it!=buffers.end();++it) delete it->second; 334 } 335 336 void CContextServer::releaseBuffers() 337 { 338 map<int,CServerBuffer*>::iterator it; 339 bool out ; 340 do 341 { 342 out=true ; 343 for(it=buffers.begin();it!=buffers.end();++it) 344 { 345 // out = out && it->second->freeWindows() ; 346 347 } 348 } while (! out) ; 349 } 350 351 void CContextServer::notifyClientsFinalize(void) 352 { 353 for(auto it=buffers.begin();it!=buffers.end();++it) 354 { 355 it->second->notifyClientFinalize() ; 356 } 239 357 } 240 358 … … 254 372 finished=true; 255 373 info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl; 374 // releaseBuffers() ; 375 notifyClientsFinalize() ; 256 376 context->finalize(); 377 378 /* don't know where release windows 379 MPI_Win_free(&windows[0]) ; 380 MPI_Win_free(&windows[1]) ; 381 */ 257 382 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 258 383 iteMap = mapBufferSize_.end(), itMap;
Note: See TracChangeset
for help on using the changeset viewer.