Changeset 1761 for XIOS/dev/dev_ym/XIOS_SERVICES/src/server.cpp
- Timestamp:
- 10/18/19 15:40:35 (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_SERVICES/src/server.cpp
r1639 r1761 15 15 #include "event_scheduler.hpp" 16 16 #include "string_tools.hpp" 17 #include "ressources_manager.hpp" 18 #include "services_manager.hpp" 19 #include "contexts_manager.hpp" 20 #include "servers_ressource.hpp" 21 #include <cstdio> 22 23 17 24 18 25 namespace xios 19 26 { 20 27 MPI_Comm CServer::intraComm ; 28 MPI_Comm CServer::serversComm_ ; 21 29 std::list<MPI_Comm> CServer::interCommLeft ; 22 30 std::list<MPI_Comm> CServer::interCommRight ; … … 34 42 bool CServer::is_MPI_Initialized ; 35 43 CEventScheduler* CServer::eventScheduler = 0; 44 CServersRessource* CServer::serversRessource_=nullptr ; 45 46 void CServer::initRessources(void) 47 { 48 auto ressourcesManager=CXios::getRessourcesManager() ; 49 auto servicesManager=CXios::getServicesManager() ; 50 auto contextsManager=CXios::getContextsManager() ; 51 auto daemonsManager=CXios::getDaemonsManager() ; 52 auto serversRessource=CServer::getServersRessource() ; 53 54 if (serversRessource->isServerLeader()) 55 { 56 // ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()/2) ; 57 // ressourcesManager->createPool("NEMO",ressourcesManager->getRessourcesSize()/2) ; 58 ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()) ; 59 servicesManager->createServices("LMDZ", "ioserver", CServicesManager::IO_SERVER, 8, 5) ; 60 for(int i=0 ; i<5;i++) 61 { 62 contextsManager->createServerContext("LMDZ","ioserver",i,"lmdz") ; 63 } 64 } 65 66 67 68 while (true) 69 { 70 daemonsManager->eventLoop() ; 71 } 72 73 74 } 75 76 void CServer::initialize(void) 77 { 78 79 MPI_Comm serverComm ; 80 int initialized ; 81 MPI_Initialized(&initialized) ; 82 if (initialized) is_MPI_Initialized=true ; 83 else is_MPI_Initialized=false ; 84 MPI_Comm globalComm=CXios::getGlobalComm() ; 85 86 ///////////////////////////////////////// 87 ///////////// PART 1 //////////////////// 88 ///////////////////////////////////////// 89 90 // don't use OASIS 91 if (!CXios::usingOasis) 92 { 93 if (!is_MPI_Initialized) MPI_Init(NULL, NULL); 94 95 // split the global communicator 96 // get hash from all model to attribute a unique color (int) and then split to get client communicator 97 // every mpi process of globalComm (MPI_COMM_WORLD) must participate 98 99 int commRank, commSize ; 100 MPI_Comm_rank(globalComm,&commRank) ; 101 MPI_Comm_size(globalComm,&commSize) ; 102 103 std::hash<string> hashString ; 104 size_t hashServer=hashString(CXios::xiosCodeId) ; 105 106 size_t* hashAll = new size_t[commSize] ; 107 MPI_Allgather(&hashServer,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ; 108 109 int color=0 ; 110 set<size_t> listHash ; 111 for(int i=0 ; i<=commRank ; i++) 112 if (listHash.count(hashAll[i])==1) 113 { 114 listHash.insert(hashAll[i]) ; 115 color=color+1 ; 116 } 117 delete[] hashAll ; 118 119 MPI_Comm_split(globalComm, color, commRank, &serverComm) ; 120 } 121 else // using OASIS 122 { 123 if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 124 125 CTimer::get("XIOS").resume() ; 126 oasis_get_localcomm(serverComm); 127 } 128 129 ///////////////////////////////////////// 130 ///////////// PART 2 //////////////////// 131 ///////////////////////////////////////// 132 133 134 // Create the XIOS communicator for every process which is related 135 // to XIOS, as well on client side as on server side 136 MPI_Comm xiosGlobalComm ; 137 string strIds=CXios::getin<string>("clients_code_id","") ; 138 vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 139 if (strIds.empty()) 140 { 141 // no code Ids given, suppose XIOS initialisation is global 142 int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ; 143 MPI_Comm splitComm,interComm ; 144 MPI_Comm_rank(globalComm,&commGlobalRank) ; 145 MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ; 146 MPI_Comm_rank(splitComm,&commRank) ; 147 if (commRank==0) serverLeader=commGlobalRank ; 148 else serverLeader=0 ; 149 clientLeader=0 ; 150 MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 151 MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 152 MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ; 153 MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ; 154 CXios::setXiosComm(xiosGlobalComm) ; 155 } 156 else 157 { 158 159 xiosGlobalCommByFileExchange(serverComm) ; 160 161 } 162 163 ///////////////////////////////////////// 164 ///////////// PART 4 //////////////////// 165 // create servers intra communicator // 166 ///////////////////////////////////////// 167 168 int commRank ; 169 MPI_Comm_rank(CXios::getXiosComm(), &commRank) ; 170 MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ; 171 172 CXios::setUsingServer() ; 173 174 ///////////////////////////////////////// 175 ///////////// PART 5 //////////////////// 176 // redirect files output // 177 ///////////////////////////////////////// 178 179 CServer::openInfoStream(CXios::serverFile); 180 CServer::openErrorStream(CXios::serverFile); 181 182 ///////////////////////////////////////// 183 ///////////// PART 4 //////////////////// 184 ///////////////////////////////////////// 185 186 CXios::launchDaemonsManager(true) ; 187 188 ///////////////////////////////////////// 189 ///////////// PART 5 //////////////////// 190 ///////////////////////////////////////// 191 192 // create the services 193 194 auto ressourcesManager=CXios::getRessourcesManager() ; 195 auto servicesManager=CXios::getServicesManager() ; 196 auto contextsManager=CXios::getContextsManager() ; 197 auto daemonsManager=CXios::getDaemonsManager() ; 198 auto serversRessource=CServer::getServersRessource() ; 199 200 if (serversRessource->isServerLeader()) 201 { 202 int nbRessources = ressourcesManager->getRessourcesSize() ; 203 if (!CXios::usingServer2) 204 { 205 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 206 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ; 207 } 208 else 209 { 210 int nprocsServer = nbRessources*CXios::ratioServer2/100.; 211 int nprocsGatherer = nbRessources - nprocsServer ; 212 213 int nbPoolsServer2 = CXios::nbPoolsServer2 ; 214 if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 215 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 216 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 217 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ; 218 } 219 } 220 221 ///////////////////////////////////////// 222 ///////////// PART 5 //////////////////// 223 ///////////////////////////////////////// 224 // loop on event loop 225 226 bool finished=false ; 227 while (!finished) 228 { 229 finished=daemonsManager->eventLoop() ; 230 } 231 232 } 233 234 235 236 237 238 void CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm) 239 { 240 241 MPI_Comm globalComm=CXios::getGlobalComm() ; 242 MPI_Comm xiosGlobalComm ; 243 244 string strIds=CXios::getin<string>("clients_code_id","") ; 245 vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 246 247 int commRank, globalRank ; 248 MPI_Comm_rank(serverComm, &commRank) ; 249 MPI_Comm_rank(globalComm, &globalRank) ; 250 string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ; 251 252 if (commRank==0) // if root process publish name 253 { 254 std::ofstream ofs (serverFileName, std::ofstream::out); 255 ofs<<globalRank ; 256 ofs.close(); 257 } 258 259 vector<int> clientsRank(clientsCodeId.size()) ; 260 for(int i=0;i<clientsRank.size();i++) 261 { 262 std::ifstream ifs ; 263 string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ; 264 do 265 { 266 ifs.clear() ; 267 ifs.open(fileName, std::ifstream::in) ; 268 } while (ifs.fail()) ; 269 ifs>>clientsRank[i] ; 270 ifs.close() ; 271 } 272 273 MPI_Comm intraComm ; 274 MPI_Comm_dup(serverComm,&intraComm) ; 275 MPI_Comm interComm ; 276 for(int i=0 ; i<clientsRank.size(); i++) 277 { 278 MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm); 279 MPI_Comm_free(&intraComm) ; 280 MPI_Intercomm_merge(interComm,false, &intraComm ) ; 281 } 282 xiosGlobalComm=intraComm ; 283 MPI_Barrier(xiosGlobalComm); 284 if (commRank==0) std::remove(serverFileName.c_str()) ; 285 MPI_Barrier(xiosGlobalComm); 286 287 CXios::setXiosComm(xiosGlobalComm) ; 288 289 } 290 291 292 void CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm) 293 { 294 // untested, need to be tested on a true MPI-2 compliant library 295 296 // try to discover other client/server 297 /* 298 // publish server name 299 char portName[MPI_MAX_PORT_NAME]; 300 int ierr ; 301 int commRank ; 302 MPI_Comm_rank(serverComm, &commRank) ; 303 304 if (commRank==0) // if root process publish name 305 { 306 MPI_Open_port(MPI_INFO_NULL, portName); 307 MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 308 } 309 310 MPI_Comm intraComm=serverComm ; 311 MPI_Comm interComm ; 312 for(int i=0 ; i<clientsCodeId.size(); i++) 313 { 314 MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 315 MPI_Intercomm_merge(interComm,false, &intraComm ) ; 316 } 317 */ 318 } 36 319 37 320 //--------------------------------------------------------------- … … 45 328 * IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead. 46 329 */ 47 void CServer::initialize (void)330 void CServer::initialize_old(void) 48 331 { 49 332 int initialized ; … … 53 336 int rank ; 54 337 338 CXios::launchRessourcesManager(true) ; 339 CXios::launchServicesManager(true) ; 340 CXios::launchContextsManager(true) ; 341 342 initRessources() ; 55 343 // Not using OASIS 56 344 if (!CXios::usingOasis) … … 421 709 MPI_Comm_free(&(*it)); 422 710 423 MPI_Comm_free(&intraComm);711 // MPI_Comm_free(&intraComm); 424 712 425 713 if (!is_MPI_Initialized) … … 854 1142 } 855 1143 else 856 it->second->checkBuffersAndListen(enableEventsProcessing); 1144 it->second->eventLoop(enableEventsProcessing); 1145 //ym it->second->checkBuffersAndListen(enableEventsProcessing); 857 1146 } 858 1147 } … … 881 1170 void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb) 882 1171 { 883 StdStringStream fileName Client;1172 StdStringStream fileNameServer; 884 1173 int numDigit = 0; 885 int size = 0; 1174 int commSize = 0; 1175 int commRank ; 886 1176 int id; 887 MPI_Comm_size(CXios::globalComm, &size); 888 while (size) 1177 1178 MPI_Comm_size(CXios::getGlobalComm(), &commSize); 1179 MPI_Comm_rank(CXios::getGlobalComm(), &commRank); 1180 1181 while (commSize) 889 1182 { 890 size /= 10;1183 commSize /= 10; 891 1184 ++numDigit; 892 1185 } 893 id = rank_; //getRank();894 895 fileName Client<< fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;896 fb->open(fileName Client.str().c_str(), std::ios::out);1186 id = commRank; 1187 1188 fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; 1189 fb->open(fileNameServer.str().c_str(), std::ios::out); 897 1190 if (!fb->is_open()) 898 1191 ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)", 899 << std::endl << "Can not open <" << fileName Client.str() << "> file to write the server log(s).");1192 << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s)."); 900 1193 } 901 1194 … … 953 1246 if (m_errorStream.is_open()) m_errorStream.close(); 954 1247 } 1248 1249 void CServer::launchServersRessource(MPI_Comm serverComm) 1250 { 1251 serversRessource_ = new CServersRessource(serverComm) ; 1252 } 955 1253 }
Note: See TracChangeset
for help on using the changeset viewer.