Ignore:
Timestamp:
10/18/19 15:40:35 (5 years ago)
Author:
ymipsl
Message:

implementing first guess for service functionnalities.

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_SERVICES/src/server.cpp

    r1639 r1761  
    1515#include "event_scheduler.hpp" 
    1616#include "string_tools.hpp" 
     17#include "ressources_manager.hpp" 
     18#include "services_manager.hpp" 
     19#include "contexts_manager.hpp" 
     20#include "servers_ressource.hpp" 
     21#include <cstdio> 
     22 
     23 
    1724 
    1825namespace xios 
    1926{ 
    2027    MPI_Comm CServer::intraComm ; 
     28    MPI_Comm CServer::serversComm_ ; 
    2129    std::list<MPI_Comm> CServer::interCommLeft ; 
    2230    std::list<MPI_Comm> CServer::interCommRight ; 
     
    3442    bool CServer::is_MPI_Initialized ; 
    3543    CEventScheduler* CServer::eventScheduler = 0; 
     44    CServersRessource* CServer::serversRessource_=nullptr ; 
     45 
     46    void CServer::initRessources(void) 
     47    {  
     48      auto ressourcesManager=CXios::getRessourcesManager() ; 
     49      auto servicesManager=CXios::getServicesManager() ; 
     50      auto contextsManager=CXios::getContextsManager() ; 
     51      auto daemonsManager=CXios::getDaemonsManager() ; 
     52      auto serversRessource=CServer::getServersRessource() ; 
     53 
     54      if (serversRessource->isServerLeader()) 
     55      { 
     56 //        ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()/2) ; 
     57 //        ressourcesManager->createPool("NEMO",ressourcesManager->getRessourcesSize()/2) ; 
     58          ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()) ; 
     59          servicesManager->createServices("LMDZ", "ioserver", CServicesManager::IO_SERVER, 8, 5) ; 
     60          for(int i=0 ; i<5;i++) 
     61          { 
     62            contextsManager->createServerContext("LMDZ","ioserver",i,"lmdz") ; 
     63          } 
     64      } 
     65 
     66 
     67 
     68      while (true) 
     69      { 
     70        daemonsManager->eventLoop() ; 
     71      } 
     72 
     73 
     74    } 
     75     
     76    void CServer::initialize(void) 
     77    { 
     78       
     79      MPI_Comm serverComm ; 
     80      int initialized ; 
     81      MPI_Initialized(&initialized) ; 
     82      if (initialized) is_MPI_Initialized=true ; 
     83      else is_MPI_Initialized=false ; 
     84      MPI_Comm globalComm=CXios::getGlobalComm() ; 
     85 
     86      ///////////////////////////////////////// 
     87      ///////////// PART 1 //////////////////// 
     88      ///////////////////////////////////////// 
     89 
     90      // don't use OASIS 
     91      if (!CXios::usingOasis) 
     92      { 
     93        if (!is_MPI_Initialized) MPI_Init(NULL, NULL); 
     94        
     95        // split the global communicator 
     96        // get hash from all model to attribute a unique color (int) and then split to get client communicator 
     97        // every mpi process of globalComm (MPI_COMM_WORLD) must participate 
     98          
     99        int commRank, commSize ; 
     100        MPI_Comm_rank(globalComm,&commRank) ; 
     101        MPI_Comm_size(globalComm,&commSize) ; 
     102 
     103        std::hash<string> hashString ; 
     104        size_t hashServer=hashString(CXios::xiosCodeId) ; 
     105           
     106        size_t* hashAll = new size_t[commSize] ; 
     107        MPI_Allgather(&hashServer,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ; 
     108           
     109        int color=0 ; 
     110        set<size_t> listHash ; 
     111        for(int i=0 ; i<=commRank ; i++)  
     112          if (listHash.count(hashAll[i])==1) 
     113          { 
     114            listHash.insert(hashAll[i]) ; 
     115            color=color+1 ; 
     116          } 
     117        delete[] hashAll ; 
     118 
     119        MPI_Comm_split(globalComm, color, commRank, &serverComm) ; 
     120      } 
     121      else // using OASIS 
     122      { 
     123        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 
     124 
     125        CTimer::get("XIOS").resume() ; 
     126        oasis_get_localcomm(serverComm); 
     127      } 
     128  
     129      ///////////////////////////////////////// 
     130      ///////////// PART 2 //////////////////// 
     131      ///////////////////////////////////////// 
     132       
     133 
     134      // Create the XIOS communicator for every process which is related 
     135      // to XIOS, as well on client side as on server side 
     136      MPI_Comm xiosGlobalComm ; 
     137      string strIds=CXios::getin<string>("clients_code_id","") ; 
     138      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 
     139      if (strIds.empty()) 
     140      { 
     141        // no code Ids given, suppose XIOS initialisation is global             
     142        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ; 
     143        MPI_Comm splitComm,interComm ; 
     144        MPI_Comm_rank(globalComm,&commGlobalRank) ; 
     145        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ; 
     146        MPI_Comm_rank(splitComm,&commRank) ; 
     147        if (commRank==0) serverLeader=commGlobalRank ; 
     148        else serverLeader=0 ; 
     149        clientLeader=0 ; 
     150        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 
     151        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 
     152        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ; 
     153        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ; 
     154        CXios::setXiosComm(xiosGlobalComm) ; 
     155      } 
     156      else 
     157      { 
     158 
     159        xiosGlobalCommByFileExchange(serverComm) ; 
     160 
     161      } 
     162       
     163      ///////////////////////////////////////// 
     164      ///////////// PART 4 //////////////////// 
     165      //  create servers intra communicator  // 
     166      /////////////////////////////////////////  
     167       
     168      int commRank ; 
     169      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ; 
     170      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ; 
     171       
     172      CXios::setUsingServer() ; 
     173 
     174      ///////////////////////////////////////// 
     175      ///////////// PART 5 //////////////////// 
     176      //       redirect files output         // 
     177      /////////////////////////////////////////  
     178       
     179      CServer::openInfoStream(CXios::serverFile); 
     180      CServer::openErrorStream(CXios::serverFile); 
     181 
     182      ///////////////////////////////////////// 
     183      ///////////// PART 4 //////////////////// 
     184      ///////////////////////////////////////// 
     185 
     186      CXios::launchDaemonsManager(true) ; 
     187      
     188      ///////////////////////////////////////// 
     189      ///////////// PART 5 //////////////////// 
     190      ///////////////////////////////////////// 
     191 
     192      // create the services 
     193 
     194      auto ressourcesManager=CXios::getRessourcesManager() ; 
     195      auto servicesManager=CXios::getServicesManager() ; 
     196      auto contextsManager=CXios::getContextsManager() ; 
     197      auto daemonsManager=CXios::getDaemonsManager() ; 
     198      auto serversRessource=CServer::getServersRessource() ; 
     199 
     200      if (serversRessource->isServerLeader()) 
     201      { 
     202        int nbRessources = ressourcesManager->getRessourcesSize() ; 
     203        if (!CXios::usingServer2) 
     204        { 
     205          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
     206          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ; 
     207        } 
     208        else 
     209        { 
     210          int nprocsServer = nbRessources*CXios::ratioServer2/100.; 
     211          int nprocsGatherer = nbRessources - nprocsServer ; 
     212           
     213          int nbPoolsServer2 = CXios::nbPoolsServer2 ; 
     214          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 
     215          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
     216          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 
     217          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ; 
     218        } 
     219      } 
     220 
     221      ///////////////////////////////////////// 
     222      ///////////// PART 5 //////////////////// 
     223      ///////////////////////////////////////// 
     224      // loop on event loop 
     225 
     226      bool finished=false ; 
     227      while (!finished) 
     228      { 
     229        finished=daemonsManager->eventLoop() ; 
     230      } 
     231 
     232    } 
     233 
     234 
     235 
     236 
     237 
     238    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm) 
     239    { 
     240         
     241      MPI_Comm globalComm=CXios::getGlobalComm() ; 
     242      MPI_Comm xiosGlobalComm ; 
     243       
     244      string strIds=CXios::getin<string>("clients_code_id","") ; 
     245      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 
     246       
     247      int commRank, globalRank ; 
     248      MPI_Comm_rank(serverComm, &commRank) ; 
     249      MPI_Comm_rank(globalComm, &globalRank) ; 
     250      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ; 
     251 
     252      if (commRank==0) // if root process publish name 
     253      {   
     254        std::ofstream ofs (serverFileName, std::ofstream::out); 
     255        ofs<<globalRank ; 
     256        ofs.close(); 
     257      } 
     258         
     259      vector<int> clientsRank(clientsCodeId.size()) ; 
     260      for(int i=0;i<clientsRank.size();i++) 
     261      { 
     262        std::ifstream ifs ; 
     263        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ; 
     264        do 
     265        { 
     266          ifs.clear() ; 
     267          ifs.open(fileName, std::ifstream::in) ; 
     268        } while (ifs.fail()) ; 
     269        ifs>>clientsRank[i] ; 
     270        ifs.close() ;  
     271      } 
     272 
     273      MPI_Comm intraComm ; 
     274      MPI_Comm_dup(serverComm,&intraComm) ; 
     275      MPI_Comm interComm ; 
     276      for(int i=0 ; i<clientsRank.size(); i++) 
     277      {   
     278        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm); 
     279        MPI_Comm_free(&intraComm) ; 
     280        MPI_Intercomm_merge(interComm,false, &intraComm ) ; 
     281      } 
     282      xiosGlobalComm=intraComm ;   
     283      MPI_Barrier(xiosGlobalComm); 
     284      if (commRank==0) std::remove(serverFileName.c_str()) ; 
     285      MPI_Barrier(xiosGlobalComm); 
     286 
     287      CXios::setXiosComm(xiosGlobalComm) ; 
     288       
     289    } 
     290 
     291 
     292    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm) 
     293    { 
     294        // untested, need to be tested on a true MPI-2 compliant library 
     295 
     296        // try to discover other client/server 
     297/* 
     298        // publish server name 
     299        char portName[MPI_MAX_PORT_NAME]; 
     300        int ierr ; 
     301        int commRank ; 
     302        MPI_Comm_rank(serverComm, &commRank) ; 
     303         
     304        if (commRank==0) // if root process publish name 
     305        {   
     306          MPI_Open_port(MPI_INFO_NULL, portName); 
     307          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 
     308        } 
     309 
     310        MPI_Comm intraComm=serverComm ; 
     311        MPI_Comm interComm ; 
     312        for(int i=0 ; i<clientsCodeId.size(); i++) 
     313        {   
     314          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 
     315          MPI_Intercomm_merge(interComm,false, &intraComm ) ; 
     316        } 
     317*/       
     318    } 
    36319 
    37320//--------------------------------------------------------------- 
     
    45328 *   IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead. 
    46329 */ 
    47     void CServer::initialize(void) 
     330    void CServer::initialize_old(void) 
    48331    { 
    49332      int initialized ; 
     
    53336      int rank ; 
    54337 
     338      CXios::launchRessourcesManager(true) ; 
     339      CXios::launchServicesManager(true) ; 
     340      CXios::launchContextsManager(true) ; 
     341       
     342      initRessources() ; 
    55343      // Not using OASIS 
    56344      if (!CXios::usingOasis) 
     
    421709          MPI_Comm_free(&(*it)); 
    422710 
    423       MPI_Comm_free(&intraComm); 
     711//      MPI_Comm_free(&intraComm); 
    424712 
    425713      if (!is_MPI_Initialized) 
     
    8541142         } 
    8551143         else 
    856            it->second->checkBuffersAndListen(enableEventsProcessing); 
     1144          it->second->eventLoop(enableEventsProcessing); 
     1145//ym          it->second->checkBuffersAndListen(enableEventsProcessing); 
    8571146       } 
    8581147     } 
     
    8811170    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb) 
    8821171    { 
    883       StdStringStream fileNameClient; 
     1172      StdStringStream fileNameServer; 
    8841173      int numDigit = 0; 
    885       int size = 0; 
     1174      int commSize = 0; 
     1175      int commRank ; 
    8861176      int id; 
    887       MPI_Comm_size(CXios::globalComm, &size); 
    888       while (size) 
     1177       
     1178      MPI_Comm_size(CXios::getGlobalComm(), &commSize); 
     1179      MPI_Comm_rank(CXios::getGlobalComm(), &commRank); 
     1180 
     1181      while (commSize) 
    8891182      { 
    890         size /= 10; 
     1183        commSize /= 10; 
    8911184        ++numDigit; 
    8921185      } 
    893       id = rank_; //getRank(); 
    894  
    895       fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; 
    896       fb->open(fileNameClient.str().c_str(), std::ios::out); 
     1186      id = commRank; 
     1187 
     1188      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; 
     1189      fb->open(fileNameServer.str().c_str(), std::ios::out); 
    8971190      if (!fb->is_open()) 
    8981191        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)", 
    899               << std::endl << "Can not open <" << fileNameClient.str() << "> file to write the server log(s)."); 
     1192              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s)."); 
    9001193    } 
    9011194 
     
    9531246      if (m_errorStream.is_open()) m_errorStream.close(); 
    9541247    } 
     1248 
     1249    void CServer::launchServersRessource(MPI_Comm serverComm) 
     1250    { 
     1251      serversRessource_ = new CServersRessource(serverComm) ; 
     1252    } 
    9551253} 
Note: See TracChangeset for help on using the changeset viewer.