Changeset 2404


Ignore:
Timestamp:
09/19/22 10:38:09 (21 months ago)
Author:
ymipsl
Message:

Add the possibility to launch a service on same ressource than an other.
YM

Location:
XIOS3/trunk/src
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/manager/pool_ressource.cpp

    r2274 r2404  
    2727     
    2828    winNotify_->lockWindow(commRank,0) ; 
    29     winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ;   
     29    winNotify_->updateToWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ; 
    3030    winNotify_->unlockWindow(commRank,0) ;        
    3131    MPI_Barrier(poolComm_) ; 
     
    6060  } 
    6161 
    62    
     62  void CPoolRessource::createServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId) 
     63  { 
     64    // for now suppose nbPartitions=1 
     65     
     66    auto it=occupancy_.begin() ; 
     67    int commSize ; 
     68    MPI_Comm_size(poolComm_, &commSize) ; 
     69     
     70    info(40)<<"CPoolRessource::createService  : notify createServiceOnto to all pool members ; serviceId : "<<serviceId 
     71            <<"  onto service Id  :"<< serviceId<<endl ; 
     72    for(int rank=0; rank<commSize; rank++) createServiceOntoNotify(rank, serviceId, type, onServiceId) ; 
     73  } 
     74 
     75/*   
    6376  void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions,  
    6477                                           bool in) 
     
    7083    winNotify_->unlockWindow(rank,0) ;    
    7184  } 
    72  
    73  
     85*/ 
     86   
     87  void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in) 
     88  { 
     89    notifyType_=NOTIFY_CREATE_SERVICE ; 
     90    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ; 
     91    sendNotification(rank) ; 
     92  } 
     93 
     94 
     95  void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId) 
     96  { 
     97    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ; 
     98    notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ; 
     99    sendNotification(rank) ; 
     100  } 
     101 
     102 
     103  void CPoolRessource::sendNotification(int rank) 
     104  { 
     105    winNotify_->lockWindowExclusive(rank) ; 
     106    winNotify_->pushToLockedWindow(rank, this, &CPoolRessource::notificationsDumpOut) ; 
     107    winNotify_->unlockWindow(rank) ; 
     108  } 
     109 
     110  void CPoolRessource::checkNotifications(void) 
     111  { 
     112    int commRank ; 
     113    MPI_Comm_rank(poolComm_, &commRank) ; 
     114    winNotify_->lockWindowExclusive(commRank) ; 
     115    winNotify_->popFromLockedWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ; 
     116    winNotify_->unlockWindow(commRank) ; 
     117    if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ; 
     118    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ; 
     119  } 
     120 
     121 
     122  void CPoolRessource::notificationsDumpOut(CBufferOut& buffer) 
     123  { 
     124 
     125    buffer.realloc(maxBufferSize_) ; 
     126     
     127    if (notifyType_==NOTIFY_CREATE_SERVICE) 
     128    { 
     129      auto& arg=notifyCreateService_ ; 
     130      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg); 
     131    } 
     132    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 
     133    { 
     134      auto& arg=notifyCreateServiceOnto_ ; 
     135      buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg)  ; 
     136    } 
     137  } 
     138 
     139  void CPoolRessource::notificationsDumpIn(CBufferIn& buffer) 
     140  { 
     141    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ; 
     142    else 
     143    { 
     144      buffer>>notifyType_; 
     145      if (notifyType_==NOTIFY_CREATE_SERVICE) 
     146      { 
     147        auto& arg=notifyCreateService_ ; 
     148        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ; 
     149      } 
     150      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 
     151      { 
     152        auto& arg=notifyCreateServiceOnto_ ; 
     153        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ; 
     154      } 
     155    } 
     156  }   
     157 
     158  void CPoolRessource::createService(void) 
     159  { 
     160    auto& arg = notifyCreateService_ ; 
     161    createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ; 
     162  } 
     163   
     164  void CPoolRessource::createServiceOnto(void) 
     165  { 
     166    auto& arg = notifyCreateServiceOnto_ ; 
     167    createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ; 
     168  } 
     169 
     170/* 
    74171  void CPoolRessource::createServiceDumpOut(CBufferOut& buffer) 
    75172  { 
     
    82179  } 
    83180 
    84  
     181*/ 
     182 
     183/* 
    85184  void CPoolRessource::createServiceDumpIn(CBufferIn& buffer) 
    86185  { 
     
    100199    } 
    101200  } 
     201*/ 
    102202 
    103203  bool CPoolRessource::eventLoop(bool serviceOnly) 
     
    110210    if (time-lastEventLoop_ > eventLoopLatency_)  
    111211    { 
    112       checkCreateServiceNotification() ; 
     212      //checkCreateServiceNotification() ; 
     213      checkNotifications() ; 
    113214      lastEventLoop_=time ; 
    114215    } 
     
    128229    else return false ; 
    129230  } 
    130  
     231/* 
    131232  void CPoolRessource::checkCreateServiceNotification(void) 
    132233  { 
     
    146247 
    147248  } 
     249*/ 
    148250 
    149251  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) 
     
    186288     MPI_Comm_free(&serviceComm) ; 
    187289  } 
     290   
     291  void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId) 
     292  { 
     293      
     294    info(40)<<"CPoolRessource::createNewServiceOnto  : receive createServiceOnto notification ; serviceId : " 
     295            <<serviceId<<"  ontoServiceId : "<<onServiceId<<endl ; 
     296    for(auto& service : services_)  
     297    { 
     298      if (std::get<0>(service.first)==onServiceId) 
     299      { 
     300        const MPI_Comm& serviceComm = service.second->getCommunicator() ; 
     301        MPI_Comm newServiceComm ; 
     302        MPI_Comm_dup(serviceComm, &newServiceComm) ; 
     303        int nbPartitions = service.second->getNbPartitions() ; 
     304        int partitionId = service.second->getPartitionId() ; 
     305        shared_ptr<CEventScheduler>  eventScheduler = service.second->getEventScheduler() ; 
     306        info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "<<onServiceId<<endl  ; 
     307        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, Id_, serviceId, partitionId, type, 
     308                                                                         nbPartitions, eventScheduler) ;        
     309      } 
     310    } 
     311     
     312  } 
    188313 
    189314  void CPoolRessource::createService(MPI_Comm serviceComm, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached 
  • XIOS3/trunk/src/manager/pool_ressource.hpp

    r2274 r2404  
    2121    CWindowManager* winNotify_ ; 
    2222     
    23    public: 
     23    private: 
     24    const int NOTIFY_NOTHING=0 ; 
     25    const int NOTIFY_CREATE_SERVICE=1 ; 
     26    const int NOTIFY_CREATE_SERVICE_ONTO=2 ; 
     27 
     28    public: 
    2429    CPoolRessource(MPI_Comm poolComm, const std::string& Id) ; 
    2530    ~CPoolRessource() ; 
     
    2732    void createService(const std::string& serviceId, int type, int size, int nbPartition) ; 
    2833    void createService(MPI_Comm serviceComm, const std::string& serviceId, int partitionId, int type, int nbPartitions) ;  
    29     void createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; 
    30     void createServiceDumpOut(CBufferOut& buffer) ; 
    31     void createServiceDumpIn(CBufferIn& buffer) ; 
    32     void checkCreateServiceNotification(void) ; 
    33     void createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; 
    34      bool eventLoop(bool serviceOnly=false) ; 
     34    void createServiceOnto(const std::string& serviceId, int type, const std::string& OnServiceId) ; 
     35    bool eventLoop(bool serviceOnly=false) ; 
    3536    CService* getService(const std::string serviceId, int partitionId) { return services_[make_tuple(serviceId,partitionId)]; } 
    3637    void finalizeSignal(void) ; 
    3738    string getId(void) { return Id_; } 
     39 
     40  private: 
     41    void createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in) ; 
     42    void createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId) ; 
     43    void sendNotification(int rank) ; 
     44    void checkNotifications(void) ; 
     45    void notificationsDumpOut(CBufferOut& buffer) ; 
     46    void notificationsDumpIn(CBufferIn& buffer) ; 
     47    void createService(void) ; 
     48    void createServiceOnto(void) ; 
    3849     
    39    private: 
     50//    void createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; 
     51//    void createServiceDumpOut(CBufferOut& buffer) ; 
     52//    void createServiceDumpIn(CBufferIn& buffer) ; 
     53//    void checkCreateServiceNotification(void) ; 
     54    void createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; 
     55    void createNewServiceOnto(const std::string& serviceId, int type, const string& onServiceId) ; 
     56     
     57    private: 
    4058    MPI_Comm poolComm_ ; 
    4159     
    4260    std::multimap<int,int> occupancy_ ; 
    43     std::list<std::tuple<std::string, int, int, int, bool> > notifications_; 
     61 
     62//    std::list<std::tuple<std::string, int, int, int, bool> > notifications_; 
     63     
     64    int notifyType_ ; 
     65    tuple<std::string, int, int, int, bool> notifyCreateService_ ; 
     66    tuple<std::string, int, std::string> notifyCreateServiceOnto_ ; 
     67 
     68 
    4469    std::map< std::tuple<std::string, int>, CService*> services_ ; 
    4570    std::string Id_ ; 
  • XIOS3/trunk/src/manager/services.cpp

    r2287 r2404  
    1010{ 
    1111  CService::CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId,  
    12                      int type, int nbPartitions) : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId), 
    13                                                    partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false) 
     12                     int type, int nbPartitions, shared_ptr<CEventScheduler> eventScheduler)  
     13                         : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId), 
     14                           partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false) 
    1415 
    1516 
     
    3637      CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ; 
    3738    } 
    38     eventScheduler_ = new CEventScheduler(serviceComm_) ; 
     39    if (eventScheduler) eventScheduler_ = eventScheduler ; 
     40    eventScheduler_ = make_shared<CEventScheduler>(serviceComm_) ; 
    3941 
    4042    ostringstream oss; 
     
    4547  CService::~CService() 
    4648  { 
    47     delete eventScheduler_ ; 
    4849    delete winNotify_ ; 
    4950    for(auto& it : contexts_) delete it.second ; 
     
    262263  } 
    263264 
    264   CEventScheduler* CService::getEventScheduler(void) 
     265  shared_ptr<CEventScheduler> CService::getEventScheduler(void) 
    265266  { 
    266267    return eventScheduler_ ; 
  • XIOS3/trunk/src/manager/services.hpp

    r2274 r2404  
    2020 
    2121    CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId,  
    22              int type, int nbPartitions) ; 
     22             int type, int nbPartitions, shared_ptr<CEventScheduler> = nullptr) ; 
    2323    ~CService() ; 
    2424 
     
    3232    CServerContext* getServerContext(const std::string& contextId) { return contexts_[contextId]; } 
    3333    void finalizeSignal(void) ; 
    34     CEventScheduler* getEventScheduler(void) ; 
     34    shared_ptr<CEventScheduler> getEventScheduler(void) ; 
    3535 
    3636    std::string getPoolId(void) {return poolId_;} 
     
    3939    int getType(void) {return type_;} 
    4040    int getNbPartitions(void) {return nbPartitions_;} 
     41    const MPI_Comm& getCommunicator(void) { return serviceComm_ ;} 
    4142     
    4243    private: 
     
    6566    std::map<std::string, CServerContext*> contexts_ ; 
    6667    bool finalizeSignal_ ; 
    67     CEventScheduler* eventScheduler_ ; 
     68    shared_ptr<CEventScheduler> eventScheduler_ ; 
    6869 
    6970    std::string poolId_ ; 
  • XIOS3/trunk/src/manager/services_manager.cpp

    r2403 r2404  
    7676  } 
    7777 
    78   bool CServicesManager::createServicesOnto(const std::string& poolId, const std::string& serviceId, const std::string& OnServiceId, bool wait) 
     78  bool CServicesManager::createServicesOnto(const std::string& poolId, const std::string& serviceId, int type, const std::string& OnServiceId, bool wait) 
    7979  { 
    8080 
     
    9696    { 
    9797      info(40)<<"CServicesManager : create service on other, notification to leader "<<leader<<", serviceId : "<<serviceId<<", service onto : "<<OnServiceId<<endl ; 
    98       createServicesOntoNotify(leader, serviceId, OnServiceId) ; 
     98      createServicesOntoNotify(leader, serviceId, type, OnServiceId) ; 
    9999      return true ; 
    100100    } 
     
    110110 
    111111 
    112   void CServicesManager::createServicesOntoNotify(int rank, const string& serviceId, const string& OnServiceId) 
     112  void CServicesManager::createServicesOntoNotify(int rank, const string& serviceId, int type, const string& OnServiceId) 
    113113  { 
    114114    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ; 
    115     notifyCreateServiceOnto_=make_tuple(serviceId, OnServiceId) ; 
     115    notifyCreateServiceOnto_=make_tuple(serviceId, type, OnServiceId) ; 
    116116    sendNotification(rank) ; 
    117117  } 
     
    160160  void CServicesManager::createServiceOnto(void) 
    161161  { 
    162     auto& arg=notifyCreateService_ ; 
    163     //CServer::getServersRessource()->getPoolRessource()->createService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ; 
     162    auto& arg=notifyCreateServiceOnto_ ; 
     163    CServer::getServersRessource()->getPoolRessource()->createServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ; 
    164164  } 
    165165 
     
    177177    { 
    178178      auto& arg=notifyCreateServiceOnto_ ; 
    179       buffer << notifyType_<< get<0>(arg) << get<1>(arg) ; 
     179      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << get<2>(arg) ; 
    180180    } 
    181181  } 
     
    195195      { 
    196196        auto& arg=notifyCreateServiceOnto_ ; 
    197         buffer >> get<0>(arg) >> get<1>(arg) ; 
     197        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ; 
    198198      } 
    199199    } 
  • XIOS3/trunk/src/manager/services_manager.hpp

    r2403 r2404  
    3838     
    3939    bool createServices(const std::string& poolId, const std::string& serviceId, int type, int size, int nbPartition, bool wait=true) ; 
    40     bool createServicesOnto(const std::string& poolId, const std::string& serviceId, const std::string& onServiceId, bool wait=true) ; 
     40    bool createServicesOnto(const std::string& poolId, const std::string& serviceId, int type, const std::string& onServiceId, bool wait=true) ; 
    4141     
    4242    void eventLoop(void) ; 
     
    5656    void createServiceOnto(void) ;     
    5757    void createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions) ; 
    58     void createServicesOntoNotify(int rank, const string& serviceId, const string& OnServiceId) ; 
     58    void createServicesOntoNotify(int rank, const string& serviceId, int type, const string& OnServiceId) ; 
    5959    void sendNotification(int rank) ; 
    6060    void checkNotifications(void) ; 
     
    7373    int notifyType_ ; 
    7474    tuple<std::string, int, int, int> notifyCreateService_ ; 
    75     tuple<std::string, std::string> notifyCreateServiceOnto_ ; 
     75    tuple<std::string, int, std::string> notifyCreateServiceOnto_ ; 
    7676    
    7777    std::map<tuple<std::string, std::string, int>, std::tuple<int, int, int, int> > services_ ; 
  • XIOS3/trunk/src/node/context.hpp

    r2326 r2404  
    342342         
    343343      private: 
    344          CEventScheduler* eventScheduler_ ; //! The local event scheduler for context 
     344         shared_ptr<CEventScheduler> eventScheduler_ ; //! The local event scheduler for context 
    345345         size_t hashId_ ; //! the local hashId for scheduler 
    346346         size_t timeLine_=0 ; 
  • XIOS3/trunk/src/server.cpp

    r2399 r2404  
    190190          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
    191191          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ; 
     192          servicesManager->createServicesOnto(CXios::defaultPoolId, "default_reader", CServicesManager::READER, CXios::defaultServerId) ; 
    192193        } 
    193194        else 
  • XIOS3/trunk/src/transport/context_server.hpp

    r2343 r2404  
    4848 
    4949      size_t hashId ; 
    50       CEventScheduler* eventScheduler_=nullptr ; 
     50      shared_ptr<CEventScheduler> eventScheduler_=nullptr ; 
    5151  } ; 
    5252   
Note: See TracChangeset for help on using the changeset viewer.