Ignore:
Timestamp:
10/11/21 14:41:56 (3 years ago)
Author:
ymipsl
Message:
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

Location:
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager
Files:
16 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/contexts_manager.cpp

    r1765 r2246  
    77#include "servers_ressource.hpp" 
    88#include "server.hpp" 
     9#include "timer.hpp" 
    910#include <functional> 
    1011 
     
    4849    int serviceLeader ; 
    4950    auto servicesManager = CXios::getServicesManager() ; 
    50      
     51    
    5152    bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ; 
    5253 
     54    info(40)<<"CContextsManager::createServerContext : waiting for service leader ;  serviceId : "<<serviceId<<endl ; 
    5355    if (wait) 
    5456    { 
     
    6466      notifyType_=NOTIFY_CREATE_CONTEXT ; 
    6567      notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ; 
     68      info(40)<<"CContextsManager::createServerContext : notification create_context to service leader "<<serviceLeader<<", serviceId : "<<serviceId<<", contextId "<<contextId<<endl ; 
    6669      sendNotification(serviceLeader) ; 
    6770      return true ; 
     
    8083     
    8184    int type ; 
     85    info(40)<<"CContextsManager::createServerContextIntercomm : waiting for context leader ;  contextId : "<<contextId<<endl ; 
    8286    ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ; 
    8387    if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ; 
     
    96100      notifyType_=NOTIFY_CREATE_INTERCOMM ; 
    97101      notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ; 
     102      info(40)<<"CContextsManager::createServerContextIntercomm : notification create_intercomm to context leader : "<<contextLeader<<", contextId :"<<contextId<<endl ; 
    98103      sendNotification(contextLeader) ; 
    99104      return true ; 
     
    149154  void CContextsManager::eventLoop(void) 
    150155  { 
    151     checkNotifications() ; 
     156    CTimer::get("CContextsManager::eventLoop").resume(); 
     157    double time=MPI_Wtime() ; 
     158    if (time-lastEventLoop_ > eventLoopLatency_)  
     159    { 
     160      checkNotifications() ; 
     161      lastEventLoop_=time ; 
     162    } 
     163    CTimer::get("CContextsManager::eventLoop").suspend(); 
    152164  } 
    153165   
     
    166178  void CContextsManager::createServerContext(void) 
    167179  { 
     180    info(40)<<"CContextsManager::createServerContext : receive create server context notification"<<endl ; 
    168181    auto arg=notifyCreateContext_ ; 
    169182    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg)) 
     
    174187  void CContextsManager::createServerContextIntercomm(void) 
    175188  { 
     189    info(40)<<"CContextsManager::createServerContext : receive create intercomm context notification"<<endl ; 
    176190    auto arg=notifyCreateIntercomm_ ; 
    177191    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg)) 
     
    194208  void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo) 
    195209  { 
    196     winContexts_->lockWindow(managerGlobalLeader_,0) ; 
    197     winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 
     210    winContexts_->lockWindowExclusive(managerGlobalLeader_) ; 
     211    winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 
     212    winContexts_->flushWindow(managerGlobalLeader_) ; 
    198213    contexts_[fullContextId] = contextInfo ; 
    199     winContexts_->updateToWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ; 
    200     winContexts_->unlockWindow(managerGlobalLeader_,0) ;     
     214    winContexts_->updateToLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ; 
     215    winContexts_->unlockWindow(managerGlobalLeader_) ; 
    201216  } 
    202217 
     
    210225    { 
    211226 
    212       winContexts_->lockWindow(managerGlobalLeader_,0) ; 
    213       winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 
    214       winContexts_->unlockWindow(managerGlobalLeader_,0) ; 
     227      winContexts_->lockWindowShared(managerGlobalLeader_) ; 
     228      winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 
     229      winContexts_->unlockWindow(managerGlobalLeader_) ; 
    215230 
    216231      auto it=contexts_.find(fullContextId) ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/contexts_manager.hpp

    r1765 r2246  
    8282 
    8383    int managerGlobalLeader_ ; 
    84      
     84 
     85    const double eventLoopLatency_=1e-2;  
     86    double lastEventLoop_=0. ; 
     87 
    8588    friend class CWindowManager ; 
    8689  
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/daemons_manager.cpp

    r2209 r2246  
    4141  bool CDaemonsManager::eventLoop(void) 
    4242  { 
     43     
    4344    CXios::getRessourcesManager()->eventLoop() ; 
    4445    CXios::getServicesManager()->eventLoop() ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/pool_ressource.cpp

    r2208 r2246  
    66#include "type.hpp" 
    77#include "cxios.hpp" 
     8#include "timer.hpp" 
    89 
    910namespace xios 
     
    1617    MPI_Comm_rank(poolComm, &commRank) ; 
    1718    MPI_Comm_size(poolComm, &commSize) ; 
    18  
    19  
     19    info(40)<<"CPoolRessource::CPoolRessource  : creating new pool : "<<Id<<endl ; 
    2020    if (commRank==localLeader_) 
    2121    { 
     
    5151    occupancy_.erase(occupancy_.begin(),it) ; 
    5252    occupancy_.insert(procs_update.begin(),procs_update.end()) ; 
    53  
     53     
     54    info(40)<<"CPoolRessource::createService  : notify createService to all pool members ; serviceId : "<<serviceId<<endl ; 
    5455    for(int rank=0; rank<commSize; rank++) 
    5556    { 
     
    102103  bool CPoolRessource::eventLoop(bool serviceOnly) 
    103104  { 
    104     checkCreateServiceNotification() ; 
     105    CTimer::get("CPoolRessource::eventLoop").resume(); 
     106    
     107    double time=MPI_Wtime() ; 
     108    if (time-lastEventLoop_ > eventLoopLatency_)  
     109    { 
     110      checkCreateServiceNotification() ; 
     111      lastEventLoop_=time ; 
     112    } 
     113     
    105114    for (auto it=services_.begin(); it!=services_.end() ; ++it)  
    106115    { 
     
    112121      } 
    113122    } 
    114  
     123    CTimer::get("CPoolRessource::eventLoop").suspend(); 
    115124    if (services_.empty() && finalizeSignal_) return true ; 
    116125    else return false ; 
     
    137146  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) 
    138147  { 
     148      
     149     info(40)<<"CPoolRessource::createNewService  : receive createService notification ; serviceId : "<<serviceId<<endl ; 
    139150     MPI_Comm serviceComm, newServiceComm ; 
    140151     int commRank ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/pool_ressource.hpp

    r1764 r2246  
    4444    std::string Id_ ; 
    4545    bool finalizeSignal_ ; 
    46  
     46     
     47    const double eventLoopLatency_=1e-2;  
     48    double lastEventLoop_=0. ; 
    4749  }; 
    4850 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/ressources_manager.cpp

    r1764 r2246  
    22#include "server.hpp" 
    33#include "servers_ressource.hpp" 
     4#include "timer.hpp" 
    45 
    56 
     
    4344  void CRessourcesManager::createPool(const string& poolId, int size) 
    4445  { 
     46    info(40)<<"CRessourcesManager::createPool : calling createPool : "<<poolId<<"  of size"<<size<<endl ; 
     47    info(40)<<"send notification to leader : "<<serverLeader_<<endl ; 
    4548    winRessources_->lockWindow(managerGlobalLeader_,0) ; 
    4649    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; 
     
    4952    notifyType_=NOTIFY_CREATE_POOL ; 
    5053    notifyCreatePool_=make_tuple(poolId, size) ; 
     54    info(40)<<"CRessourcesManager::createPool : send notification creating pool to server leader "<<serverLeader_<<endl ; 
    5155    sendNotification(serverLeader_) ;  
    5256  } 
     
    6165    { 
    6266      notifyType_=NOTIFY_FINALIZE ; 
     67      info(40)<<"CRessourcesManager::finalize : send notification finalize to server leader "<<serverLeader_<<endl ; 
    6368      sendNotification(serverLeader_) ; 
    6469    }  
     
    107112  void CRessourcesManager::eventLoop(void) 
    108113  { 
    109     checkNotifications() ; 
     114    CTimer::get("CRessourcesManager::eventLoop").resume(); 
     115    double time=MPI_Wtime() ; 
     116    if (time-lastEventLoop_ > eventLoopLatency_)  
     117    { 
     118      checkNotifications() ; 
     119      lastEventLoop_=time ; 
     120    } 
     121 
     122    CTimer::get("CRessourcesManager::eventLoop").suspend(); 
    110123  } 
    111124   
     
    114127    int commRank ; 
    115128    MPI_Comm_rank(xiosComm_, &commRank) ; 
     129    CTimer::get("CRessourcesManager::checkNotifications lock").resume(); 
    116130    winNotify_->lockWindow(commRank,0) ; 
     131    CTimer::get("CRessourcesManager::checkNotifications lock").suspend(); 
     132    CTimer::get("CRessourcesManager::checkNotifications pop").resume(); 
    117133    winNotify_->popFromWindow(commRank, this, &CRessourcesManager::notificationsDumpIn) ; 
     134    CTimer::get("CRessourcesManager::checkNotifications pop").suspend(); 
     135    CTimer::get("CRessourcesManager::checkNotifications unlock").resume(); 
    118136    winNotify_->unlockWindow(commRank,0) ; 
     137    CTimer::get("CRessourcesManager::checkNotifications unlock").suspend(); 
    119138    if (notifyType_==NOTIFY_CREATE_POOL) createPool() ; 
    120139    else if (notifyType_==NOTIFY_FINALIZE) finalizeSignal() ; 
     
    123142  void CRessourcesManager::createPool(void) 
    124143  { 
     144     
    125145    auto& arg=notifyCreatePool_ ; 
    126146    string poolId=get<0>(arg) ; 
    127147    int size=get<1>(arg) ; 
     148    info(40)<<"CRessourcesManager::createPool : receive create pool notification : "<< poolId<<"  of size "<<size<<endl ; 
    128149    CServer::getServersRessource()->createPool(poolId,size) ; 
    129150  }  
     
    131152  void CRessourcesManager::finalizeSignal(void) 
    132153  { 
     154    info(40)<<"CRessourcesManager::createPool : receive finalize notification"<<endl ; 
    133155    CServer::getServersRessource()->finalize() ; 
    134156  } 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/ressources_manager.hpp

    r1764 r2246  
    7272    int freeRessourcesSize_ ; 
    7373 
     74    const double eventLoopLatency_=1e-2;  
     75    double lastEventLoop_=0. ; 
     76 
    7477    friend class CWindowManager ; 
    7578  } ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.cpp

    r2230 r2246  
    66#include "register_context_info.hpp" 
    77#include "services.hpp" 
     8#include "timer.hpp" 
    89 
    910 
     
    1819                                 hasNotification_(false) 
    1920  { 
     21   info(40)<<"CCServerContext::CServerContext  : new context creation ; contextId : "<<contextId<<endl ; 
    2022   int localRank, globalRank, commSize ; 
    2123 
     
    5860                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait) 
    5961  { 
     62    info(40)<<"CServerContext::createIntercomm  : context intercomm creation ; contextId : "<<contextId<<endl ; 
    6063    int intraCommRank ; 
    6164    MPI_Comm_rank(intraComm, &intraCommRank) ; 
     
    145148     int commSize ; 
    146149     MPI_Comm_size(contextComm_,&commSize) ; 
     150     info(40)<<"CServerContext::createIntercomm  : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ; 
     151     
    147152     for(int rank=0; rank<commSize; rank++) 
    148153     { 
     
    191196    if (!hasNotification_) 
    192197    { 
    193       int commRank ; 
    194       MPI_Comm_rank(contextComm_, &commRank) ; 
    195       winNotify_->lockWindow(commRank,0) ; 
    196       winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ; 
    197       winNotify_->unlockWindow(commRank,0) ; 
     198      double time=MPI_Wtime() ; 
     199      if (time-lastEventLoop_ > eventLoopLatency_)  
     200      { 
     201        int commRank ; 
     202        MPI_Comm_rank(contextComm_, &commRank) ; 
     203        winNotify_->lockWindow(commRank,0) ; 
     204        winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ; 
     205        winNotify_->unlockWindow(commRank,0) ; 
    198206       
    199       if (notifyInType_!= NOTIFY_NOTHING) 
    200       { 
    201         hasNotification_=true ; 
    202         auto eventScheduler=parentService_->getEventScheduler() ; 
    203         std::hash<string> hashString ; 
    204         size_t hashId = hashString(name_) ; 
    205         size_t currentTimeLine=0 ; 
    206         eventScheduler->registerEvent(currentTimeLine,hashId);  
     207        if (notifyInType_!= NOTIFY_NOTHING) 
     208        { 
     209          hasNotification_=true ; 
     210          auto eventScheduler=parentService_->getEventScheduler() ; 
     211          std::hash<string> hashString ; 
     212          size_t hashId = hashString(name_) ; 
     213          size_t currentTimeLine=0 ; 
     214          eventScheduler->registerEvent(currentTimeLine,hashId);  
     215        } 
     216        lastEventLoop_=time ; 
    207217      } 
    208218    } 
     
    225235  bool CServerContext::eventLoop(bool serviceOnly) 
    226236  { 
     237    CTimer::get("CServerContext::eventLoop").resume(); 
    227238    bool finished=false ; 
    228     if (winNotify_!=nullptr) checkNotifications() ; 
     239     
     240//    double time=MPI_Wtime() ; 
     241//    if (time-lastEventLoop_ > eventLoopLatency_)  
     242//    { 
     243      if (winNotify_!=nullptr) checkNotifications() ; 
     244//      lastEventLoop_=time ; 
     245//    } 
     246 
     247 
    229248    if (!serviceOnly && context_!=nullptr)   
    230249    { 
     
    235254      } 
    236255    } 
    237  
     256    CTimer::get("CServerContext::eventLoop").suspend(); 
    238257    if (context_==nullptr && finalizeSignal_) finished=true ; 
    239258    return finished ; 
     
    242261  void CServerContext::createIntercomm(void) 
    243262  { 
     263    info(40)<<"CServerContext::createIntercomm  : received createIntercomm notification"<<endl ; 
     264 
    244265     MPI_Comm interCommServer, interCommClient ; 
    245266     auto& arg=notifyInCreateIntercomm_ ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.hpp

    r2130 r2246  
    6161    bool isAttachedMode_ ; 
    6262 
     63    const double eventLoopLatency_=1e-2;  
     64    double lastEventLoop_=0. ; 
     65 
    6366    friend class CWindowManager ; 
    6467  } ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/servers_ressource.cpp

    r2220 r2246  
    55#include "cxios.hpp" 
    66#include "mpi.hpp" 
     7#include "timer.hpp" 
    78#include <vector> 
    89#include <string> 
     
    115116  bool CServersRessource::eventLoop(bool serviceOnly) 
    116117  { 
    117     checkNotifications() ; 
     118    CTimer::get("CServersRessource::eventLoop").resume(); 
     119    double time=MPI_Wtime() ; 
     120    if (time-lastEventLoop_ > eventLoopLatency_)  
     121    { 
     122      checkNotifications() ; 
     123      lastEventLoop_=time ; 
     124    } 
     125 
    118126    if (poolRessource_!=nullptr)  
    119127    { 
     
    124132      }  
    125133    } 
    126  
     134    CTimer::get("CServersRessource::eventLoop").suspend(); 
    127135    if (poolRessource_==nullptr && finalizeSignal_) return true ; 
    128136    else return false ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/servers_ressource.hpp

    r1764 r2246  
    5050    bool finalizeSignal_ ; 
    5151 
     52    const double eventLoopLatency_=1e-2;  
     53    double lastEventLoop_=0. ; 
     54 
    5255    friend class CWindowManager ; 
    5356  } ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.cpp

    r2230 r2246  
    55#include "server_context.hpp" 
    66#include "event_scheduler.hpp" 
     7#include "timer.hpp" 
    78 
    89namespace xios 
     
    1415 
    1516  { 
     17    info(40)<<"CService::CService  : new service created ; serviceId : "<<serviceId<<endl ; 
     18    
    1619    int localRank, globalRank, commSize ; 
    1720 
     
    4447    int commSize ; 
    4548    MPI_Comm_size(serviceComm_, &commSize) ; 
    46      
     49    info(40)<<"CService::createContext  : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ; 
     50 
    4751    for(int rank=0; rank<commSize; rank++)  
    4852    { 
     
    5155      sendNotification(rank) ; 
    5256    } 
     57    info(40)<<"CService::createContext  : notify CreateContext to all services members : DONE "<<endl ; 
    5358  } 
    5459/* 
     
    99104  { 
    100105    //checkCreateContextNotification() ; 
    101     checkNotifications() ; 
     106    CTimer::get("CService::eventLoop").resume(); 
     107     
     108//    double time=MPI_Wtime() ; 
     109//    if (time-lastEventLoop_ > eventLoopLatency_)  
     110//    { 
     111      checkNotifications() ; 
     112//      lastEventLoop_=time ; 
     113//    } 
     114 
    102115 
    103116    eventScheduler_->checkEvent() ; 
     
    111124      } ; 
    112125    } 
    113  
     126    CTimer::get("CService::eventLoop").suspend(); 
    114127    if (contexts_.empty() && finalizeSignal_) return true ; 
    115128    else return false ; 
     
    144157      if (notifyInType_==NOTIFY_CREATE_CONTEXT) 
    145158      { 
    146         info(10)<<"NotifyDumpOut"<<endl ; 
    147159        auto& arg=notifyInCreateContext_ ; 
    148160        buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg); 
     
    158170    if (!hasNotification_) 
    159171    { 
    160       int commRank ; 
    161       MPI_Comm_rank(serviceComm_, &commRank) ; 
    162       winNotify_->lockWindow(commRank,0) ; 
    163       winNotify_->popFromWindow(commRank, this, &CService::notificationsDumpIn) ; 
    164       winNotify_->unlockWindow(commRank,0) ; 
     172      double time=MPI_Wtime() ; 
     173      if (time-lastEventLoop_ > eventLoopLatency_)  
     174      { 
     175        int commRank ; 
     176        MPI_Comm_rank(serviceComm_, &commRank) ; 
     177        winNotify_->lockWindow(commRank,0) ; 
     178        winNotify_->popFromWindow(commRank, this, &CService::notificationsDumpIn) ; 
     179        winNotify_->unlockWindow(commRank,0) ; 
    165180       
    166       if (notifyInType_!= NOTIFY_NOTHING) 
    167       { 
    168         hasNotification_=true ; 
    169         std::hash<string> hashString ; 
    170         size_t hashId = hashString(name_) ; 
    171         size_t currentTimeLine=0 ; 
    172         eventScheduler_->registerEvent(currentTimeLine,hashId);  
     181        if (notifyInType_!= NOTIFY_NOTHING) 
     182        { 
     183          hasNotification_=true ; 
     184          std::hash<string> hashString ; 
     185          size_t hashId = hashString(name_) ; 
     186          size_t currentTimeLine=0 ; 
     187          info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler"<<endl ; 
     188          eventScheduler_->registerEvent(currentTimeLine,hashId);  
     189        } 
     190        lastEventLoop_=time ; 
    173191      } 
    174192    } 
     
    179197      size_t hashId = hashString(name_) ; 
    180198      size_t currentTimeLine=0 ; 
     199      info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ; 
    181200      if (eventScheduler_->queryEvent(currentTimeLine,hashId)) 
    182201      { 
    183202        eventScheduler_->popEvent() ; 
     203        info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ; 
    184204        if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ; 
    185205        hasNotification_=false ; 
     
    190210 
    191211 
    192  
     212//ym not use any more 
    193213  void CService::checkCreateContextNotification(void) 
    194214  { 
     
    210230  void CService::createContext(void) 
    211231   { 
     232     info(40)<<"CService::createContext(void)  : receive createContext notification"<<endl ; 
    212233     auto& arg=notifyInCreateContext_ ; 
    213234     string poolId = get<0>(arg) ; 
     
    218239   } 
    219240 
    220    //to remove 
     241   //to remove, not used anymore 
    221242   void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId) 
    222243   { 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.hpp

    r2130 r2246  
    7171    int nbPartitions_ ; 
    7272 
     73    const double eventLoopLatency_=1e-2;  
     74    double lastEventLoop_=0. ; 
     75 
    7376  }; 
    7477 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services_manager.cpp

    r1764 r2246  
    77#include "server.hpp" 
    88#include "servers_ressource.hpp" 
     9#include "timer.hpp" 
    910 
    1011namespace xios 
     
    5556    int poolSize ; 
    5657     
     58    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ; 
    5759    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ; 
    5860    if (wait) 
     
    6769    if (ok)  
    6870    { 
     71      info(40)<<"CServicesManager : create service notification to leader "<<leader<<", serviceId : "<<serviceId<<", size : "<<size<<endl ; 
    6972      createServicesNotify(leader, serviceId, type, size, nbPartitions) ; 
    7073      return true ; 
     
    9497    { 
    9598      auto info = notifications_.front() ; 
     99      xios::info(40)<<"CServicesManager : receive create service notification : "<<get<0>(info)<<endl ; 
    96100      CServer::getServersRessource()->getPoolRessource()->createService(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ; 
    97101      notifications_.pop_front() ; 
     
    104108  void CServicesManager::eventLoop(void) 
    105109  { 
    106     checkCreateServicesNotification() ; 
     110    CTimer::get("CServicesManager::eventLoop").resume(); 
     111    double time=MPI_Wtime() ; 
     112    if (time-lastEventLoop_ > eventLoopLatency_)  
     113    { 
     114      checkCreateServicesNotification() ; 
     115      lastEventLoop_=time ; 
     116    } 
     117    CTimer::get("CServicesManager::eventLoop").suspend(); 
    107118  } 
    108119 
     
    176187  { 
    177188     
     189    info(40)<<"CServicesManager : registering service, poolId : "<<poolId<<", serviceId : "<<serviceId<<endl ; ; 
     190 
     191    winServices_->lockWindowExclusive(managerGlobalLeader_) ; 
     192    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
     193    winServices_->flushWindow(managerGlobalLeader_) ; 
     194    services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ; 
     195    winServices_->updateToLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ; 
     196    winServices_->unlockWindow(managerGlobalLeader_) ; 
     197 
     198/* 
    178199    winServices_->lockWindow(managerGlobalLeader_,0) ; 
    179200    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
    180201    services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ; 
    181202    winServices_->updateToWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ; 
    182     winServices_->unlockWindow(managerGlobalLeader_,0) ; 
     203    winServices_->unlockWindow(managerGlobalLeader_,0) ;*/ 
    183204  } 
    184205 
     
    186207                                        int& size, int& nbPartitions, int& leader) 
    187208  { 
     209     
     210    winServices_->lockWindowShared(managerGlobalLeader_) ; 
     211    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
     212    winServices_->unlockWindow(managerGlobalLeader_) ; 
     213/* 
    188214    winServices_->lockWindow(managerGlobalLeader_,0) ; 
    189215    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
    190     winServices_->unlockWindow(managerGlobalLeader_,0) ; 
     216    winServices_->unlockWindow(managerGlobalLeader_,0) ;*/ 
    191217 
    192218    auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services_manager.hpp

    r1764 r2246  
    5959 
    6060    int managerGlobalLeader_ ; 
     61 
     62    const double eventLoopLatency_=1e-2;  
     63    double lastEventLoop_=0. ; 
    6164     
    6265    friend class CWindowManager ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/window_manager.hpp

    r1764 r2246  
    2525    MPI_Win window_ ; 
    2626    void * winBuffer_ ; 
     27    map<int,double> lastTimeLock_ ; 
     28    const double latency_=0e-2 ;  
    2729 
    2830    public : 
     
    4648    { 
    4749      int lock=state ; 
    48            
     50      double time ; 
     51      auto it=lastTimeLock_.find(rank) ; 
     52      if (it == lastTimeLock_.end())  
     53      {  
     54        lastTimeLock_[rank] = 0. ;  
     55        it=lastTimeLock_.find(rank) ; 
     56      } 
     57      double& lastTime = it->second ; 
     58 
    4959      do  
    5060      { 
     61        time=MPI_Wtime() ; 
     62        while(time-lastTime < latency_) time=MPI_Wtime() ; 
    5163        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
    5264        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ; 
    5365        MPI_Win_unlock(rank, window_) ; 
     66        lastTime=MPI_Wtime() ; 
    5467      } while (lock!=state) ; 
    55  
    56        
    57     } 
    58  
     68       
     69       
     70    } 
     71 
     72    void lockWindowExclusive(int rank, int state ) 
     73    { 
     74      int lock=state ; 
     75      double time ; 
     76      auto it=lastTimeLock_.find(rank) ; 
     77      if (it == lastTimeLock_.end())  
     78      {  
     79        lastTimeLock_[rank] = 0. ;  
     80        it=lastTimeLock_.find(rank) ; 
     81      } 
     82      double& lastTime = it->second ; 
     83 
     84      do  
     85      { 
     86        time=MPI_Wtime() ; 
     87        while(time-lastTime < latency_) time=MPI_Wtime() ; 
     88        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
     89        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ; 
     90        MPI_Win_unlock(rank, window_) ; 
     91        lastTime=MPI_Wtime() ; 
     92      } while (lock!=state) ; 
     93    } 
     94 
     95    void lockWindowExclusive(int rank) 
     96    { 
     97      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
     98    } 
     99 
     100    void lockWindowShared(int rank) 
     101    { 
     102      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ; 
     103    } 
     104 
     105    void unlockWindow(int rank) 
     106    { 
     107      MPI_Win_unlock(rank, window_) ; 
     108    } 
     109 
     110    void flushWindow(int rank) 
     111    { 
     112      MPI_Win_flush(rank, window_) ; 
     113    } 
    59114 
    60115    void unlockWindow(int rank, int state ) 
     
    77132      MPI_Win_unlock(rank, window_) ; 
    78133    } 
    79      
     134 
     135    template< class T > 
     136    void updateToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) ) 
     137    { 
     138      CBufferOut buffer ; 
     139      (object->*dumpOut)(buffer) ; 
     140      size_t size=buffer.count() ; 
     141//      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
     142      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ; 
     143      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ; 
     144//      MPI_Win_unlock(rank, window_) ; 
     145    } 
     146 
    80147    template< typename T > 
    81148    void updateFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) )  
     
    90157      (object->*dumpIn)(buffer) ; 
    91158    } 
     159 
     160    template< typename T > 
     161    void updateFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) )  
     162    { 
     163      size_t size ; 
     164//      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ; 
     165      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ; 
     166      MPI_Win_flush(rank,window_) ; 
     167      CBufferIn buffer(size) ; 
     168      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ; 
     169//      MPI_Win_unlock(rank, window_) ; 
     170      MPI_Win_flush(rank, window_) ; 
     171      (object->*dumpIn)(buffer) ; 
     172    } 
     173 
    92174 
    93175    template< class T > 
Note: See TracChangeset for help on using the changeset viewer.