Changeset 2019 for XIOS/dev/dev_trunk_graph/src/server.cpp
- Timestamp:
- 01/22/21 12:00:29 (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_trunk_graph/src/server.cpp
r1590 r2019 15 15 #include "event_scheduler.hpp" 16 16 #include "string_tools.hpp" 17 #include "ressources_manager.hpp" 18 #include "services_manager.hpp" 19 #include "contexts_manager.hpp" 20 #include "servers_ressource.hpp" 21 #include <cstdio> 22 #include "workflow_graph.hpp" 23 17 24 18 25 namespace xios 19 26 { 20 27 MPI_Comm CServer::intraComm ; 28 MPI_Comm CServer::serversComm_ ; 21 29 std::list<MPI_Comm> CServer::interCommLeft ; 22 30 std::list<MPI_Comm> CServer::interCommRight ; … … 34 42 bool CServer::is_MPI_Initialized ; 35 43 CEventScheduler* CServer::eventScheduler = 0; 36 37 //--------------------------------------------------------------- 38 /*! 39 * \fn void CServer::initialize(void) 40 * Creates intraComm for each possible type of servers (classical, primary or secondary). 41 * Creates interComm and stores them into the following lists: 42 * classical server -- interCommLeft 43 * primary server -- interCommLeft and interCommRight 44 * secondary server -- interCommLeft for each pool. 45 * IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead. 46 */ 44 CServersRessource* CServer::serversRessource_=nullptr ; 45 46 47 47 void CServer::initialize(void) 48 48 { 49 50 MPI_Comm serverComm ; 49 51 int initialized ; 50 52 MPI_Initialized(&initialized) ; 51 53 if (initialized) is_MPI_Initialized=true ; 52 54 else is_MPI_Initialized=false ; 53 int rank ; 54 55 // Not using OASIS 55 MPI_Comm globalComm=CXios::getGlobalComm() ; 56 57 ///////////////////////////////////////// 58 ///////////// PART 1 //////////////////// 59 ///////////////////////////////////////// 60 61 // don't use OASIS 56 62 if (!CXios::usingOasis) 57 63 { 58 59 if (!is_MPI_Initialized) 64 if (!is_MPI_Initialized) MPI_Init(NULL, NULL); 65 66 // split the global communicator 67 // get hash from all model to attribute a unique color (int) and then split to get client communicator 68 // every mpi process of globalComm (MPI_COMM_WORLD) must participate 69 70 int commRank, commSize ; 71 MPI_Comm_rank(globalComm,&commRank) ; 72 MPI_Comm_size(globalComm,&commSize) ; 73 74 std::hash<string> hashString ; 75 size_t hashServer=hashString(CXios::xiosCodeId) ; 76 77 size_t* hashAll = new size_t[commSize] ; 78 MPI_Allgather(&hashServer,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ; 79 80 int color=0 ; 81 set<size_t> listHash ; 82 for(int i=0 ; i<=commRank ; i++) 83 if (listHash.count(hashAll[i])==1) 84 { 85 listHash.insert(hashAll[i]) ; 86 color=color+1 ; 87 } 88 delete[] hashAll ; 89 90 MPI_Comm_split(globalComm, color, commRank, &serverComm) ; 91 } 92 else // using OASIS 93 { 94 if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 95 96 CTimer::get("XIOS").resume() ; 97 oasis_get_localcomm(serverComm); 98 } 99 100 ///////////////////////////////////////// 101 ///////////// PART 2 //////////////////// 102 ///////////////////////////////////////// 103 104 105 // Create the XIOS communicator for every process which is related 106 // to XIOS, as well on client side as on server side 107 MPI_Comm xiosGlobalComm ; 108 string strIds=CXios::getin<string>("clients_code_id","") ; 109 vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 110 if (strIds.empty()) 111 { 112 // no code Ids given, suppose XIOS initialisation is global 113 int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ; 114 MPI_Comm splitComm,interComm ; 115 MPI_Comm_rank(globalComm,&commGlobalRank) ; 116 MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ; 117 MPI_Comm_rank(splitComm,&commRank) ; 118 if (commRank==0) serverLeader=commGlobalRank ; 119 else serverLeader=0 ; 120 clientLeader=0 ; 121 MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 122 MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 123 MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ; 124 MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ; 125 CXios::setXiosComm(xiosGlobalComm) ; 126 } 127 else 128 { 129 130 xiosGlobalCommByFileExchange(serverComm) ; 131 132 } 133 134 ///////////////////////////////////////// 135 ///////////// PART 4 //////////////////// 136 // create servers intra communicator // 137 ///////////////////////////////////////// 138 139 int commRank ; 140 MPI_Comm_rank(CXios::getXiosComm(), &commRank) ; 141 MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ; 142 143 CXios::setUsingServer() ; 144 145 ///////////////////////////////////////// 146 ///////////// PART 5 //////////////////// 147 // redirect files output // 148 ///////////////////////////////////////// 149 150 CServer::openInfoStream(CXios::serverFile); 151 CServer::openErrorStream(CXios::serverFile); 152 153 ///////////////////////////////////////// 154 ///////////// PART 4 //////////////////// 155 ///////////////////////////////////////// 156 157 CXios::launchDaemonsManager(true) ; 158 159 ///////////////////////////////////////// 160 ///////////// PART 5 //////////////////// 161 ///////////////////////////////////////// 162 163 // create the services 164 165 auto ressourcesManager=CXios::getRessourcesManager() ; 166 auto servicesManager=CXios::getServicesManager() ; 167 auto contextsManager=CXios::getContextsManager() ; 168 auto daemonsManager=CXios::getDaemonsManager() ; 169 auto serversRessource=CServer::getServersRessource() ; 170 171 if (serversRessource->isServerLeader()) 172 { 173 int nbRessources = ressourcesManager->getRessourcesSize() ; 174 if (!CXios::usingServer2) 60 175 { 61 MPI_Init(NULL, NULL); 62 } 63 CTimer::get("XIOS").resume() ; 64 65 boost::hash<string> hashString ; 66 unsigned long hashServer = hashString(CXios::xiosCodeId); 67 68 unsigned long* hashAll ; 69 unsigned long* srvLevelAll ; 70 71 int size ; 72 int myColor ; 73 int i,c ; 74 MPI_Comm newComm; 75 76 MPI_Comm_size(CXios::globalComm, &size) ; 77 MPI_Comm_rank(CXios::globalComm, &rank_); 78 79 hashAll=new unsigned long[size] ; 80 MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ; 81 82 map<unsigned long, int> colors ; 83 map<unsigned long, int> leaders ; 84 map<unsigned long, int>::iterator it ; 85 86 // (1) Establish client leaders, distribute processes between two server levels 87 std::vector<int> srvRanks; 88 for(i=0,c=0;i<size;i++) 89 { 90 if (colors.find(hashAll[i])==colors.end()) 91 { 92 colors[hashAll[i]]=c ; 93 leaders[hashAll[i]]=i ; 94 c++ ; 95 } 96 if (CXios::usingServer2) 97 if (hashAll[i] == hashServer) 98 srvRanks.push_back(i); 99 } 100 101 if (CXios::usingServer2) 102 { 103 int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.; 104 if (reqNbProc<1 || reqNbProc==srvRanks.size()) 105 { 106 error(0)<<"WARNING: void CServer::initialize(void)"<<endl 107 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc 108 <<" to secondary server. XIOS will run in the classical server mode."<<endl; 109 } 110 else 111 { 112 if (CXios::nbPoolsServer2 == 0) CXios::nbPoolsServer2 = reqNbProc; 113 int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ; 114 int poolLeader = firstSndSrvRank; 115 //*********** (1) Comment out the line below to set one process per pool 116 sndServerGlobalRanks.push_back(srvRanks[poolLeader]); 117 int nbPools = CXios::nbPoolsServer2; 118 if ( nbPools > reqNbProc || nbPools < 1) 119 { 120 error(0)<<"WARNING: void CServer::initialize(void)"<<endl 121 << "It is impossible to allocate the requested number of pools = "<<nbPools 122 <<" on the secondary server. It will be set so that there is one process per pool."<<endl; 123 nbPools = reqNbProc; 124 } 125 int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools; 126 int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools; 127 for (i=0; i<srvRanks.size(); i++) 128 { 129 if (i >= firstSndSrvRank) 130 { 131 if (rank_ == srvRanks[i]) 132 { 133 serverLevel=2; 134 } 135 poolLeader += procsPerPool; 136 if (remainder != 0) 137 { 138 ++poolLeader; 139 --remainder; 140 } 141 //*********** (2) Comment out the two lines below to set one process per pool 142 if (poolLeader < srvRanks.size()) 143 sndServerGlobalRanks.push_back(srvRanks[poolLeader]); 144 //*********** (3) Uncomment the line below to set one process per pool 145 // sndServerGlobalRanks.push_back(srvRanks[i]); 146 } 147 else 148 { 149 if (rank_ == srvRanks[i]) serverLevel=1; 150 } 151 } 152 if (serverLevel==2) 153 { 154 info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; 155 for (i=0; i<sndServerGlobalRanks.size(); i++) 156 { 157 if (rank_>= sndServerGlobalRanks[i]) 158 { 159 if ( i == sndServerGlobalRanks.size()-1) 160 { 161 myColor = colors.size() + sndServerGlobalRanks[i]; 162 } 163 else if (rank_< sndServerGlobalRanks[i+1]) 164 { 165 myColor = colors.size() + sndServerGlobalRanks[i]; 166 break; 167 } 168 } 169 } 170 } 171 } 172 } 173 174 // (2) Create intraComm 175 if (serverLevel != 2) myColor=colors[hashServer]; 176 MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ; 177 178 // (3) Create interComm 179 if (serverLevel == 0) 180 { 181 int clientLeader; 182 for(it=leaders.begin();it!=leaders.end();it++) 183 { 184 if (it->first!=hashServer) 185 { 186 clientLeader=it->second ; 187 int intraCommSize, intraCommRank ; 188 MPI_Comm_size(intraComm,&intraCommSize) ; 189 MPI_Comm_rank(intraComm,&intraCommRank) ; 190 info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize 191 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ; 192 193 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 194 interCommLeft.push_back(newComm) ; 195 } 196 } 197 } 198 else if (serverLevel == 1) 199 { 200 int clientLeader, srvSndLeader; 201 int srvPrmLeader ; 202 203 for (it=leaders.begin();it!=leaders.end();it++) 204 { 205 if (it->first != hashServer) 206 { 207 clientLeader=it->second ; 208 int intraCommSize, intraCommRank ; 209 MPI_Comm_size(intraComm, &intraCommSize) ; 210 MPI_Comm_rank(intraComm, &intraCommRank) ; 211 info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize 212 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ; 213 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 214 interCommLeft.push_back(newComm) ; 215 } 216 } 217 218 for (int i = 0; i < sndServerGlobalRanks.size(); ++i) 219 { 220 int intraCommSize, intraCommRank ; 221 MPI_Comm_size(intraComm, &intraCommSize) ; 222 MPI_Comm_rank(intraComm, &intraCommRank) ; 223 info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize 224 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< sndServerGlobalRanks[i]<<endl ; 225 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ; 226 interCommRight.push_back(newComm) ; 227 } 176 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 177 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ; 228 178 } 229 179 else 230 180 { 231 int clientLeader; 232 clientLeader = leaders[hashString(CXios::xiosCodeId)]; 233 int intraCommSize, intraCommRank ; 234 MPI_Comm_size(intraComm, &intraCommSize) ; 235 MPI_Comm_rank(intraComm, &intraCommRank) ; 236 info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize 237 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ; 238 239 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ; 240 interCommLeft.push_back(newComm) ; 181 int nprocsServer = nbRessources*CXios::ratioServer2/100.; 182 int nprocsGatherer = nbRessources - nprocsServer ; 183 184 int nbPoolsServer2 = CXios::nbPoolsServer2 ; 185 if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 186 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 187 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 188 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ; 241 189 } 242 243 delete [] hashAll ; 244 245 } 246 // using OASIS 247 else 248 { 249 int size; 250 int myColor; 251 int* srvGlobalRanks; 252 if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 253 254 CTimer::get("XIOS").resume() ; 255 MPI_Comm localComm; 256 oasis_get_localcomm(localComm); 257 MPI_Comm_rank(localComm,&rank_) ; 258 259 // (1) Create server intraComm 260 if (!CXios::usingServer2) 190 } 191 192 ///////////////////////////////////////// 193 ///////////// PART 5 //////////////////// 194 ///////////////////////////////////////// 195 // loop on event loop 196 197 bool finished=false ; 198 while (!finished) 199 { 200 finished=daemonsManager->eventLoop() ; 201 } 202 203 } 204 205 206 207 208 209 void CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm) 210 { 211 212 MPI_Comm globalComm=CXios::getGlobalComm() ; 213 MPI_Comm xiosGlobalComm ; 214 215 string strIds=CXios::getin<string>("clients_code_id","") ; 216 vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 217 218 int commRank, globalRank ; 219 MPI_Comm_rank(serverComm, &commRank) ; 220 MPI_Comm_rank(globalComm, &globalRank) ; 221 string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ; 222 223 if (commRank==0) // if root process publish name 224 { 225 std::ofstream ofs (serverFileName, std::ofstream::out); 226 ofs<<globalRank ; 227 ofs.close(); 228 } 229 230 vector<int> clientsRank(clientsCodeId.size()) ; 231 for(int i=0;i<clientsRank.size();i++) 232 { 233 std::ifstream ifs ; 234 string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ; 235 do 261 236 { 262 MPI_Comm_dup(localComm, &intraComm); 237 ifs.clear() ; 238 ifs.open(fileName, std::ifstream::in) ; 239 } while (ifs.fail()) ; 240 ifs>>clientsRank[i] ; 241 ifs.close() ; 242 } 243 244 MPI_Comm intraComm ; 245 MPI_Comm_dup(serverComm,&intraComm) ; 246 MPI_Comm interComm ; 247 for(int i=0 ; i<clientsRank.size(); i++) 248 { 249 MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm); 250 MPI_Comm_free(&intraComm) ; 251 MPI_Intercomm_merge(interComm,false, &intraComm ) ; 252 } 253 xiosGlobalComm=intraComm ; 254 MPI_Barrier(xiosGlobalComm); 255 if (commRank==0) std::remove(serverFileName.c_str()) ; 256 MPI_Barrier(xiosGlobalComm); 257 258 CXios::setXiosComm(xiosGlobalComm) ; 259 260 } 261 262 263 void CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm) 264 { 265 // untested, need to be tested on a true MPI-2 compliant library 266 267 // try to discover other client/server 268 /* 269 // publish server name 270 char portName[MPI_MAX_PORT_NAME]; 271 int ierr ; 272 int commRank ; 273 MPI_Comm_rank(serverComm, &commRank) ; 274 275 if (commRank==0) // if root process publish name 276 { 277 MPI_Open_port(MPI_INFO_NULL, portName); 278 MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 263 279 } 264 else 265 { 266 int globalRank; 267 MPI_Comm_size(localComm,&size) ; 268 MPI_Comm_rank(CXios::globalComm,&globalRank) ; 269 srvGlobalRanks = new int[size] ; 270 MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ; 271 272 int reqNbProc = size*CXios::ratioServer2/100.; 273 if (reqNbProc < 1 || reqNbProc == size) 274 { 275 error(0)<<"WARNING: void CServer::initialize(void)"<<endl 276 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc 277 <<" to secondary server. XIOS will run in the classical server mode."<<endl; 278 MPI_Comm_dup(localComm, &intraComm); 279 } 280 else 281 { 282 int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ; 283 int poolLeader = firstSndSrvRank; 284 //*********** (1) Comment out the line below to set one process per pool 285 // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); 286 int nbPools = CXios::nbPoolsServer2; 287 if ( nbPools > reqNbProc || nbPools < 1) 288 { 289 error(0)<<"WARNING: void CServer::initialize(void)"<<endl 290 << "It is impossible to allocate the requested number of pools = "<<nbPools 291 <<" on the secondary server. It will be set so that there is one process per pool."<<endl; 292 nbPools = reqNbProc; 293 } 294 int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools; 295 int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools; 296 for (int i=0; i<size; i++) 297 { 298 if (i >= firstSndSrvRank) 299 { 300 if (globalRank == srvGlobalRanks[i]) 301 { 302 serverLevel=2; 303 } 304 poolLeader += procsPerPool; 305 if (remainder != 0) 306 { 307 ++poolLeader; 308 --remainder; 309 } 310 //*********** (2) Comment out the two lines below to set one process per pool 311 // if (poolLeader < size) 312 // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); 313 //*********** (3) Uncomment the line below to set one process per pool 314 sndServerGlobalRanks.push_back(srvGlobalRanks[i]); 315 } 316 else 317 { 318 if (globalRank == srvGlobalRanks[i]) serverLevel=1; 319 } 320 } 321 if (serverLevel==2) 322 { 323 info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; 324 for (int i=0; i<sndServerGlobalRanks.size(); i++) 325 { 326 if (globalRank>= sndServerGlobalRanks[i]) 327 { 328 if (i == sndServerGlobalRanks.size()-1) 329 { 330 myColor = sndServerGlobalRanks[i]; 331 } 332 else if (globalRank< sndServerGlobalRanks[i+1]) 333 { 334 myColor = sndServerGlobalRanks[i]; 335 break; 336 } 337 } 338 } 339 } 340 if (serverLevel != 2) myColor=0; 341 MPI_Comm_split(localComm, myColor, rank_, &intraComm) ; 342 } 280 281 MPI_Comm intraComm=serverComm ; 282 MPI_Comm interComm ; 283 for(int i=0 ; i<clientsCodeId.size(); i++) 284 { 285 MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 286 MPI_Intercomm_merge(interComm,false, &intraComm ) ; 343 287 } 344 345 string codesId=CXios::getin<string>("oasis_codes_id") ; 346 vector<string> oasisCodeId=splitRegex(codesId,"\\s*,\\s*") ; 347 348 vector<string>::iterator it ; 349 350 MPI_Comm newComm ; 351 int globalRank ; 352 MPI_Comm_rank(CXios::globalComm,&globalRank); 353 354 // (2) Create interComms with models 355 for(it=oasisCodeId.begin();it!=oasisCodeId.end();it++) 356 { 357 oasis_get_intercomm(newComm,*it) ; 358 if ( serverLevel == 0 || serverLevel == 1) 359 { 360 interCommLeft.push_back(newComm) ; 361 if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 362 } 363 } 364 365 // (3) Create interComms between primary and secondary servers 366 int intraCommSize, intraCommRank ; 367 MPI_Comm_size(intraComm,&intraCommSize) ; 368 MPI_Comm_rank(intraComm, &intraCommRank) ; 369 370 if (serverLevel == 1) 371 { 372 for (int i = 0; i < sndServerGlobalRanks.size(); ++i) 373 { 374 int srvSndLeader = sndServerGlobalRanks[i]; 375 info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize 376 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvSndLeader<<endl ; 377 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ; 378 interCommRight.push_back(newComm) ; 379 } 380 } 381 else if (serverLevel == 2) 382 { 383 info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize 384 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvGlobalRanks[0] <<endl ; 385 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ; 386 interCommLeft.push_back(newComm) ; 387 } 388 if (CXios::usingServer2) delete [] srvGlobalRanks ; 389 390 bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ; 391 if (!oasisEnddef) oasis_enddef() ; 392 } 393 394 395 MPI_Comm_rank(intraComm, &rank) ; 396 if (rank==0) isRoot=true; 397 else isRoot=false; 398 399 eventScheduler = new CEventScheduler(intraComm) ; 400 } 288 */ 289 } 290 401 291 402 292 void CServer::finalize(void) … … 412 302 MPI_Comm_free(&(*it)); 413 303 414 // for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)415 // MPI_Comm_free(&(*it));416 417 // for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++)418 // MPI_Comm_free(&(*it));419 420 304 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 421 305 MPI_Comm_free(&(*it)); 422 306 423 MPI_Comm_free(&intraComm); 424 307 // MPI_Comm_free(&intraComm); 308 309 CXios::finalizeDaemonsManager(); 310 425 311 if (!is_MPI_Initialized) 426 312 { … … 432 318 report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl ; 433 319 report(100)<<CTimer::getAllCumulatedTime()<<endl ; 434 } 435 436 void CServer::eventLoop(void) 437 { 438 bool stop=false ; 439 440 CTimer::get("XIOS server").resume() ; 441 while(!stop) 442 { 443 if (isRoot) 444 { 445 listenContext(); 446 listenRootContext(); 447 listenOasisEnddef() ; 448 listenRootOasisEnddef() ; 449 if (!finished) listenFinalize() ; 450 } 451 else 452 { 453 listenRootContext(); 454 listenRootOasisEnddef() ; 455 if (!finished) listenRootFinalize() ; 456 } 457 458 contextEventLoop() ; 459 if (finished && contextList.empty()) stop=true ; 460 eventScheduler->checkEvent() ; 461 } 462 CTimer::get("XIOS server").suspend() ; 463 } 464 465 void CServer::listenFinalize(void) 466 { 467 list<MPI_Comm>::iterator it, itr; 468 int msg ; 469 int flag ; 470 471 for(it=interCommLeft.begin();it!=interCommLeft.end();it++) 472 { 473 MPI_Status status ; 474 traceOff() ; 475 MPI_Iprobe(0,0,*it,&flag,&status) ; 476 traceOn() ; 477 if (flag==true) 478 { 479 MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; 480 info(20)<<" CServer : Receive client finalize"<<endl ; 481 // Sending server finalize message to secondary servers (if any) 482 for(itr=interCommRight.begin();itr!=interCommRight.end();itr++) 483 { 484 MPI_Send(&msg,1,MPI_INT,0,0,*itr) ; 485 } 486 MPI_Comm_free(&(*it)); 487 interCommLeft.erase(it) ; 488 break ; 489 } 490 } 491 492 if (interCommLeft.empty()) 493 { 494 int i,size ; 495 MPI_Comm_size(intraComm,&size) ; 496 MPI_Request* requests= new MPI_Request[size-1] ; 497 MPI_Status* status= new MPI_Status[size-1] ; 498 499 for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ; 500 MPI_Waitall(size-1,requests,status) ; 501 502 finished=true ; 503 delete [] requests ; 504 delete [] status ; 505 } 506 } 507 508 509 void CServer::listenRootFinalize() 510 { 511 int flag ; 512 MPI_Status status ; 513 int msg ; 514 515 traceOff() ; 516 MPI_Iprobe(0,4,intraComm, &flag, &status) ; 517 traceOn() ; 518 if (flag==true) 519 { 520 MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ; 521 finished=true ; 522 } 523 } 524 525 526 /*! 527 * Root process is listening for an order sent by client to call "oasis_enddef". 528 * The root client of a compound send the order (tag 5). It is probed and received. 529 * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any). 530 * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done 531 */ 532 533 void CServer::listenOasisEnddef(void) 534 { 535 int flag ; 536 MPI_Status status ; 537 list<MPI_Comm>::iterator it; 538 int msg ; 539 static int nbCompound=0 ; 540 int size ; 541 static bool sent=false ; 542 static MPI_Request* allRequests ; 543 static MPI_Status* allStatus ; 544 545 546 if (sent) 547 { 548 MPI_Comm_size(intraComm,&size) ; 549 MPI_Testall(size,allRequests, &flag, allStatus) ; 550 if (flag==true) 551 { 552 delete [] allRequests ; 553 delete [] allStatus ; 554 sent=false ; 555 } 556 } 557 558 559 for(it=interCommLeft.begin();it!=interCommLeft.end();it++) 560 { 561 MPI_Status status ; 562 traceOff() ; 563 MPI_Iprobe(0,5,*it,&flag,&status) ; // tags oasis_endded = 5 564 traceOn() ; 565 if (flag==true) 566 { 567 MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5 568 nbCompound++ ; 569 if (nbCompound==interCommLeft.size()) 570 { 571 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 572 { 573 MPI_Send(&msg,1,MPI_INT,0,5,*it) ; // tags oasis_endded = 5 574 } 575 MPI_Comm_size(intraComm,&size) ; 576 allRequests= new MPI_Request[size] ; 577 allStatus= new MPI_Status[size] ; 578 for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm,&allRequests[i]) ; // tags oasis_endded = 5 579 sent=true ; 580 } 581 } 582 } 583 } 584 585 /*! 586 * Processes probes message from root process if oasis_enddef call must be done. 587 * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator 588 */ 589 void CServer::listenRootOasisEnddef(void) 590 { 591 int flag ; 592 MPI_Status status ; 593 const int root=0 ; 594 int msg ; 595 static bool eventSent=false ; 596 597 if (eventSent) 598 { 599 boost::hash<string> hashString; 600 size_t hashId = hashString("oasis_enddef"); 601 if (eventScheduler->queryEvent(0,hashId)) 602 { 603 oasis_enddef() ; 604 eventSent=false ; 605 } 606 } 607 608 traceOff() ; 609 MPI_Iprobe(root,5,intraComm, &flag, &status) ; 610 traceOn() ; 611 if (flag==true) 612 { 613 MPI_Recv(&msg,1,MPI_INT,root,5,intraComm,&status) ; // tags oasis_endded = 5 614 boost::hash<string> hashString; 615 size_t hashId = hashString("oasis_enddef"); 616 eventScheduler->registerEvent(0,hashId); 617 eventSent=true ; 618 } 619 } 620 621 622 623 624 625 void CServer::listenContext(void) 626 { 627 628 MPI_Status status ; 629 int flag ; 630 static char* buffer ; 631 static MPI_Request request ; 632 static bool recept=false ; 633 int rank ; 634 int count ; 635 636 if (recept==false) 637 { 638 traceOff() ; 639 MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ; 640 traceOn() ; 641 if (flag==true) 642 { 643 rank=status.MPI_SOURCE ; 644 MPI_Get_count(&status,MPI_CHAR,&count) ; 645 buffer=new char[count] ; 646 MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ; 647 recept=true ; 648 } 649 } 650 else 651 { 652 traceOff() ; 653 MPI_Test(&request,&flag,&status) ; 654 traceOn() ; 655 if (flag==true) 656 { 657 rank=status.MPI_SOURCE ; 658 MPI_Get_count(&status,MPI_CHAR,&count) ; 659 recvContextMessage((void*)buffer,count) ; 660 delete [] buffer ; 661 recept=false ; 662 } 663 } 664 } 665 666 void CServer::recvContextMessage(void* buff,int count) 667 { 668 static map<string,contextMessage> recvContextId; 669 map<string,contextMessage>::iterator it ; 670 CBufferIn buffer(buff,count) ; 671 string id ; 672 int clientLeader ; 673 int nbMessage ; 674 675 buffer>>id>>nbMessage>>clientLeader ; 676 677 it=recvContextId.find(id) ; 678 if (it==recvContextId.end()) 679 { 680 contextMessage msg={0,0} ; 681 pair<map<string,contextMessage>::iterator,bool> ret ; 682 ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ; 683 it=ret.first ; 684 } 685 it->second.nbRecv+=1 ; 686 it->second.leaderRank+=clientLeader ; 687 688 if (it->second.nbRecv==nbMessage) 689 { 690 int size ; 691 MPI_Comm_size(intraComm,&size) ; 692 // MPI_Request* requests= new MPI_Request[size-1] ; 693 // MPI_Status* status= new MPI_Status[size-1] ; 694 MPI_Request* requests= new MPI_Request[size] ; 695 MPI_Status* status= new MPI_Status[size] ; 696 697 CMessage msg ; 698 msg<<id<<it->second.leaderRank; 699 int messageSize=msg.size() ; 700 void * sendBuff = new char[messageSize] ; 701 CBufferOut sendBuffer(sendBuff,messageSize) ; 702 sendBuffer<<msg ; 703 704 // Include root itself in order not to have a divergence 705 for(int i=0; i<size; i++) 706 { 707 MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ; 708 } 709 710 recvContextId.erase(it) ; 711 delete [] requests ; 712 delete [] status ; 713 714 } 715 } 716 717 void CServer::listenRootContext(void) 718 { 719 MPI_Status status ; 720 int flag ; 721 static std::vector<void*> buffers; 722 static std::vector<MPI_Request> requests ; 723 static std::vector<int> counts ; 724 static std::vector<bool> isEventRegistered ; 725 static std::vector<bool> isEventQueued ; 726 MPI_Request request; 727 728 int rank ; 729 const int root=0 ; 730 boost::hash<string> hashString; 731 size_t hashId = hashString("RegisterContext"); 732 733 // (1) Receive context id from the root, save it into a buffer 734 traceOff() ; 735 MPI_Iprobe(root,2,intraComm, &flag, &status) ; 736 traceOn() ; 737 if (flag==true) 738 { 739 counts.push_back(0); 740 MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ; 741 buffers.push_back(new char[counts.back()]) ; 742 requests.push_back(request); 743 MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&(requests.back())) ; 744 isEventRegistered.push_back(false); 745 isEventQueued.push_back(false); 746 nbContexts++; 747 } 748 749 for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ ) 750 { 751 // (2) If context id is received, register an event 752 MPI_Test(&requests[ctxNb],&flag,&status) ; 753 if (flag==true && !isEventRegistered[ctxNb]) 754 { 755 eventScheduler->registerEvent(ctxNb,hashId); 756 isEventRegistered[ctxNb] = true; 757 } 758 // (3) If event has been scheduled, call register context 759 if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb]) 760 { 761 registerContext(buffers[ctxNb],counts[ctxNb]) ; 762 isEventQueued[ctxNb] = true; 763 delete [] buffers[ctxNb] ; 764 } 765 } 766 767 } 768 769 void CServer::registerContext(void* buff, int count, int leaderRank) 770 { 771 string contextId; 772 CBufferIn buffer(buff, count); 773 // buffer >> contextId; 774 buffer >> contextId>>leaderRank; 775 CContext* context; 776 777 info(20) << "CServer : Register new Context : " << contextId << endl; 778 779 if (contextList.find(contextId) != contextList.end()) 780 ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)", 781 << "Context '" << contextId << "' has already been registred"); 782 783 context=CContext::create(contextId); 784 contextList[contextId]=context; 785 786 // Primary or classical server: create communication channel with a client 787 // (1) create interComm (with a client) 788 // (2) initialize client and server (contextClient and contextServer) 789 MPI_Comm inter; 790 if (serverLevel < 2) 791 { 792 MPI_Comm contextInterComm; 793 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm); 794 MPI_Intercomm_merge(contextInterComm,1,&inter); 795 MPI_Barrier(inter); 796 MPI_Comm_free(&inter); 797 context->initServer(intraComm,contextInterComm); 798 contextInterComms.push_back(contextInterComm); 799 800 } 801 // Secondary server: create communication channel with a primary server 802 // (1) duplicate interComm with a primary server 803 // (2) initialize client and server (contextClient and contextServer) 804 // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create, 805 // because interComm of CContext is defined on the same processes as the interComm of CServer. 806 // So just duplicate it. 807 else if (serverLevel == 2) 808 { 809 MPI_Comm_dup(interCommLeft.front(), &inter); 810 contextInterComms.push_back(inter); 811 context->initServer(intraComm, contextInterComms.back()); 812 } 813 814 // Primary server: 815 // (1) send create context message to secondary servers 816 // (2) initialize communication channels with secondary servers (create contextClient and contextServer) 817 if (serverLevel == 1) 818 { 819 int i = 0, size; 820 MPI_Comm_size(intraComm, &size) ; 821 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i) 822 { 823 StdString str = contextId +"_server_" + boost::lexical_cast<string>(i); 824 CMessage msg; 825 int messageSize; 826 msg<<str<<size<<rank_ ; 827 messageSize = msg.size() ; 828 buff = new char[messageSize] ; 829 CBufferOut buffer(buff,messageSize) ; 830 buffer<<msg ; 831 MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ; 832 MPI_Comm_dup(*it, &inter); 833 contextInterComms.push_back(inter); 834 MPI_Comm_dup(intraComm, &inter); 835 contextIntraComms.push_back(inter); 836 context->initClient(contextIntraComms.back(), contextInterComms.back()) ; 837 delete [] buff ; 838 } 839 } 840 } 841 842 void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/) 843 { 844 bool isFinalized ; 845 map<string,CContext*>::iterator it ; 846 847 for(it=contextList.begin();it!=contextList.end();it++) 848 { 849 isFinalized=it->second->isFinalized(); 850 if (isFinalized) 851 { 852 contextList.erase(it) ; 853 break ; 854 } 855 else 856 it->second->checkBuffersAndListen(enableEventsProcessing); 857 } 858 } 859 860 //! Get rank of the current process in the intraComm 861 int CServer::getRank() 862 { 863 int rank; 864 MPI_Comm_rank(intraComm,&rank); 865 return rank; 866 } 867 868 vector<int>& CServer::getSecondaryServerGlobalRanks() 869 { 870 return sndServerGlobalRanks; 871 } 320 321 CWorkflowGraph::drawWorkFlowGraph_server(); 322 } 872 323 873 324 /*! … … 881 332 void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb) 882 333 { 883 StdStringStream fileName Client;334 StdStringStream fileNameServer; 884 335 int numDigit = 0; 885 int size = 0; 336 int commSize = 0; 337 int commRank ; 886 338 int id; 887 MPI_Comm_size(CXios::globalComm, &size); 888 while (size) 889 { 890 size /= 10; 339 340 MPI_Comm_size(CXios::getGlobalComm(), &commSize); 341 MPI_Comm_rank(CXios::getGlobalComm(), &commRank); 342 343 while (commSize) 344 { 345 commSize /= 10; 891 346 ++numDigit; 892 347 } 893 id = rank_; //getRank();894 895 fileName Client<< fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;896 fb->open(fileName Client.str().c_str(), std::ios::out);348 id = commRank; 349 350 fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; 351 fb->open(fileNameServer.str().c_str(), std::ios::out); 897 352 if (!fb->is_open()) 898 353 ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)", 899 << std::endl << "Can not open <" << fileName Client.str() << "> file to write the server log(s).");354 << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s)."); 900 355 } 901 356 … … 953 408 if (m_errorStream.is_open()) m_errorStream.close(); 954 409 } 410 411 void CServer::launchServersRessource(MPI_Comm serverComm) 412 { 413 serversRessource_ = new CServersRessource(serverComm) ; 414 } 955 415 }
Note: See TracChangeset
for help on using the changeset viewer.