Changeset 1761 for XIOS/dev/dev_ym/XIOS_SERVICES/src/client.cpp
- Timestamp:
- 10/18/19 15:40:35 (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_SERVICES/src/client.cpp
r1756 r1761 12 12 #include "buffer_client.hpp" 13 13 #include "string_tools.hpp" 14 #include "ressources_manager.hpp" 15 #include "services_manager.hpp" 16 #include <functional> 17 #include <cstdio> 18 14 19 15 20 namespace xios 16 21 { 17 22 23 const double serverPublishDefaultTimeout=10; 24 18 25 MPI_Comm CClient::intraComm ; 19 26 MPI_Comm CClient::interComm ; 27 MPI_Comm CClient::clientsComm_ ; 28 20 29 std::list<MPI_Comm> CClient::contextInterComms; 21 30 int CClient::serverLeader ; … … 24 33 StdOFStream CClient::m_infoStream; 25 34 StdOFStream CClient::m_errorStream; 35 CPoolRessource* CClient::poolRessource_=nullptr ; 36 26 37 MPI_Comm& CClient::getInterComm(void) { return (interComm); } 27 38 … … 35 46 */ 36 47 48 void CClient::initRessources(void) 49 { 50 51 /* 52 int commRank; 53 MPI_Comm_rank(CXios::globalComm,&commRank) ; 54 if (commRank==0) 55 { 56 ressources.createPool("ioserver1",ressources.getRessourcesSize()/2) ; 57 } 58 else if (commRank==1) 59 { 60 ressources.createPool("ioserver2",ressources.getRessourcesSize()/2) ; 61 } 62 */ 63 } 64 37 65 void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm) 66 { 67 68 MPI_Comm clientComm ; 69 // initialize MPI if not initialized 70 int initialized ; 71 MPI_Initialized(&initialized) ; 72 if (initialized) is_MPI_Initialized=true ; 73 else is_MPI_Initialized=false ; 74 75 MPI_Comm globalComm=CXios::getGlobalComm() ; 76 77 ///////////////////////////////////////// 78 ///////////// PART 1 //////////////////// 79 ///////////////////////////////////////// 80 81 82 // localComm isn't given 83 if (localComm == MPI_COMM_NULL) 84 { 85 86 // don't use OASIS 87 if (!CXios::usingOasis) 88 { 89 90 if (!is_MPI_Initialized) 91 { 92 MPI_Init(NULL, NULL); 93 } 94 CTimer::get("XIOS").resume() ; 95 CTimer::get("XIOS init/finalize",false).resume() ; 96 97 // split the global communicator 98 // get hash from all model to attribute a unique color (int) and then split to get client communicator 99 // every mpi process of globalComm (MPI_COMM_WORLD) must participate 100 101 int commRank, commSize ; 102 MPI_Comm_rank(globalComm,&commRank) ; 103 MPI_Comm_size(globalComm,&commSize) ; 104 105 std::hash<string> hashString ; 106 size_t hashClient=hashString(codeId) ; 107 108 size_t* hashAll = new size_t[commSize] ; 109 MPI_Allgather(&hashClient,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ; 110 111 int color=0 ; 112 set<size_t> listHash ; 113 for(int i=0 ; i<=commRank ; i++) 114 if (listHash.count(hashAll[i])==0) 115 { 116 listHash.insert(hashAll[i]) ; 117 color=color+1 ; 118 } 119 delete[] hashAll ; 120 121 MPI_Comm_split(globalComm, color, commRank, &clientComm) ; 122 } 123 else // using oasis to split communicator 124 { 125 if (!is_MPI_Initialized) oasis_init(codeId) ; 126 oasis_get_localcomm(clientComm) ; 127 } 128 } 129 else // localComm is given 130 { 131 MPI_Comm_dup(localComm,&clientComm) ; 132 } 133 134 135 ///////////////////////////////////////// 136 ///////////// PART 2 //////////////////// 137 ///////////////////////////////////////// 138 139 140 // Create the XIOS communicator for every process which is related 141 // to XIOS, as well on client side as on server side 142 143 MPI_Comm xiosGlobalComm ; 144 string strIds=CXios::getin<string>("clients_code_id","") ; 145 vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 146 if (strIds.empty()) 147 { 148 // no code Ids given, suppose XIOS initialisation is global 149 int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ; 150 MPI_Comm splitComm,interComm ; 151 MPI_Comm_rank(globalComm,&commGlobalRank) ; 152 MPI_Comm_split(globalComm, 0, commGlobalRank, &splitComm) ; 153 int splitCommSize, globalCommSize ; 154 155 MPI_Comm_size(splitComm,&splitCommSize) ; 156 MPI_Comm_size(globalComm,&globalCommSize) ; 157 if (splitCommSize==globalCommSize) // no server 158 { 159 MPI_Comm_dup(globalComm,&xiosGlobalComm) ; 160 CXios::setXiosComm(xiosGlobalComm) ; 161 } 162 else 163 { 164 MPI_Comm_rank(splitComm,&commRank) ; 165 if (commRank==0) clientLeader=commGlobalRank ; 166 else clientLeader=0 ; 167 serverLeader=0 ; 168 MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 169 MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 170 MPI_Intercomm_create(splitComm, 0, globalComm, serverRemoteLeader,1341,&interComm) ; 171 MPI_Intercomm_merge(interComm,true,&xiosGlobalComm) ; 172 CXios::setXiosComm(xiosGlobalComm) ; 173 } 174 } 175 else 176 { 177 178 xiosGlobalCommByFileExchange(clientComm, codeId) ; 179 180 } 181 182 int commRank ; 183 MPI_Comm_rank(CXios::getXiosComm(), &commRank) ; 184 MPI_Comm_split(CXios::getXiosComm(),false,commRank, &clientsComm_) ; 185 186 // is using server or not ? 187 int xiosCommSize, clientsCommSize ; 188 MPI_Comm_size(CXios::getXiosComm(), &xiosCommSize) ; 189 MPI_Comm_size(clientsComm_, &clientsCommSize) ; 190 if (xiosCommSize==clientsCommSize) CXios::setUsingServer() ; 191 else CXios::setNotUsingServer() ; 192 193 194 CXios::setGlobalRegistry(new CRegistry(clientsComm_)) ; 195 ///////////////////////////////////////// 196 ///////////// PART 3 //////////////////// 197 ///////////////////////////////////////// 198 199 CXios::launchDaemonsManager(false) ; 200 poolRessource_ = new CPoolRessource(clientComm, codeId) ; 201 202 ///////////////////////////////////////// 203 ///////////// PART 4 //////////////////// 204 ///////////////////////////////////////// 205 206 // create the services 207 /* 208 int commRank ; 209 MPI_Comm_rank(clientComm,&commRank) ; 210 auto contextsManager=CXios::getContextsManager() ; 211 212 if (commRank==0) 213 { 214 contextsManager->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, codeId) ; 215 } 216 217 MPI_Comm interComm ; 218 219 contextsManager->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, codeId, clientComm, interComm) ; 220 */ 221 /* while (true) 222 { 223 224 } 225 */ 226 returnComm = clientComm ; 227 } 228 229 230 void CClient::xiosGlobalCommByFileExchange(MPI_Comm clientComm, const string& codeId) 231 { 232 233 MPI_Comm globalComm=CXios::getGlobalComm() ; 234 MPI_Comm xiosGlobalComm ; 235 236 string strIds=CXios::getin<string>("clients_code_id","") ; 237 vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 238 239 int commRank, globalRank, clientRank, serverRank ; 240 MPI_Comm_rank(clientComm, &commRank) ; 241 MPI_Comm_rank(globalComm, &globalRank) ; 242 string clientFileName("__xios_publisher::"+codeId+"__to_remove__") ; 243 244 int error ; 245 246 if (commRank==0) // if root process publish name 247 { 248 std::ofstream ofs (clientFileName, std::ofstream::out); 249 ofs<<globalRank ; 250 ofs.close(); 251 252 // get server root rank 253 254 std::ifstream ifs ; 255 string fileName=("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ; 256 257 double timeout = CXios::getin<double>("server_puplish_timeout",serverPublishDefaultTimeout) ; 258 double time ; 259 260 do 261 { 262 CTimer::get("server_publish_timeout").resume() ; 263 ifs.clear() ; 264 ifs.open(fileName, std::ifstream::in) ; 265 CTimer::get("server_publish_timeout").suspend() ; 266 } while (ifs.fail() && CTimer::get("server_publish_timeout").getCumulatedTime()<timeout) ; 267 268 if (CTimer::get("server_publish_timeout").getCumulatedTime()>=timeout || ifs.fail()) 269 { 270 ifs.clear() ; 271 ifs.close() ; 272 ifs.clear() ; 273 error=true ; 274 } 275 else 276 { 277 ifs>>serverRank ; 278 ifs.close() ; 279 error=false ; 280 } 281 282 } 283 284 MPI_Bcast(&error,1,MPI_INT,0,clientComm) ; 285 286 if (error==false) // you have a server 287 { 288 MPI_Comm intraComm ; 289 MPI_Comm_dup(clientComm,&intraComm) ; 290 MPI_Comm interComm ; 291 292 int pos=0 ; 293 for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ; 294 295 bool high=true ; 296 for(int i=pos ; i<clientsCodeId.size(); i++) 297 { 298 MPI_Intercomm_create(intraComm, 0, globalComm, serverRank, 3141, &interComm); 299 MPI_Comm_free(&intraComm) ; 300 MPI_Intercomm_merge(interComm,high, &intraComm ) ; 301 high=false ; 302 } 303 xiosGlobalComm=intraComm ; 304 } 305 else // no server detected 306 { 307 vector<int> clientsRank(clientsCodeId.size()) ; 308 309 if (commRank==0) 310 { 311 for(int i=0;i<clientsRank.size();i++) 312 { 313 std::ifstream ifs ; 314 string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ; 315 do 316 { 317 ifs.clear() ; 318 ifs.open(fileName, std::ifstream::in) ; 319 } while (ifs.fail()) ; 320 ifs>>clientsRank[i] ; 321 ifs.close() ; 322 } 323 } 324 325 int client ; 326 MPI_Comm intraComm ; 327 MPI_Comm_dup(clientComm,&intraComm) ; 328 MPI_Comm interComm ; 329 330 int pos=0 ; 331 for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ; 332 333 bool high=true ; 334 for(int i=pos+1 ; i<clientsCodeId.size(); i++) 335 { 336 if (codeId==clientsCodeId[0]) // first model play the server rule 337 { 338 MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm); 339 MPI_Intercomm_merge(interComm,false, &intraComm ) ; 340 } 341 else 342 { 343 MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[0], 3141, &interComm); 344 MPI_Intercomm_merge(interComm,high, &intraComm ) ; 345 high=false ; 346 } 347 } 348 xiosGlobalComm=intraComm ; 349 } 350 351 MPI_Barrier(xiosGlobalComm); 352 if (commRank==0) std::remove(clientFileName.c_str()) ; 353 MPI_Barrier(xiosGlobalComm); 354 355 CXios::setXiosComm(xiosGlobalComm) ; 356 357 } 358 359 void CClient::xiosGlobalCommByPublishing(MPI_Comm clientComm, const string& codeId) 360 { 361 362 // untested. need to be developped an a true MPI compliant library 363 364 /* 365 // try to discover other client/server 366 // do you have a xios server ? 367 char portName[MPI_MAX_PORT_NAME]; 368 int ierr ; 369 int commRank ; 370 MPI_Comm_rank(clientComm,&commRank) ; 371 372 MPI_Barrier(globalComm) ; 373 if (commRank==0) 374 { 375 376 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN ); 377 const char* serviceName=CXios::xiosCodeId.c_str() ; 378 ierr=MPI_Lookup_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 379 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL ); 380 } 381 ierr=MPI_SUCCESS ; 382 MPI_Bcast(&ierr,1,MPI_INT,0,clientComm) ; 383 384 if (ierr==MPI_SUCCESS) // you have a server 385 { 386 MPI_Comm intraComm=clientComm ; 387 MPI_Comm interComm ; 388 for(int i=0 ; i<clientsCodeId.size(); i++) 389 { 390 MPI_Comm_connect(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 391 MPI_Intercomm_merge(interComm, true, &intraComm ) ; 392 } 393 xiosGlobalComm=intraComm ; 394 } 395 else // you don't have any server 396 { 397 if (codeId==clientsCodeId[0]) // first code will publish his name 398 { 399 400 if (commRank==0) // if root process publish name 401 { 402 MPI_Open_port(MPI_INFO_NULL, portName); 403 MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 404 } 405 406 MPI_Comm intraComm=clientComm ; 407 MPI_Comm interComm ; 408 for(int i=0 ; i<clientsCodeId.size()-1; i++) 409 { 410 MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 411 MPI_Intercomm_merge(interComm,false, &intraComm ) ; 412 } 413 } 414 else // other clients are connecting to the first one 415 { 416 if (commRank==0) 417 { 418 419 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN ); 420 ierr=MPI_Lookup_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 421 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL ); 422 } 423 424 MPI_Bcast(&ierr,1,MPI_INT,0,clientComm) ; 425 426 if (ierr==MPI_SUCCESS) // you can connect 427 { 428 MPI_Comm intraComm=clientComm ; 429 MPI_Comm interComm ; 430 for(int i=0 ; i<clientsCodeId.size()-1; i++) 431 { 432 MPI_Comm_connect(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 433 MPI_Intercomm_merge(interComm, true, &intraComm ) ; 434 } 435 xiosGlobalComm=intraComm ; 436 } 437 } 438 } 439 */ 440 } 441 442 void CClient::initialize_old(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm) 38 443 { 39 444 int initialized ; … … 42 447 else is_MPI_Initialized=false ; 43 448 int rank ; 44 449 450 CXios::launchRessourcesManager(false) ; 451 CXios::launchServicesManager( false) ; 452 CXios::launchContextsManager(false) ; 453 454 initRessources() ; 45 455 // don't use OASIS 46 456 if (!CXios::usingOasis) … … 160 570 } 161 571 572 573 574 void CClient::registerContext(const string& id, MPI_Comm contextComm) 575 { 576 int commRank, commSize ; 577 MPI_Comm_rank(contextComm,&commRank) ; 578 MPI_Comm_size(contextComm,&commSize) ; 579 580 getPoolRessource()->createService(contextComm, id, 0, CServicesManager::CLIENT, 1) ; 581 getPoolRessource()->createService(contextComm, CXios::defaultServerId, 0, CServicesManager::IO_SERVER, 1) ; 582 583 if (commRank==0) while (!CXios::getServicesManager()->hasService(getPoolRessource()->getId(), id, 0)) { CXios::getDaemonsManager()->eventLoop();} 584 585 if (commRank==0) CXios::getContextsManager()->createServerContext(getPoolRessource()->getId(), id, 0, id) ; 586 int type=CServicesManager::CLIENT ; 587 string name = CXios::getContextsManager()->getServerContextName(getPoolRessource()->getId(), id, 0, type, id) ; 588 while (!CXios::getContextsManager()->hasContext(name, contextComm) ) 589 { 590 CXios::getDaemonsManager()->eventLoop() ; 591 } 592 593 /* 594 595 CContext::setCurrent(id) ; 596 CContext* context=CContext::create(id); 597 598 // register the new client side context to the contexts manager 599 if (commRank==0) 600 { 601 MPI_Comm_rank(CXios::getXiosComm(),&commRank) ; 602 SRegisterContextInfo contextInfo ; 603 contextInfo.serviceType=CServicesManager::CLIENT ; 604 contextInfo.partitionId=0 ; 605 contextInfo.leader=commRank ; 606 contextInfo.size=commSize ; 607 CXios::getContextsManager()->registerContext(id, contextInfo) ; 608 } 609 context->initClient(contextComm) ; 610 */ 611 } 612 613 162 614 ///--------------------------------------------------------------- 163 615 /*! … … 168 620 * Function is only called by client. 169 621 */ 170 void CClient::registerContext (const string& id, MPI_Comm contextComm)622 void CClient::registerContext_old(const string& id, MPI_Comm contextComm) 171 623 { 172 624 CContext::setCurrent(id) ; … … 260 712 } 261 713 262 263 714 void CClient::finalize(void) 715 { 716 717 MPI_Barrier(clientsComm_) ; 718 int commRank ; 719 MPI_Comm_rank(clientsComm_, &commRank) ; 720 if (commRank==0) CXios::getRessourcesManager()->finalize() ; 721 722 auto globalRegistry=CXios::getGlobalRegistry() ; 723 globalRegistry->hierarchicalGatherRegistry() ; 724 725 if (commRank==0) 726 { 727 info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ; 728 globalRegistry->toFile("xios_registry.bin") ; 729 } 730 delete globalRegistry ; 731 732 CTimer::get("XIOS init/finalize",false).suspend() ; 733 CTimer::get("XIOS").suspend() ; 734 if (!is_MPI_Initialized) 735 { 736 if (CXios::usingOasis) oasis_finalize(); 737 else MPI_Finalize() ; 738 } 739 740 info(20) << "Client side context is finalized"<<endl ; 741 report(0) <<" Performance report : Whole time from XIOS init and finalize: "<< CTimer::get("XIOS init/finalize").getCumulatedTime()<<" s"<<endl ; 742 report(0) <<" Performance report : total time spent for XIOS : "<< CTimer::get("XIOS").getCumulatedTime()<<" s"<<endl ; 743 report(0)<< " Performance report : time spent for waiting free buffer : "<< CTimer::get("Blocking time").getCumulatedTime()<<" s"<<endl ; 744 report(0)<< " Performance report : Ratio : "<< CTimer::get("Blocking time").getCumulatedTime()/CTimer::get("XIOS init/finalize").getCumulatedTime()*100.<<" %"<<endl ; 745 report(0)<< " Performance report : This ratio must be close to zero. Otherwise it may be usefull to increase buffer size or numbers of server"<<endl ; 746 // report(0)<< " Memory report : Current buffer_size : "<<CXios::bufferSize<<endl ; 747 report(0)<< " Memory report : Minimum buffer size required : " << CClientBuffer::maxRequestSize << " bytes" << endl ; 748 report(0)<< " Memory report : increasing it by a factor will increase performance, depending of the volume of data wrote in file at each time step of the file"<<endl ; 749 report(100)<<CTimer::getAllCumulatedTime()<<endl ; 750 } 751 752 753 void CClient::finalize_old(void) 264 754 { 265 755 int rank ; … … 325 815 int size = 0; 326 816 int rank; 327 MPI_Comm_size(CXios::globalComm, &size); 817 MPI_Comm_size(CXios::getGlobalComm(), &size); 818 MPI_Comm_rank(CXios::getGlobalComm(),&rank); 328 819 while (size) 329 820 { … … 332 823 } 333 824 334 if (CXios::usingOasis) 335 { 336 MPI_Comm_rank(CXios::globalComm,&rank); 337 fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << rank << ext; 338 } 339 else 340 fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext; 341 825 fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << rank << ext; 342 826 343 827 fb->open(fileNameClient.str().c_str(), std::ios::out);
Note: See TracChangeset
for help on using the changeset viewer.