Changeset 2407


Ignore:
Timestamp:
09/21/22 15:53:51 (21 months ago)
Author:
ymipsl
Message:

Implement separate "reader" and "writer" service. Default reader live on same ressources that "writer" or "gatherer" services.

YM

Location:
XIOS3/trunk/src
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/client.cpp

    r2335 r2407  
    434434 
    435435      getPoolRessource()->createService(contextComm, id, 0, CServicesManager::CLIENT, 1) ; 
    436       getPoolRessource()->createService(contextComm, id+"_"+CXios::defaultServerId, 0, CServicesManager::IO_SERVER, 1) ; 
     436      getPoolRessource()->createService(contextComm, id+"_"+CXios::defaultWriterId, 0, CServicesManager::WRITER, 1) ; 
     437      getPoolRessource()->createService(contextComm, id+"_"+CXios::defaultReaderId, 0, CServicesManager::READER, 1) ; 
    437438 
    438439      if (commRank==0) while (!CXios::getServicesManager()->hasService(getPoolRessource()->getId(), id, 0)) { CXios::getDaemonsManager()->eventLoop();} 
  • XIOS3/trunk/src/cxios.cpp

    r2335 r2407  
    2626  const string CXios::defaultPoolId="default_pool_id" ; 
    2727  const string CXios::defaultServerId="default_server_id" ; 
     28  const string CXios::defaultWriterId="default_writer_id" ; 
     29  const string CXios::defaultReaderId="default_reader_id" ; 
    2830  const string CXios::defaultGathererId="default_gatherer_id" ; 
    2931  const string CXios::defaultServicesId="default_services_id" ; 
  • XIOS3/trunk/src/cxios.hpp

    r2335 r2407  
    7171     static const string defaultServerId ; 
    7272     static const string defaultGathererId ; 
     73     static const string defaultWriterId ; 
     74     static const string defaultReaderId ; 
    7375     static const string defaultServicesId ; 
    7476 
  • XIOS3/trunk/src/node/context.cpp

    r2406 r2407  
    515515       hasServer=true ; 
    516516     } 
    517      else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 
     517     else if (serviceType_==CServicesManager::WRITER || serviceType_==CServicesManager::READER) 
    518518     { 
    519519       hasClient=false ; 
     
    549549    client->setAssociatedServer(server) ;   
    550550    server->setAssociatedClient(client) ; 
    551  
    552     writerServerIn_.push_back(server) ;  
    553     writerClientIn_.push_back(client) ;  
     551     
     552    if (serviceType_ == CServicesManager::GATHERER || serviceType_ == CServicesManager::WRITER) 
     553    {   
     554      writerServerIn_.push_back(server) ;  
     555      writerClientIn_.push_back(client) ;  
     556    } 
     557    else if (serviceType_ == CServicesManager::READER) 
     558    { 
     559      readerServerIn_.push_back(server) ;  
     560      readerClientIn_.push_back(client) ;  
     561    } 
    554562  } 
    555563  CATCH_DUMP_ATTR 
     
    592600       
    593601      clientServers.push_back({fullServerId,{client,server}}) ; 
     602      clientsId_[client] = fullServerId ; 
     603      serversId_[server] = fullServerId ; 
    594604    } 
    595605  } 
     
    603613    if (serviceType_ == CServicesManager::CLIENT) 
    604614    { 
    605       if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultServerId, clientServers) ; 
     615      if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultWriterId, clientServers) ; 
    606616      else if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ; 
    607       else createServerInterComm(CXios::defaultPoolId, CXios::defaultServerId, clientServers) ; 
     617      else createServerInterComm(CXios::defaultPoolId, CXios::defaultWriterId, clientServers) ; 
    608618       
    609619      writerClientOut_.push_back(clientServers[0].second.first) ;  
    610620      writerServerOut_.push_back(clientServers[0].second.second) ; 
     621 
     622      clientServers.clear() ; 
     623    
     624      if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultReaderId, clientServers) ; 
     625      else createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ; 
     626      readerClientOut_.push_back(clientServers[0].second.first) ;  
     627      readerServerOut_.push_back(clientServers[0].second.second) ; 
     628 
     629 
    611630    } 
    612631    else if (serviceType_ == CServicesManager::GATHERER) 
    613632    { 
    614       createServerInterComm(CXios::defaultPoolId, CXios::defaultServerId, clientServers) ; 
     633      createServerInterComm(CXios::defaultPoolId, CXios::defaultWriterId, clientServers) ; 
    615634      for(auto& clientServer : clientServers) 
    616635      { 
    617         primServerId_.push_back(clientServer.first) ; 
    618         clientPrimServer.push_back(clientServer.second.first); 
    619         serverPrimServer.push_back(clientServer.second.second); 
    620636        writerClientOut_.push_back(clientServer.second.first) ;  
    621637        writerServerOut_.push_back(clientServer.second.second) ; 
     
    672688      for(auto client : writerClientIn_) client->eventLoop(); 
    673689      for(auto server : writerServerIn_) finished &= server->eventLoop(enableEventsProcessing); 
     690      for(auto client : readerClientOut_) client->eventLoop(); 
     691      for(auto server : readerServerOut_) finished &= server->eventLoop(enableEventsProcessing); 
     692      for(auto client : readerClientIn_) client->eventLoop(); 
     693      for(auto server : readerServerIn_) finished &= server->eventLoop(enableEventsProcessing); 
    674694      for(auto couplerOut : couplerOutClient_) couplerOut.second->eventLoop(); 
    675695      for(auto couplerIn : couplerInClient_) couplerIn.second->eventLoop(); 
     
    756776        } while (!couplersInFinalized) ; 
    757777 
    758  
    759         auto& client=writerClientOut_[0] ; 
    760         auto& server=writerServerOut_[0] ; 
    761  
    762         info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 
    763         client->finalize(); 
    764         info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 
    765         while (client->havePendingRequests()) client->eventLoop(); 
    766         info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 
    767         bool notifiedFinalized=false ; 
    768         do 
     778        CContextClient* client ; 
     779        CContextServer* server ; 
     780 
     781        if (writerClientOut_.size()!=0) 
    769782        { 
    770           notifiedFinalized = client->isNotifiedFinalized() ; 
    771         } while (!notifiedFinalized) ; 
    772  
    773         server->releaseBuffers(); 
    774         client->releaseBuffers(); 
    775         info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 
     783          client=writerClientOut_[0] ; 
     784          server=writerServerOut_[0] ; 
     785 
     786          info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to writer"<<endl ; 
     787          client->finalize(); 
     788          info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to writer"<<endl ; 
     789          while (client->havePendingRequests()) client->eventLoop(); 
     790          info(100)<<"DEBUG: context "<<getId()<<" no pending request on writer ok"<<endl ; 
     791          bool notifiedFinalized=false ; 
     792          do 
     793          { 
     794            notifiedFinalized = client->isNotifiedFinalized() ; 
     795          } while (!notifiedFinalized) ; 
     796 
     797          server->releaseBuffers(); 
     798          client->releaseBuffers(); 
     799          info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 
     800        } 
     801 
     802        if (readerClientOut_.size()!=0) 
     803        { 
     804          client=readerClientOut_[0] ; 
     805          server=readerServerOut_[0] ; 
     806 
     807          info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to reader"<<endl ; 
     808          client->finalize(); 
     809          info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to reader"<<endl ; 
     810          while (client->havePendingRequests()) client->eventLoop(); 
     811          info(100)<<"DEBUG: context "<<getId()<<" no pending request on reader ok"<<endl ; 
     812          bool notifiedFinalized=false ; 
     813         do 
     814         { 
     815            notifiedFinalized = client->isNotifiedFinalized() ; 
     816         }   while (!notifiedFinalized) ; 
     817 
     818          server->releaseBuffers(); 
     819          client->releaseBuffers(); 
     820          info(100)<<"DEBUG: context "<<getId()<<" release client reader ok"<<endl ; 
     821        } 
    776822      } 
    777823      else if (serviceType_==CServicesManager::GATHERER) 
     
    797843         //ym writerClientIn & writerServerIn not released here ==> to check !! 
    798844      } 
    799       else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 
     845      else if (serviceType_==CServicesManager::WRITER) 
    800846      { 
    801847        closeAllFile(); 
    802848        writerClientIn_[0]->releaseBuffers(); 
    803849        writerServerIn_[0]->releaseBuffers(); 
     850      } 
     851      else if (serviceType_==CServicesManager::READER) 
     852      { 
     853        closeAllFile(); 
     854        readerClientIn_[0]->releaseBuffers(); 
     855        readerServerIn_[0]->releaseBuffers(); 
    804856      } 
    805857 
     
    9691021 
    9701022    // client side, assign context for file reading 
    971     if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(writerClientOut_[0]) ; 
     1023    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 
    9721024     
    9731025    // server side, assign context where to send file data read 
    974     if (serviceType_==CServicesManager::CServicesManager::GATHERER || serviceType_==CServicesManager::IO_SERVER)  
    975       for(auto file : this->enabledReadModeFiles) file->setContextClient(writerClientIn_[0]) ; 
     1026    if (serviceType_==CServicesManager::READER) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientIn_[0]) ; 
    9761027    
    9771028 
     
    10001051 
    10011052    // workflow endpoint => write to file 
    1002     if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 
     1053    if (serviceType_==CServicesManager::WRITER) 
    10031054    { 
    10041055      for(auto field : fileOutField)  
     
    10091060     
    10101061    // workflow endpoint => Send data from server to client 
    1011     if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER) 
     1062    if (serviceType_==CServicesManager::READER || serviceType_==CServicesManager::GATHERER) 
    10121063    { 
    10131064      for(auto field : fileInField)  
     
    10351086     
    10361087    // workflow startpoint => data from client on server side 
    1037     if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER || serviceType_==CServicesManager::OUT_SERVER) 
     1088    if (serviceType_==CServicesManager::WRITER || serviceType_==CServicesManager::GATHERER) 
    10381089    { 
    10391090      for(auto field : fieldModelIn)  
     
    10561107 
    10571108    // workflow startpoint => data read from file on server side 
    1058     if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER) 
     1109    if (serviceType_==CServicesManager::READER) 
    10591110    { 
    10601111      for(auto field : fileInField)  
     
    10651116     
    10661117    // construct slave server list 
     1118    map<string, CContextClient*> slaves ; // need an ordered list ;  
    10671119    if (serviceType_==CServicesManager::CLIENT)  
    10681120    { 
    1069       for(auto field : fileOutField) slaveServers_.insert(field->getContextClient()) ;  
    1070       for(auto field : fileInField) slaveServers_.insert(field->getContextClient()) ;  
     1121      for(auto field : fileOutField) slaves[clientsId_[field->getContextClient()]] = field->getContextClient() ;  
     1122      for(auto field : fileInField) slaves[clientsId_[field->getContextClient()]] = field->getContextClient() ;  
    10711123    } 
    10721124    else if (serviceType_==CServicesManager::GATHERER)  
    1073       for(auto field : fileOutField) slaveServers_.insert(field->getContextClient()) ;  
     1125      for(auto field : fileOutField) slaves[clientsId_[field->getContextClient()]] = field->getContextClient() ; 
     1126    for(auto& slave : slaves) slaveServers_.push_back(slave.second) ;    
     1127 
    10741128 
    10751129    for(auto& slaveServer : slaveServers_) sendCloseDefinition(slaveServer) ; 
    10761130 
    1077     if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)   
     1131    if (serviceType_==CServicesManager::WRITER)   
    10781132    { 
    10791133      createFileHeader(); 
  • XIOS3/trunk/src/node/context.hpp

    r2406 r2407  
    308308         bool hasServer; 
    309309 
    310       private: 
    311          std::vector<CContextServer*> serverPrimServer; 
    312          std::vector<CContextClient*> clientPrimServer; 
    313  
    314          // list of slave servers (IO server or others) 
    315          set<CContextClient*> slaveServers_ ; 
    316310 
    317311      private: 
     
    333327        std::map<std::string, CContextClient*> clients_ ; 
    334328        std::map<std::string, CContextClient*> servers_ ; 
    335  
    336       private: 
    337          // the map containing context client associated to it string id for coupling out ; 
    338          std::map<std::string, CContextClient*> couplerOutClient_ ; 
    339          // the map containing context server associated to it string id for coupling out ; 
    340          std::map<std::string, CContextServer*> couplerOutServer_ ; 
    341          // the map containing context client associated to it string id for coupling in ; 
    342          std::map<std::string, CContextClient*> couplerInClient_ ; 
    343          // the map containing context server associated to it string id for coupling in ; 
    344          std::map<std::string, CContextServer*> couplerInServer_ ; 
     329        std::map<CContextClient*, std::string> clientsId_ ; 
     330        std::map<CContextServer*, std::string> serversId_ ; 
     331 
     332        // list of slave servers (IO server or others) 
     333        std::vector<CContextClient*> slaveServers_ ; 
     334 
     335        // the map containing context client associated to it string id for coupling out ; 
     336        std::map<std::string, CContextClient*> couplerOutClient_ ; 
     337        // the map containing context server associated to it string id for coupling out ; 
     338        std::map<std::string, CContextServer*> couplerOutServer_ ; 
     339        // the map containing context client associated to it string id for coupling in ; 
     340        std::map<std::string, CContextClient*> couplerInClient_ ; 
     341        // the map containing context server associated to it string id for coupling in ; 
     342        std::map<std::string, CContextServer*> couplerInServer_ ; 
    345343      public: 
    346344         CContextClient* getCouplerInClient(const string& contextId) { return couplerInClient_[contextId] ;} 
     
    348346         CContextClient* getCouplerOutClient(const string& contextId) { return couplerOutClient_[contextId] ;} 
    349347         CContextServer* getCouplerOutServer(const string& contextId) { return couplerOutServer_[contextId] ;} 
    350        
    351    
    352          std::vector<std::string> primServerId_; 
    353348 
    354349         CRegistry* registryIn=nullptr ;    //!< input registry which is read from file 
  • XIOS3/trunk/src/server.cpp

    r2404 r2407  
    189189        { 
    190190          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
    191           servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ; 
    192           servicesManager->createServicesOnto(CXios::defaultPoolId, "default_reader", CServicesManager::READER, CXios::defaultServerId) ; 
     191          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ; 
     192          servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ; 
    193193        } 
    194194        else 
     
    201201          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
    202202          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 
    203           servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ; 
     203          servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ; 
     204          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ; 
    204205 
    205206 
    206207        } 
    207         servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ; 
     208//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ; 
    208209      } 
    209210      CTimer::get("XIOS initialize").suspend() ; 
  • XIOS3/trunk/src/transport/one_sided_context_server.cpp

    r2399 r2407  
    201201          CEventServer event(this) ; 
    202202          for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine, event) ; 
    203           MPI_Barrier(intraComm) ; 
     203//          MPI_Barrier(intraComm) ; 
    204204          CTimer::get("Process events").resume(); 
    205205          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event.classId<<" of type "<<event.type<<endl ; 
Note: See TracChangeset for help on using the changeset viewer.