[490] | 1 | #include "globalScopeData.hpp" |
---|
[591] | 2 | #include "xios_spl.hpp" |
---|
[300] | 3 | #include "cxios.hpp" |
---|
[342] | 4 | #include "server.hpp" |
---|
[983] | 5 | #include "client.hpp" |
---|
[300] | 6 | #include "type.hpp" |
---|
| 7 | #include "context.hpp" |
---|
[352] | 8 | #include "object_template.hpp" |
---|
[300] | 9 | #include "oasis_cinterface.hpp" |
---|
| 10 | #include <boost/functional/hash.hpp> |
---|
| 11 | #include <boost/algorithm/string.hpp> |
---|
[382] | 12 | #include "mpi.hpp" |
---|
[347] | 13 | #include "tracer.hpp" |
---|
| 14 | #include "timer.hpp" |
---|
[492] | 15 | #include "event_scheduler.hpp" |
---|
[1587] | 16 | #include "string_tools.hpp" |
---|
[1761] | 17 | #include "ressources_manager.hpp" |
---|
| 18 | #include "services_manager.hpp" |
---|
| 19 | #include "contexts_manager.hpp" |
---|
| 20 | #include "servers_ressource.hpp" |
---|
| 21 | #include <cstdio> |
---|
[300] | 22 | |
---|
[1761] | 23 | |
---|
| 24 | |
---|
[335] | 25 | namespace xios |
---|
[490] | 26 | { |
---|
[1639] | 27 | MPI_Comm CServer::intraComm ; |
---|
[1761] | 28 | MPI_Comm CServer::serversComm_ ; |
---|
[1639] | 29 | std::list<MPI_Comm> CServer::interCommLeft ; |
---|
| 30 | std::list<MPI_Comm> CServer::interCommRight ; |
---|
| 31 | std::list<MPI_Comm> CServer::contextInterComms; |
---|
| 32 | std::list<MPI_Comm> CServer::contextIntraComms; |
---|
[1021] | 33 | int CServer::serverLevel = 0 ; |
---|
[1148] | 34 | int CServer::nbContexts = 0; |
---|
[983] | 35 | bool CServer::isRoot = false ; |
---|
[1077] | 36 | int CServer::rank_ = INVALID_RANK; |
---|
[490] | 37 | StdOFStream CServer::m_infoStream; |
---|
[523] | 38 | StdOFStream CServer::m_errorStream; |
---|
[490] | 39 | map<string,CContext*> CServer::contextList ; |
---|
[1152] | 40 | vector<int> CServer::sndServerGlobalRanks; |
---|
[300] | 41 | bool CServer::finished=false ; |
---|
| 42 | bool CServer::is_MPI_Initialized ; |
---|
[597] | 43 | CEventScheduler* CServer::eventScheduler = 0; |
---|
[1761] | 44 | CServersRessource* CServer::serversRessource_=nullptr ; |
---|
[983] | 45 | |
---|
[1761] | 46 | void CServer::initRessources(void) |
---|
| 47 | { |
---|
| 48 | auto ressourcesManager=CXios::getRessourcesManager() ; |
---|
| 49 | auto servicesManager=CXios::getServicesManager() ; |
---|
| 50 | auto contextsManager=CXios::getContextsManager() ; |
---|
| 51 | auto daemonsManager=CXios::getDaemonsManager() ; |
---|
| 52 | auto serversRessource=CServer::getServersRessource() ; |
---|
| 53 | |
---|
| 54 | if (serversRessource->isServerLeader()) |
---|
| 55 | { |
---|
| 56 | // ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()/2) ; |
---|
| 57 | // ressourcesManager->createPool("NEMO",ressourcesManager->getRessourcesSize()/2) ; |
---|
| 58 | ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()) ; |
---|
| 59 | servicesManager->createServices("LMDZ", "ioserver", CServicesManager::IO_SERVER, 8, 5) ; |
---|
| 60 | for(int i=0 ; i<5;i++) |
---|
| 61 | { |
---|
| 62 | contextsManager->createServerContext("LMDZ","ioserver",i,"lmdz") ; |
---|
| 63 | } |
---|
| 64 | } |
---|
| 65 | |
---|
| 66 | |
---|
| 67 | |
---|
| 68 | while (true) |
---|
| 69 | { |
---|
| 70 | daemonsManager->eventLoop() ; |
---|
| 71 | } |
---|
| 72 | |
---|
| 73 | |
---|
| 74 | } |
---|
| 75 | |
---|
| 76 | void CServer::initialize(void) |
---|
| 77 | { |
---|
| 78 | |
---|
| 79 | MPI_Comm serverComm ; |
---|
| 80 | int initialized ; |
---|
| 81 | MPI_Initialized(&initialized) ; |
---|
| 82 | if (initialized) is_MPI_Initialized=true ; |
---|
| 83 | else is_MPI_Initialized=false ; |
---|
| 84 | MPI_Comm globalComm=CXios::getGlobalComm() ; |
---|
| 85 | |
---|
| 86 | ///////////////////////////////////////// |
---|
| 87 | ///////////// PART 1 //////////////////// |
---|
| 88 | ///////////////////////////////////////// |
---|
| 89 | |
---|
| 90 | // don't use OASIS |
---|
| 91 | if (!CXios::usingOasis) |
---|
| 92 | { |
---|
| 93 | if (!is_MPI_Initialized) MPI_Init(NULL, NULL); |
---|
| 94 | |
---|
| 95 | // split the global communicator |
---|
| 96 | // get hash from all model to attribute a unique color (int) and then split to get client communicator |
---|
| 97 | // every mpi process of globalComm (MPI_COMM_WORLD) must participate |
---|
| 98 | |
---|
| 99 | int commRank, commSize ; |
---|
| 100 | MPI_Comm_rank(globalComm,&commRank) ; |
---|
| 101 | MPI_Comm_size(globalComm,&commSize) ; |
---|
| 102 | |
---|
| 103 | std::hash<string> hashString ; |
---|
| 104 | size_t hashServer=hashString(CXios::xiosCodeId) ; |
---|
| 105 | |
---|
| 106 | size_t* hashAll = new size_t[commSize] ; |
---|
| 107 | MPI_Allgather(&hashServer,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ; |
---|
| 108 | |
---|
| 109 | int color=0 ; |
---|
| 110 | set<size_t> listHash ; |
---|
| 111 | for(int i=0 ; i<=commRank ; i++) |
---|
| 112 | if (listHash.count(hashAll[i])==1) |
---|
| 113 | { |
---|
| 114 | listHash.insert(hashAll[i]) ; |
---|
| 115 | color=color+1 ; |
---|
| 116 | } |
---|
| 117 | delete[] hashAll ; |
---|
| 118 | |
---|
| 119 | MPI_Comm_split(globalComm, color, commRank, &serverComm) ; |
---|
| 120 | } |
---|
| 121 | else // using OASIS |
---|
| 122 | { |
---|
| 123 | if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); |
---|
| 124 | |
---|
| 125 | CTimer::get("XIOS").resume() ; |
---|
| 126 | oasis_get_localcomm(serverComm); |
---|
| 127 | } |
---|
| 128 | |
---|
| 129 | ///////////////////////////////////////// |
---|
| 130 | ///////////// PART 2 //////////////////// |
---|
| 131 | ///////////////////////////////////////// |
---|
| 132 | |
---|
| 133 | |
---|
| 134 | // Create the XIOS communicator for every process which is related |
---|
| 135 | // to XIOS, as well on client side as on server side |
---|
| 136 | MPI_Comm xiosGlobalComm ; |
---|
| 137 | string strIds=CXios::getin<string>("clients_code_id","") ; |
---|
| 138 | vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; |
---|
| 139 | if (strIds.empty()) |
---|
| 140 | { |
---|
| 141 | // no code Ids given, suppose XIOS initialisation is global |
---|
| 142 | int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ; |
---|
| 143 | MPI_Comm splitComm,interComm ; |
---|
| 144 | MPI_Comm_rank(globalComm,&commGlobalRank) ; |
---|
| 145 | MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ; |
---|
| 146 | MPI_Comm_rank(splitComm,&commRank) ; |
---|
| 147 | if (commRank==0) serverLeader=commGlobalRank ; |
---|
| 148 | else serverLeader=0 ; |
---|
| 149 | clientLeader=0 ; |
---|
| 150 | MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; |
---|
| 151 | MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; |
---|
| 152 | MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ; |
---|
| 153 | MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ; |
---|
| 154 | CXios::setXiosComm(xiosGlobalComm) ; |
---|
| 155 | } |
---|
| 156 | else |
---|
| 157 | { |
---|
| 158 | |
---|
| 159 | xiosGlobalCommByFileExchange(serverComm) ; |
---|
| 160 | |
---|
| 161 | } |
---|
| 162 | |
---|
| 163 | ///////////////////////////////////////// |
---|
| 164 | ///////////// PART 4 //////////////////// |
---|
| 165 | // create servers intra communicator // |
---|
| 166 | ///////////////////////////////////////// |
---|
| 167 | |
---|
| 168 | int commRank ; |
---|
| 169 | MPI_Comm_rank(CXios::getXiosComm(), &commRank) ; |
---|
| 170 | MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ; |
---|
| 171 | |
---|
| 172 | CXios::setUsingServer() ; |
---|
| 173 | |
---|
| 174 | ///////////////////////////////////////// |
---|
| 175 | ///////////// PART 5 //////////////////// |
---|
| 176 | // redirect files output // |
---|
| 177 | ///////////////////////////////////////// |
---|
| 178 | |
---|
| 179 | CServer::openInfoStream(CXios::serverFile); |
---|
| 180 | CServer::openErrorStream(CXios::serverFile); |
---|
| 181 | |
---|
| 182 | ///////////////////////////////////////// |
---|
| 183 | ///////////// PART 4 //////////////////// |
---|
| 184 | ///////////////////////////////////////// |
---|
| 185 | |
---|
| 186 | CXios::launchDaemonsManager(true) ; |
---|
| 187 | |
---|
| 188 | ///////////////////////////////////////// |
---|
| 189 | ///////////// PART 5 //////////////////// |
---|
| 190 | ///////////////////////////////////////// |
---|
| 191 | |
---|
| 192 | // create the services |
---|
| 193 | |
---|
| 194 | auto ressourcesManager=CXios::getRessourcesManager() ; |
---|
| 195 | auto servicesManager=CXios::getServicesManager() ; |
---|
| 196 | auto contextsManager=CXios::getContextsManager() ; |
---|
| 197 | auto daemonsManager=CXios::getDaemonsManager() ; |
---|
| 198 | auto serversRessource=CServer::getServersRessource() ; |
---|
| 199 | |
---|
| 200 | if (serversRessource->isServerLeader()) |
---|
| 201 | { |
---|
| 202 | int nbRessources = ressourcesManager->getRessourcesSize() ; |
---|
| 203 | if (!CXios::usingServer2) |
---|
| 204 | { |
---|
| 205 | ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; |
---|
| 206 | servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ; |
---|
| 207 | } |
---|
| 208 | else |
---|
| 209 | { |
---|
| 210 | int nprocsServer = nbRessources*CXios::ratioServer2/100.; |
---|
| 211 | int nprocsGatherer = nbRessources - nprocsServer ; |
---|
| 212 | |
---|
| 213 | int nbPoolsServer2 = CXios::nbPoolsServer2 ; |
---|
| 214 | if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; |
---|
| 215 | ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; |
---|
| 216 | servicesManager->createServices(CXios::defaultPoolId, CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; |
---|
| 217 | servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ; |
---|
| 218 | } |
---|
| 219 | } |
---|
| 220 | |
---|
| 221 | ///////////////////////////////////////// |
---|
| 222 | ///////////// PART 5 //////////////////// |
---|
| 223 | ///////////////////////////////////////// |
---|
| 224 | // loop on event loop |
---|
| 225 | |
---|
| 226 | bool finished=false ; |
---|
| 227 | while (!finished) |
---|
| 228 | { |
---|
| 229 | finished=daemonsManager->eventLoop() ; |
---|
| 230 | } |
---|
| 231 | |
---|
| 232 | } |
---|
| 233 | |
---|
| 234 | |
---|
| 235 | |
---|
| 236 | |
---|
| 237 | |
---|
| 238 | void CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm) |
---|
| 239 | { |
---|
| 240 | |
---|
| 241 | MPI_Comm globalComm=CXios::getGlobalComm() ; |
---|
| 242 | MPI_Comm xiosGlobalComm ; |
---|
| 243 | |
---|
| 244 | string strIds=CXios::getin<string>("clients_code_id","") ; |
---|
| 245 | vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; |
---|
| 246 | |
---|
| 247 | int commRank, globalRank ; |
---|
| 248 | MPI_Comm_rank(serverComm, &commRank) ; |
---|
| 249 | MPI_Comm_rank(globalComm, &globalRank) ; |
---|
| 250 | string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ; |
---|
| 251 | |
---|
| 252 | if (commRank==0) // if root process publish name |
---|
| 253 | { |
---|
| 254 | std::ofstream ofs (serverFileName, std::ofstream::out); |
---|
| 255 | ofs<<globalRank ; |
---|
| 256 | ofs.close(); |
---|
| 257 | } |
---|
| 258 | |
---|
| 259 | vector<int> clientsRank(clientsCodeId.size()) ; |
---|
| 260 | for(int i=0;i<clientsRank.size();i++) |
---|
| 261 | { |
---|
| 262 | std::ifstream ifs ; |
---|
| 263 | string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ; |
---|
| 264 | do |
---|
| 265 | { |
---|
| 266 | ifs.clear() ; |
---|
| 267 | ifs.open(fileName, std::ifstream::in) ; |
---|
| 268 | } while (ifs.fail()) ; |
---|
| 269 | ifs>>clientsRank[i] ; |
---|
| 270 | ifs.close() ; |
---|
| 271 | } |
---|
| 272 | |
---|
| 273 | MPI_Comm intraComm ; |
---|
| 274 | MPI_Comm_dup(serverComm,&intraComm) ; |
---|
| 275 | MPI_Comm interComm ; |
---|
| 276 | for(int i=0 ; i<clientsRank.size(); i++) |
---|
| 277 | { |
---|
| 278 | MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm); |
---|
| 279 | MPI_Comm_free(&intraComm) ; |
---|
| 280 | MPI_Intercomm_merge(interComm,false, &intraComm ) ; |
---|
| 281 | } |
---|
| 282 | xiosGlobalComm=intraComm ; |
---|
| 283 | MPI_Barrier(xiosGlobalComm); |
---|
| 284 | if (commRank==0) std::remove(serverFileName.c_str()) ; |
---|
| 285 | MPI_Barrier(xiosGlobalComm); |
---|
| 286 | |
---|
| 287 | CXios::setXiosComm(xiosGlobalComm) ; |
---|
| 288 | |
---|
| 289 | } |
---|
| 290 | |
---|
| 291 | |
---|
| 292 | void CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm) |
---|
| 293 | { |
---|
| 294 | // untested, need to be tested on a true MPI-2 compliant library |
---|
| 295 | |
---|
| 296 | // try to discover other client/server |
---|
| 297 | /* |
---|
| 298 | // publish server name |
---|
| 299 | char portName[MPI_MAX_PORT_NAME]; |
---|
| 300 | int ierr ; |
---|
| 301 | int commRank ; |
---|
| 302 | MPI_Comm_rank(serverComm, &commRank) ; |
---|
| 303 | |
---|
| 304 | if (commRank==0) // if root process publish name |
---|
| 305 | { |
---|
| 306 | MPI_Open_port(MPI_INFO_NULL, portName); |
---|
| 307 | MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); |
---|
| 308 | } |
---|
| 309 | |
---|
| 310 | MPI_Comm intraComm=serverComm ; |
---|
| 311 | MPI_Comm interComm ; |
---|
| 312 | for(int i=0 ; i<clientsCodeId.size(); i++) |
---|
| 313 | { |
---|
| 314 | MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm); |
---|
| 315 | MPI_Intercomm_merge(interComm,false, &intraComm ) ; |
---|
| 316 | } |
---|
| 317 | */ |
---|
| 318 | } |
---|
| 319 | |
---|
[983] | 320 | //--------------------------------------------------------------- |
---|
| 321 | /*! |
---|
| 322 | * \fn void CServer::initialize(void) |
---|
[1054] | 323 | * Creates intraComm for each possible type of servers (classical, primary or secondary). |
---|
[1148] | 324 | * Creates interComm and stores them into the following lists: |
---|
[1054] | 325 | * classical server -- interCommLeft |
---|
| 326 | * primary server -- interCommLeft and interCommRight |
---|
[1148] | 327 | * secondary server -- interCommLeft for each pool. |
---|
[1234] | 328 | * IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead. |
---|
[983] | 329 | */ |
---|
[1761] | 330 | void CServer::initialize_old(void) |
---|
[300] | 331 | { |
---|
| 332 | int initialized ; |
---|
[1639] | 333 | MPI_Initialized(&initialized) ; |
---|
[300] | 334 | if (initialized) is_MPI_Initialized=true ; |
---|
| 335 | else is_MPI_Initialized=false ; |
---|
[1152] | 336 | int rank ; |
---|
[490] | 337 | |
---|
[1761] | 338 | CXios::launchRessourcesManager(true) ; |
---|
| 339 | CXios::launchServicesManager(true) ; |
---|
| 340 | CXios::launchContextsManager(true) ; |
---|
| 341 | |
---|
| 342 | initRessources() ; |
---|
[300] | 343 | // Not using OASIS |
---|
| 344 | if (!CXios::usingOasis) |
---|
| 345 | { |
---|
[490] | 346 | |
---|
| 347 | if (!is_MPI_Initialized) |
---|
[300] | 348 | { |
---|
[1639] | 349 | MPI_Init(NULL, NULL); |
---|
[300] | 350 | } |
---|
[359] | 351 | CTimer::get("XIOS").resume() ; |
---|
[490] | 352 | |
---|
| 353 | boost::hash<string> hashString ; |
---|
[1021] | 354 | unsigned long hashServer = hashString(CXios::xiosCodeId); |
---|
[490] | 355 | |
---|
[300] | 356 | unsigned long* hashAll ; |
---|
[1152] | 357 | unsigned long* srvLevelAll ; |
---|
[490] | 358 | |
---|
[300] | 359 | int size ; |
---|
| 360 | int myColor ; |
---|
| 361 | int i,c ; |
---|
[1639] | 362 | MPI_Comm newComm; |
---|
[490] | 363 | |
---|
[1639] | 364 | MPI_Comm_size(CXios::globalComm, &size) ; |
---|
| 365 | MPI_Comm_rank(CXios::globalComm, &rank_); |
---|
[1009] | 366 | |
---|
[300] | 367 | hashAll=new unsigned long[size] ; |
---|
[1639] | 368 | MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ; |
---|
[490] | 369 | |
---|
[1021] | 370 | map<unsigned long, int> colors ; |
---|
[300] | 371 | map<unsigned long, int> leaders ; |
---|
| 372 | map<unsigned long, int>::iterator it ; |
---|
[490] | 373 | |
---|
[1152] | 374 | // (1) Establish client leaders, distribute processes between two server levels |
---|
[1234] | 375 | std::vector<int> srvRanks; |
---|
[300] | 376 | for(i=0,c=0;i<size;i++) |
---|
| 377 | { |
---|
| 378 | if (colors.find(hashAll[i])==colors.end()) |
---|
| 379 | { |
---|
| 380 | colors[hashAll[i]]=c ; |
---|
| 381 | leaders[hashAll[i]]=i ; |
---|
| 382 | c++ ; |
---|
| 383 | } |
---|
[1152] | 384 | if (CXios::usingServer2) |
---|
[1180] | 385 | if (hashAll[i] == hashServer) |
---|
| 386 | srvRanks.push_back(i); |
---|
| 387 | } |
---|
[1234] | 388 | |
---|
| 389 | if (CXios::usingServer2) |
---|
[1180] | 390 | { |
---|
[1234] | 391 | int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.; |
---|
| 392 | if (reqNbProc<1 || reqNbProc==srvRanks.size()) |
---|
[1152] | 393 | { |
---|
[1234] | 394 | error(0)<<"WARNING: void CServer::initialize(void)"<<endl |
---|
| 395 | << "It is impossible to dedicate the requested number of processes = "<<reqNbProc |
---|
| 396 | <<" to secondary server. XIOS will run in the classical server mode."<<endl; |
---|
[1152] | 397 | } |
---|
[1180] | 398 | else |
---|
| 399 | { |
---|
[1519] | 400 | if (CXios::nbPoolsServer2 == 0) CXios::nbPoolsServer2 = reqNbProc; |
---|
[1243] | 401 | int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ; |
---|
| 402 | int poolLeader = firstSndSrvRank; |
---|
| 403 | //*********** (1) Comment out the line below to set one process per pool |
---|
[1519] | 404 | sndServerGlobalRanks.push_back(srvRanks[poolLeader]); |
---|
[1243] | 405 | int nbPools = CXios::nbPoolsServer2; |
---|
| 406 | if ( nbPools > reqNbProc || nbPools < 1) |
---|
| 407 | { |
---|
| 408 | error(0)<<"WARNING: void CServer::initialize(void)"<<endl |
---|
| 409 | << "It is impossible to allocate the requested number of pools = "<<nbPools |
---|
| 410 | <<" on the secondary server. It will be set so that there is one process per pool."<<endl; |
---|
| 411 | nbPools = reqNbProc; |
---|
| 412 | } |
---|
| 413 | int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools; |
---|
| 414 | int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools; |
---|
[1234] | 415 | for (i=0; i<srvRanks.size(); i++) |
---|
| 416 | { |
---|
[1243] | 417 | if (i >= firstSndSrvRank) |
---|
[1234] | 418 | { |
---|
[1243] | 419 | if (rank_ == srvRanks[i]) |
---|
| 420 | { |
---|
| 421 | serverLevel=2; |
---|
| 422 | } |
---|
| 423 | poolLeader += procsPerPool; |
---|
| 424 | if (remainder != 0) |
---|
| 425 | { |
---|
| 426 | ++poolLeader; |
---|
| 427 | --remainder; |
---|
| 428 | } |
---|
| 429 | //*********** (2) Comment out the two lines below to set one process per pool |
---|
[1519] | 430 | if (poolLeader < srvRanks.size()) |
---|
| 431 | sndServerGlobalRanks.push_back(srvRanks[poolLeader]); |
---|
[1243] | 432 | //*********** (3) Uncomment the line below to set one process per pool |
---|
[1519] | 433 | // sndServerGlobalRanks.push_back(srvRanks[i]); |
---|
[1234] | 434 | } |
---|
| 435 | else |
---|
| 436 | { |
---|
| 437 | if (rank_ == srvRanks[i]) serverLevel=1; |
---|
| 438 | } |
---|
| 439 | } |
---|
[1243] | 440 | if (serverLevel==2) |
---|
| 441 | { |
---|
| 442 | info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; |
---|
| 443 | for (i=0; i<sndServerGlobalRanks.size(); i++) |
---|
| 444 | { |
---|
| 445 | if (rank_>= sndServerGlobalRanks[i]) |
---|
| 446 | { |
---|
| 447 | if ( i == sndServerGlobalRanks.size()-1) |
---|
| 448 | { |
---|
| 449 | myColor = colors.size() + sndServerGlobalRanks[i]; |
---|
| 450 | } |
---|
| 451 | else if (rank_< sndServerGlobalRanks[i+1]) |
---|
| 452 | { |
---|
| 453 | myColor = colors.size() + sndServerGlobalRanks[i]; |
---|
| 454 | break; |
---|
| 455 | } |
---|
| 456 | } |
---|
| 457 | } |
---|
| 458 | } |
---|
[1180] | 459 | } |
---|
[300] | 460 | } |
---|
[1243] | 461 | |
---|
[1152] | 462 | // (2) Create intraComm |
---|
[1243] | 463 | if (serverLevel != 2) myColor=colors[hashServer]; |
---|
[1639] | 464 | MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ; |
---|
[1152] | 465 | |
---|
| 466 | // (3) Create interComm |
---|
[1021] | 467 | if (serverLevel == 0) |
---|
[983] | 468 | { |
---|
| 469 | int clientLeader; |
---|
| 470 | for(it=leaders.begin();it!=leaders.end();it++) |
---|
| 471 | { |
---|
| 472 | if (it->first!=hashServer) |
---|
| 473 | { |
---|
| 474 | clientLeader=it->second ; |
---|
| 475 | int intraCommSize, intraCommRank ; |
---|
[1639] | 476 | MPI_Comm_size(intraComm,&intraCommSize) ; |
---|
| 477 | MPI_Comm_rank(intraComm,&intraCommRank) ; |
---|
[1142] | 478 | info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize |
---|
[983] | 479 | <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ; |
---|
[490] | 480 | |
---|
[1639] | 481 | MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; |
---|
[1180] | 482 | interCommLeft.push_back(newComm) ; |
---|
[983] | 483 | } |
---|
| 484 | } |
---|
| 485 | } |
---|
[1021] | 486 | else if (serverLevel == 1) |
---|
[983] | 487 | { |
---|
[1054] | 488 | int clientLeader, srvSndLeader; |
---|
| 489 | int srvPrmLeader ; |
---|
[1152] | 490 | |
---|
[1021] | 491 | for (it=leaders.begin();it!=leaders.end();it++) |
---|
[983] | 492 | { |
---|
[1021] | 493 | if (it->first != hashServer) |
---|
[983] | 494 | { |
---|
[1021] | 495 | clientLeader=it->second ; |
---|
| 496 | int intraCommSize, intraCommRank ; |
---|
[1639] | 497 | MPI_Comm_size(intraComm, &intraCommSize) ; |
---|
| 498 | MPI_Comm_rank(intraComm, &intraCommRank) ; |
---|
[1142] | 499 | info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize |
---|
[1021] | 500 | <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ; |
---|
[1639] | 501 | MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; |
---|
[1021] | 502 | interCommLeft.push_back(newComm) ; |
---|
[983] | 503 | } |
---|
[1021] | 504 | } |
---|
[1009] | 505 | |
---|
[1152] | 506 | for (int i = 0; i < sndServerGlobalRanks.size(); ++i) |
---|
[983] | 507 | { |
---|
[1054] | 508 | int intraCommSize, intraCommRank ; |
---|
[1639] | 509 | MPI_Comm_size(intraComm, &intraCommSize) ; |
---|
| 510 | MPI_Comm_rank(intraComm, &intraCommRank) ; |
---|
[1142] | 511 | info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize |
---|
[1152] | 512 | <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< sndServerGlobalRanks[i]<<endl ; |
---|
[1639] | 513 | MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ; |
---|
[1054] | 514 | interCommRight.push_back(newComm) ; |
---|
[1021] | 515 | } |
---|
[1168] | 516 | } |
---|
[1021] | 517 | else |
---|
| 518 | { |
---|
| 519 | int clientLeader; |
---|
| 520 | clientLeader = leaders[hashString(CXios::xiosCodeId)]; |
---|
| 521 | int intraCommSize, intraCommRank ; |
---|
[1639] | 522 | MPI_Comm_size(intraComm, &intraCommSize) ; |
---|
| 523 | MPI_Comm_rank(intraComm, &intraCommRank) ; |
---|
[1142] | 524 | info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize |
---|
[1021] | 525 | <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ; |
---|
[983] | 526 | |
---|
[1639] | 527 | MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ; |
---|
[1021] | 528 | interCommLeft.push_back(newComm) ; |
---|
[1168] | 529 | } |
---|
[983] | 530 | |
---|
| 531 | delete [] hashAll ; |
---|
| 532 | |
---|
[300] | 533 | } |
---|
| 534 | // using OASIS |
---|
| 535 | else |
---|
| 536 | { |
---|
[1152] | 537 | int size; |
---|
[1130] | 538 | int myColor; |
---|
[1234] | 539 | int* srvGlobalRanks; |
---|
[490] | 540 | if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); |
---|
| 541 | |
---|
[359] | 542 | CTimer::get("XIOS").resume() ; |
---|
[1639] | 543 | MPI_Comm localComm; |
---|
[655] | 544 | oasis_get_localcomm(localComm); |
---|
[1639] | 545 | MPI_Comm_rank(localComm,&rank_) ; |
---|
[655] | 546 | |
---|
[1234] | 547 | // (1) Create server intraComm |
---|
[1130] | 548 | if (!CXios::usingServer2) |
---|
[1167] | 549 | { |
---|
[1639] | 550 | MPI_Comm_dup(localComm, &intraComm); |
---|
[1167] | 551 | } |
---|
[1130] | 552 | else |
---|
| 553 | { |
---|
[1180] | 554 | int globalRank; |
---|
[1639] | 555 | MPI_Comm_size(localComm,&size) ; |
---|
| 556 | MPI_Comm_rank(CXios::globalComm,&globalRank) ; |
---|
[1234] | 557 | srvGlobalRanks = new int[size] ; |
---|
[1639] | 558 | MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ; |
---|
[1152] | 559 | |
---|
[1234] | 560 | int reqNbProc = size*CXios::ratioServer2/100.; |
---|
| 561 | if (reqNbProc < 1 || reqNbProc == size) |
---|
[1130] | 562 | { |
---|
[1234] | 563 | error(0)<<"WARNING: void CServer::initialize(void)"<<endl |
---|
| 564 | << "It is impossible to dedicate the requested number of processes = "<<reqNbProc |
---|
| 565 | <<" to secondary server. XIOS will run in the classical server mode."<<endl; |
---|
[1639] | 566 | MPI_Comm_dup(localComm, &intraComm); |
---|
[1130] | 567 | } |
---|
| 568 | else |
---|
| 569 | { |
---|
[1243] | 570 | int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ; |
---|
| 571 | int poolLeader = firstSndSrvRank; |
---|
| 572 | //*********** (1) Comment out the line below to set one process per pool |
---|
| 573 | // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); |
---|
| 574 | int nbPools = CXios::nbPoolsServer2; |
---|
| 575 | if ( nbPools > reqNbProc || nbPools < 1) |
---|
| 576 | { |
---|
| 577 | error(0)<<"WARNING: void CServer::initialize(void)"<<endl |
---|
| 578 | << "It is impossible to allocate the requested number of pools = "<<nbPools |
---|
| 579 | <<" on the secondary server. It will be set so that there is one process per pool."<<endl; |
---|
| 580 | nbPools = reqNbProc; |
---|
| 581 | } |
---|
| 582 | int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools; |
---|
| 583 | int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools; |
---|
[1234] | 584 | for (int i=0; i<size; i++) |
---|
| 585 | { |
---|
[1243] | 586 | if (i >= firstSndSrvRank) |
---|
[1234] | 587 | { |
---|
[1243] | 588 | if (globalRank == srvGlobalRanks[i]) |
---|
| 589 | { |
---|
| 590 | serverLevel=2; |
---|
| 591 | } |
---|
| 592 | poolLeader += procsPerPool; |
---|
| 593 | if (remainder != 0) |
---|
| 594 | { |
---|
| 595 | ++poolLeader; |
---|
| 596 | --remainder; |
---|
| 597 | } |
---|
| 598 | //*********** (2) Comment out the two lines below to set one process per pool |
---|
| 599 | // if (poolLeader < size) |
---|
| 600 | // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); |
---|
| 601 | //*********** (3) Uncomment the line below to set one process per pool |
---|
[1234] | 602 | sndServerGlobalRanks.push_back(srvGlobalRanks[i]); |
---|
| 603 | } |
---|
| 604 | else |
---|
| 605 | { |
---|
| 606 | if (globalRank == srvGlobalRanks[i]) serverLevel=1; |
---|
| 607 | } |
---|
| 608 | } |
---|
[1243] | 609 | if (serverLevel==2) |
---|
| 610 | { |
---|
| 611 | info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; |
---|
| 612 | for (int i=0; i<sndServerGlobalRanks.size(); i++) |
---|
| 613 | { |
---|
| 614 | if (globalRank>= sndServerGlobalRanks[i]) |
---|
| 615 | { |
---|
| 616 | if (i == sndServerGlobalRanks.size()-1) |
---|
| 617 | { |
---|
| 618 | myColor = sndServerGlobalRanks[i]; |
---|
| 619 | } |
---|
| 620 | else if (globalRank< sndServerGlobalRanks[i+1]) |
---|
| 621 | { |
---|
| 622 | myColor = sndServerGlobalRanks[i]; |
---|
| 623 | break; |
---|
| 624 | } |
---|
| 625 | } |
---|
| 626 | } |
---|
| 627 | } |
---|
| 628 | if (serverLevel != 2) myColor=0; |
---|
[1639] | 629 | MPI_Comm_split(localComm, myColor, rank_, &intraComm) ; |
---|
[1130] | 630 | } |
---|
[1234] | 631 | } |
---|
[1130] | 632 | |
---|
[300] | 633 | string codesId=CXios::getin<string>("oasis_codes_id") ; |
---|
[1587] | 634 | vector<string> oasisCodeId=splitRegex(codesId,"\\s*,\\s*") ; |
---|
| 635 | |
---|
[300] | 636 | vector<string>::iterator it ; |
---|
| 637 | |
---|
[1639] | 638 | MPI_Comm newComm ; |
---|
[300] | 639 | int globalRank ; |
---|
[1639] | 640 | MPI_Comm_rank(CXios::globalComm,&globalRank); |
---|
[490] | 641 | |
---|
[1234] | 642 | // (2) Create interComms with models |
---|
[1587] | 643 | for(it=oasisCodeId.begin();it!=oasisCodeId.end();it++) |
---|
[300] | 644 | { |
---|
| 645 | oasis_get_intercomm(newComm,*it) ; |
---|
[1234] | 646 | if ( serverLevel == 0 || serverLevel == 1) |
---|
[1150] | 647 | { |
---|
[1130] | 648 | interCommLeft.push_back(newComm) ; |
---|
[1639] | 649 | if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; |
---|
[1150] | 650 | } |
---|
[1180] | 651 | } |
---|
| 652 | |
---|
[1234] | 653 | // (3) Create interComms between primary and secondary servers |
---|
[1243] | 654 | int intraCommSize, intraCommRank ; |
---|
[1639] | 655 | MPI_Comm_size(intraComm,&intraCommSize) ; |
---|
| 656 | MPI_Comm_rank(intraComm, &intraCommRank) ; |
---|
[1243] | 657 | |
---|
[1180] | 658 | if (serverLevel == 1) |
---|
| 659 | { |
---|
| 660 | for (int i = 0; i < sndServerGlobalRanks.size(); ++i) |
---|
[1130] | 661 | { |
---|
[1180] | 662 | int srvSndLeader = sndServerGlobalRanks[i]; |
---|
[1243] | 663 | info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize |
---|
| 664 | <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvSndLeader<<endl ; |
---|
[1639] | 665 | MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ; |
---|
[1180] | 666 | interCommRight.push_back(newComm) ; |
---|
[1130] | 667 | } |
---|
[300] | 668 | } |
---|
[1180] | 669 | else if (serverLevel == 2) |
---|
| 670 | { |
---|
[1244] | 671 | info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize |
---|
[1243] | 672 | <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvGlobalRanks[0] <<endl ; |
---|
[1639] | 673 | MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ; |
---|
[1180] | 674 | interCommLeft.push_back(newComm) ; |
---|
| 675 | } |
---|
[1184] | 676 | if (CXios::usingServer2) delete [] srvGlobalRanks ; |
---|
[1587] | 677 | |
---|
| 678 | bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ; |
---|
| 679 | if (!oasisEnddef) oasis_enddef() ; |
---|
[300] | 680 | } |
---|
[490] | 681 | |
---|
[1184] | 682 | |
---|
[1639] | 683 | MPI_Comm_rank(intraComm, &rank) ; |
---|
[1152] | 684 | if (rank==0) isRoot=true; |
---|
[490] | 685 | else isRoot=false; |
---|
[492] | 686 | |
---|
| 687 | eventScheduler = new CEventScheduler(intraComm) ; |
---|
[300] | 688 | } |
---|
[490] | 689 | |
---|
[300] | 690 | void CServer::finalize(void) |
---|
| 691 | { |
---|
[361] | 692 | CTimer::get("XIOS").suspend() ; |
---|
[697] | 693 | |
---|
[492] | 694 | delete eventScheduler ; |
---|
[655] | 695 | |
---|
[1639] | 696 | for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++) |
---|
| 697 | MPI_Comm_free(&(*it)); |
---|
[983] | 698 | |
---|
[1639] | 699 | for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++) |
---|
| 700 | MPI_Comm_free(&(*it)); |
---|
[1071] | 701 | |
---|
[992] | 702 | // for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++) |
---|
| 703 | // MPI_Comm_free(&(*it)); |
---|
| 704 | |
---|
[1077] | 705 | // for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++) |
---|
| 706 | // MPI_Comm_free(&(*it)); |
---|
[983] | 707 | |
---|
[1639] | 708 | for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) |
---|
| 709 | MPI_Comm_free(&(*it)); |
---|
[992] | 710 | |
---|
[1761] | 711 | // MPI_Comm_free(&intraComm); |
---|
[655] | 712 | |
---|
[300] | 713 | if (!is_MPI_Initialized) |
---|
[490] | 714 | { |
---|
[300] | 715 | if (CXios::usingOasis) oasis_finalize(); |
---|
[1639] | 716 | else MPI_Finalize() ; |
---|
[300] | 717 | } |
---|
[347] | 718 | report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl ; |
---|
| 719 | report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl ; |
---|
| 720 | report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl ; |
---|
[1158] | 721 | report(100)<<CTimer::getAllCumulatedTime()<<endl ; |
---|
[300] | 722 | } |
---|
[490] | 723 | |
---|
[300] | 724 | void CServer::eventLoop(void) |
---|
| 725 | { |
---|
| 726 | bool stop=false ; |
---|
[490] | 727 | |
---|
[347] | 728 | CTimer::get("XIOS server").resume() ; |
---|
[300] | 729 | while(!stop) |
---|
| 730 | { |
---|
| 731 | if (isRoot) |
---|
| 732 | { |
---|
| 733 | listenContext(); |
---|
[1148] | 734 | listenRootContext(); |
---|
[1587] | 735 | listenOasisEnddef() ; |
---|
| 736 | listenRootOasisEnddef() ; |
---|
[300] | 737 | if (!finished) listenFinalize() ; |
---|
| 738 | } |
---|
| 739 | else |
---|
| 740 | { |
---|
| 741 | listenRootContext(); |
---|
[1587] | 742 | listenRootOasisEnddef() ; |
---|
[300] | 743 | if (!finished) listenRootFinalize() ; |
---|
| 744 | } |
---|
[490] | 745 | |
---|
[300] | 746 | contextEventLoop() ; |
---|
| 747 | if (finished && contextList.empty()) stop=true ; |
---|
[956] | 748 | eventScheduler->checkEvent() ; |
---|
[300] | 749 | } |
---|
[347] | 750 | CTimer::get("XIOS server").suspend() ; |
---|
[300] | 751 | } |
---|
[490] | 752 | |
---|
[300] | 753 | void CServer::listenFinalize(void) |
---|
| 754 | { |
---|
[1639] | 755 | list<MPI_Comm>::iterator it, itr; |
---|
[300] | 756 | int msg ; |
---|
| 757 | int flag ; |
---|
[490] | 758 | |
---|
[992] | 759 | for(it=interCommLeft.begin();it!=interCommLeft.end();it++) |
---|
[300] | 760 | { |
---|
[1639] | 761 | MPI_Status status ; |
---|
[347] | 762 | traceOff() ; |
---|
[1639] | 763 | MPI_Iprobe(0,0,*it,&flag,&status) ; |
---|
[347] | 764 | traceOn() ; |
---|
[300] | 765 | if (flag==true) |
---|
| 766 | { |
---|
[1639] | 767 | MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; |
---|
[1054] | 768 | info(20)<<" CServer : Receive client finalize"<<endl ; |
---|
| 769 | // Sending server finalize message to secondary servers (if any) |
---|
| 770 | for(itr=interCommRight.begin();itr!=interCommRight.end();itr++) |
---|
| 771 | { |
---|
[1639] | 772 | MPI_Send(&msg,1,MPI_INT,0,0,*itr) ; |
---|
[1054] | 773 | } |
---|
[1639] | 774 | MPI_Comm_free(&(*it)); |
---|
[992] | 775 | interCommLeft.erase(it) ; |
---|
[300] | 776 | break ; |
---|
| 777 | } |
---|
| 778 | } |
---|
[490] | 779 | |
---|
[1054] | 780 | if (interCommLeft.empty()) |
---|
[300] | 781 | { |
---|
| 782 | int i,size ; |
---|
[1639] | 783 | MPI_Comm_size(intraComm,&size) ; |
---|
| 784 | MPI_Request* requests= new MPI_Request[size-1] ; |
---|
| 785 | MPI_Status* status= new MPI_Status[size-1] ; |
---|
[490] | 786 | |
---|
[1639] | 787 | for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ; |
---|
| 788 | MPI_Waitall(size-1,requests,status) ; |
---|
[300] | 789 | |
---|
| 790 | finished=true ; |
---|
| 791 | delete [] requests ; |
---|
| 792 | delete [] status ; |
---|
| 793 | } |
---|
| 794 | } |
---|
[490] | 795 | |
---|
| 796 | |
---|
[300] | 797 | void CServer::listenRootFinalize() |
---|
| 798 | { |
---|
| 799 | int flag ; |
---|
[1639] | 800 | MPI_Status status ; |
---|
[300] | 801 | int msg ; |
---|
[490] | 802 | |
---|
[347] | 803 | traceOff() ; |
---|
[1639] | 804 | MPI_Iprobe(0,4,intraComm, &flag, &status) ; |
---|
[347] | 805 | traceOn() ; |
---|
[300] | 806 | if (flag==true) |
---|
| 807 | { |
---|
[1639] | 808 | MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ; |
---|
[300] | 809 | finished=true ; |
---|
| 810 | } |
---|
| 811 | } |
---|
[490] | 812 | |
---|
[1587] | 813 | |
---|
| 814 | /*! |
---|
| 815 | * Root process is listening for an order sent by client to call "oasis_enddef". |
---|
| 816 | * The root client of a compound send the order (tag 5). It is probed and received. |
---|
| 817 | * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any). |
---|
| 818 | * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done |
---|
| 819 | */ |
---|
| 820 | |
---|
| 821 | void CServer::listenOasisEnddef(void) |
---|
| 822 | { |
---|
| 823 | int flag ; |
---|
[1639] | 824 | MPI_Status status ; |
---|
| 825 | list<MPI_Comm>::iterator it; |
---|
[1587] | 826 | int msg ; |
---|
| 827 | static int nbCompound=0 ; |
---|
| 828 | int size ; |
---|
| 829 | static bool sent=false ; |
---|
[1639] | 830 | static MPI_Request* allRequests ; |
---|
| 831 | static MPI_Status* allStatus ; |
---|
[1587] | 832 | |
---|
| 833 | |
---|
| 834 | if (sent) |
---|
| 835 | { |
---|
[1639] | 836 | MPI_Comm_size(intraComm,&size) ; |
---|
| 837 | MPI_Testall(size,allRequests, &flag, allStatus) ; |
---|
[1587] | 838 | if (flag==true) |
---|
| 839 | { |
---|
| 840 | delete [] allRequests ; |
---|
| 841 | delete [] allStatus ; |
---|
| 842 | sent=false ; |
---|
| 843 | } |
---|
| 844 | } |
---|
| 845 | |
---|
| 846 | |
---|
| 847 | for(it=interCommLeft.begin();it!=interCommLeft.end();it++) |
---|
| 848 | { |
---|
[1639] | 849 | MPI_Status status ; |
---|
[1587] | 850 | traceOff() ; |
---|
[1639] | 851 | MPI_Iprobe(0,5,*it,&flag,&status) ; // tags oasis_endded = 5 |
---|
[1587] | 852 | traceOn() ; |
---|
| 853 | if (flag==true) |
---|
| 854 | { |
---|
[1639] | 855 | MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5 |
---|
[1587] | 856 | nbCompound++ ; |
---|
| 857 | if (nbCompound==interCommLeft.size()) |
---|
| 858 | { |
---|
[1639] | 859 | for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) |
---|
[1587] | 860 | { |
---|
[1639] | 861 | MPI_Send(&msg,1,MPI_INT,0,5,*it) ; // tags oasis_endded = 5 |
---|
[1587] | 862 | } |
---|
[1639] | 863 | MPI_Comm_size(intraComm,&size) ; |
---|
| 864 | allRequests= new MPI_Request[size] ; |
---|
| 865 | allStatus= new MPI_Status[size] ; |
---|
| 866 | for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm,&allRequests[i]) ; // tags oasis_endded = 5 |
---|
[1587] | 867 | sent=true ; |
---|
| 868 | } |
---|
| 869 | } |
---|
| 870 | } |
---|
| 871 | } |
---|
| 872 | |
---|
| 873 | /*! |
---|
| 874 | * Processes probes message from root process if oasis_enddef call must be done. |
---|
| 875 | * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator |
---|
| 876 | */ |
---|
| 877 | void CServer::listenRootOasisEnddef(void) |
---|
| 878 | { |
---|
| 879 | int flag ; |
---|
[1639] | 880 | MPI_Status status ; |
---|
[1587] | 881 | const int root=0 ; |
---|
| 882 | int msg ; |
---|
| 883 | static bool eventSent=false ; |
---|
| 884 | |
---|
| 885 | if (eventSent) |
---|
| 886 | { |
---|
| 887 | boost::hash<string> hashString; |
---|
| 888 | size_t hashId = hashString("oasis_enddef"); |
---|
| 889 | if (eventScheduler->queryEvent(0,hashId)) |
---|
| 890 | { |
---|
| 891 | oasis_enddef() ; |
---|
| 892 | eventSent=false ; |
---|
| 893 | } |
---|
| 894 | } |
---|
| 895 | |
---|
| 896 | traceOff() ; |
---|
[1639] | 897 | MPI_Iprobe(root,5,intraComm, &flag, &status) ; |
---|
[1587] | 898 | traceOn() ; |
---|
| 899 | if (flag==true) |
---|
| 900 | { |
---|
[1639] | 901 | MPI_Recv(&msg,1,MPI_INT,root,5,intraComm,&status) ; // tags oasis_endded = 5 |
---|
[1587] | 902 | boost::hash<string> hashString; |
---|
| 903 | size_t hashId = hashString("oasis_enddef"); |
---|
| 904 | eventScheduler->registerEvent(0,hashId); |
---|
| 905 | eventSent=true ; |
---|
| 906 | } |
---|
| 907 | } |
---|
| 908 | |
---|
| 909 | |
---|
| 910 | |
---|
| 911 | |
---|
| 912 | |
---|
[300] | 913 | void CServer::listenContext(void) |
---|
| 914 | { |
---|
[490] | 915 | |
---|
[1639] | 916 | MPI_Status status ; |
---|
[300] | 917 | int flag ; |
---|
[1158] | 918 | static char* buffer ; |
---|
[1639] | 919 | static MPI_Request request ; |
---|
[300] | 920 | static bool recept=false ; |
---|
| 921 | int rank ; |
---|
| 922 | int count ; |
---|
[490] | 923 | |
---|
[300] | 924 | if (recept==false) |
---|
| 925 | { |
---|
[347] | 926 | traceOff() ; |
---|
[1639] | 927 | MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ; |
---|
[347] | 928 | traceOn() ; |
---|
[490] | 929 | if (flag==true) |
---|
[300] | 930 | { |
---|
| 931 | rank=status.MPI_SOURCE ; |
---|
[1639] | 932 | MPI_Get_count(&status,MPI_CHAR,&count) ; |
---|
[300] | 933 | buffer=new char[count] ; |
---|
[1639] | 934 | MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ; |
---|
[490] | 935 | recept=true ; |
---|
[300] | 936 | } |
---|
| 937 | } |
---|
| 938 | else |
---|
| 939 | { |
---|
[347] | 940 | traceOff() ; |
---|
[1639] | 941 | MPI_Test(&request,&flag,&status) ; |
---|
[347] | 942 | traceOn() ; |
---|
[300] | 943 | if (flag==true) |
---|
| 944 | { |
---|
| 945 | rank=status.MPI_SOURCE ; |
---|
[1639] | 946 | MPI_Get_count(&status,MPI_CHAR,&count) ; |
---|
[1158] | 947 | recvContextMessage((void*)buffer,count) ; |
---|
| 948 | delete [] buffer ; |
---|
[490] | 949 | recept=false ; |
---|
[300] | 950 | } |
---|
| 951 | } |
---|
| 952 | } |
---|
[490] | 953 | |
---|
[300] | 954 | void CServer::recvContextMessage(void* buff,int count) |
---|
| 955 | { |
---|
[983] | 956 | static map<string,contextMessage> recvContextId; |
---|
[300] | 957 | map<string,contextMessage>::iterator it ; |
---|
| 958 | CBufferIn buffer(buff,count) ; |
---|
| 959 | string id ; |
---|
| 960 | int clientLeader ; |
---|
| 961 | int nbMessage ; |
---|
| 962 | |
---|
| 963 | buffer>>id>>nbMessage>>clientLeader ; |
---|
[490] | 964 | |
---|
[300] | 965 | it=recvContextId.find(id) ; |
---|
| 966 | if (it==recvContextId.end()) |
---|
[490] | 967 | { |
---|
[300] | 968 | contextMessage msg={0,0} ; |
---|
| 969 | pair<map<string,contextMessage>::iterator,bool> ret ; |
---|
| 970 | ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ; |
---|
| 971 | it=ret.first ; |
---|
[490] | 972 | } |
---|
[300] | 973 | it->second.nbRecv+=1 ; |
---|
| 974 | it->second.leaderRank+=clientLeader ; |
---|
[490] | 975 | |
---|
[300] | 976 | if (it->second.nbRecv==nbMessage) |
---|
[490] | 977 | { |
---|
[300] | 978 | int size ; |
---|
[1639] | 979 | MPI_Comm_size(intraComm,&size) ; |
---|
[1148] | 980 | // MPI_Request* requests= new MPI_Request[size-1] ; |
---|
| 981 | // MPI_Status* status= new MPI_Status[size-1] ; |
---|
[1639] | 982 | MPI_Request* requests= new MPI_Request[size] ; |
---|
| 983 | MPI_Status* status= new MPI_Status[size] ; |
---|
[490] | 984 | |
---|
[1148] | 985 | CMessage msg ; |
---|
| 986 | msg<<id<<it->second.leaderRank; |
---|
| 987 | int messageSize=msg.size() ; |
---|
| 988 | void * sendBuff = new char[messageSize] ; |
---|
| 989 | CBufferOut sendBuffer(sendBuff,messageSize) ; |
---|
| 990 | sendBuffer<<msg ; |
---|
| 991 | |
---|
| 992 | // Include root itself in order not to have a divergence |
---|
| 993 | for(int i=0; i<size; i++) |
---|
[300] | 994 | { |
---|
[1639] | 995 | MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ; |
---|
[300] | 996 | } |
---|
| 997 | |
---|
| 998 | recvContextId.erase(it) ; |
---|
| 999 | delete [] requests ; |
---|
| 1000 | delete [] status ; |
---|
| 1001 | |
---|
| 1002 | } |
---|
[490] | 1003 | } |
---|
| 1004 | |
---|
[300] | 1005 | void CServer::listenRootContext(void) |
---|
| 1006 | { |
---|
[1639] | 1007 | MPI_Status status ; |
---|
[300] | 1008 | int flag ; |
---|
[1270] | 1009 | static std::vector<void*> buffers; |
---|
[1639] | 1010 | static std::vector<MPI_Request> requests ; |
---|
[1270] | 1011 | static std::vector<int> counts ; |
---|
[1271] | 1012 | static std::vector<bool> isEventRegistered ; |
---|
[1323] | 1013 | static std::vector<bool> isEventQueued ; |
---|
[1639] | 1014 | MPI_Request request; |
---|
[1270] | 1015 | |
---|
[300] | 1016 | int rank ; |
---|
| 1017 | const int root=0 ; |
---|
[1148] | 1018 | boost::hash<string> hashString; |
---|
| 1019 | size_t hashId = hashString("RegisterContext"); |
---|
[490] | 1020 | |
---|
[1270] | 1021 | // (1) Receive context id from the root, save it into a buffer |
---|
[1271] | 1022 | traceOff() ; |
---|
[1639] | 1023 | MPI_Iprobe(root,2,intraComm, &flag, &status) ; |
---|
[1271] | 1024 | traceOn() ; |
---|
| 1025 | if (flag==true) |
---|
[300] | 1026 | { |
---|
[1271] | 1027 | counts.push_back(0); |
---|
[1639] | 1028 | MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ; |
---|
[1271] | 1029 | buffers.push_back(new char[counts.back()]) ; |
---|
| 1030 | requests.push_back(request); |
---|
[1639] | 1031 | MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&(requests.back())) ; |
---|
[1271] | 1032 | isEventRegistered.push_back(false); |
---|
[1323] | 1033 | isEventQueued.push_back(false); |
---|
[1271] | 1034 | nbContexts++; |
---|
[300] | 1035 | } |
---|
[1271] | 1036 | |
---|
| 1037 | for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ ) |
---|
[300] | 1038 | { |
---|
[1271] | 1039 | // (2) If context id is received, register an event |
---|
[1639] | 1040 | MPI_Test(&requests[ctxNb],&flag,&status) ; |
---|
[1271] | 1041 | if (flag==true && !isEventRegistered[ctxNb]) |
---|
[300] | 1042 | { |
---|
[1271] | 1043 | eventScheduler->registerEvent(ctxNb,hashId); |
---|
| 1044 | isEventRegistered[ctxNb] = true; |
---|
[300] | 1045 | } |
---|
[1271] | 1046 | // (3) If event has been scheduled, call register context |
---|
[1323] | 1047 | if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb]) |
---|
[1271] | 1048 | { |
---|
| 1049 | registerContext(buffers[ctxNb],counts[ctxNb]) ; |
---|
[1323] | 1050 | isEventQueued[ctxNb] = true; |
---|
[1271] | 1051 | delete [] buffers[ctxNb] ; |
---|
| 1052 | } |
---|
[300] | 1053 | } |
---|
[1271] | 1054 | |
---|
[490] | 1055 | } |
---|
| 1056 | |
---|
[655] | 1057 | void CServer::registerContext(void* buff, int count, int leaderRank) |
---|
[300] | 1058 | { |
---|
| 1059 | string contextId; |
---|
[655] | 1060 | CBufferIn buffer(buff, count); |
---|
[1148] | 1061 | // buffer >> contextId; |
---|
| 1062 | buffer >> contextId>>leaderRank; |
---|
[983] | 1063 | CContext* context; |
---|
[300] | 1064 | |
---|
[680] | 1065 | info(20) << "CServer : Register new Context : " << contextId << endl; |
---|
[490] | 1066 | |
---|
[680] | 1067 | if (contextList.find(contextId) != contextList.end()) |
---|
| 1068 | ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)", |
---|
| 1069 | << "Context '" << contextId << "' has already been registred"); |
---|
[490] | 1070 | |
---|
[983] | 1071 | context=CContext::create(contextId); |
---|
[655] | 1072 | contextList[contextId]=context; |
---|
| 1073 | |
---|
[1148] | 1074 | // Primary or classical server: create communication channel with a client |
---|
| 1075 | // (1) create interComm (with a client) |
---|
| 1076 | // (2) initialize client and server (contextClient and contextServer) |
---|
[1639] | 1077 | MPI_Comm inter; |
---|
[1054] | 1078 | if (serverLevel < 2) |
---|
| 1079 | { |
---|
[1639] | 1080 | MPI_Comm contextInterComm; |
---|
| 1081 | MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm); |
---|
| 1082 | MPI_Intercomm_merge(contextInterComm,1,&inter); |
---|
| 1083 | MPI_Barrier(inter); |
---|
| 1084 | MPI_Comm_free(&inter); |
---|
[1054] | 1085 | context->initServer(intraComm,contextInterComm); |
---|
| 1086 | contextInterComms.push_back(contextInterComm); |
---|
[1071] | 1087 | |
---|
[1054] | 1088 | } |
---|
[1148] | 1089 | // Secondary server: create communication channel with a primary server |
---|
| 1090 | // (1) duplicate interComm with a primary server |
---|
| 1091 | // (2) initialize client and server (contextClient and contextServer) |
---|
| 1092 | // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create, |
---|
| 1093 | // because interComm of CContext is defined on the same processes as the interComm of CServer. |
---|
| 1094 | // So just duplicate it. |
---|
[1054] | 1095 | else if (serverLevel == 2) |
---|
| 1096 | { |
---|
[1639] | 1097 | MPI_Comm_dup(interCommLeft.front(), &inter); |
---|
[1071] | 1098 | contextInterComms.push_back(inter); |
---|
| 1099 | context->initServer(intraComm, contextInterComms.back()); |
---|
[1054] | 1100 | } |
---|
| 1101 | |
---|
[1148] | 1102 | // Primary server: |
---|
| 1103 | // (1) send create context message to secondary servers |
---|
| 1104 | // (2) initialize communication channels with secondary servers (create contextClient and contextServer) |
---|
[1021] | 1105 | if (serverLevel == 1) |
---|
[983] | 1106 | { |
---|
[1054] | 1107 | int i = 0, size; |
---|
[1639] | 1108 | MPI_Comm_size(intraComm, &size) ; |
---|
| 1109 | for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i) |
---|
[1054] | 1110 | { |
---|
| 1111 | StdString str = contextId +"_server_" + boost::lexical_cast<string>(i); |
---|
[1352] | 1112 | CMessage msg; |
---|
| 1113 | int messageSize; |
---|
[1077] | 1114 | msg<<str<<size<<rank_ ; |
---|
[1054] | 1115 | messageSize = msg.size() ; |
---|
| 1116 | buff = new char[messageSize] ; |
---|
| 1117 | CBufferOut buffer(buff,messageSize) ; |
---|
| 1118 | buffer<<msg ; |
---|
[1639] | 1119 | MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ; |
---|
| 1120 | MPI_Comm_dup(*it, &inter); |
---|
[1071] | 1121 | contextInterComms.push_back(inter); |
---|
[1639] | 1122 | MPI_Comm_dup(intraComm, &inter); |
---|
[1071] | 1123 | contextIntraComms.push_back(inter); |
---|
| 1124 | context->initClient(contextIntraComms.back(), contextInterComms.back()) ; |
---|
[1054] | 1125 | delete [] buff ; |
---|
| 1126 | } |
---|
[983] | 1127 | } |
---|
[490] | 1128 | } |
---|
| 1129 | |
---|
[1377] | 1130 | void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/) |
---|
[300] | 1131 | { |
---|
[1130] | 1132 | bool isFinalized ; |
---|
[300] | 1133 | map<string,CContext*>::iterator it ; |
---|
[983] | 1134 | |
---|
[490] | 1135 | for(it=contextList.begin();it!=contextList.end();it++) |
---|
[300] | 1136 | { |
---|
[1130] | 1137 | isFinalized=it->second->isFinalized(); |
---|
| 1138 | if (isFinalized) |
---|
[300] | 1139 | { |
---|
| 1140 | contextList.erase(it) ; |
---|
| 1141 | break ; |
---|
| 1142 | } |
---|
[1054] | 1143 | else |
---|
[1761] | 1144 | it->second->eventLoop(enableEventsProcessing); |
---|
| 1145 | //ym it->second->checkBuffersAndListen(enableEventsProcessing); |
---|
[300] | 1146 | } |
---|
| 1147 | } |
---|
[490] | 1148 | |
---|
[1148] | 1149 | //! Get rank of the current process in the intraComm |
---|
[490] | 1150 | int CServer::getRank() |
---|
| 1151 | { |
---|
[1167] | 1152 | int rank; |
---|
[1639] | 1153 | MPI_Comm_rank(intraComm,&rank); |
---|
[1167] | 1154 | return rank; |
---|
[490] | 1155 | } |
---|
| 1156 | |
---|
[1168] | 1157 | vector<int>& CServer::getSecondaryServerGlobalRanks() |
---|
| 1158 | { |
---|
| 1159 | return sndServerGlobalRanks; |
---|
| 1160 | } |
---|
| 1161 | |
---|
[523] | 1162 | /*! |
---|
| 1163 | * Open a file specified by a suffix and an extension and use it for the given file buffer. |
---|
| 1164 | * The file name will be suffix+rank+extension. |
---|
| 1165 | * |
---|
| 1166 | * \param fileName[in] protype file name |
---|
| 1167 | * \param ext [in] extension of the file |
---|
| 1168 | * \param fb [in/out] the file buffer |
---|
| 1169 | */ |
---|
| 1170 | void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb) |
---|
| 1171 | { |
---|
[1761] | 1172 | StdStringStream fileNameServer; |
---|
[523] | 1173 | int numDigit = 0; |
---|
[1761] | 1174 | int commSize = 0; |
---|
| 1175 | int commRank ; |
---|
[1021] | 1176 | int id; |
---|
[1761] | 1177 | |
---|
| 1178 | MPI_Comm_size(CXios::getGlobalComm(), &commSize); |
---|
| 1179 | MPI_Comm_rank(CXios::getGlobalComm(), &commRank); |
---|
| 1180 | |
---|
| 1181 | while (commSize) |
---|
[523] | 1182 | { |
---|
[1761] | 1183 | commSize /= 10; |
---|
[523] | 1184 | ++numDigit; |
---|
| 1185 | } |
---|
[1761] | 1186 | id = commRank; |
---|
[497] | 1187 | |
---|
[1761] | 1188 | fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; |
---|
| 1189 | fb->open(fileNameServer.str().c_str(), std::ios::out); |
---|
[523] | 1190 | if (!fb->is_open()) |
---|
| 1191 | ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)", |
---|
[1761] | 1192 | << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s)."); |
---|
[523] | 1193 | } |
---|
[490] | 1194 | |
---|
[523] | 1195 | /*! |
---|
| 1196 | * \brief Open a file stream to write the info logs |
---|
| 1197 | * Open a file stream with a specific file name suffix+rank |
---|
| 1198 | * to write the info logs. |
---|
| 1199 | * \param fileName [in] protype file name |
---|
| 1200 | */ |
---|
| 1201 | void CServer::openInfoStream(const StdString& fileName) |
---|
| 1202 | { |
---|
| 1203 | std::filebuf* fb = m_infoStream.rdbuf(); |
---|
| 1204 | openStream(fileName, ".out", fb); |
---|
[490] | 1205 | |
---|
[523] | 1206 | info.write2File(fb); |
---|
| 1207 | report.write2File(fb); |
---|
| 1208 | } |
---|
[490] | 1209 | |
---|
[523] | 1210 | //! Write the info logs to standard output |
---|
| 1211 | void CServer::openInfoStream() |
---|
| 1212 | { |
---|
| 1213 | info.write2StdOut(); |
---|
| 1214 | report.write2StdOut(); |
---|
| 1215 | } |
---|
[490] | 1216 | |
---|
[523] | 1217 | //! Close the info logs file if it opens |
---|
| 1218 | void CServer::closeInfoStream() |
---|
| 1219 | { |
---|
| 1220 | if (m_infoStream.is_open()) m_infoStream.close(); |
---|
| 1221 | } |
---|
| 1222 | |
---|
| 1223 | /*! |
---|
| 1224 | * \brief Open a file stream to write the error log |
---|
| 1225 | * Open a file stream with a specific file name suffix+rank |
---|
| 1226 | * to write the error log. |
---|
| 1227 | * \param fileName [in] protype file name |
---|
| 1228 | */ |
---|
| 1229 | void CServer::openErrorStream(const StdString& fileName) |
---|
| 1230 | { |
---|
| 1231 | std::filebuf* fb = m_errorStream.rdbuf(); |
---|
| 1232 | openStream(fileName, ".err", fb); |
---|
| 1233 | |
---|
| 1234 | error.write2File(fb); |
---|
| 1235 | } |
---|
| 1236 | |
---|
| 1237 | //! Write the error log to standard error output |
---|
| 1238 | void CServer::openErrorStream() |
---|
| 1239 | { |
---|
| 1240 | error.write2StdErr(); |
---|
| 1241 | } |
---|
| 1242 | |
---|
| 1243 | //! Close the error log file if it opens |
---|
| 1244 | void CServer::closeErrorStream() |
---|
| 1245 | { |
---|
| 1246 | if (m_errorStream.is_open()) m_errorStream.close(); |
---|
| 1247 | } |
---|
[1761] | 1248 | |
---|
| 1249 | void CServer::launchServersRessource(MPI_Comm serverComm) |
---|
| 1250 | { |
---|
| 1251 | serversRessource_ = new CServersRessource(serverComm) ; |
---|
| 1252 | } |
---|
[300] | 1253 | } |
---|