Ignore:
Timestamp:
08/29/23 17:24:04 (10 months ago)
Author:
ymipsl
Message:

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/buffer_client.cpp

    r2458 r2547  
    1414  size_t CClientBuffer::maxRequestSize = 0; 
    1515 
    16   CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 
     16  CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, bool hasWindows) 
    1717    : interComm(interComm) 
    1818    , clientRank_(0) 
    1919    , serverRank(serverRank) 
    2020    , bufferSize(bufferSize) 
    21     , estimatedMaxEventSize(estimatedMaxEventSize) 
    2221    , maxEventSize(0) 
    2322    , current(0) 
    2423    , count(0) 
    2524    , pending(false) 
    26     , hasWindows(false)  
    27   { 
    28      /* 
    29       if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 
    30       else hasWindows=true ; 
    31      */ 
    32  
    33       MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
    34       MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 
     25    , hasWindows_(hasWindows)  
     26  { 
     27      if (hasWindows_) 
     28      {  
     29        windows_.resize(2) ; 
     30        windows_[0] = new CWindowDynamic() ; 
     31        windows_[0]->allocateBuffer(bufferSize+headerSize_) ; 
     32        bufferHeader[0] = (char*) windows_[0]->getBufferAddress() ; 
     33        windows_[1] = new CWindowDynamic() ; 
     34        windows_[1]->allocateBuffer(bufferSize+headerSize_) ; 
     35        bufferHeader[1] = (char*) windows_[1]->getBufferAddress() ; 
     36      } 
     37      else 
     38      { 
     39        MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
     40        MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 
     41      } 
     42 
    3543      buffer[0] = bufferHeader[0]+headerSize_ ; 
    3644      buffer[1] = bufferHeader[1]+headerSize_ ; 
     
    5563      winState[1]=false ; 
    5664 
    57  
    58     if (hasWindows) 
    59     {   
    60      
    61       MPI_Aint buffSize=bufferSize+headerSize_ ; 
    62       MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 
    63       MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 
    64      
    65       MPI_Group group ; 
    66       int groupSize,groupRank ; 
    67       MPI_Win_get_group(windows_[0], &group) ; 
    68       MPI_Group_size(group, &groupSize) ; 
    69       MPI_Group_rank(group, &groupRank) ; 
    70       if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
    71  
    72       MPI_Win_get_group(windows_[1], &group) ; 
    73       MPI_Group_size(group, &groupSize) ; 
    74       MPI_Group_rank(group, &groupRank) ; 
    75       if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
    76  
    77       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
    78       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
    79  
    80       MPI_Win_unlock(clientRank_, windows_[1]) ; 
    81       MPI_Win_unlock(clientRank_, windows_[0]) ; 
    82     }  
    83     retBuffer = new CBufferOut(buffer[current], bufferSize); 
    84     info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 
     65     
     66      retBuffer = new CBufferOut(buffer[current], bufferSize); 
     67      info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 
    8568  } 
    8669 
     
    9174    return address ; 
    9275  } 
    93  
    94   void CClientBuffer::attachWindows(vector<MPI_Win>& windows) 
     76   
     77  MPI_Aint CClientBuffer::getWinBufferAddress(int i) 
     78  { 
     79    return windows_[i]->getWinBufferAddress() ; 
     80  } 
     81 
     82  void CClientBuffer::attachWindows(MPI_Comm& winComm) 
    9583  { 
    9684    isAttachedWindows_=true ; 
    97     windows_=windows ; 
    98     if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ; 
    99     else hasWindows=true ; 
    100  
    101     if (hasWindows) 
     85 
     86    if (hasWindows_) 
    10287    {   
    10388      MPI_Aint buffSize=bufferSize+headerSize_ ; 
    104       MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 
    105       MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 
    106      
    107       MPI_Group group ; 
    108       int groupSize,groupRank ; 
    109       MPI_Win_get_group(windows_[0], &group) ; 
    110       MPI_Group_size(group, &groupSize) ; 
    111       MPI_Group_rank(group, &groupRank) ; 
    112       if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
    113  
    114       MPI_Win_get_group(windows_[1], &group) ; 
    115       MPI_Group_size(group, &groupSize) ; 
    116       MPI_Group_rank(group, &groupRank) ; 
    117       if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
    118  
    119       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
    120       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
    121  
    122       MPI_Win_unlock(clientRank_, windows_[1]) ; 
    123       MPI_Win_unlock(clientRank_, windows_[0]) ; 
     89      windows_[0]->create(winComm) ; 
     90      windows_[0]->attach() ; 
     91      windows_[1]->create(winComm) ; 
     92      windows_[1]->attach() ; 
     93 
     94      windows_[0]->lockExclusive(clientRank_) ; 
     95      windows_[1]->lockExclusive(clientRank_) ; 
     96 
     97      windows_[0]->unlockExclusive(clientRank_) ; 
     98      windows_[1]->unlockExclusive(clientRank_) ; 
     99 
    124100    }  
    125101 
     
    129105  CClientBuffer::~CClientBuffer() 
    130106  { 
    131      //freeWindows() ; 
    132      if (hasWindows) 
    133      { 
    134        MPI_Win_detach(windows_[0],bufferHeader[0]); 
    135        MPI_Win_detach(windows_[1],bufferHeader[1]); 
    136        MPI_Free_mem(bufferHeader[0]) ; 
    137        MPI_Free_mem(bufferHeader[1]) ; 
    138      } 
    139      delete retBuffer; 
     107    if (hasWindows_) 
     108    { 
     109      windows_[0]->detach() ; 
     110      windows_[1]->detach() ; 
     111      delete windows_[0] ; 
     112      delete windows_[1] ; 
     113    } 
     114    else  
     115    { 
     116      MPI_Free_mem(bufferHeader[0]) ; 
     117      MPI_Free_mem(bufferHeader[1]) ; 
     118    } 
     119    delete retBuffer; 
    140120  } 
    141121 
     
    143123  { 
    144124    CTimer::get("lock buffer").resume(); 
    145     if (hasWindows) 
     125    if (isAttachedWindows_)     
    146126    { 
    147127      if (winState[current]==true) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo lock client buffer but winState said it is already locked") ; 
    148       MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 
     128      //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 
     129      windows_[current]->lockExclusive(clientRank_) ; 
    149130      winState[current]=true ; 
    150131    } 
     
    155136  { 
    156137    CTimer::get("unlock buffer").resume(); 
    157     if (hasWindows) 
     138    if (isAttachedWindows_) 
    158139    { 
    159140      if (winState[current]==false) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo unlock client buffer but winState said it is already unlocked") ; 
    160       MPI_Win_unlock(clientRank_, windows_[current]) ; 
     141      //MPI_Win_unlock(clientRank_, windows_[current]) ; 
     142      windows_[current]->unlockExclusive(clientRank_) ; 
    161143      winState[current]=false ; 
    162144    } 
     
    189171    { 
    190172      maxEventSize = size; 
    191  
    192       if (size > estimatedMaxEventSize) 
    193         error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank 
    194                  << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl; 
    195173 
    196174      if (size > maxRequestSize) maxRequestSize = size; 
     
    319297  { 
    320298 
    321     if (hasWindows) 
     299    bufferSize=newSize ; 
     300 
     301    if (hasWindows_) 
    322302    {  
    323       MPI_Win_detach(windows_[0], bufferHeader[0]) ; 
    324       MPI_Win_detach(windows_[1], bufferHeader[1]) ; 
    325     } 
    326     MPI_Free_mem(bufferHeader[0]) ; 
    327     MPI_Free_mem(bufferHeader[1]) ; 
    328  
    329     bufferSize=newSize ; 
    330     MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
    331     MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 
     303      windows_[0]->detach(); 
     304      windows_[1]->detach(); 
     305       
     306      windows_[0]->attach(bufferSize+headerSize_) ; 
     307      bufferHeader[0] = (char*) windows_[0] -> getBufferAddress() ; 
     308      windows_[1]->attach(bufferSize+headerSize_) ; 
     309      bufferHeader[1] = (char*) windows_[1] -> getBufferAddress() ; 
     310    } 
     311     
     312     
    332313    buffer[0] = bufferHeader[0]+headerSize_ ; 
    333314    buffer[1] = bufferHeader[1]+headerSize_ ; 
     
    353334    current=0 ; 
    354335     
    355     if (hasWindows) 
     336    if (hasWindows_) 
    356337    {   
    357      
    358       MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ; 
    359       MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ; 
    360            
    361       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
    362       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
    363  
    364       MPI_Win_unlock(clientRank_, windows_[1]) ; 
    365       MPI_Win_unlock(clientRank_, windows_[0]) ; 
     338 
     339      windows_[0]->lockExclusive(clientRank_) ; 
     340      windows_[1]->lockExclusive(clientRank_) ; 
     341       
     342      windows_[1]->unlockExclusive(clientRank_) ; 
     343      windows_[0]->unlockExclusive(clientRank_) ; 
    366344    }  
    367345 
     
    373351    bufOut->put(timelineEventChangeBufferSize); 
    374352    bufOut->put(newBufferSize_); 
    375     bufOut->put(this->getWinAddress(0)); 
    376     bufOut->put(this->getWinAddress(1)); 
     353     
     354    bufOut->put(this->getWinBufferAddress(0)); 
     355    bufOut->put(this->getWinBufferAddress(1)); 
    377356 
    378357    resizingBufferStep_=4; 
    379358    unlockBuffer() ; 
    380     info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinAddress(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl; 
     359    info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinBufferAddress(0)<<" winAdress[1] "<<this->getWinBufferAddress(1)<<endl; 
    381360  } 
    382361 
     
    397376    lockBuffer() ; 
    398377    ret=*notify[current] == notifyResizeBuffer_ ? true : false ; 
    399     if (ret || !hasWindows)  
     378    if (ret || !hasWindows_)  
    400379    { 
    401380      *notify[current] = notifyNothing_ ; 
Note: See TracChangeset for help on using the changeset viewer.