Changeset 2547


Ignore:
Timestamp:
08/29/23 17:24:04 (9 months ago)
Author:
ymipsl
Message:

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

Location:
XIOS3/trunk/src
Files:
3 added
35 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/buffer_client.cpp

    r2458 r2547  
    1414  size_t CClientBuffer::maxRequestSize = 0; 
    1515 
    16   CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 
     16  CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, bool hasWindows) 
    1717    : interComm(interComm) 
    1818    , clientRank_(0) 
    1919    , serverRank(serverRank) 
    2020    , bufferSize(bufferSize) 
    21     , estimatedMaxEventSize(estimatedMaxEventSize) 
    2221    , maxEventSize(0) 
    2322    , current(0) 
    2423    , count(0) 
    2524    , pending(false) 
    26     , hasWindows(false)  
    27   { 
    28      /* 
    29       if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 
    30       else hasWindows=true ; 
    31      */ 
    32  
    33       MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
    34       MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 
     25    , hasWindows_(hasWindows)  
     26  { 
     27      if (hasWindows_) 
     28      {  
     29        windows_.resize(2) ; 
     30        windows_[0] = new CWindowDynamic() ; 
     31        windows_[0]->allocateBuffer(bufferSize+headerSize_) ; 
     32        bufferHeader[0] = (char*) windows_[0]->getBufferAddress() ; 
     33        windows_[1] = new CWindowDynamic() ; 
     34        windows_[1]->allocateBuffer(bufferSize+headerSize_) ; 
     35        bufferHeader[1] = (char*) windows_[1]->getBufferAddress() ; 
     36      } 
     37      else 
     38      { 
     39        MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
     40        MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 
     41      } 
     42 
    3543      buffer[0] = bufferHeader[0]+headerSize_ ; 
    3644      buffer[1] = bufferHeader[1]+headerSize_ ; 
     
    5563      winState[1]=false ; 
    5664 
    57  
    58     if (hasWindows) 
    59     {   
    60      
    61       MPI_Aint buffSize=bufferSize+headerSize_ ; 
    62       MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 
    63       MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 
    64      
    65       MPI_Group group ; 
    66       int groupSize,groupRank ; 
    67       MPI_Win_get_group(windows_[0], &group) ; 
    68       MPI_Group_size(group, &groupSize) ; 
    69       MPI_Group_rank(group, &groupRank) ; 
    70       if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
    71  
    72       MPI_Win_get_group(windows_[1], &group) ; 
    73       MPI_Group_size(group, &groupSize) ; 
    74       MPI_Group_rank(group, &groupRank) ; 
    75       if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
    76  
    77       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
    78       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
    79  
    80       MPI_Win_unlock(clientRank_, windows_[1]) ; 
    81       MPI_Win_unlock(clientRank_, windows_[0]) ; 
    82     }  
    83     retBuffer = new CBufferOut(buffer[current], bufferSize); 
    84     info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 
     65     
     66      retBuffer = new CBufferOut(buffer[current], bufferSize); 
     67      info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 
    8568  } 
    8669 
     
    9174    return address ; 
    9275  } 
    93  
    94   void CClientBuffer::attachWindows(vector<MPI_Win>& windows) 
     76   
     77  MPI_Aint CClientBuffer::getWinBufferAddress(int i) 
     78  { 
     79    return windows_[i]->getWinBufferAddress() ; 
     80  } 
     81 
     82  void CClientBuffer::attachWindows(MPI_Comm& winComm) 
    9583  { 
    9684    isAttachedWindows_=true ; 
    97     windows_=windows ; 
    98     if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ; 
    99     else hasWindows=true ; 
    100  
    101     if (hasWindows) 
     85 
     86    if (hasWindows_) 
    10287    {   
    10388      MPI_Aint buffSize=bufferSize+headerSize_ ; 
    104       MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 
    105       MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 
    106      
    107       MPI_Group group ; 
    108       int groupSize,groupRank ; 
    109       MPI_Win_get_group(windows_[0], &group) ; 
    110       MPI_Group_size(group, &groupSize) ; 
    111       MPI_Group_rank(group, &groupRank) ; 
    112       if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
    113  
    114       MPI_Win_get_group(windows_[1], &group) ; 
    115       MPI_Group_size(group, &groupSize) ; 
    116       MPI_Group_rank(group, &groupRank) ; 
    117       if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
    118  
    119       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
    120       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
    121  
    122       MPI_Win_unlock(clientRank_, windows_[1]) ; 
    123       MPI_Win_unlock(clientRank_, windows_[0]) ; 
     89      windows_[0]->create(winComm) ; 
     90      windows_[0]->attach() ; 
     91      windows_[1]->create(winComm) ; 
     92      windows_[1]->attach() ; 
     93 
     94      windows_[0]->lockExclusive(clientRank_) ; 
     95      windows_[1]->lockExclusive(clientRank_) ; 
     96 
     97      windows_[0]->unlockExclusive(clientRank_) ; 
     98      windows_[1]->unlockExclusive(clientRank_) ; 
     99 
    124100    }  
    125101 
     
    129105  CClientBuffer::~CClientBuffer() 
    130106  { 
    131      //freeWindows() ; 
    132      if (hasWindows) 
    133      { 
    134        MPI_Win_detach(windows_[0],bufferHeader[0]); 
    135        MPI_Win_detach(windows_[1],bufferHeader[1]); 
    136        MPI_Free_mem(bufferHeader[0]) ; 
    137        MPI_Free_mem(bufferHeader[1]) ; 
    138      } 
    139      delete retBuffer; 
     107    if (hasWindows_) 
     108    { 
     109      windows_[0]->detach() ; 
     110      windows_[1]->detach() ; 
     111      delete windows_[0] ; 
     112      delete windows_[1] ; 
     113    } 
     114    else  
     115    { 
     116      MPI_Free_mem(bufferHeader[0]) ; 
     117      MPI_Free_mem(bufferHeader[1]) ; 
     118    } 
     119    delete retBuffer; 
    140120  } 
    141121 
     
    143123  { 
    144124    CTimer::get("lock buffer").resume(); 
    145     if (hasWindows) 
     125    if (isAttachedWindows_)     
    146126    { 
    147127      if (winState[current]==true) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo lock client buffer but winState said it is already locked") ; 
    148       MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 
     128      //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 
     129      windows_[current]->lockExclusive(clientRank_) ; 
    149130      winState[current]=true ; 
    150131    } 
     
    155136  { 
    156137    CTimer::get("unlock buffer").resume(); 
    157     if (hasWindows) 
     138    if (isAttachedWindows_) 
    158139    { 
    159140      if (winState[current]==false) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo unlock client buffer but winState said it is already unlocked") ; 
    160       MPI_Win_unlock(clientRank_, windows_[current]) ; 
     141      //MPI_Win_unlock(clientRank_, windows_[current]) ; 
     142      windows_[current]->unlockExclusive(clientRank_) ; 
    161143      winState[current]=false ; 
    162144    } 
     
    189171    { 
    190172      maxEventSize = size; 
    191  
    192       if (size > estimatedMaxEventSize) 
    193         error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank 
    194                  << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl; 
    195173 
    196174      if (size > maxRequestSize) maxRequestSize = size; 
     
    319297  { 
    320298 
    321     if (hasWindows) 
     299    bufferSize=newSize ; 
     300 
     301    if (hasWindows_) 
    322302    {  
    323       MPI_Win_detach(windows_[0], bufferHeader[0]) ; 
    324       MPI_Win_detach(windows_[1], bufferHeader[1]) ; 
    325     } 
    326     MPI_Free_mem(bufferHeader[0]) ; 
    327     MPI_Free_mem(bufferHeader[1]) ; 
    328  
    329     bufferSize=newSize ; 
    330     MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
    331     MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 
     303      windows_[0]->detach(); 
     304      windows_[1]->detach(); 
     305       
     306      windows_[0]->attach(bufferSize+headerSize_) ; 
     307      bufferHeader[0] = (char*) windows_[0] -> getBufferAddress() ; 
     308      windows_[1]->attach(bufferSize+headerSize_) ; 
     309      bufferHeader[1] = (char*) windows_[1] -> getBufferAddress() ; 
     310    } 
     311     
     312     
    332313    buffer[0] = bufferHeader[0]+headerSize_ ; 
    333314    buffer[1] = bufferHeader[1]+headerSize_ ; 
     
    353334    current=0 ; 
    354335     
    355     if (hasWindows) 
     336    if (hasWindows_) 
    356337    {   
    357      
    358       MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ; 
    359       MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ; 
    360            
    361       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
    362       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
    363  
    364       MPI_Win_unlock(clientRank_, windows_[1]) ; 
    365       MPI_Win_unlock(clientRank_, windows_[0]) ; 
     338 
     339      windows_[0]->lockExclusive(clientRank_) ; 
     340      windows_[1]->lockExclusive(clientRank_) ; 
     341       
     342      windows_[1]->unlockExclusive(clientRank_) ; 
     343      windows_[0]->unlockExclusive(clientRank_) ; 
    366344    }  
    367345 
     
    373351    bufOut->put(timelineEventChangeBufferSize); 
    374352    bufOut->put(newBufferSize_); 
    375     bufOut->put(this->getWinAddress(0)); 
    376     bufOut->put(this->getWinAddress(1)); 
     353     
     354    bufOut->put(this->getWinBufferAddress(0)); 
     355    bufOut->put(this->getWinBufferAddress(1)); 
    377356 
    378357    resizingBufferStep_=4; 
    379358    unlockBuffer() ; 
    380     info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinAddress(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl; 
     359    info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinBufferAddress(0)<<" winAdress[1] "<<this->getWinBufferAddress(1)<<endl; 
    381360  } 
    382361 
     
    397376    lockBuffer() ; 
    398377    ret=*notify[current] == notifyResizeBuffer_ ? true : false ; 
    399     if (ret || !hasWindows)  
     378    if (ret || !hasWindows_)  
    400379    { 
    401380      *notify[current] = notifyNothing_ ; 
  • XIOS3/trunk/src/buffer_client.hpp

    r2458 r2547  
    77#include "mpi.hpp" 
    88#include "cxios.hpp" 
     9#include "window_dynamic.hpp" 
    910 
    1011namespace xios 
     
    1516      static size_t maxRequestSize; 
    1617 
    17       CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize); 
     18      CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, bool hasWindows); 
    1819      ~CClientBuffer(); 
    1920//      void createWindows(MPI_Comm oneSidedComm) ; 
     
    2829      StdSize remain(void); 
    2930      MPI_Aint getWinAddress(int numWindows) ; 
     31      MPI_Aint getWinBufferAddress(int numWindows) ; 
    3032      void infoBuffer(void) ; 
    3133      bool isNotifiedFinalized(void) ; 
     
    3335      void fixBufferSize(size_t bufferSize) { newBufferSize_=bufferSize ; isGrowableBuffer_=false ; resizingBufferStep_=1 ;} 
    3436      void fixBuffer(void) { isGrowableBuffer_=false ;} 
    35       void attachWindows(vector<MPI_Win>& windows) ; 
     37      void attachWindows(MPI_Comm& winComm) ; 
    3638      bool isAttachedWindows(void) { return isAttachedWindows_ ;} 
    3739    private: 
     
    5860      StdSize maxEventSize; 
    5961      StdSize bufferSize; 
    60       const StdSize estimatedMaxEventSize; 
    6162      bool isFinalized_=false ; 
    6263 
     
    6970      CBufferOut* retBuffer; 
    7071      const MPI_Comm interComm; 
    71       std::vector<MPI_Win> windows_ ; 
    72       bool hasWindows=false ; 
     72      std::vector<CWindowDynamic*> windows_ ; 
     73      bool hasWindows_=false ; 
    7374      bool isAttachedWindows_=false ; 
    7475      double latency_=0 ; 
  • XIOS3/trunk/src/buffer_server.cpp

    r2309 r2547  
    33#include "buffer_server.hpp" 
    44#include "timer.hpp" 
     5#include "window_dynamic.hpp" 
    56 
    67 
     
    89{ 
    910 
    10   CServerBuffer::CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize)  
     11  CServerBuffer::CServerBuffer(vector<CWindowDynamic*>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize)  
    1112  : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress) 
    1213  { 
     
    1819    MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer) ; 
    1920    currentWindows=1 ; 
    20     if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 
     21    if (windows[0]==nullptr && windows[1]==nullptr) hasWindows=false ; 
    2122  } 
    2223 
     
    239240    bool ok=false ; 
    240241     
    241     MPI_Group group ; 
    242     int groupSize,groupRank ; 
    243     MPI_Win_get_group(windows_[currentWindows], &group) ; 
    244     MPI_Group_size(group, &groupSize) ; 
    245     MPI_Group_rank(group, &groupRank) ; 
    246      
     242    
    247243    lockBuffer();  
    248244    CTimer::get("getBufferFromClient_locked").resume() ;    
    249245// lock is acquired 
    250246 
    251     MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    252     MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    253     MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
    254  
     247    windows_[currentWindows]->get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 
     248    windows_[currentWindows]->get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 
     249    windows_[currentWindows]->flush(windowsRank_) ; 
     250    
    255251    if (timeLine==clientTimeline) 
    256252    { 
    257253      buffer=(char*)getBuffer(clientCount) ; 
    258254      count=clientCount ; 
    259       MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ; 
     255      windows_[currentWindows]->get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR) ; 
     256       
    260257      clientTimeline = 0 ; 
    261258      clientCount = 0 ; 
    262       MPI_Put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    263       MPI_Put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     259      windows_[currentWindows]->put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 
     260      windows_[currentWindows]->put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 
    264261 
    265262// release lock 
     
    298295  { 
    299296    if (!hasWindows) return ; 
    300     MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
     297    //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
     298    windows_[currentWindows]->lockExclusive(windowsRank_) ; 
    301299  } 
    302300 
     
    304302  { 
    305303    if (!hasWindows) return ; 
    306     MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ; 
     304    windows_[currentWindows]->unlockExclusive(windowsRank_) ; 
    307305  } 
    308306   
     
    313311    lockBuffer();  
    314312// lock is acquired 
    315     MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     313    windows_[currentWindows]->put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 
    316314    unlockBuffer() ; 
    317315  } 
     
    324322    lockBuffer();  
    325323// lock is acquired 
    326     MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     324    windows_[currentWindows]->put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ; 
    327325    unlockBuffer() ; 
    328326  } 
  • XIOS3/trunk/src/buffer_server.hpp

    r2323 r2547  
    77#include "mpi.hpp" 
    88#include "cxios.hpp" 
     9#include "window_dynamic.hpp" 
     10 
    911 
    1012namespace xios 
     
    1315  { 
    1416    public: 
    15       CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize bufSize) ; 
     17      CServerBuffer(vector<CWindowDynamic*>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize bufSize) ; 
    1618      ~CServerBuffer() ; 
    1719 
     
    3739      size_t size; 
    3840      size_t used ;  // count of element occupied 
    39       std::vector<MPI_Win> windows_ ; 
     41      std::vector<CWindowDynamic*> windows_ ; 
    4042      std::vector<MPI_Aint> winAddress_ ; 
    4143      bool resizingBuffer_ = false ; 
  • XIOS3/trunk/src/client.cpp

    r2535 r2547  
    180180      MPI_Comm_size(CXios::getXiosComm(), &xiosCommSize) ; 
    181181      MPI_Comm_size(clientsComm_, &clientsCommSize) ; 
    182       if (xiosCommSize==clientsCommSize) CXios::setUsingServer() ; 
    183       else CXios::setNotUsingServer() ; 
     182      if (xiosCommSize==clientsCommSize) CXios::setNotUsingServer() ; 
     183      else CXios::setUsingServer() ; 
    184184 
    185185      ///////////////////////////////////////// 
  • XIOS3/trunk/src/cxios.cpp

    r2535 r2547  
    313313  } 
    314314 
    315    
     315  void CXios::launchThreadManager(bool isXiosServer) 
     316  { 
     317    CThreadManager::initialize(isXiosServer) ; 
     318  } 
     319 
    316320  void CXios::finalizeRegistryManager() 
    317321  { 
     
    339343  } 
    340344   
     345  void CXios::finalizeThreadManager() 
     346  { 
     347    CThreadManager::finalize()  ; 
     348  } 
     349 
    341350  void CXios::finalizeDaemonsManager() 
    342351  { 
  • XIOS3/trunk/src/cxios.hpp

    r2535 r2547  
    1111#include "coupler_manager.hpp" 
    1212#include "registry_manager.hpp" 
     13#include "thread_manager.hpp" 
    1314#include "mpi_garbage_collector.hpp" 
    1415 
     
    106107     static void launchCouplerManager(bool isXiosServer) ; 
    107108     static void launchRegistryManager(bool isXiosServer) ; 
     109     static void launchThreadManager(bool isXiosServer) ; 
    108110     
    109111     static void finalizeServicesManager() ; 
     
    113115     static void finalizeCouplerManager() ; 
    114116     static void finalizeRegistryManager() ; 
     117     static void finalizeThreadManager() ; 
    115118 
    116119     static CRegistryManager*   getRegistryManager(void) { return registryManager_ ;} 
  • XIOS3/trunk/src/manager/daemons_manager.cpp

    r2333 r2547  
    2424    CXios::launchContextsManager(isXiosServer) ; 
    2525    CXios::launchCouplerManager(isXiosServer) ; 
     26    CXios::launchThreadManager(isXiosServer) ; 
    2627  
    2728    if (isXiosServer) CServer::launchServersRessource(splitComm) ; 
     
    4142    CXios::getServicesManager()->eventLoop() ; 
    4243    CXios::getContextsManager()->eventLoop() ; 
    43     if (isServer_) { 
    44         if (CServer::isRoot) { 
    45             CServer::listenOasisEnddef() ; 
    46             CServer::listenRootOasisEnddef() ; 
    47         } 
    48         else { 
    49             CServer::listenRootOasisEnddef() ; 
    50         } 
    51         return CServer::getServersRessource()->eventLoop(false) ; 
     44    if (isServer_)  
     45    { 
     46      if (CServer::isRoot)  
     47      { 
     48        CServer::listenOasisEnddef() ; 
     49        CServer::listenRootOasisEnddef() ; 
     50      } 
     51      else CServer::listenRootOasisEnddef() ; 
     52       
     53      if (CThreadManager::isUsingThreads()) return CServer::getServersRessource()->isFinished() ; 
     54      else return CServer::getServersRessource()->eventLoop(false) ; 
    5255    } 
    5356    else  return CXios::getPoolRessource()->eventLoop(false) ; 
     
    7679      CXios::finalizeRessourcesManager() ; 
    7780      CXios::finalizeRegistryManager() ; 
     81      CXios::finalizeThreadManager() ; 
    7882      isFinalized_=true ; 
    7983    } 
  • XIOS3/trunk/src/manager/pool_ressource.cpp

    r2523 r2547  
    88#include "timer.hpp" 
    99#include "event_scheduler.hpp" 
     10#include "thread_manager.hpp" 
    1011 
    1112namespace xios 
     
    3334    else eventScheduler_= make_shared<CEventScheduler>(poolComm) ; 
    3435    freeRessourceEventScheduler_ = eventScheduler_ ; 
     36    std::hash<string> hashString ; 
     37    hashId_ = hashString("CPoolRessource::"+Id) ; 
     38    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CPoolRessource::threadEventLoop, this) ; 
     39  } 
     40 
     41  void CPoolRessource::synchronize(void) 
     42  { 
     43    bool out=false ;  
     44    size_t timeLine=0 ; 
     45           
     46    eventScheduler_->registerEvent(timeLine, hashId_) ; 
     47    while (!out) 
     48    { 
     49      CThreadManager::yield() ; 
     50      out = eventScheduler_->queryEvent(timeLine,hashId_) ; 
     51      if (out) eventScheduler_->popEvent() ; 
     52    } 
    3553  } 
    3654 
     
    122140    MPI_Comm_rank(poolComm_, &commRank) ; 
    123141    winNotify_->popFromExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ; 
    124     if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ; 
    125     else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ; 
     142    if (notifyType_==NOTIFY_CREATE_SERVICE)  
     143    { 
     144      if (CThreadManager::isUsingThreads()) synchronize() ; 
     145      createService() ; 
     146    } 
     147    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)  
     148    { 
     149      if (CThreadManager::isUsingThreads()) synchronize() ; 
     150      createServiceOnto() ; 
     151    } 
    126152  } 
    127153 
     
    233259    } 
    234260    CTimer::get("CPoolRessource::eventLoop").suspend(); 
    235     if (services_.empty() && finalizeSignal_) return true ; 
    236     else return false ; 
    237   } 
     261    if (services_.empty() && finalizeSignal_) finished_=true ; 
     262    return finished_ ; 
     263  } 
     264 
     265  void CPoolRessource::threadEventLoop(void) 
     266  { 
     267    CTimer::get("CPoolRessource::eventLoop").resume(); 
     268    info(100)<<"Launch Thread for  CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ; 
     269    CThreadManager::threadInitialize() ;  
     270     
     271    do 
     272    { 
     273 
     274      double time=MPI_Wtime() ; 
     275      int flag ; 
     276      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); 
     277      if (time-lastEventLoop_ > eventLoopLatency_)  
     278      { 
     279        //checkCreateServiceNotification() ; 
     280        checkNotifications() ; 
     281        lastEventLoop_=time ; 
     282      } 
     283     
     284      for(auto it=services_.begin();it!=services_.end();++it)  
     285      { 
     286        if (it->second->isFinished()) 
     287        { 
     288          delete it->second ;  
     289          services_.erase(it) ; 
     290          // destroy server_context -> to do later 
     291          break ; 
     292        } ; 
     293      } 
     294 
     295      CTimer::get("CPoolRessource::eventLoop").suspend(); 
     296      if (services_.empty() && finalizeSignal_) finished_=true ; 
     297       
     298      if (!finished_) CThreadManager::yield() ; 
     299     
     300    } while (!finished_) ; 
     301 
     302    CThreadManager::threadFinalize() ; 
     303    info(100)<<"Close thread for  CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ; 
     304  } 
     305 
    238306/* 
    239307  void CPoolRessource::checkCreateServiceNotification(void) 
     
    347415  } 
    348416 
    349   void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached 
     417  void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients 
    350418  { 
    351419    services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, eventScheduler, Id_, serviceId, partitionId, type, nbPartitions) ; 
  • XIOS3/trunk/src/manager/pool_ressource.hpp

    r2523 r2547  
    3535    void createServiceOnto(const std::string& serviceId, int type, const std::string& OnServiceId) ; 
    3636    bool eventLoop(bool serviceOnly=false) ; 
     37    void threadEventLoop(void) ; 
    3738    bool hasService(const std::string serviceId, int partitionId) {return services_.count(make_tuple(serviceId,partitionId))>0 ;} 
    3839    CService* getService(const std::string serviceId, int partitionId) { return services_[make_tuple(serviceId,partitionId)]; } 
     
    5051    void createService(void) ; 
    5152    void createServiceOnto(void) ; 
     53    void synchronize(void) ; 
    5254     
    5355//    void createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; 
     
    5860  public: 
    5961    void createNewServiceOnto(const std::string& serviceId, int type, const string& onServiceId) ; 
    60      
    61     private: 
     62   
     63  private: 
     64    bool finished_=false ; 
     65  public: 
     66    bool isFinished(void) { return finished_ ;} 
     67 
     68  private: 
    6269    MPI_Comm poolComm_ ; 
    6370     
     
    7481    std::string Id_ ; 
    7582    bool finalizeSignal_ ; 
    76      
     83         
    7784    const double eventLoopLatency_=0;  
    7885    double lastEventLoop_=0. ; 
    7986 
    8087    private: 
     88      size_t hashId_ ; 
    8189      shared_ptr<CEventScheduler> eventScheduler_ ; 
    8290      shared_ptr<CEventScheduler> freeRessourceEventScheduler_ ; 
  • XIOS3/trunk/src/manager/server_context.cpp

    r2517 r2547  
    66#include "register_context_info.hpp" 
    77#include "services.hpp" 
     8#include "thread_manager.hpp" 
    89#include "timer.hpp" 
    910 
     
    5051    info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank 
    5152                        <<" and global rank "<<globalRank<<endl  ; 
     53    
     54    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServerContext::threadEventLoop, this) ; 
    5255  } 
    5356 
     
    8487    } 
    8588     
    86     MPI_Request req ; 
    87     MPI_Status status ; 
    88     MPI_Ibarrier(intraComm,&req) ; 
    89      
    90     int flag=false ; 
    91     while(!flag)  
    92     { 
    93       CXios::getDaemonsManager()->servicesEventLoop() ; 
    94       MPI_Test(&req,&flag,&status) ; 
    95     } 
    96  
     89    if (wait) 
     90    { 
     91      MPI_Request req ; 
     92      MPI_Status status ; 
     93      MPI_Ibarrier(intraComm,&req) ; 
     94     
     95      int flag=false ; 
     96      while(!flag)  
     97      { 
     98        CXios::getDaemonsManager()->servicesEventLoop() ; 
     99        MPI_Test(&req,&flag,&status) ; 
     100      } 
     101    } 
     102     
    97103    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ; 
    98104 
     
    117123      MPI_Comm_size(contextComm_,&commSize ) ; 
    118124*/ 
    119       if (nOverlap> 0 ) 
    120       { 
    121         while (get<0>(overlapedComm_[name_])==false) CXios::getDaemonsManager()->servicesEventLoop() ; 
    122         isAttachedMode_=true ; 
    123         cout<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ; 
    124         interCommClient=newInterCommClient ; 
    125         interCommServer=newInterCommServer ; 
    126       } 
    127       else if (nOverlap==0) 
     125      if (nOverlap==0) 
    128126      {  
    129         cout<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ; 
    130         isAttachedMode_=false ; 
    131127        MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ; 
    132128        MPI_Comm_dup(interCommClient, &interCommServer) ; 
     
    136132      else 
    137133      { 
    138         cout<<"CServerContext::createIntercomm : partial overlap ==> not managed"<<endl ; 
     134        ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ; 
    139135      } 
    140136    } 
     
    259255  } 
    260256 
     257  void CServerContext::threadEventLoop(void) 
     258  { 
     259     
     260    info(100)<<"Launch Thread for CServerContext::threadEventLoop, context id = "<<context_->getId()<<endl ; 
     261    CThreadManager::threadInitialize() ;  
     262    do 
     263    { 
     264      CTimer::get("CServerContext::eventLoop").resume(); 
     265      int flag ; 
     266      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); 
     267 
     268      if (winNotify_!=nullptr) checkNotifications() ; 
     269 
     270 
     271      if (context_!=nullptr)   
     272      { 
     273        if (context_->eventLoop()) 
     274        { 
     275          info(100)<<"Remove context server with id "<<context_->getId()<<endl ; 
     276          CContext::removeContext(context_->getId()) ; 
     277          context_=nullptr ; 
     278          // destroy context ??? --> later 
     279        } 
     280      } 
     281      CTimer::get("CServerContext::eventLoop").suspend(); 
     282      if (context_==nullptr && finalizeSignal_) finished_=true ; 
     283  
     284      if (!finished_) CThreadManager::yield() ; 
     285    } 
     286    while (!finished_) ; 
     287     
     288    CThreadManager::threadFinalize() ; 
     289    info(100)<<"Close thread for CServerContext::threadEventLoop"<<endl ; 
     290  } 
     291 
    261292  void CServerContext::createIntercomm(void) 
    262293  { 
     
    280311     MPI_Comm_size(contextComm_,&commSize ) ; 
    281312 
    282     if (nOverlap==commSize) 
    283     { 
    284       info(10)<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ; 
    285       isAttachedMode_=true ; 
    286       interCommClient=get<2>(it->second) ; 
    287       interCommServer=get<1>(it->second) ; 
    288       context_ -> createClientInterComm(interCommClient, interCommServer ) ; 
    289       clientsInterComm_.push_back(interCommClient) ; 
    290       clientsInterComm_.push_back(interCommServer) ; 
    291     } 
    292     else if (nOverlap==0) 
     313    if (nOverlap==0) 
    293314    {  
    294315      info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ; 
    295       isAttachedMode_=false ; 
    296316      MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ; 
    297317      MPI_Comm_dup(interCommServer,&interCommClient) ; 
     
    302322    else 
    303323    { 
    304       ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : partial overlap ==> not managed") ; 
     324      ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ; 
    305325    } 
    306326    
  • XIOS3/trunk/src/manager/server_context.hpp

    r2274 r2547  
    2929 
    3030    bool eventLoop(bool serviceOnly=false) ; 
     31    void threadEventLoop(void) ; 
    3132    void notificationsDumpOut(CBufferOut& buffer) ; 
    3233    void notificationsDumpIn(CBufferIn& buffer) ; 
    3334    void finalizeSignal(void) ; 
    3435    void freeComm(void) ; 
    35     bool isAttachedMode(void) { return isAttachedMode_ ;} 
    3636    CService* getParentService(void) {return parentService_ ; } 
    37  
     37     
     38    private :  
     39      bool finished_=false ; 
     40    public: 
     41      bool isFinished(void) { return finished_ ; } 
    3842    private: 
    3943    void createIntercomm(void) ; 
     
    6064    bool finalizeSignal_ ; 
    6165    bool hasNotification_ ; 
    62     bool isAttachedMode_ ; 
    6366 
    6467    const double eventLoopLatency_=0;  
  • XIOS3/trunk/src/manager/servers_ressource.cpp

    r2523 r2547  
    99#include <vector> 
    1010#include <string> 
     11#include "thread_manager.hpp" 
    1112 
    1213 
     
    4344    eventScheduler_ = make_shared<CEventScheduler>(freeRessourcesComm_) ; 
    4445    freeRessourceEventScheduler_ = eventScheduler_ ; 
     46    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServersRessource::threadEventLoop, this) ; 
    4547  } 
    4648 
     
    131133    if (poolRessource_!=nullptr)  
    132134    { 
    133       if (poolRessource_->eventLoop(serviceOnly)) 
     135      poolRessource_->eventLoop(serviceOnly) ; 
     136      if (poolRessource_->isFinished()) 
    134137      { 
    135138        delete poolRessource_ ; 
     
    139142    } 
    140143    CTimer::get("CServersRessource::eventLoop").suspend(); 
    141     if (poolRessource_==nullptr && finalizeSignal_) return true ; 
    142     else return false ; 
    143   } 
     144    if (poolRessource_==nullptr && finalizeSignal_) finished_=true ; 
     145    return finished_ ; 
     146  } 
     147 
     148  void CServersRessource::threadEventLoop(void) 
     149  { 
     150    CTimer::get("CServersRessource::eventLoop").resume(); 
     151    info(100)<<"Launch Thread for  CServersRessource::threadEventLoop"<<endl ; 
     152    CThreadManager::threadInitialize() ;  
     153 
     154    do 
     155    { 
     156      double time=MPI_Wtime() ; 
     157      int flag ; 
     158      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); 
     159 
     160      if (time-lastEventLoop_ > eventLoopLatency_)  
     161      { 
     162        checkNotifications() ; 
     163        lastEventLoop_=time ; 
     164      } 
     165 
     166      if (poolRessource_!=nullptr)  
     167      { 
     168        if (poolRessource_->isFinished()) 
     169        { 
     170          delete poolRessource_ ; 
     171          poolRessource_=nullptr ; 
     172          // don't forget to free pool ressource later 
     173        }  
     174      } 
     175      CTimer::get("CServersRessource::eventLoop").suspend(); 
     176      if (poolRessource_==nullptr && finalizeSignal_) finished_=true ; 
     177      if (!finished_) CThreadManager::yield() ; 
     178     
     179    } while (!finished_) ; 
     180 
     181    CThreadManager::threadFinalize() ; 
     182    info(100)<<"Close thread for CServersRessource::threadEventLoop"<<endl ; ; 
     183  } 
     184 
    144185 
    145186  void CServersRessource::checkNotifications(void) 
     
    148189    MPI_Comm_rank(serverComm_, &commRank) ; 
    149190    winNotify_->popFromExclusiveWindow(commRank, this, &CServersRessource::notificationsDumpIn) ; 
    150     if (notifyInType_==NOTIFY_CREATE_POOL) createPool() ; 
     191    if (notifyInType_==NOTIFY_CREATE_POOL)  
     192    { 
     193      if (CThreadManager::isUsingThreads()) synchronize() ; 
     194      createPool() ; 
     195    } 
    151196    else if (notifyInType_==NOTIFY_FINALIZE) finalizeSignal() ; 
     197  } 
     198 
     199  void CServersRessource::synchronize(void) 
     200  { 
     201    bool out=false ;  
     202    size_t timeLine=0 ; 
     203    std::hash<string> hashString ; 
     204    int commSize ; 
     205    MPI_Comm_size(freeRessourcesComm_,&commSize) ; 
     206    size_t hashId = hashString("CServersRessource::"+to_string(commSize)) ; 
     207    freeRessourceEventScheduler_->registerEvent(timeLine, hashId) ; 
     208    while (!out) 
     209    { 
     210      CThreadManager::yield() ; 
     211      out = eventScheduler_->queryEvent(timeLine,hashId) ; 
     212      if (out) eventScheduler_->popEvent() ; 
     213    } 
    152214  } 
    153215 
  • XIOS3/trunk/src/manager/servers_ressource.hpp

    r2523 r2547  
    5555    const double eventLoopLatency_=0;  
    5656    double lastEventLoop_=0. ; 
    57  
     57     
    5858    private: 
     59      bool finished_=false; 
     60    public: 
     61      bool isFinished(void) { return finished_ ;} 
     62    private: 
     63      void synchronize(void) ; 
     64      void threadEventLoop(void) ; 
    5965      shared_ptr<CEventScheduler> eventScheduler_ ; 
    6066      shared_ptr<CEventScheduler> freeRessourceEventScheduler_ ; 
  • XIOS3/trunk/src/manager/services.cpp

    r2523 r2547  
    55#include "server_context.hpp" 
    66#include "event_scheduler.hpp" 
     7#include "thread_manager.hpp" 
    78#include "timer.hpp" 
    89 
     
    4142    oss<<partitionId; 
    4243    name_= poolId+"__"+serviceId+"_"+oss.str(); 
     44     
     45    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CService::threadEventLoop, this) ; 
    4346  } 
    4447 
     
    124127 
    125128    eventScheduler_->checkEvent() ; 
     129    
    126130    for(auto it=contexts_.begin();it!=contexts_.end();++it)  
    127131    { 
     
    134138      } ; 
    135139    } 
     140   
    136141    CTimer::get("CService::eventLoop").suspend(); 
    137142    if (contexts_.empty() && finalizeSignal_) return true ; 
    138143    else return false ; 
    139144  } 
     145 
     146  void CService::threadEventLoop(void) 
     147  { 
     148    info(100)<<"Launch Thread for  CService::threadEventLoop, service id = "<<name_<<endl ; 
     149    CThreadManager::threadInitialize() ;  
     150     
     151    do 
     152    { 
     153      CTimer::get("CService::eventLoop").resume(); 
     154      int flag ; 
     155      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); 
     156    
     157//    double time=MPI_Wtime() ; 
     158//    if (time-lastEventLoop_ > eventLoopLatency_)  
     159//    { 
     160      checkNotifications() ; 
     161//      lastEventLoop_=time ; 
     162//    } 
     163 
     164 
     165      eventScheduler_->checkEvent() ; 
     166    
     167      for(auto it=contexts_.begin();it!=contexts_.end();++it)  
     168      { 
     169        if (it->second->isFinished()) 
     170        { 
     171          delete it->second ;  
     172          contexts_.erase(it) ; 
     173          // destroy server_context -> to do later 
     174          break ; 
     175        } ; 
     176      } 
     177 
     178      CTimer::get("CService::eventLoop").suspend(); 
     179      if (contexts_.empty() && finalizeSignal_) finished_=true ; 
     180      if (!finished_) CThreadManager::yield() ; 
     181    } while (!finished_) ; 
     182     
     183    CThreadManager::threadFinalize() ; 
     184    info(100)<<"Close thread for  CService::threadEventLoop, service id = "<<name_<<endl ; 
     185  } 
     186 
    140187 
    141188  void CService::sendNotification(int rank) 
  • XIOS3/trunk/src/manager/services.hpp

    r2523 r2547  
    2424 
    2525    bool eventLoop(bool serviceOnly=false) ; 
     26    void threadEventLoop(void) ; 
    2627    void createContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId) ; 
    2728    void checkCreateContextNotification(void) ; 
     
    4142    const MPI_Comm& getCommunicator(void) { return serviceComm_ ;} 
    4243     
     44    private:  
     45    bool finished_=false ; 
     46    public: 
     47    bool isFinished(void) { return finished_; } 
     48 
    4349    private: 
    4450    void sendNotification(int rank) ; 
  • XIOS3/trunk/src/manager/token_manager.hpp

    r2498 r2547  
    2626          memset( winBufferRetrieved_, 0, windowSize ); 
    2727        } 
     28        MPI_Win_lock_all(0, winCurrentToken_) ; 
     29        MPI_Win_lock_all(0, winRetrievedToken_) ; 
    2830      } 
    29  
     31       
     32      ~CTokenManager() 
     33      { 
     34        MPI_Win_unlock_all(winCurrentToken_) ; 
     35        MPI_Win_unlock_all(winRetrievedToken_) ; 
     36        MPI_Win_free(&winCurrentToken_) ; 
     37        MPI_Win_free(&winRetrievedToken_) ; 
     38      } 
     39       
    3040      size_t getToken(void) 
    3141      { 
    3242        size_t inc=1 ; 
    3343        size_t token ; 
    34         MPI_Win_lock(MPI_LOCK_EXCLUSIVE, leader_, 0, winCurrentToken_) ; 
    3544        MPI_Fetch_and_op(&inc, &token, MPI_SIZE_T, leader_, 0, MPI_SUM, winCurrentToken_) ; 
    36         MPI_Win_unlock(leader_, winCurrentToken_) ; 
     45        MPI_Win_flush(leader_, winCurrentToken_); 
    3746        return token ; 
    3847      } 
    3948 
    40       bool lockToken(size_t token) 
     49      bool checkToken(size_t token) 
    4150      { 
    4251        size_t tokenRead ; 
    43         MPI_Win_lock(MPI_LOCK_SHARED, leader_, 0, winRetrievedToken_) ; 
    44         MPI_Get(&tokenRead, 1, MPI_SIZE_T, leader_, 0, 1, MPI_SIZE_T, winRetrievedToken_ ) ; 
    45         MPI_Win_unlock(leader_, winRetrievedToken_) ; 
    46         if (token==tokenRead) return true ; 
    47         else return false ; 
     52        size_t inc=0 ; 
     53        MPI_Fetch_and_op(&inc, &tokenRead, MPI_SIZE_T, leader_, 0, MPI_NO_OP, winRetrievedToken_) ; 
     54        MPI_Win_flush(leader_, winRetrievedToken_); 
     55        return tokenRead==token ; 
    4856      } 
    49  
    50       void unlockToken(size_t token) 
     57       
     58      void updateToken(size_t token) 
     59      { 
     60        size_t inc=1 ; 
     61        size_t tokenRead ; 
     62        MPI_Fetch_and_op(&inc, &tokenRead, MPI_SIZE_T, leader_, 0, MPI_SUM, winRetrievedToken_) ; 
     63        MPI_Win_flush(leader_, winRetrievedToken_); 
     64        if (token!=tokenRead)  ERROR("void CTokenManager::unlockToken(size_t token)",<<"Cannot release token="<<token<< 
     65                                     " that is not corresponding to the locked token="<<tokenRead) ;      
     66      } 
     67/*      void unlockToken(size_t token) 
    5168      { 
    5269        size_t inc=1 ; 
     
    5976                                     " that is not corresponding to the locked token="<<tokenRead) ;      
    6077      } 
    61  
     78*/ 
    6279    private: 
    6380 
  • XIOS3/trunk/src/manager/window_base.hpp

    r2517 r2547  
    137137      return MPI_Get(origin_addr, origin_count, origin_datatype, target_rank, target_disp + OFFSET_BUFFER, target_count, target_datatype, window_) ; 
    138138    } 
    139  
     139     
     140    int fetchAndOp(const void *origin_addr, void *result_addr, MPI_Datatype datatype, int target_rank, MPI_Aint target_disp, MPI_Op op) 
     141    { 
     142      return MPI_Fetch_and_op(origin_addr, result_addr, datatype, target_rank, target_disp + OFFSET_BUFFER, op, window_ ) ; 
     143    } 
     144     
    140145    int compareAndSwap(const void *origin_addr, const void *compare_addr, void *result_addr, MPI_Datatype datatype, 
    141146                       int target_rank, MPI_Aint target_disp) 
  • XIOS3/trunk/src/node/context.cpp

    r2507 r2547  
    3434#include "services.hpp" 
    3535#include "contexts_manager.hpp" 
     36#include "thread_manager.hpp" 
    3637#include <chrono> 
    3738#include <random> 
     
    488489      } 
    489490      contextId_ = getId() ; 
    490        
    491       attached_mode=true ; 
    492       if (!CXios::isUsingServer()) attached_mode=false ; 
    493  
    494491 
    495492      string contextRegistryId=getId() ; 
     
    544541    MPI_Comm_dup(intraComm_, &intraCommClient); 
    545542    comms.push_back(intraCommClient); 
    546     // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context 
    547543 
    548544    CContextServer* server ; 
     
    578574    if (commRank==0) 
    579575    { 
    580       CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 
    581       for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 
    582     } 
    583     setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble (attached ???) 
     576      while (! CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions)) yield() ; 
     577      for(int i=0 ; i<nbPartitions; i++)   
     578        while (!CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId())) yield() ; 
     579    } 
     580    synchronize() ; 
     581    setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble  
    584582    MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ; 
    585583       
     
    587585    for(int i=0 ; i<nbPartitions; i++) 
    588586    { 
    589       parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 
     587      while (!parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer)) yield() ; 
    590588      int type ;  
    591       if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 
     589      if (commRank==0) while (!CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type)) yield(); 
     590      synchronize() ; 
    592591      MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 
     592 
    593593      string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; 
    594594 
     
    620620    if (serviceType_ == CServicesManager::CLIENT) 
    621621    { 
    622       if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultWriterId, clientServers) ; 
    623       else if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ; 
     622      if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ; 
    624623      else createServerInterComm(CXios::defaultPoolId, CXios::defaultWriterId, clientServers) ; 
    625624       
     
    629628      clientServers.clear() ; 
    630629    
    631       if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultReaderId, clientServers) ; 
    632       else createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ; 
     630      createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ; 
    633631      readerClientOut_.push_back(clientServers[0].second.first) ;  
    634632      readerServerOut_.push_back(clientServers[0].second.second) ; 
     
    658656    else 
    659657    { 
    660       if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 
    661       else createServerInterComm(poolId, serviceId, retClientServers) ; 
     658      createServerInterComm(poolId, serviceId, retClientServers) ; 
    662659      for(auto& retClientServer : retClientServers)  clientServers.push_back(retClientServer.second) ; 
    663660       
    664661      int serviceType ; 
    665       if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType, true) ; 
     662      if (intraCommRank_==0) while(!CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType)) yield();  
     663      synchronize() ; 
    666664      MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 
    667665       
     
    694692  void CContext::globalEventLoop(void) 
    695693  { 
    696     lockContext() ; 
    697     CXios::getDaemonsManager()->eventLoop() ; 
    698     unlockContext() ; 
    699     setCurrent(getId()) ; 
     694    if (CThreadManager::isUsingThreads()) CThreadManager::yield(); 
     695    else 
     696    { 
     697      lockContext() ; 
     698      CXios::getDaemonsManager()->eventLoop() ; 
     699      unlockContext() ; 
     700      setCurrent(getId()) ; 
     701    } 
    700702  } 
    701703 
     704  void CContext::yield(void) 
     705  { 
     706    if (CThreadManager::isUsingThreads())  
     707    { 
     708      CThreadManager::yield(); 
     709      setCurrent(getId()) ; 
     710    } 
     711    else 
     712    { 
     713      lockContext() ; 
     714      CXios::getDaemonsManager()->eventLoop() ; 
     715      unlockContext() ; 
     716      setCurrent(getId()) ; 
     717    } 
     718  } 
     719 
     720  void CContext::synchronize(void) 
     721  { 
     722    bool out, finished;  
     723    size_t timeLine=timeLine_ ; 
     724       
     725    timeLine_++ ; 
     726    eventScheduler_->registerEvent(timeLine, hashId_) ; 
     727       
     728    out = eventScheduler_->queryEvent(timeLine,hashId_) ; 
     729    if (out) eventScheduler_->popEvent() ; 
     730    while (!out) 
     731    { 
     732      yield() ; 
     733      out = eventScheduler_->queryEvent(timeLine,hashId_) ; 
     734      if (out) eventScheduler_->popEvent() ; 
     735    } 
     736  } 
     737   
    702738  bool CContext::scheduledEventLoop(bool enableEventsProcessing)  
    703739  { 
     
    761797       if (couplerOutClient_.find(fullContextId)==couplerOutClient_.end())  
    762798       { 
    763          bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 
     799         while(!CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm())) yield(); 
     800         synchronize() ; 
    764801      
    765802         MPI_Comm interComm, interCommClient, interCommServer  ; 
    766803         MPI_Comm intraCommClient, intraCommServer ; 
    767804 
    768          if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 
     805         MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 
    769806 
    770807        MPI_Comm_dup(intraComm_, &intraCommClient) ; 
     
    783820    else if (couplerInClient_.find(fullContextId)==couplerInClient_.end()) 
    784821    { 
    785       bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 
     822      while(!CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm())) yield() ; 
     823      synchronize() ; 
    786824      
    787825       MPI_Comm interComm, interCommClient, interCommServer  ; 
    788826       MPI_Comm intraCommClient, intraCommServer ; 
    789827 
    790        if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 
     828       MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 
    791829 
    792830       MPI_Comm_dup(intraComm_, &intraCommClient) ; 
     
    824862          couplersInFinalized=true ; 
    825863          for(auto& couplerOutClient : couplerOutClient_) couplersInFinalized &= isCouplerInContextFinalized(couplerOutClient.second) ;  
    826           globalEventLoop() ; 
     864          if (CThreadManager::isUsingThreads()) yield() ; 
     865          else globalEventLoop() ; 
    827866        } while (!couplersInFinalized) ; 
    828867 
     
    900939          info(100)<<"DEBUG: context "<<getId()<<" release client reader ok"<<endl ; 
    901940        } 
     941        closeAllFile() ; 
    902942      } 
    903943      else if (serviceType_==CServicesManager::GATHERER) 
    904944      { 
    905          for(auto& client : writerClientOut_) 
    906          { 
    907            client->finalize(); 
    908            bool bufferReleased; 
    909            do 
    910            { 
    911              client->eventLoop(); 
    912              bufferReleased = !client->havePendingRequests(); 
    913            } while (!bufferReleased); 
     945        CContextClient* client ; 
     946        CContextServer* server ; 
     947         
     948        for(int n=0; n<writerClientOut_.size() ; n++) 
     949        { 
     950          client=writerClientOut_[n] ; 
     951          server=writerServerOut_[n] ; 
     952         
     953          client->finalize(); 
     954          bool bufferReleased; 
     955          do 
     956          { 
     957            client->eventLoop(); 
     958            bufferReleased = !client->havePendingRequests(); 
     959          } while (!bufferReleased); 
    914960            
    915            bool notifiedFinalized=false ; 
    916            do 
    917            { 
    918              notifiedFinalized=client->isNotifiedFinalized() ; 
    919            } while (!notifiedFinalized) ; 
    920            client->releaseBuffers(); 
     961          bool notifiedFinalized=false ; 
     962          do 
     963          { 
     964            notifiedFinalized=client->isNotifiedFinalized() ; 
     965          } while (!notifiedFinalized) ; 
     966          server->releaseBuffers(); 
     967          client->releaseBuffers(); 
    921968         } 
    922969         closeAllFile(); 
     970         writerClientIn_[0]->releaseBuffers(); 
     971         writerServerIn_[0]->releaseBuffers();          
    923972         //ym writerClientIn & writerServerIn not released here ==> to check !! 
    924973      } 
     
    9571006   void CContext::setDefaultServices(void) 
    9581007   { 
    959      defaultPoolWriterId_ = CXios::defaultPoolId ; 
    960      defaultPoolReaderId_ = CXios::defaultPoolId ; 
    961      defaultPoolGathererId_ = CXios::defaultPoolId ; 
    962      defaultWriterId_ = CXios::defaultWriterId ; 
    963      defaultReaderId_ = CXios::defaultReaderId ; 
    964      defaultGathererId_ = CXios::defaultGathererId ; 
    965      defaultUsingServer2_ = CXios::usingServer2 ; 
     1008     if (!CXios::isUsingServer()) 
     1009     { 
     1010       defaultPoolWriterId_ = CXios::defaultPoolId ; 
     1011       defaultPoolReaderId_ = CXios::defaultPoolId ; 
     1012       defaultPoolGathererId_ = CXios::defaultPoolId ; 
     1013       defaultWriterId_ = "attached" ; 
     1014       defaultReaderId_ = "attached" ; 
     1015       defaultGathererId_ =  "attached" ; 
     1016       defaultUsingServer2_ = false; 
     1017     } 
     1018     else 
     1019     { 
     1020       defaultPoolWriterId_ = CXios::defaultPoolId ; 
     1021       defaultPoolReaderId_ = CXios::defaultPoolId ; 
     1022       defaultPoolGathererId_ = CXios::defaultPoolId ; 
     1023       defaultWriterId_ = CXios::defaultWriterId ; 
     1024       defaultReaderId_ = CXios::defaultReaderId ; 
     1025       defaultGathererId_ = CXios::defaultGathererId ; 
     1026       defaultUsingServer2_ = CXios::usingServer2 ; 
    9661027      
    967      if (!default_pool.isEmpty())  defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 
    968      if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 
    969      if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 
    970      if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 
    971      if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 
    972      if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 
    973      if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 
    974      if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 
     1028       if (!default_pool.isEmpty())  defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 
     1029       if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 
     1030       if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 
     1031       if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 
     1032       if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 
     1033       if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 
     1034       if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 
     1035       if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 
     1036     } 
    9751037   } 
    9761038 
  • XIOS3/trunk/src/node/context.hpp

    r2509 r2547  
    112112         bool scheduledEventLoop(bool enableEventsProcessing=true) ;  
    113113         void globalEventLoop(void); 
     114         void yield(void) ; 
     115         void synchronize(void) ; 
    114116 
    115117         // Finalize a context 
     
    368370        int getIntraCommRank(void) {return intraCommRank_;} 
    369371        int getIntraCommSize(void) {return intraCommSize_;} 
     372       
     373      public: 
     374        shared_ptr<CEventScheduler> getEventScheduler(void) {return eventScheduler_ ;} 
    370375      private: 
    371376         shared_ptr<CEventScheduler> eventScheduler_ ; //! The local event scheduler for context 
  • XIOS3/trunk/src/node/pool_node.cpp

    r2458 r2547  
    11#include "pool_node.hpp" 
    22#include "cxios.hpp" 
     3#include "thread_manager.hpp" 
    34#include<cmath> 
    45 
     
    6162    else ERROR("void CPoolNode::allocateRessources(void)",<<"Pool has no name or id, attributes <id> or <name> must be specified") 
    6263    ressourcesManager->createPool(poolId, nbRessources) ; 
    63     ressourcesManager->waitPoolRegistration(poolId) ; 
     64    if (CThreadManager::isUsingThreads())  
     65      while(!ressourcesManager->hasPool(CXios::defaultPoolId))  
     66      { 
     67        CXios::getDaemonsManager()->eventLoop() ; 
     68        CThreadManager::yield() ; 
     69      } 
     70    else ressourcesManager->waitPoolRegistration(poolId) ; 
    6471    auto services=this->getAllServiceNodes() ; 
    6572    for(auto& service : services) service->allocateRessources(poolId) ; 
  • XIOS3/trunk/src/node/service_node.cpp

    r2458 r2547  
    6060    else if (!hasAutoGeneratedId() ) serviceId=getId() ; 
    6161    else ERROR("void CServiceNode::allocateRessources(const string& poolId)",<<"Service has no name or id, attributes <id> or <name> must be specified") 
     62     
    6263    servicesManager->createServices(poolId, serviceId, serviceType, nbRessources, nb_partitions, true) ; 
     64    if (CThreadManager::isUsingThreads()) 
     65      for(int i=0; i<nb_partitions; i++) 
     66        while(!servicesManager->hasService(poolId, serviceId, i))  
     67        { 
     68          CXios::getDaemonsManager()->eventLoop() ; 
     69          CThreadManager::yield() ; 
     70        } 
     71    else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 
     72 
    6373  } 
    6474 
  • XIOS3/trunk/src/server.cpp

    r2535 r2547  
    2525#include "workflow_graph.hpp" 
    2626#include "release_static_allocation.hpp" 
     27#include "thread_manager.hpp" 
    2728#include <sys/stat.h> 
    2829#include <unistd.h> 
     
    6869      if (!CXios::usingOasis) 
    6970      { 
    70         if (!is_MPI_Initialized) MPI_Init(NULL, NULL); 
     71        if (!is_MPI_Initialized)  
     72        { 
     73          int required = MPI_THREAD_SERIALIZED ; 
     74          int provided ; 
     75          MPI_Init_thread(NULL,NULL, required, &provided) ; 
     76        } 
    7177        
    7278        // split the global communicator 
     
    99105      else // using OASIS 
    100106      { 
    101         if (!is_MPI_Initialized) driver_ = new CThirdPartyDriver(); 
     107         
     108        if (!is_MPI_Initialized)  
     109        { 
     110          int required = MPI_THREAD_SERIALIZED ; 
     111          int provided ; 
     112          MPI_Init_thread(NULL,NULL, required, &provided) ; 
     113        } 
     114 
     115        driver_ = new CThirdPartyDriver(); 
    102116 
    103117        driver_->getComponentCommunicator( serverComm ); 
     
    200214          { 
    201215            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
    202             ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 
     216            if (CThreadManager::isUsingThreads())  
     217              while(!ressourcesManager->hasPool(CXios::defaultPoolId))  
     218              { 
     219                daemonsManager->eventLoop() ; 
     220                CThreadManager::yield() ; 
     221              } 
     222            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 
     223           
    203224            servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ; 
    204             servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 
     225            if (CThreadManager::isUsingThreads())  
     226              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,0))  
     227              { 
     228                daemonsManager->eventLoop() ; 
     229                CThreadManager::yield() ; 
     230              } 
     231            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 
     232             
    205233            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ; 
    206             servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 
     234            if (CThreadManager::isUsingThreads())  
     235            { 
     236              daemonsManager->eventLoop() ; 
     237              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) CThreadManager::yield() ; 
     238            } 
     239            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 
    207240          } 
    208241          else 
     
    214247            if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 
    215248            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
    216             ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 
     249            if (CThreadManager::isUsingThreads())  
     250              while(!ressourcesManager->hasPool(CXios::defaultPoolId))  
     251              { 
     252                daemonsManager->eventLoop() ; 
     253                CThreadManager::yield() ; 
     254              } 
     255            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 
     256 
    217257            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 
    218             servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ; 
     258            if (CThreadManager::isUsingThreads())  
     259              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultGathererId,0))  
     260              { 
     261                daemonsManager->eventLoop() ; 
     262                CThreadManager::yield() ; 
     263              } 
     264            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ; 
     265 
    219266            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ; 
    220             servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 
     267            if (CThreadManager::isUsingThreads())  
     268              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0))  
     269              { 
     270                daemonsManager->eventLoop() ; 
     271                CThreadManager::yield() ; 
     272              } 
     273            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 
     274             
    221275            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ; 
    222             servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 
     276            if (CThreadManager::isUsingThreads()) 
     277              for(int i=0; i<nbPoolsServer2; i++) 
     278                while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,i))  
     279                { 
     280                  daemonsManager->eventLoop() ; 
     281                  CThreadManager::yield() ; 
     282                } 
     283            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 
    223284          } 
    224285        } 
     
    233294      { 
    234295        daemonsManager->eventLoop() ; 
     296        if (CThreadManager::isUsingThreads()) CThreadManager::yield(); 
    235297        MPI_Test(&req,&ok,&status) ; 
    236298      } 
    237299 
    238300 
    239       testingEventScheduler() ; 
     301      //testingEventScheduler() ; 
    240302/* 
    241303      MPI_Request req ; 
     
    262324      { 
    263325        finished=daemonsManager->eventLoop() ; 
     326        if (CThreadManager::isUsingThreads()) CThreadManager::yield() ; 
    264327      } 
    265328      CTimer::get("XIOS event loop").suspend() ; 
  • XIOS3/trunk/src/transport/context_client.cpp

    r2507 r2547  
    4040 
    4141      int flag; 
    42       MPI_Comm_test_inter(interComm, &flag); 
    43       if (flag) isAttached_=false ; 
    44       else  isAttached_=true ; 
    45  
    46       if (flag) MPI_Comm_remote_size(interComm, &serverSize); 
    47       else  MPI_Comm_size(interComm, &serverSize); 
    48  
     42       
     43      MPI_Comm_remote_size(interComm, &serverSize); 
     44       
    4945      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 
    5046 
  • XIOS3/trunk/src/transport/context_client.hpp

    r2507 r2547  
    4040      const std::list<int>& getRanksServerLeader(void) const; 
    4141      const std::list<int>& getRanksServerNotLeader(void) const; 
    42       bool isAttachedModeEnabled() const { return isAttached_ ; }  
    4342      static void computeLeader(int clientRank, int clientSize, int serverSize, 
    4443                                std::list<int>& rankRecvLeader, 
     
    9392      size_t hashId_ ; //!< hash id on the context client that will be used for context server to identify the remote calling context client. 
    9493 
    95       bool isAttached_ ; 
    9694      CContextServer* associatedServer_ ; //!< The server associated to the pair client/server 
    9795  }; 
  • XIOS3/trunk/src/transport/context_server.cpp

    r2343 r2547  
    4444 
    4545    interComm=interComm_; 
    46     int flag; 
    47     MPI_Comm_test_inter(interComm,&flag); 
    48  
    49     if (flag) attachedMode=false ; 
    50     else  attachedMode=true ; 
     46    MPI_Comm_remote_size(interComm,&clientSize_); 
    5147     
    52     if (flag) MPI_Comm_remote_size(interComm,&clientSize_); 
    53     else  MPI_Comm_size(interComm,&clientSize_); 
    54  
    5548    SRegisterContextInfo contextInfo ; 
    5649    CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ; 
     
    8982  } 
    9083 
    91 //! Attached mode is used ? 
    92 //! \return true if attached mode is used, false otherwise 
    93   bool CContextServer::isAttachedModeEnabled() const 
    94   { 
    95     return attachedMode ; 
    96   } 
    97  
    9884} 
  • XIOS3/trunk/src/transport/context_server.hpp

    r2404 r2547  
    2323    CContextServer(CContext* parent,MPI_Comm intraComm,MPI_Comm interComm) ; 
    2424    ~CContextServer() {}  
    25     bool isAttachedModeEnabled() const ; 
    2625    void setAssociatedClient(CContextClient* associatedClient) {associatedClient_=associatedClient ;} 
    2726    CContextClient* getAssociatedClient(void) { return associatedClient_ ;} 
     
    4140      int commSize ; 
    4241      int clientSize_ ; 
    43  
    44       bool attachedMode ;  //! true if attached mode is enabled otherwise false 
    4542 
    4643      CContext* context ; 
  • XIOS3/trunk/src/transport/legacy_context_client.cpp

    r2528 r2547  
    2424    \param [in] intraComm_ communicator of group client 
    2525    \param [in] interComm_ communicator of group server 
    26     \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode). 
     26    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode --> obsolete). 
    2727    */ 
    2828    CLegacyContextClient::CLegacyContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) 
     
    3131    { 
    3232      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
    33       if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
    34  
    35       if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 
    36       else interCommMerged_ = interComm_; // interComm_ is yet an intracommunicator in attached 
    37        
     33      MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 
    3834      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows 
    39  
     35      eventScheduler_ = parent->getEventScheduler() ;   
    4036      timeLine = 1; 
    4137    } 
     
    4440 
    4541    /*! 
    46     In case of attached mode, the current context must be reset to context for client 
    4742    \param [in] event Event sent to server 
    4843    */ 
     
    105100      } 
    106101       
    107       if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
    108       { 
    109         while (checkBuffers(ranks)) callGlobalEventLoop() ; 
    110        
    111         CXios::getDaemonsManager()->scheduleContext(hashId_) ; 
    112         while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ; 
    113       } 
    114        
    115       MPI_Request req ; 
    116       MPI_Status status ; 
    117       MPI_Ibarrier(intraComm,&req) ; 
    118       int flag ; 
    119       MPI_Test(&req,&flag,&status) ; 
    120       while(!flag)  
    121       { 
    122         callGlobalEventLoop() ; 
    123         MPI_Test(&req,&flag,&status) ; 
    124       } 
    125  
    126  
     102      synchronize() ; 
    127103      timeLine++; 
    128104    } 
     
    148124      list<CClientBuffer*>::iterator itBuffer; 
    149125      bool areBuffersFree; 
    150       
     126/*      
    151127      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) 
    152128      { 
     
    156132          CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ; 
    157133          size_t token = tokenManager->getToken() ; 
    158           while (!tokenManager->lockToken(token)) callGlobalEventLoop() ; 
     134          while (!tokenManager->checkToken(token)) callGlobalEventLoop() ; 
    159135          newBuffer(*itServer); 
    160136          it = buffers.find(*itServer); 
    161137          checkAttachWindows(it->second,it->first) ; 
    162           tokenManager->unlockToken(token) ; 
     138          tokenManager->updateToken(token) ; 
    163139        } 
    164140        bufferList.push_back(it->second); 
    165141      } 
     142*/ 
     143      map<int,MPI_Request> attachList ; 
     144      
     145      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) 
     146      { 
     147        it = buffers.find(*itServer); 
     148        if (it == buffers.end()) 
     149        { 
     150          newBuffer(*itServer); 
     151          it = buffers.find(*itServer); 
     152          checkAttachWindows(it->second, it->first, attachList) ; 
     153        } 
     154        bufferList.push_back(it->second); 
     155      } 
     156       
     157      while(!attachList.empty()) 
     158      { 
     159        auto it = attachList.begin() ; 
     160        while(it!=attachList.end()) 
     161        { 
     162          if (checkAttachWindows(buffers[it->first], it->first, attachList)) it=attachList.erase(it) ; 
     163          else ++it ; 
     164        } 
     165 
     166        yield() ; 
     167      } 
     168 
    166169 
    167170      double lastTimeBuffersNotFree=0. ; 
     
    193196          checkBuffers(); 
    194197 
    195           callGlobalEventLoop() ; 
     198          yield() ; 
    196199        } 
    197200 
     
    203206   } 
    204207 
     208 
     209   bool CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank, map<int, MPI_Request>& attachList) 
     210   { 
     211      int dummy; 
     212      bool ret=true;  
     213 
     214      if (!buffer->isAttachedWindows()) 
     215      { 
     216           // create windows dynamically for one-sided 
     217          /* 
     218          CTimer::get("create Windows").resume() ; 
     219          MPI_Comm interComm ; 
     220          int tag = 0 ; 
     221          MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, tag, &interComm) ; 
     222          MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 
     223          MPI_Comm_free(&interComm) ; 
     224                 
     225          buffer->attachWindows(winComm_[rank]) ; 
     226          CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 
     227          MPI_Barrier(winComm_[rank]) ; 
     228        */ 
     229        if (attachList.count(rank)==0)  
     230        { 
     231          MPI_Irecv(&dummy,0,MPI_INT,clientSize+rank, 21, interCommMerged_, &attachList[rank]) ; 
     232          ret = false ; 
     233        } 
     234        else 
     235        { 
     236          MPI_Status status ; 
     237          int flag ; 
     238          MPI_Test(&attachList[rank],&flag, &status) ; 
     239          if (flag) 
     240          { 
     241            CTimer::get("create Windows").resume() ; 
     242            MPI_Comm interComm ; 
     243            int tag = 0 ; 
     244            MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, tag, &interComm) ; 
     245            MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 
     246            MPI_Comm_free(&interComm) ; 
     247               
     248            buffer->attachWindows(winComm_[rank]) ; 
     249            CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 
     250            MPI_Barrier(winComm_[rank]) ; 
     251            ret = true ; 
     252          } 
     253          else ret=false ; 
     254        } 
     255      } 
     256      return ret ; 
     257    } 
     258 
     259 
    205260   void CLegacyContextClient::eventLoop(void) 
    206261   { 
     
    211266   { 
    212267     locked_=true ; 
    213      context_->globalEventLoop() ; 
     268     context_->yield() ; 
    214269     locked_=false ; 
     270   } 
     271 
     272   void CLegacyContextClient::yield(void) 
     273   { 
     274     locked_=true ; 
     275     context_->yield() ; 
     276     locked_=false ; 
     277   } 
     278 
     279   void CLegacyContextClient::synchronize(void) 
     280   { 
     281     if (context_->getServiceType()!=CServicesManager::CLIENT) 
     282     { 
     283       locked_=true ; 
     284       context_->synchronize() ; 
     285       locked_=false ; 
     286     }     
    215287   } 
    216288   /*! 
     
    226298        maxEventSizes[rank] = CXios::minBufferSize; 
    227299      } 
    228        
    229       int considerServers = 1; 
    230       if (isAttachedModeEnabled()) considerServers = 0; 
    231       CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interCommMerged_, considerServers*clientSize+rank, mapBufferSize_[rank], maxEventSizes[rank]); 
     300      bool hasWindows = true ; 
     301      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interCommMerged_, clientSize+rank, mapBufferSize_[rank], hasWindows); 
    232302      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ; 
    233303      else buffer->fixBuffer() ; 
    234304      // Notify the server 
     305      
    235306      CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint)); 
    236307      MPI_Aint sendBuff[4] ; 
    237308      sendBuff[0]=hashId_; 
    238309      sendBuff[1]=mapBufferSize_[rank]; 
    239       sendBuff[2]=buffers[rank]->getWinAddress(0);  
    240       sendBuff[3]=buffers[rank]->getWinAddress(1);  
    241       info(100)<<"CLegacyContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl; 
    242       bufOut->put(sendBuff, 4);  
     310      sendBuff[2]=buffers[rank]->getWinBufferAddress(0);  
     311      sendBuff[3]=buffers[rank]->getWinBufferAddress(1);  
     312      info(100)<<"CLegacyContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinBufferAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinBufferAddress(1)<<endl; 
     313      bufOut->put(sendBuff,4);  
    243314      buffer->checkBuffer(true); 
    244 /* 
    245        // create windows dynamically for one-sided 
    246       if (!isAttachedModeEnabled()) 
    247       {  
    248         CTimer::get("create Windows").resume() ; 
    249         MPI_Comm interComm ; 
    250         MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ; 
    251         MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 
    252         CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 
    253         MPI_Comm_free(&interComm) ; 
    254         windows_[rank].resize(2) ; 
    255          
    256         MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
    257         CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 
    258          
    259         MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);    
    260         CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 
    261  
    262         CTimer::get("create Windows").suspend() ; 
    263       } 
    264       else 
    265       { 
    266         winComm_[rank] = MPI_COMM_NULL ; 
    267         windows_[rank].resize(2) ; 
    268         windows_[rank][0] = MPI_WIN_NULL ; 
    269         windows_[rank][1] = MPI_WIN_NULL ; 
    270       } 
    271       buffer->attachWindows(windows_[rank]) ; 
    272       if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ; 
    273   */      
    274    } 
    275  
    276    void CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank) 
    277    { 
    278       if (!buffer->isAttachedWindows()) 
    279       { 
    280            // create windows dynamically for one-sided 
    281         if (!isAttachedModeEnabled()) 
    282         {  
    283           CTimer::get("create Windows").resume() ; 
    284           MPI_Comm interComm ; 
    285           MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ; 
    286           MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 
    287           CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 
    288           MPI_Comm_free(&interComm) ; 
    289           windows_[rank].resize(2) ; 
    290        
    291           MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
    292           CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 
    293        
    294           MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);    
    295           CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 
    296  
    297           CTimer::get("create Windows").suspend() ; 
    298           buffer->attachWindows(windows_[rank]) ; 
    299           MPI_Barrier(winComm_[rank]) ; 
    300         } 
    301         else 
    302         { 
    303           winComm_[rank] = MPI_COMM_NULL ; 
    304           windows_[rank].resize(2) ; 
    305           windows_[rank][0] = MPI_WIN_NULL ; 
    306           windows_[rank][1] = MPI_WIN_NULL ; 
    307           buffer->attachWindows(windows_[rank]) ; 
    308         } 
    309  
    310       } 
    311     } 
    312  
    313  
     315 
     316   } 
     317 
     318   
    314319   
    315320   /*! 
     
    336341      buffers.clear(); 
    337342 
    338 // don't know when release windows 
    339  
    340       //if (!isAttachedModeEnabled()) 
    341       //{   
    342       //  for(auto& it : winComm_) 
    343       //  { 
    344       //    int rank = it.first ; 
    345       //    MPI_Win_free(&windows_[rank][0]); 
    346       //    MPI_Win_free(&windows_[rank][1]); 
    347       //    MPI_Comm_free(&winComm_[rank]) ; 
    348       //  } 
    349       //}  
     343      for(auto& it : winComm_) 
     344      { 
     345        int rank = it.first ; 
     346      } 
    350347   } 
    351348 
     
    469466  bool CLegacyContextClient::isNotifiedFinalized(void) 
    470467  { 
    471     if (isAttachedModeEnabled()) return true ; 
    472  
    473468    bool finalized = true; 
    474469    map<int,CClientBuffer*>::iterator itBuff; 
  • XIOS3/trunk/src/transport/legacy_context_client.hpp

    r2507 r2547  
    1111#include "registry.hpp" 
    1212#include "context_client.hpp" 
     13#include "window_dynamic.hpp" 
    1314 
    1415namespace xios 
     
    4849      void getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers); 
    4950      void newBuffer(int rank); 
    50       void checkAttachWindows(CClientBuffer* buffer , int rank) ; 
     51      bool checkAttachWindows(CClientBuffer* buffer , int rank, map<int,MPI_Request>& attachList) ; 
    5152      bool checkBuffers(list<int>& ranks); 
    5253      bool checkBuffers(void); 
    5354      void callGlobalEventLoop() ; 
     55      void yield(void) ; 
     56      void synchronize(void) ; 
    5457      bool havePendingRequests(list<int>& ranks) ; 
    5558      void setGrowableBuffer(void) { isGrowableBuffer_=true;} 
     
    7679 
    7780      std::map<int, MPI_Comm> winComm_ ; //! Window communicators 
    78       std::map<int, std::vector<MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2] 
     81      std::map<int, std::vector<CWindowDynamic*> >windows_ ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2] 
    7982      bool isGrowableBuffer_ = true ; 
    8083 
     
    8285 
    8386      bool locked_ = false ; //!< The context client is locked to avoid recursive checkBuffer 
     87      shared_ptr<CEventScheduler> eventScheduler_ ; 
    8488  }; 
    8589} 
  • XIOS3/trunk/src/transport/legacy_context_server.cpp

    r2528 r2547  
    4343    finished=false; 
    4444 
    45     if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 
    46     else interCommMerged_ = interComm_; // interComm_ is yet an intracommunicator in attached 
     45    MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 
    4746    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows 
    4847     
     
    5049 
    5150    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
    52     if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
    53        
    5451  } 
    5552  
     
    115112      remoteHashId_ = recvBuff[0] ; 
    116113      StdSize buffSize = recvBuff[1]; 
    117       vector<MPI_Aint> winAdress(2) ; 
    118       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 
     114      vector<MPI_Aint> winBufferAddress(2) ; 
     115      winBufferAddress[0]=recvBuff[2] ; winBufferAddress[1]=recvBuff[3] ; 
    119116      mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    120117 
    121118      // create windows dynamically for one-sided 
    122       if (!isAttachedModeEnabled()) 
    123       {  
    124         CTimer::get("create Windows").resume() ; 
    125         MPI_Comm interComm ; 
    126         MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ; 
    127         MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 
    128         CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 
    129         MPI_Comm_free(&interComm) ; 
    130         windows_[rank].resize(2) ; 
    131         MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
    132         CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 
    133         MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 
    134         CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 
    135         CTimer::get("create Windows").suspend() ; 
    136         MPI_Barrier(winComm_[rank]) ; 
    137       } 
    138       else 
    139       { 
    140         winComm_[rank] = MPI_COMM_NULL ; 
    141         windows_[rank].resize(2) ; 
    142         windows_[rank][0] = MPI_WIN_NULL ; 
    143         windows_[rank][1] = MPI_WIN_NULL ; 
    144       }    
    145  
    146       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first; 
     119      int dummy ; 
     120      MPI_Send(&dummy, 0, MPI_INT, rank, 21,interCommMerged_) ; 
     121      CTimer::get("create Windows").resume() ; 
     122      MPI_Comm interComm ; 
     123      int tag = 0 ; 
     124      MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, tag , &interComm) ; 
     125      MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 
     126      MPI_Comm_free(&interComm) ; 
     127      windows_[rank].resize(2) ; 
     128      //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
     129      //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 
     130      //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 
     131      //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 
     132      windows_[rank][0] = new CWindowDynamic() ; 
     133      windows_[rank][1] = new CWindowDynamic() ; 
     134      windows_[rank][0] -> create(winComm_[rank]) ; 
     135      windows_[rank][1] -> create(winComm_[rank]) ; 
     136      windows_[rank][0] -> setWinBufferAddress(winBufferAddress[0],0) ; 
     137      windows_[rank][1] -> setWinBufferAddress(winBufferAddress[1],0) ; 
     138      CTimer::get("create Windows").suspend() ; 
     139      CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 
     140      MPI_Barrier(winComm_[rank]) ; 
     141 
     142      it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winBufferAddress, 0, buffSize)))).first; 
    147143      lastTimeLine[rank]=0 ; 
    148144      itLastTimeLine=lastTimeLine.begin() ; 
     
    230226  { 
    231227    CTimer::get("CLegacyContextServer::getBufferFromClient").resume() ; 
    232     if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 
    233     {   
    234       int rank ; 
    235       char *buffer ; 
    236       size_t count ;  
    237  
    238       if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 
    239       for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 
    240       { 
    241         rank=itLastTimeLine->first ; 
    242         if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 
    243         { 
    244           if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 
    245           if (count >= 0) ++itLastTimeLine ; 
    246           break ; 
    247         } 
     228 
     229    int rank ; 
     230    char *buffer ; 
     231    size_t count ;  
     232 
     233    if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 
     234    for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 
     235    { 
     236      rank=itLastTimeLine->first ; 
     237      if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 
     238      { 
     239        if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 
     240        if (count >= 0) ++itLastTimeLine ; 
     241        break ; 
    248242      } 
    249243    } 
     
    280274      { 
    281275        size_t newSize ; 
    282         vector<MPI_Aint> winAdress(2) ; 
    283         newBuffer>>newSize>>winAdress[0]>>winAdress[1] ; 
     276        vector<MPI_Aint> winBufferAdress(2) ; 
     277        newBuffer>>newSize>>winBufferAdress[0]>>winBufferAdress[1] ; 
    284278        buffers[rank]->freeBuffer(count) ; 
    285279        delete buffers[rank] ; 
    286         buffers[rank] = new CServerBuffer(windows_[rank], winAdress, 0, newSize) ; 
     280        windows_[rank][0] -> setWinBufferAddress(winBufferAdress[0],0) ; 
     281        windows_[rank][1] -> setWinBufferAddress(winBufferAdress[1],0) ; 
     282        buffers[rank] = new CServerBuffer(windows_[rank], winBufferAdress, 0, newSize) ; 
    287283        info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank 
    288                  <<"  newSize : "<<newSize<<" Address : "<<winAdress[0]<<" & "<<winAdress[1]<<endl ; 
     284                 <<"  newSize : "<<newSize<<" Address : "<<winBufferAdress[0]<<" & "<<winBufferAdress[1]<<endl ; 
    289285      } 
    290286      else 
     
    309305    CEventServer* event; 
    310306     
    311 //    if (context->isProcessingEvent()) return ; 
    312307    if (isProcessingEvent_) return ; 
    313     if (isAttachedModeEnabled()) 
    314       if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ; 
    315308 
    316309    it=events.find(currentTimeLine); 
     
    321314      if (event->isFull()) 
    322315      { 
    323         if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side 
     316        if (!scheduled) 
    324317        { 
    325318          eventScheduler_->registerEvent(currentTimeLine,hashId); 
     
    327320          scheduled=true; 
    328321        } 
    329         else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) ) 
     322        else if (eventScheduler_->queryEvent(currentTimeLine,hashId) ) 
    330323        { 
    331324          if (!enableEventsProcessing && isCollectiveEvent(*event)) return ; 
     
    346339          } 
    347340           
    348           if (CXios::checkEventSync) 
     341          if (CXios::checkEventSync && context->getServiceType()!=CServicesManager::CLIENT) 
    349342          { 
    350343            int typeId, classId, typeId_in, classId_in; 
     
    364357          } 
    365358 
    366           if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ; 
    367           //MPI_Barrier(intraComm) ; 
    368          // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes 
    369          // The best way to properly solve this problem will be to use the event scheduler also in attached mode 
    370          // for now just set up a MPI barrier 
    371 //ym to be check later 
    372 //         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ; 
    373  
    374 //         context->setProcessingEvent() ; 
    375          isProcessingEvent_=true ; 
    376          CTimer::get("Process events").resume(); 
    377          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ; 
    378          dispatchEvent(*event); 
    379          CTimer::get("Process events").suspend(); 
    380          isProcessingEvent_=false ; 
    381 //         context->unsetProcessingEvent() ; 
    382          pendingEvent=false; 
    383          delete event; 
    384          events.erase(it); 
    385          currentTimeLine++; 
    386          scheduled = false; 
    387          if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ; 
     359          isProcessingEvent_=true ; 
     360          CTimer::get("Process events").resume(); 
     361          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ; 
     362          eventScheduler_->popEvent() ; 
     363          dispatchEvent(*event); 
     364          CTimer::get("Process events").suspend(); 
     365          isProcessingEvent_=false ; 
     366          pendingEvent=false; 
     367          delete event; 
     368          events.erase(it); 
     369          currentTimeLine++; 
     370          scheduled = false; 
    388371        } 
    389372      } 
     
    409392  void CLegacyContextServer::freeWindows() 
    410393  { 
    411     //if (!isAttachedModeEnabled()) 
    412     //{ 
    413     //  for(auto& it : winComm_) 
    414     //  { 
    415     //    int rank = it.first ; 
    416     //    MPI_Win_free(&windows_[rank][0]); 
    417     //    MPI_Win_free(&windows_[rank][1]); 
    418     //    MPI_Comm_free(&winComm_[rank]) ; 
    419     //  } 
    420     //} 
     394    for(auto& it : winComm_) 
     395    { 
     396      int rank = it.first ; 
     397      delete windows_[rank][0]; 
     398      delete windows_[rank][1]; 
     399    } 
    421400  } 
    422401 
  • XIOS3/trunk/src/transport/legacy_context_server.hpp

    r2343 r2547  
    6161      std::map<int, StdSize> mapBufferSize_; 
    6262      std::map<int,MPI_Comm> winComm_ ; //! Window communicators 
    63       std::map<int,std::vector<MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side. 
     63      std::map<int,std::vector<CWindowDynamic*> >windows_ ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side. 
    6464      bool isProcessingEvent_ ; 
    6565      size_t remoteHashId_; //!< the hash is of the calling context client 
  • XIOS3/trunk/src/transport/one_sided_client_buffer.hpp

    r2526 r2547  
    182182      int serverRank_ ; 
    183183 
    184       MPI_Comm interCommMerged_;  
    185       int intraServerRank_ ; 
     184      MPI_Comm interCommMerged_;   
     185      int intraServerRank_ ;  
    186186 
    187187      std::list<CBuffer*> buffers_ ; 
  • XIOS3/trunk/src/transport/one_sided_context_client.cpp

    r2399 r2547  
    2222    \param [in] intraComm_ communicator of group client 
    2323    \param [in] interComm_ communicator of group server 
    24     \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode). 
     24    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode --> obsolete ). 
    2525    */ 
    2626    COneSidedContextClient::COneSidedContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) 
     
    3030       
    3131      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
    32       if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
    33  
    34       if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 
     32 
     33      MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 
    3534       
    3635      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows 
    37  
     36      eventScheduler_ = parent->getEventScheduler() ;   
    3837      timeLine = 1; 
    3938    } 
     
    109108        { 
    110109          if (CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").resume() ; 
    111           callGlobalEventLoop() ; 
     110          yield() ; 
    112111        }  
    113112      } 
    114113      if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").suspend() ; 
    115114 
    116       if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
    117       { 
    118         while (checkBuffers(ranks)) callGlobalEventLoop() ; 
    119        
    120         CXios::getDaemonsManager()->scheduleContext(hashId_) ; 
    121         while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ; 
    122       } 
     115 
     116      synchronize() ; 
    123117       
    124118      timeLine++; 
     
    137131     locked_=false ; 
    138132   } 
     133 
     134   void COneSidedContextClient::yield(void) 
     135   { 
     136     locked_=true ; 
     137     context_->yield() ; 
     138     locked_=false ; 
     139   } 
     140 
     141   void COneSidedContextClient::synchronize(void) 
     142   { 
     143     if (context_->getServiceType()!=CServicesManager::CLIENT) 
     144     { 
     145       locked_=true ; 
     146       context_->synchronize() ; 
     147       locked_=false ; 
     148     }     
     149   } 
     150 
    139151   /*! 
    140152   Make a new buffer for a certain connection to server with specific rank 
     
    271283  bool COneSidedContextClient::isNotifiedFinalized(void) 
    272284  { 
    273     if (isAttachedModeEnabled()) return true ; 
    274285 
    275286    bool finalized = true; 
  • XIOS3/trunk/src/transport/one_sided_context_client.hpp

    r2507 r2547  
    4848      void eventLoop(void) ; 
    4949      void callGlobalEventLoop() ; 
     50      void yield() ; 
     51      void synchronize() ; 
    5052      bool havePendingRequests(list<int>& ranks) ; 
    5153 
     
    8183 
    8284      bool locked_ = false ; //!< The context client is locked to avoid recursive checkBuffer 
     85      shared_ptr<CEventScheduler> eventScheduler_ ; 
    8386  }; 
    8487} 
  • XIOS3/trunk/src/transport/one_sided_context_server.cpp

    r2526 r2547  
    4242    finished=false; 
    4343 
    44     if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 
     44    MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 
    4545    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows 
    4646     
     
    4848 
    4949    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
    50     if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
    5150       
    5251  } 
     
    165164   
    166165    if (isProcessingEvent_) return ; 
    167     if (isAttachedModeEnabled()) 
    168       if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ; 
    169166 
    170167    auto it=completedEvents_.find(currentTimeLine); 
     
    174171      if (it->second.nbSenders == it->second.currentNbSenders) 
    175172      { 
    176         if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side 
     173        if (!scheduled)  
    177174        { 
    178175          eventScheduler_->registerEvent(currentTimeLine,hashId); 
    179176          scheduled=true; 
    180177        } 
    181         else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) ) 
     178        else if (eventScheduler_->queryEvent(currentTimeLine,hashId) ) 
    182179        { 
    183180          //if (!enableEventsProcessing && isCollectiveEvent(event)) return ; 
     
    198195          } 
    199196 
    200           if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ; 
     197          eventScheduler_->popEvent() ; 
    201198 
    202199          isProcessingEvent_=true ; 
     
    214211          currentTimeLine++; 
    215212          scheduled = false; 
    216           if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ; 
    217213        } 
    218214      } 
     
    235231  void COneSidedContextServer::freeWindows() 
    236232  { 
    237     //if (!isAttachedModeEnabled()) 
    238     //{ 
    239233    //  for(auto& it : winComm_) 
    240234    //  { 
     
    244238    //    MPI_Comm_free(&winComm_[rank]) ; 
    245239    //  } 
    246     //} 
    247240  } 
    248241 
Note: See TracChangeset for help on using the changeset viewer.