Ignore:
Timestamp:
10/11/21 14:41:56 (3 years ago)
Author:
ymipsl
Message:
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp

    r2221 r2246  
    88#include "tracer.hpp" 
    99#include "timeline_events.hpp" 
     10#include "timer.hpp" 
    1011 
    1112namespace xios 
     
    2930    else hasWindows=true ; 
    3031 
    31       MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 
    32       MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 
    33       buffer[0] = bufferHeader[0]+headerSize ; 
    34       buffer[1] = bufferHeader[1]+headerSize ; 
    35       firstTimeLine[0]=(size_t*)bufferHeader[0] ; 
    36       firstTimeLine[1]=(size_t*)bufferHeader[1] ; 
    37       bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 
    38       bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 
    39       control[0]=(size_t*)bufferHeader[0] +2 ; 
    40       control[1]=(size_t*)bufferHeader[1] +2 ; 
    41       finalize[0]=(size_t*)bufferHeader[0] +3 ; 
    42       finalize[1]=(size_t*)bufferHeader[1] +3 ; 
     32      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
     33      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 
     34      buffer[0] = bufferHeader[0]+headerSize_ ; 
     35      buffer[1] = bufferHeader[1]+headerSize_ ; 
     36      firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_ ; 
     37      firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_ ; 
     38      bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ; 
     39      bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ; 
     40      control[0]=(size_t*)bufferHeader[0] + controlOffset_ ; 
     41      control[1]=(size_t*)bufferHeader[1] + controlOffset_ ; 
     42      notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ; 
     43      notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ; 
    4344 
    4445      *firstTimeLine[0]=0 ; 
     
    4849      *control[0]=0 ; 
    4950      *control[1]=0 ; 
    50       *finalize[0]=0 ; 
    51       *finalize[1]=0 ; 
     51      *notify[0]=notifyNothing_ ; 
     52      *notify[1]=notifyNothing_ ; 
    5253      winState[0]=false ; 
    5354      winState[1]=false ; 
     
    5758    {   
    5859     
    59       MPI_Aint buffSize=bufferSize+headerSize ; 
     60      MPI_Aint buffSize=bufferSize+headerSize_ ; 
    6061      MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 
    6162      MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 
     
    106107  } 
    107108 
    108 /*  void CClientBuffer::createWindows(MPI_Comm oneSidedComm) 
    109   { 
    110     MPI_Barrier(oneSidedComm) ; 
    111     MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
    112     MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
    113  
    114     MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 
    115     *firstTimeLine[0]=0 ; 
    116     *bufferCount[0]=0 ; 
    117     *control[0]=0 ; 
    118     MPI_Win_unlock(0, windows[0]) ; 
    119  
    120     MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 
    121     *firstTimeLine[1]=0 ; 
    122     *bufferCount[1]=0 ; 
    123     *control[1]=0 ; 
    124     MPI_Win_unlock(0, windows[1]) ; 
    125     winState[0]=false ; 
    126     winState[1]=false ; 
    127     MPI_Barrier(oneSidedComm) ; 
    128     hasWindows=true ; 
    129   } 
    130 */ 
    131  
    132 /*   
    133   void CClientBuffer::freeWindows() 
    134   { 
    135     if (hasWindows) 
    136     { 
    137       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[0]) ; 
    138       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[1]) ; 
    139       *control[0]=2 ; 
    140       *control[1]=2 ; 
    141       MPI_Win_unlock(0, windows_[1]) ; 
    142       MPI_Win_unlock(0, windows_[0]) ; 
    143        
    144       MPI_Win_free(&windows_[0]) ; 
    145       MPI_Win_free(&windows_[1]) ; 
    146       hasWindows=false ; 
    147     } 
    148   } 
    149 */  
    150109  void CClientBuffer::lockBuffer(void) 
    151110  { 
    152     if (hasWindows) 
    153     { 
    154    //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[current]) ; 
    155       long long int lock=1 ; 
    156       long long int zero=0, one=1 ; 
    157       
     111    CTimer::get("lock buffer").resume(); 
     112    if (hasWindows) 
     113    { 
    158114      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 
    159       
    160       while(lock!=0) 
    161       { 
    162         MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 
    163                              windows_[current]) ; 
    164         MPI_Win_flush(clientRank_, windows_[current]) ; 
    165       } 
    166  
    167 //      info(100)<<"Buffer locked "<<&windows_<<"  "<<current<<endl ; 
    168115      winState[current]=true ; 
    169116    } 
     117    CTimer::get("lock buffer").suspend(); 
    170118  } 
    171119 
    172120  void CClientBuffer::unlockBuffer(void) 
    173121  { 
    174     if (hasWindows) 
    175     { 
    176       long long int lock=1 ; 
    177       long long int zero=0, one=1 ; 
    178  
    179       MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 
    180                              windows_[current]) ; 
     122    CTimer::get("unlock buffer").resume(); 
     123    if (hasWindows) 
     124    { 
    181125      MPI_Win_unlock(clientRank_, windows_[current]) ; 
    182  
    183  //     info(100)<<"Buffer unlocked "<<&windows_<<"  "<<current<<endl ; 
    184126      winState[current]=false ; 
    185127    } 
     128    CTimer::get("unlock buffer").suspend(); 
    186129  } 
    187130 
     
    193136  bool CClientBuffer::isBufferFree(StdSize size) 
    194137  { 
    195 //    bool loop=true ; 
    196 //    while (loop)  
    197 //    { 
    198 //      lockBuffer(); 
    199 //      if (*control[current]==0) loop=false ; // attemp to read from server ? 
    200 //      else unlockBuffer() ; 
    201 //    } 
    202138   
    203139    lockBuffer(); 
     
    208144    if (size > bufferSize) 
    209145    { 
    210       // ERROR("bool CClientBuffer::isBufferFree(StdSize size)", 
    211       //      << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl); 
    212146      resizingBufferStep_=1 ; 
     147      *firstTimeLine[current]=0 ; 
    213148      newBufferSize_=size ; 
    214149      return false ; 
     
    231166      { 
    232167        resizingBufferStep_ = 1 ; 
     168        *firstTimeLine[current]=0 ; 
    233169        newBufferSize_ = (count+size)*growFactor_ ; 
    234170      }   
     
    247183      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 
    248184      *bufferCount[current]=count ; 
    249 /*      info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 
    250               <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ; 
    251       if (!winState[current]) info(40)<<"CClientBuffer::getBuffer "<<" Windows Not Locked... "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 
    252               <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;*/ 
    253185      return retBuffer; 
    254186    } 
     
    284216    MPI_Status status; 
    285217    int flag; 
     218     
     219    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
     220    MPI_Win_unlock(clientRank_, windows_[0]) ; 
     221 
     222    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
     223    MPI_Win_unlock(clientRank_, windows_[1]) ; 
    286224 
    287225    if (pending) 
     
    299237      if (count > 0) 
    300238      { 
    301         lockBuffer() ; 
    302  //       if (*control[current]==0 && bufferCount[current] > 0) 
    303         if (*bufferCount[current] > 0) 
     239        double time=MPI_Wtime() ; 
     240        if (time - lastCheckedWithNothing_ > latency_) 
    304241        { 
    305           MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
    306           if (resizingBufferStep_==3) resizingBufferStep_=0 ; 
    307           pending = true; 
    308 //          *control[current]=0 ; 
    309           *firstTimeLine[current]=0 ; 
    310           *bufferCount[current]=0 ; 
    311  
    312            unlockBuffer() ; 
    313  
    314           if (current == 1) current = 0; 
    315           else current = 1; 
    316           count = 0; 
    317         } 
    318         else  
    319         { 
    320           unlockBuffer() ; 
     242          lockBuffer() ; 
     243          if (*bufferCount[current] > 0) 
     244          { 
     245            MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
     246            if (resizingBufferStep_==4) resizingBufferStep_=0 ; 
     247            pending = true; 
     248            *firstTimeLine[current]=0 ; 
     249            *bufferCount[current]=0 ; 
     250 
     251             unlockBuffer() ; 
     252 
     253            if (current == 1) current = 0; 
     254            else current = 1; 
     255            count = 0; 
     256          } 
     257          else  
     258          { 
     259            unlockBuffer() ; 
     260            lastCheckedWithNothing_ = time ; 
     261          } 
    321262        } 
    322263      } 
    323264      else 
    324265      { 
    325         if (resizingBufferStep_==2) resizeBuffer(newBufferSize_) ; 
    326266        if (resizingBufferStep_==1) resizeBufferNotify() ; 
     267        else if (resizingBufferStep_==2) isNotifiedChangeBufferSize() ; 
     268        else if (resizingBufferStep_==3) resizeBuffer(newBufferSize_) ; 
    327269      } 
    328270    } 
     
    345287  void CClientBuffer::resizeBuffer(size_t newSize) 
    346288  { 
     289 
    347290    if (hasWindows) 
    348291    {  
     
    354297 
    355298    bufferSize=newSize ; 
    356     MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 
    357     MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 
    358     buffer[0] = bufferHeader[0]+headerSize ; 
    359     buffer[1] = bufferHeader[1]+headerSize ; 
    360     firstTimeLine[0]=(size_t*)bufferHeader[0] ; 
    361     firstTimeLine[1]=(size_t*)bufferHeader[1] ; 
    362     bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 
    363     bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 
    364     control[0]=(size_t*)bufferHeader[0] +2 ; 
    365     control[1]=(size_t*)bufferHeader[1] +2 ; 
    366     finalize[0]=(size_t*)bufferHeader[0] +3 ; 
    367     finalize[1]=(size_t*)bufferHeader[1] +3 ; 
     299    MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
     300    MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 
     301    buffer[0] = bufferHeader[0]+headerSize_ ; 
     302    buffer[1] = bufferHeader[1]+headerSize_ ; 
     303    firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_; 
     304    firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_; 
     305    bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ; 
     306    bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ; 
     307    control[0]=(size_t*)bufferHeader[0] + controlOffset_ ;  // control=0 => nothing ; control=1 => changeBufferSize 
     308    control[1]=(size_t*)bufferHeader[1] + controlOffset_ ; 
     309    notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ; 
     310    notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ; 
    368311 
    369312    *firstTimeLine[0]=0 ; 
     
    373316    *control[0]=0 ; 
    374317    *control[1]=0 ; 
    375     *finalize[0]=0 ; 
    376     *finalize[1]=0 ; 
     318    *notify[0] = notifyNothing_ ; 
     319    *notify[1] = notifyNothing_ ; 
    377320    winState[0]=false ; 
    378321    winState[1]=false ; 
     
    382325    {   
    383326     
    384       MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; 
    385       MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; 
     327      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ; 
     328      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ; 
    386329           
    387330      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
     
    402345    bufOut->put(this->getWinAddress(1)); 
    403346 
    404     resizingBufferStep_=3; 
    405     unlockBuffer() ; 
     347    resizingBufferStep_=4; 
     348    unlockBuffer() ; 
     349    info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinAddress(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl; 
    406350  } 
    407351 
     
    416360  } 
    417361 
    418   bool CClientBuffer::isNotifiedFinalized(void) 
     362  bool CClientBuffer::isNotifiedChangeBufferSize(void) 
    419363  { 
    420364    
    421365    bool ret ; 
    422366    lockBuffer() ; 
    423     ret=*finalize[current] == 1 ? true : false ; 
     367    ret=*notify[current] == notifyResizeBuffer_ ? true : false ; 
     368    if (ret)  
     369    { 
     370      *notify[current] = notifyNothing_ ; 
     371      resizingBufferStep_=3;   
     372    } 
    424373    unlockBuffer() ; 
    425374 
     
    427376  } 
    428377 
     378  bool CClientBuffer::isNotifiedFinalized(void) 
     379  { 
     380    
     381    bool ret ; 
     382    lockBuffer() ; 
     383    ret=*notify[current] == notifyFinalize_ ? true : false ; 
     384    unlockBuffer() ; 
     385 
     386    return ret; 
     387  } 
     388 
    429389} 
Note: See TracChangeset for help on using the changeset viewer.