Ignore:
Timestamp:
10/18/19 15:40:35 (5 years ago)
Author:
ymipsl
Message:

implementing first guess for service functionnalities.

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_SERVICES/src/client.cpp

    r1756 r1761  
    1212#include "buffer_client.hpp" 
    1313#include "string_tools.hpp" 
     14#include "ressources_manager.hpp" 
     15#include "services_manager.hpp" 
     16#include <functional> 
     17#include <cstdio> 
     18 
    1419 
    1520namespace xios 
    1621{ 
    1722 
     23    const double serverPublishDefaultTimeout=10; 
     24 
    1825    MPI_Comm CClient::intraComm ; 
    1926    MPI_Comm CClient::interComm ; 
     27    MPI_Comm CClient::clientsComm_ ; 
     28 
    2029    std::list<MPI_Comm> CClient::contextInterComms; 
    2130    int CClient::serverLeader ; 
     
    2433    StdOFStream CClient::m_infoStream; 
    2534    StdOFStream CClient::m_errorStream; 
     35    CPoolRessource* CClient::poolRessource_=nullptr ; 
     36 
    2637    MPI_Comm& CClient::getInterComm(void)   { return (interComm); } 
    2738      
     
    3546 */ 
    3647 
     48    void CClient::initRessources(void) 
     49    { 
     50 
     51 /*      
     52      int commRank;  
     53      MPI_Comm_rank(CXios::globalComm,&commRank) ; 
     54      if (commRank==0) 
     55      { 
     56        ressources.createPool("ioserver1",ressources.getRessourcesSize()/2) ; 
     57      } 
     58      else if (commRank==1) 
     59      { 
     60        ressources.createPool("ioserver2",ressources.getRessourcesSize()/2) ; 
     61      } 
     62  */ 
     63    } 
     64 
    3765    void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm) 
     66    { 
     67     
     68       MPI_Comm clientComm ; 
     69      // initialize MPI if not initialized 
     70      int initialized ; 
     71      MPI_Initialized(&initialized) ; 
     72      if (initialized) is_MPI_Initialized=true ; 
     73      else is_MPI_Initialized=false ; 
     74       
     75      MPI_Comm globalComm=CXios::getGlobalComm() ; 
     76 
     77      ///////////////////////////////////////// 
     78      ///////////// PART 1 //////////////////// 
     79      ///////////////////////////////////////// 
     80       
     81 
     82      // localComm isn't given 
     83      if (localComm == MPI_COMM_NULL) 
     84      { 
     85           
     86        // don't use OASIS 
     87        if (!CXios::usingOasis) 
     88        { 
     89 
     90          if (!is_MPI_Initialized) 
     91          { 
     92            MPI_Init(NULL, NULL); 
     93          } 
     94          CTimer::get("XIOS").resume() ; 
     95          CTimer::get("XIOS init/finalize",false).resume() ; 
     96           
     97          // split the global communicator 
     98          // get hash from all model to attribute a unique color (int) and then split to get client communicator 
     99          // every mpi process of globalComm (MPI_COMM_WORLD) must participate 
     100 
     101          int commRank, commSize ; 
     102          MPI_Comm_rank(globalComm,&commRank) ; 
     103          MPI_Comm_size(globalComm,&commSize) ; 
     104 
     105          std::hash<string> hashString ; 
     106          size_t hashClient=hashString(codeId) ; 
     107           
     108          size_t* hashAll = new size_t[commSize] ; 
     109          MPI_Allgather(&hashClient,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ; 
     110           
     111          int color=0 ; 
     112          set<size_t> listHash ; 
     113          for(int i=0 ; i<=commRank ; i++)  
     114            if (listHash.count(hashAll[i])==0) 
     115            { 
     116              listHash.insert(hashAll[i]) ; 
     117              color=color+1 ; 
     118            } 
     119          delete[] hashAll ; 
     120 
     121          MPI_Comm_split(globalComm, color, commRank, &clientComm) ; 
     122        } 
     123        else // using oasis to split communicator 
     124        { 
     125          if (!is_MPI_Initialized) oasis_init(codeId) ; 
     126          oasis_get_localcomm(clientComm) ; 
     127        } 
     128      } 
     129      else // localComm is given 
     130      { 
     131        MPI_Comm_dup(localComm,&clientComm) ; 
     132      } 
     133       
     134      
     135      ///////////////////////////////////////// 
     136      ///////////// PART 2 //////////////////// 
     137      ///////////////////////////////////////// 
     138       
     139 
     140      // Create the XIOS communicator for every process which is related 
     141      // to XIOS, as well on client side as on server side 
     142       
     143      MPI_Comm xiosGlobalComm ; 
     144      string strIds=CXios::getin<string>("clients_code_id","") ; 
     145      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 
     146      if (strIds.empty()) 
     147      { 
     148         // no code Ids given, suppose XIOS initialisation is global             
     149         int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ; 
     150         MPI_Comm splitComm,interComm ; 
     151         MPI_Comm_rank(globalComm,&commGlobalRank) ; 
     152         MPI_Comm_split(globalComm, 0, commGlobalRank, &splitComm) ; 
     153         int splitCommSize, globalCommSize ; 
     154         
     155         MPI_Comm_size(splitComm,&splitCommSize) ; 
     156         MPI_Comm_size(globalComm,&globalCommSize) ; 
     157         if (splitCommSize==globalCommSize) // no server 
     158         { 
     159           MPI_Comm_dup(globalComm,&xiosGlobalComm) ; 
     160           CXios::setXiosComm(xiosGlobalComm) ; 
     161         } 
     162         else 
     163         { 
     164           MPI_Comm_rank(splitComm,&commRank) ; 
     165           if (commRank==0) clientLeader=commGlobalRank ; 
     166           else clientLeader=0 ; 
     167           serverLeader=0 ; 
     168           MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 
     169           MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 
     170           MPI_Intercomm_create(splitComm, 0, globalComm, serverRemoteLeader,1341,&interComm) ; 
     171           MPI_Intercomm_merge(interComm,true,&xiosGlobalComm) ; 
     172           CXios::setXiosComm(xiosGlobalComm) ; 
     173         } 
     174      } 
     175      else 
     176      { 
     177 
     178        xiosGlobalCommByFileExchange(clientComm, codeId) ; 
     179       
     180      } 
     181 
     182      int commRank ; 
     183      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ; 
     184      MPI_Comm_split(CXios::getXiosComm(),false,commRank, &clientsComm_) ; 
     185       
     186      // is using server or not ? 
     187      int xiosCommSize, clientsCommSize ;  
     188      MPI_Comm_size(CXios::getXiosComm(), &xiosCommSize) ; 
     189      MPI_Comm_size(clientsComm_, &clientsCommSize) ; 
     190      if (xiosCommSize==clientsCommSize) CXios::setUsingServer() ; 
     191      else CXios::setNotUsingServer() ; 
     192 
     193 
     194      CXios::setGlobalRegistry(new CRegistry(clientsComm_)) ; 
     195      ///////////////////////////////////////// 
     196      ///////////// PART 3 //////////////////// 
     197      ///////////////////////////////////////// 
     198      
     199      CXios::launchDaemonsManager(false) ; 
     200      poolRessource_ = new CPoolRessource(clientComm, codeId) ; 
     201 
     202      ///////////////////////////////////////// 
     203      ///////////// PART 4 //////////////////// 
     204      /////////////////////////////////////////       
     205       
     206      // create the services 
     207/* 
     208      int commRank ; 
     209      MPI_Comm_rank(clientComm,&commRank) ; 
     210      auto contextsManager=CXios::getContextsManager() ; 
     211     
     212      if (commRank==0)  
     213      { 
     214        contextsManager->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, codeId) ; 
     215      } 
     216       
     217      MPI_Comm interComm ; 
     218 
     219      contextsManager->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, codeId, clientComm, interComm) ; 
     220*/   
     221/*      while (true)  
     222      { 
     223 
     224      } 
     225*/       
     226      returnComm = clientComm ; 
     227    } 
     228 
     229 
     230    void CClient::xiosGlobalCommByFileExchange(MPI_Comm clientComm, const string& codeId) 
     231    { 
     232  
     233      MPI_Comm globalComm=CXios::getGlobalComm() ; 
     234      MPI_Comm xiosGlobalComm ; 
     235 
     236      string strIds=CXios::getin<string>("clients_code_id","") ; 
     237      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 
     238 
     239      int commRank, globalRank, clientRank, serverRank ; 
     240      MPI_Comm_rank(clientComm, &commRank) ; 
     241      MPI_Comm_rank(globalComm, &globalRank) ; 
     242      string clientFileName("__xios_publisher::"+codeId+"__to_remove__") ; 
     243            
     244      int error ; 
     245 
     246      if (commRank==0) // if root process publish name 
     247      {   
     248        std::ofstream ofs (clientFileName, std::ofstream::out); 
     249        ofs<<globalRank ; 
     250        ofs.close(); 
     251         
     252  // get server root rank 
     253 
     254        std::ifstream ifs ; 
     255        string fileName=("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ; 
     256       
     257        double timeout = CXios::getin<double>("server_puplish_timeout",serverPublishDefaultTimeout) ; 
     258        double time ; 
     259           
     260        do 
     261        { 
     262          CTimer::get("server_publish_timeout").resume() ;   
     263          ifs.clear() ; 
     264          ifs.open(fileName, std::ifstream::in) ; 
     265          CTimer::get("server_publish_timeout").suspend() ; 
     266        } while (ifs.fail() && CTimer::get("server_publish_timeout").getCumulatedTime()<timeout) ; 
     267         
     268        if (CTimer::get("server_publish_timeout").getCumulatedTime()>=timeout || ifs.fail()) 
     269        { 
     270          ifs.clear() ; 
     271          ifs.close() ; 
     272          ifs.clear() ; 
     273          error=true ;             
     274        } 
     275        else  
     276        { 
     277          ifs>>serverRank ; 
     278          ifs.close() ; 
     279          error=false ; 
     280        }  
     281 
     282      }  
     283       
     284      MPI_Bcast(&error,1,MPI_INT,0,clientComm) ; 
     285       
     286      if (error==false)  // you have a server 
     287      { 
     288        MPI_Comm intraComm ; 
     289        MPI_Comm_dup(clientComm,&intraComm) ; 
     290        MPI_Comm interComm ; 
     291         
     292        int pos=0 ; 
     293        for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ; 
     294 
     295        bool high=true ; 
     296        for(int i=pos ; i<clientsCodeId.size(); i++) 
     297        {   
     298          MPI_Intercomm_create(intraComm, 0, globalComm, serverRank, 3141, &interComm); 
     299          MPI_Comm_free(&intraComm) ; 
     300          MPI_Intercomm_merge(interComm,high, &intraComm ) ; 
     301          high=false ; 
     302        } 
     303        xiosGlobalComm=intraComm ; 
     304      } 
     305      else  // no server detected 
     306      { 
     307        vector<int> clientsRank(clientsCodeId.size()) ; 
     308         
     309        if (commRank==0) 
     310        {   
     311          for(int i=0;i<clientsRank.size();i++) 
     312          { 
     313            std::ifstream ifs ; 
     314            string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ; 
     315            do 
     316            { 
     317              ifs.clear() ; 
     318              ifs.open(fileName, std::ifstream::in) ; 
     319            } while (ifs.fail()) ; 
     320            ifs>>clientsRank[i] ; 
     321            ifs.close() ; 
     322          } 
     323        } 
     324          
     325        int client ; 
     326        MPI_Comm intraComm ; 
     327        MPI_Comm_dup(clientComm,&intraComm) ; 
     328        MPI_Comm interComm ; 
     329         
     330        int pos=0 ; 
     331        for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ; 
     332         
     333        bool high=true ; 
     334        for(int i=pos+1 ; i<clientsCodeId.size(); i++) 
     335        {   
     336          if (codeId==clientsCodeId[0])   // first model play the server rule 
     337          {           
     338            MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm); 
     339            MPI_Intercomm_merge(interComm,false, &intraComm ) ; 
     340          } 
     341          else 
     342          {           
     343            MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[0], 3141, &interComm); 
     344            MPI_Intercomm_merge(interComm,high, &intraComm ) ; 
     345            high=false ; 
     346          } 
     347        } 
     348        xiosGlobalComm=intraComm ; 
     349      } 
     350 
     351      MPI_Barrier(xiosGlobalComm); 
     352      if (commRank==0) std::remove(clientFileName.c_str()) ;          
     353      MPI_Barrier(xiosGlobalComm); 
     354   
     355      CXios::setXiosComm(xiosGlobalComm) ; 
     356 
     357    } 
     358 
     359    void CClient::xiosGlobalCommByPublishing(MPI_Comm clientComm, const string& codeId) 
     360    { 
     361 
     362      // untested. need to be developped an a true MPI compliant library 
     363 
     364/* 
     365        // try to discover other client/server 
     366        // do you have a xios server ? 
     367        char portName[MPI_MAX_PORT_NAME]; 
     368        int ierr ; 
     369        int commRank ; 
     370        MPI_Comm_rank(clientComm,&commRank) ; 
     371 
     372        MPI_Barrier(globalComm) ; 
     373        if (commRank==0) 
     374        { 
     375              
     376          MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN ); 
     377          const char* serviceName=CXios::xiosCodeId.c_str() ; 
     378          ierr=MPI_Lookup_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 
     379          MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL ); 
     380        } 
     381        ierr=MPI_SUCCESS ; 
     382        MPI_Bcast(&ierr,1,MPI_INT,0,clientComm) ; 
     383 
     384        if (ierr==MPI_SUCCESS) // you have a server 
     385        {   
     386          MPI_Comm intraComm=clientComm ; 
     387          MPI_Comm interComm ; 
     388          for(int i=0 ; i<clientsCodeId.size(); i++) 
     389          {   
     390            MPI_Comm_connect(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 
     391            MPI_Intercomm_merge(interComm, true, &intraComm ) ; 
     392          } 
     393          xiosGlobalComm=intraComm ; 
     394        } 
     395        else  // you don't have any server 
     396        { 
     397          if (codeId==clientsCodeId[0]) // first code will publish his name 
     398          { 
     399 
     400            if (commRank==0) // if root process publish name 
     401            {   
     402              MPI_Open_port(MPI_INFO_NULL, portName); 
     403              MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 
     404            } 
     405 
     406            MPI_Comm intraComm=clientComm ; 
     407            MPI_Comm interComm ; 
     408            for(int i=0 ; i<clientsCodeId.size()-1; i++) 
     409            {   
     410              MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 
     411              MPI_Intercomm_merge(interComm,false, &intraComm ) ; 
     412            } 
     413          } 
     414          else  // other clients are connecting to the first one 
     415          { 
     416            if (commRank==0) 
     417            { 
     418 
     419              MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN ); 
     420              ierr=MPI_Lookup_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 
     421              MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL ); 
     422             } 
     423 
     424            MPI_Bcast(&ierr,1,MPI_INT,0,clientComm) ; 
     425 
     426            if (ierr==MPI_SUCCESS) // you can connect 
     427            {   
     428              MPI_Comm intraComm=clientComm ; 
     429              MPI_Comm interComm ; 
     430              for(int i=0 ; i<clientsCodeId.size()-1; i++) 
     431              {   
     432                MPI_Comm_connect(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 
     433                MPI_Intercomm_merge(interComm, true, &intraComm ) ; 
     434              } 
     435              xiosGlobalComm=intraComm ; 
     436            } 
     437          } 
     438        }   
     439      */ 
     440    } 
     441 
     442    void CClient::initialize_old(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm) 
    38443    { 
    39444      int initialized ; 
     
    42447      else is_MPI_Initialized=false ; 
    43448      int rank ; 
    44  
     449       
     450      CXios::launchRessourcesManager(false) ; 
     451      CXios::launchServicesManager( false) ; 
     452      CXios::launchContextsManager(false) ; 
     453 
     454      initRessources() ; 
    45455// don't use OASIS 
    46456      if (!CXios::usingOasis) 
     
    160570    } 
    161571 
     572 
     573 
     574    void CClient::registerContext(const string& id, MPI_Comm contextComm) 
     575    { 
     576      int commRank, commSize ; 
     577      MPI_Comm_rank(contextComm,&commRank) ; 
     578      MPI_Comm_size(contextComm,&commSize) ; 
     579 
     580      getPoolRessource()->createService(contextComm, id, 0, CServicesManager::CLIENT, 1) ; 
     581      getPoolRessource()->createService(contextComm, CXios::defaultServerId, 0, CServicesManager::IO_SERVER, 1) ; 
     582 
     583      if (commRank==0) while (!CXios::getServicesManager()->hasService(getPoolRessource()->getId(), id, 0)) { CXios::getDaemonsManager()->eventLoop();} 
     584 
     585      if (commRank==0) CXios::getContextsManager()->createServerContext(getPoolRessource()->getId(), id, 0, id) ; 
     586      int type=CServicesManager::CLIENT ; 
     587      string name = CXios::getContextsManager()->getServerContextName(getPoolRessource()->getId(), id, 0, type, id) ; 
     588      while (!CXios::getContextsManager()->hasContext(name, contextComm) ) 
     589      { 
     590        CXios::getDaemonsManager()->eventLoop() ; 
     591      } 
     592 
     593/*       
     594 
     595      CContext::setCurrent(id) ; 
     596      CContext* context=CContext::create(id); 
     597       
     598      // register the new client side context to the contexts manager 
     599      if (commRank==0) 
     600      { 
     601        MPI_Comm_rank(CXios::getXiosComm(),&commRank) ; 
     602        SRegisterContextInfo contextInfo ; 
     603        contextInfo.serviceType=CServicesManager::CLIENT ; 
     604        contextInfo.partitionId=0 ; 
     605        contextInfo.leader=commRank ; 
     606        contextInfo.size=commSize ; 
     607        CXios::getContextsManager()->registerContext(id, contextInfo) ; 
     608      } 
     609      context->initClient(contextComm) ; 
     610*/   
     611    } 
     612 
     613 
    162614///--------------------------------------------------------------- 
    163615/*! 
     
    168620 * Function is only called by client. 
    169621 */ 
    170     void CClient::registerContext(const string& id, MPI_Comm contextComm) 
     622    void CClient::registerContext_old(const string& id, MPI_Comm contextComm) 
    171623    { 
    172624      CContext::setCurrent(id) ; 
     
    260712    } 
    261713 
    262  
    263714    void CClient::finalize(void) 
     715    { 
     716      
     717      MPI_Barrier(clientsComm_) ; 
     718      int commRank ; 
     719      MPI_Comm_rank(clientsComm_, &commRank) ; 
     720      if (commRank==0) CXios::getRessourcesManager()->finalize() ; 
     721       
     722      auto globalRegistry=CXios::getGlobalRegistry() ; 
     723      globalRegistry->hierarchicalGatherRegistry() ; 
     724 
     725      if (commRank==0) 
     726      { 
     727        info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ; 
     728        globalRegistry->toFile("xios_registry.bin") ; 
     729      } 
     730      delete globalRegistry ; 
     731 
     732      CTimer::get("XIOS init/finalize",false).suspend() ; 
     733      CTimer::get("XIOS").suspend() ; 
     734      if (!is_MPI_Initialized) 
     735      { 
     736        if (CXios::usingOasis) oasis_finalize(); 
     737        else MPI_Finalize() ; 
     738      } 
     739       
     740      info(20) << "Client side context is finalized"<<endl ; 
     741      report(0) <<" Performance report : Whole time from XIOS init and finalize: "<< CTimer::get("XIOS init/finalize").getCumulatedTime()<<" s"<<endl ; 
     742      report(0) <<" Performance report : total time spent for XIOS : "<< CTimer::get("XIOS").getCumulatedTime()<<" s"<<endl ; 
     743      report(0)<< " Performance report : time spent for waiting free buffer : "<< CTimer::get("Blocking time").getCumulatedTime()<<" s"<<endl ; 
     744      report(0)<< " Performance report : Ratio : "<< CTimer::get("Blocking time").getCumulatedTime()/CTimer::get("XIOS init/finalize").getCumulatedTime()*100.<<" %"<<endl ; 
     745      report(0)<< " Performance report : This ratio must be close to zero. Otherwise it may be usefull to increase buffer size or numbers of server"<<endl ; 
     746//      report(0)<< " Memory report : Current buffer_size : "<<CXios::bufferSize<<endl ; 
     747      report(0)<< " Memory report : Minimum buffer size required : " << CClientBuffer::maxRequestSize << " bytes" << endl ; 
     748      report(0)<< " Memory report : increasing it by a factor will increase performance, depending of the volume of data wrote in file at each time step of the file"<<endl ; 
     749      report(100)<<CTimer::getAllCumulatedTime()<<endl ; 
     750    } 
     751     
     752 
     753    void CClient::finalize_old(void) 
    264754    { 
    265755      int rank ; 
     
    325815      int size = 0; 
    326816      int rank; 
    327       MPI_Comm_size(CXios::globalComm, &size); 
     817      MPI_Comm_size(CXios::getGlobalComm(), &size); 
     818      MPI_Comm_rank(CXios::getGlobalComm(),&rank); 
    328819      while (size) 
    329820      { 
     
    332823      } 
    333824 
    334       if (CXios::usingOasis) 
    335       { 
    336         MPI_Comm_rank(CXios::globalComm,&rank); 
    337         fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << rank << ext; 
    338       } 
    339       else 
    340         fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext; 
    341  
     825      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << rank << ext; 
    342826 
    343827      fb->open(fileNameClient.str().c_str(), std::ios::out); 
Note: See TracChangeset for help on using the changeset viewer.