Ignore:
Timestamp:
10/18/19 15:40:35 (5 years ago)
Author:
ymipsl
Message:

implementing first guess for service functionnalities.

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.cpp

    r1757 r1761  
    2020#include "server.hpp" 
    2121#include "distribute_file_server2.hpp" 
     22#include "services_manager.hpp" 
     23#include "contexts_manager.hpp" 
     24#include "cxios.hpp" 
     25#include "client.hpp" 
    2226 
    2327namespace xios { 
     
    3135      , calendar(), hasClient(false), hasServer(false) 
    3236      , isPostProcessed(false), finalized(false) 
    33       , idServer_(), client(0), server(0) 
    34       , allProcessed(false), countChildCtx_(0) 
     37      , idServer_(), client(nullptr), server(nullptr) 
     38      , allProcessed(false), countChildCtx_(0), isProcessingEvent_(false) 
    3539 
    3640   { /* Ne rien faire de plus */ } 
     
    4044      , calendar(), hasClient(false), hasServer(false) 
    4145      , isPostProcessed(false), finalized(false) 
    42       , idServer_(), client(0), server(0) 
    43       , allProcessed(false), countChildCtx_(0) 
     46      , idServer_(), client(nullptr), server(nullptr) 
     47      , allProcessed(false), countChildCtx_(0), isProcessingEvent_(false) 
    4448   { /* Ne rien faire de plus */ } 
    4549 
     
    264268   ///--------------------------------------------------------------- 
    265269 
    266    //! Initialize client side 
     270 
     271   //! Initialize client side : old interface to be removed 
    267272   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/) 
    268273   TRY 
     
    270275 
    271276     hasClient = true; 
    272      MPI_Comm intraCommServer, interCommServer; 
     277     MPI_Comm intraCommServer, interCommServer;  
    273278      
    274279 
     
    383388   CATCH_DUMP_ATTR 
    384389 
     390 
     391   void CContext::init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType) 
     392   TRY 
     393   { 
     394     parentServerContext_ = parentServerContext ; 
     395     if (serviceType==CServicesManager::CLIENT)  
     396       initClient(intraComm, serviceType) ; 
     397     else 
     398       initServer(intraComm, serviceType) ; 
     399    } 
     400    CATCH_DUMP_ATTR 
     401 
     402 
     403 
     404//! Initialize client side 
     405   void CContext::initClient(MPI_Comm intraComm, int serviceType) 
     406   TRY 
     407   { 
     408      intraComm_=intraComm ; 
     409      serviceType_ = CServicesManager::CLIENT ; 
     410      if (serviceType_==CServicesManager::CLIENT) 
     411      { 
     412        hasClient=true ; 
     413        hasServer=false ; 
     414      } 
     415      contextId_ = getId() ; 
     416       
     417      attached_mode=true ; 
     418      if (!CXios::isUsingServer()) attached_mode=false ; 
     419 
     420 
     421      string contextRegistryId=getId() ; 
     422      registryIn=new CRegistry(intraComm); 
     423      registryIn->setPath(contextRegistryId) ; 
     424       
     425      int commRank ; 
     426      MPI_Comm_rank(intraComm_,&commRank) ; 
     427      if (commRank==0) registryIn->fromFile("xios_registry.bin") ; 
     428      registryIn->bcastRegistry() ; 
     429      registryOut=new CRegistry(intraComm_) ; 
     430      registryOut->setPath(contextRegistryId) ; 
     431      
     432   } 
     433   CATCH_DUMP_ATTR 
     434 
     435    
     436   void CContext::initServer(MPI_Comm intraComm, int serviceType) 
     437   TRY 
     438   { 
     439     hasServer=true; 
     440     intraComm_=intraComm ; 
     441     serviceType_=serviceType ; 
     442 
     443     if (serviceType_==CServicesManager::GATHERER) 
     444     { 
     445       hasClient=true ; 
     446       hasServer=true ; 
     447     } 
     448     else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 
     449     { 
     450       hasClient=false ; 
     451       hasServer=true ; 
     452     } 
     453 
     454     CXios::getContextsManager()->getContextId(getId(), contextId_, intraComm) ; 
     455      
     456     registryIn=new CRegistry(intraComm); 
     457     registryIn->setPath(contextId_) ; 
     458      
     459     int commRank ; 
     460     MPI_Comm_rank(intraComm_,&commRank) ; 
     461     if (commRank==0) registryIn->fromFile("xios_registry.bin") ; 
     462     
     463     registryIn->bcastRegistry() ; 
     464     registryOut=new CRegistry(intraComm) ; 
     465     registryOut->setPath(contextId_) ; 
     466 
     467   } 
     468   CATCH_DUMP_ATTR 
     469 
     470 
     471  void CContext::createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) // for servers 
     472  TRY 
     473  { 
     474    MPI_Comm intraCommClient ; 
     475    MPI_Comm_dup(intraComm_, &intraCommClient); 
     476    comms.push_back(intraCommClient); 
     477     
     478    server = new CContextServer(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ? 
     479    client = new CContextClient(this,intraCommClient,interCommClient); 
     480 
     481  } 
     482  CATCH_DUMP_ATTR 
     483 
     484  void CContext::createServerInterComm(void)  
     485  TRY 
     486  { 
     487    
     488    MPI_Comm interCommClient, interCommServer ; 
     489 
     490    if (serviceType_ == CServicesManager::CLIENT) 
     491    { 
     492 
     493      int commRank ; 
     494      MPI_Comm_rank(intraComm_,&commRank) ; 
     495      if (commRank==0) 
     496      { 
     497        if (attached_mode) CXios::getContextsManager()->createServerContext(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, getContextId()) ; 
     498        else if (CXios::usingServer2) CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId()) ; 
     499        else  CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId()) ; 
     500      } 
     501 
     502      MPI_Comm interComm ; 
     503       
     504      if (attached_mode) 
     505      { 
     506        parentServerContext_->createIntercomm(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, getContextId(), intraComm_,  
     507                                              interCommClient, interCommServer) ; 
     508        int type ;  
     509        if (commRank==0) CXios::getServicesManager()->getServiceType(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, type) ; 
     510        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 
     511        setIdServer(CXios::getContextsManager()->getServerContextName(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, type, getContextId())) ; 
     512        setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble 
     513      } 
     514      else if (CXios::usingServer2) 
     515      {  
     516//      CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, interComm) ; 
     517        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, 
     518                                              interCommClient, interCommServer) ; 
     519        int type ;  
     520        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultGathererId, 0, type) ; 
     521        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 
     522        setIdServer(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultGathererId, 0, type, getContextId())) ; 
     523      } 
     524      else 
     525      { 
     526        //CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, interComm) ; 
     527        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, 
     528                                              interCommClient, interCommServer) ; 
     529        int type ;  
     530        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ; 
     531        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 
     532        setIdServer(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultServerId, 0, type, getContextId())) ; 
     533      } 
     534 
     535        // intraComm client is not duplicated. In all the code we use client->intraComm for MPI 
     536        // in future better to replace it by intracommuncator associated to the context 
     537       
     538/*        MPI_Comm intraCommClient, intraCommServer ; 
     539        MPI_Comm interCommClient, interCommServer ; 
     540 
     541        intraCommClient=intraComm_ ; 
     542        MPI_Comm_dup(intraComm_, &intraCommServer) ; 
     543 
     544        interCommClient=interComm ;                
     545        MPI_Comm_dup(interComm, &interCommServer) ; */ 
     546       
     547      MPI_Comm intraCommClient, intraCommServer ; 
     548      intraCommClient=intraComm_ ; 
     549      MPI_Comm_dup(intraComm_, &intraCommServer) ; 
     550      client = new CContextClient(this, intraCommClient, interCommClient); 
     551      server = new CContextServer(this, intraCommServer, interCommServer); 
     552     
     553    } 
     554     
     555    if (serviceType_ == CServicesManager::GATHERER) 
     556    { 
     557      int commRank ; 
     558      MPI_Comm_rank(intraComm_,&commRank) ; 
     559       
     560      int nbPartitions ; 
     561      if (commRank==0)  
     562      {  
     563        CXios::getServicesManager()->getServiceNbPartitions(CXios::defaultPoolId, CXios::defaultServerId, 0, nbPartitions) ; 
     564        for(int i=0 ; i<nbPartitions; i++) 
     565          CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId()) ; 
     566      }       
     567      MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ; 
     568       
     569      MPI_Comm interComm ; 
     570      for(int i=0 ; i<nbPartitions; i++) 
     571      { 
     572//        CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interComm) ; 
     573        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 
     574        int type ;  
     575        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ; 
     576        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 
     577        primServerId_.push_back(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultServerId, i, type, getContextId())) ; 
     578 
     579        // intraComm client is not duplicated. In all the code we use client->intraComm for MPI 
     580        // in future better to replace it by intracommuncator associated to the context 
     581       
     582        MPI_Comm intraCommClient, intraCommServer ; 
     583//        MPI_Comm interCommClient, interCommServer ; 
     584 
     585        intraCommClient=intraComm_ ; 
     586        MPI_Comm_dup(intraComm_, &intraCommServer) ; 
     587 
     588//        interCommClient=interComm ;                
     589//        MPI_Comm_dup(interComm, &interCommServer) ; 
     590 
     591        clientPrimServer.push_back(new CContextClient(this, intraCommClient, interCommClient)); 
     592        serverPrimServer.push_back(new CContextServer(this, intraCommServer, interCommServer));   
     593       
     594      } 
     595    } 
     596  } 
     597  CATCH_DUMP_ATTR 
     598 
     599  
    385600   void CContext::initServer(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtClient /*= 0*/) 
    386601   TRY 
     
    418633   } 
    419634   CATCH_DUMP_ATTR 
     635 
     636  
     637 
     638  bool CContext::eventLoop(bool enableEventsProcessing) 
     639  { 
     640    bool finished=true;  
     641 
     642    if (client!=nullptr && !finalized) client->checkBuffers(); 
     643     
     644    for (int i = 0; i < clientPrimServer.size(); ++i) 
     645    { 
     646      if (!finalized) clientPrimServer[i]->checkBuffers(); 
     647      if (!finalized) finished &= serverPrimServer[i]->eventLoop(enableEventsProcessing); 
     648    } 
     649 
     650    if (server!=nullptr) if (!finalized) finished &= server->eventLoop(enableEventsProcessing); 
     651   
     652    return finalized && finished ; 
     653  } 
    420654 
    421655   //! Try to send the buffers and receive possible answers 
     
    451685    } 
    452686  } 
    453    CATCH_DUMP_ATTR 
    454  
    455    //! Terminate a context 
     687  CATCH_DUMP_ATTR 
     688 
     689 
     690 
     691 
    456692   void CContext::finalize(void) 
    457693   TRY 
     
    462698      } 
    463699     // Send registry upon calling the function the first time 
    464      if (countChildCtx_ == 0) 
    465        if (hasClient) sendRegistry() ; 
     700     if (countChildCtx_ == 0) if (hasClient) sendRegistry() ; 
    466701 
    467702     // Client: 
     
    562797          
    563798         closeAllFile(); // Just move to here to make sure that server-level 1 can close files 
     799         
     800        /*  ym 
    564801         if (hasServer && !hasClient) 
    565802         {            
     
    567804           if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 
    568805         } 
     806        */ 
    569807 
    570808         //! Deallocate client buffers 
     
    589827   CATCH_DUMP_ATTR 
    590828 
     829 
     830 
     831   //! Terminate a context 
     832   void CContext::finalize_old(void) 
     833   TRY 
     834   { 
     835      if (hasClient && !hasServer) // For now we only use server level 1 to read data 
     836      { 
     837        doPreTimestepOperationsForEnabledReadModeFiles(); 
     838      } 
     839     // Send registry upon calling the function the first time 
     840     if (countChildCtx_ == 0) if (hasClient) sendRegistry() ; 
     841 
     842     // Client: 
     843     // (1) blocking send context finalize to its server 
     844     // (2) blocking receive context finalize from its server 
     845     // (3) some memory deallocations 
     846     if (CXios::isClient) 
     847     { 
     848       // Make sure that client (model) enters the loop only once 
     849       if (countChildCtx_ < 1) 
     850       { 
     851         ++countChildCtx_; 
     852 
     853         info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 
     854         client->finalize(); 
     855         info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 
     856         while (client->havePendingRequests()) client->checkBuffers(); 
     857          
     858         info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 
     859         while (!server->hasFinished()) 
     860           server->eventLoop(); 
     861        info(100)<<"DEBUG: context "<<getId()<<" server has finished"<<endl ; 
     862         
     863        bool notifiedFinalized=false ; 
     864        do 
     865        { 
     866          notifiedFinalized=client->isNotifiedFinalized() ; 
     867        } while (!notifiedFinalized) ; 
     868        client->releaseBuffers(); 
     869 
     870         if (hasServer) // Mode attache 
     871         { 
     872           closeAllFile(); 
     873           registryOut->hierarchicalGatherRegistry() ; 
     874           if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 
     875         } 
     876 
     877         //! Deallocate client buffers 
     878//         client->releaseBuffers(); 
     879        info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 
     880         //! Free internally allocated communicators 
     881         for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 
     882           MPI_Comm_free(&(*it)); 
     883         comms.clear(); 
     884 
     885         info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 
     886       } 
     887     } 
     888     else if (CXios::isServer) 
     889     { 
     890       // First context finalize message received from a model 
     891       // Send context finalize to its child contexts (if any) 
     892       if (countChildCtx_ == 0) 
     893         for (int i = 0; i < clientPrimServer.size(); ++i) 
     894         { 
     895           clientPrimServer[i]->finalize(); 
     896           bool bufferReleased; 
     897           do 
     898           { 
     899             clientPrimServer[i]->checkBuffers(); 
     900             bufferReleased = !clientPrimServer[i]->havePendingRequests(); 
     901           } while (!bufferReleased); 
     902            
     903           bool notifiedFinalized=false ; 
     904           do 
     905           { 
     906//             clientPrimServer[i]->checkBuffers(); 
     907             notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ; 
     908           } while (!notifiedFinalized) ; 
     909           clientPrimServer[i]->releaseBuffers(); 
     910         } 
     911            
     912 
     913       // (Last) context finalized message received 
     914       if (countChildCtx_ == clientPrimServer.size()) 
     915       { 
     916         // Blocking send of context finalize message to its client (e.g. primary server or model) 
     917         info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 
     918         client->finalize(); 
     919         info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 
     920         bool bufferReleased; 
     921         do 
     922         { 
     923           client->checkBuffers(); 
     924           bufferReleased = !client->havePendingRequests(); 
     925         } while (!bufferReleased); 
     926          
     927         bool notifiedFinalized=false ; 
     928         do 
     929         { 
     930  //         client->checkBuffers(); 
     931           notifiedFinalized=client->isNotifiedFinalized() ; 
     932         } while (!notifiedFinalized) ; 
     933         client->releaseBuffers(); 
     934          
     935         finalized = true; 
     936         info(100)<<"DEBUG: context "<<getId()<<" bufferRelease OK"<<endl ; 
     937          
     938         closeAllFile(); // Just move to here to make sure that server-level 1 can close files 
     939         
     940        /*  ym 
     941         if (hasServer && !hasClient) 
     942         {            
     943           registryOut->hierarchicalGatherRegistry() ; 
     944           if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 
     945         } 
     946        */ 
     947 
     948         //! Deallocate client buffers 
     949//         client->releaseBuffers(); 
     950         info(100)<<"DEBUG: context "<<getId()<<" client release"<<endl ; 
     951 
     952/*          
     953         for (int i = 0; i < clientPrimServer.size(); ++i) 
     954           clientPrimServer[i]->releaseBuffers(); 
     955*/ 
     956         //! Free internally allocated communicators 
     957         for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 
     958           MPI_Comm_free(&(*it)); 
     959         comms.clear(); 
     960 
     961         info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 
     962       } 
     963 
     964       ++countChildCtx_; 
     965     } 
     966   } 
     967   CATCH_DUMP_ATTR 
     968 
    591969   //! Free internally allocated communicators 
    592970   void CContext::freeComms(void) 
     
    614992     if (allProcessed) return;   
    615993      
     994    // create intercommunicator with servers.  
     995    // not sure it is the good place to be called here  
     996    createServerInterComm() ; 
     997 
     998 
    616999     // After xml is parsed, there are some more works with post processing 
    6171000     postProcessing(); 
     
    7341117   TRY 
    7351118   { 
    736     CTimer::get("Context : close definition").resume() ; 
     1119     CTimer::get("Context : close definition").resume() ; 
     1120     
     1121    // 
    7371122    postProcessingGlobalAttributes(); 
    7381123 
     
    15481933   CATCH_DUMP_ATTR 
    15491934 
     1935   void CContext::setIdServer(const StdString& idServer) 
     1936   TRY 
     1937   { 
     1938      idServer_=idServer ; 
     1939   } 
     1940   CATCH_DUMP_ATTR 
     1941 
     1942    
     1943   const StdString& CContext::getIdServer() 
     1944   TRY 
     1945   { 
     1946      return idServer_; 
     1947   } 
     1948   CATCH_DUMP_ATTR 
     1949 
     1950   const StdString& CContext::getIdServer(const int i) 
     1951   TRY 
     1952   { 
     1953//     return idServer_ + std::to_string(static_cast<unsigned long long>(i)); 
     1954      return primServerId_[i] ; 
     1955   } 
     1956   CATCH_DUMP_ATTR 
     1957 
     1958/* 
    15501959   const StdString& CContext::getIdServer() 
    15511960   TRY 
     
    15701979   } 
    15711980   CATCH_DUMP_ATTR 
     1981*/ 
     1982 
    15721983 
    15731984   /*! 
Note: See TracChangeset for help on using the changeset viewer.