Ignore:
Timestamp:
10/18/19 14:55:57 (5 years ago)
Author:
ymipsl
Message:

Implement one sided communication in client/server protocol to avoid dead-lock when some buffer are full.

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp

    r1639 r1757  
    1212  size_t CClientBuffer::maxRequestSize = 0; 
    1313 
    14   CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize, StdSize maxBufferedEvents) 
     14  CClientBuffer::CClientBuffer(MPI_Comm interComm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 
    1515    : interComm(interComm) 
     16    , clientRank_(clientRank) 
    1617    , serverRank(serverRank) 
    1718    , bufferSize(bufferSize) 
     
    2021    , current(0) 
    2122    , count(0) 
    22     , bufferedEvents(0) 
    23     , maxBufferedEvents(maxBufferedEvents) 
    2423    , pending(false) 
    25   { 
    26     buffer[0] = new char[bufferSize]; // transform it with MPI_ALLOC_MEM later 
    27     buffer[1] = new char[bufferSize]; 
     24    , hasWindows(false)  
     25    , windows_(windows) 
     26  { 
     27    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 
     28    else hasWindows=true ; 
     29 
     30      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 
     31      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 
     32      buffer[0] = bufferHeader[0]+headerSize ; 
     33      buffer[1] = bufferHeader[1]+headerSize ; 
     34      firstTimeLine[0]=(size_t*)bufferHeader[0] ; 
     35      firstTimeLine[1]=(size_t*)bufferHeader[1] ; 
     36      bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 
     37      bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 
     38      control[0]=(size_t*)bufferHeader[0] +2 ; 
     39      control[1]=(size_t*)bufferHeader[1] +2 ; 
     40      finalize[0]=(size_t*)bufferHeader[0] +3 ; 
     41      finalize[1]=(size_t*)bufferHeader[1] +3 ; 
     42 
     43      *firstTimeLine[0]=0 ; 
     44      *firstTimeLine[1]=0 ; 
     45      *bufferCount[0]=0 ; 
     46      *bufferCount[1]=0 ; 
     47      *control[0]=0 ; 
     48      *control[1]=0 ; 
     49      *finalize[0]=0 ; 
     50      *finalize[1]=0 ; 
     51      winState[0]=false ; 
     52      winState[1]=false ; 
     53 
     54 
     55    if (hasWindows) 
     56    {   
     57     
     58      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; 
     59      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; 
     60     
     61      MPI_Group group ; 
     62      int groupSize,groupRank ; 
     63      MPI_Win_get_group(windows_[0], &group) ; 
     64      MPI_Group_size(group, &groupSize) ; 
     65      MPI_Group_rank(group, &groupRank) ; 
     66      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
     67 
     68      MPI_Win_get_group(windows_[1], &group) ; 
     69      MPI_Group_size(group, &groupSize) ; 
     70      MPI_Group_rank(group, &groupRank) ; 
     71      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
     72 
     73      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
     74      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
     75 
     76      MPI_Win_unlock(clientRank_, windows_[1]) ; 
     77      MPI_Win_unlock(clientRank_, windows_[0]) ; 
     78    }  
    2879    retBuffer = new CBufferOut(buffer[current], bufferSize); 
    29     info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << " with a maximum of " << maxBufferedEvents << " buffered events" << endl; 
     80    info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 
     81  } 
     82 
     83  MPI_Aint CClientBuffer::getWinAddress(int i) 
     84  { 
     85     MPI_Aint address ; 
     86      
     87     if (hasWindows) MPI_Get_address(bufferHeader[i], &address) ; 
     88     else address=0 ; 
     89 
     90     return address ; 
    3091  } 
    3192 
    3293  CClientBuffer::~CClientBuffer() 
    3394  { 
    34    delete [] buffer[0]; 
    35    delete [] buffer[1]; 
    36    delete retBuffer; 
     95     //freeWindows() ; 
     96     if (hasWindows) 
     97     { 
     98       MPI_Win_detach(windows_[0],bufferHeader[0]); 
     99       MPI_Win_detach(windows_[1],bufferHeader[1]); 
     100       MPI_Free_mem(bufferHeader[0]) ; 
     101       MPI_Free_mem(bufferHeader[1]) ; 
     102     } 
     103     delete retBuffer; 
     104  } 
     105 
     106/*  void CClientBuffer::createWindows(MPI_Comm oneSidedComm) 
     107  { 
     108    MPI_Barrier(oneSidedComm) ; 
     109    MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
     110    MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
     111 
     112    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 
     113    *firstTimeLine[0]=0 ; 
     114    *bufferCount[0]=0 ; 
     115    *control[0]=0 ; 
     116    MPI_Win_unlock(0, windows[0]) ; 
     117 
     118    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 
     119    *firstTimeLine[1]=0 ; 
     120    *bufferCount[1]=0 ; 
     121    *control[1]=0 ; 
     122    MPI_Win_unlock(0, windows[1]) ; 
     123    winState[0]=false ; 
     124    winState[1]=false ; 
     125    MPI_Barrier(oneSidedComm) ; 
     126    hasWindows=true ; 
     127  } 
     128*/ 
     129 
     130/*   
     131  void CClientBuffer::freeWindows() 
     132  { 
     133    if (hasWindows) 
     134    { 
     135      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[0]) ; 
     136      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[1]) ; 
     137      *control[0]=2 ; 
     138      *control[1]=2 ; 
     139      MPI_Win_unlock(0, windows_[1]) ; 
     140      MPI_Win_unlock(0, windows_[0]) ; 
     141       
     142      MPI_Win_free(&windows_[0]) ; 
     143      MPI_Win_free(&windows_[1]) ; 
     144      hasWindows=false ; 
     145    } 
     146  } 
     147*/  
     148  void CClientBuffer::lockBuffer(void) 
     149  { 
     150    if (hasWindows) 
     151    { 
     152   //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[current]) ; 
     153      long long int lock=1 ; 
     154      long long int zero=0, one=1 ; 
     155      
     156      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 
     157      
     158      while(lock!=0) 
     159      { 
     160        MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 
     161                             windows_[current]) ; 
     162        MPI_Win_flush(clientRank_, windows_[current]) ; 
     163      } 
     164 
     165//      info(100)<<"Buffer locked "<<&windows_<<"  "<<current<<endl ; 
     166      winState[current]=true ; 
     167    } 
     168  } 
     169 
     170  void CClientBuffer::unlockBuffer(void) 
     171  { 
     172    if (hasWindows) 
     173    { 
     174      long long int lock=1 ; 
     175      long long int zero=0, one=1 ; 
     176 
     177      MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 
     178                             windows_[current]) ; 
     179      MPI_Win_unlock(clientRank_, windows_[current]) ; 
     180 
     181 //     info(100)<<"Buffer unlocked "<<&windows_<<"  "<<current<<endl ; 
     182      winState[current]=false ; 
     183    } 
    37184  } 
    38185 
     
    44191  bool CClientBuffer::isBufferFree(StdSize size) 
    45192  { 
     193//    bool loop=true ; 
     194//    while (loop)  
     195//    { 
     196//      lockBuffer(); 
     197//      if (*control[current]==0) loop=false ; // attemp to read from server ? 
     198//      else unlockBuffer() ; 
     199//    } 
     200   
     201    lockBuffer(); 
    46202    if (size > bufferSize) 
    47203      ERROR("bool CClientBuffer::isBufferFree(StdSize size)", 
     
    59215    } 
    60216 
    61  
    62     return (size <= remain() && bufferedEvents < maxBufferedEvents); 
    63   } 
    64  
    65  
    66   CBufferOut* CClientBuffer::getBuffer(StdSize size) 
     217      count=*bufferCount[current] ; 
     218      return (size <= remain()); 
     219  } 
     220 
     221 
     222  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size) 
    67223  { 
    68224    if (size <= remain()) 
     
    70226      retBuffer->realloc(buffer[current] + count, size); 
    71227      count += size; 
    72       bufferedEvents++; 
     228      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 
     229      *bufferCount[current]=count ; 
     230/*      info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 
     231              <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ; 
     232      if (!winState[current]) info(40)<<"CClientBuffer::getBuffer "<<" Windows Not Locked... "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 
     233              <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;*/ 
    73234      return retBuffer; 
    74235    } 
     
    81242  } 
    82243 
    83   bool CClientBuffer::checkBuffer(void) 
     244  void CClientBuffer::infoBuffer(void) 
     245  { 
     246       
     247      char checksum=0 ; 
     248      for(size_t i=0;i<*bufferCount[current];i++) checksum=checksum+buffer[current][i] ; 
     249  
     250      char checksumFirst=0 ; 
     251      for(size_t i=5; i<10 && i<*bufferCount[current] ;i++) checksumFirst=checksumFirst+buffer[current][i] ; 
     252  
     253      char checksumLast=0 ; 
     254      for(size_t i=(*bufferCount[current]<10)?0:*bufferCount[current]-10; i<*bufferCount[current] ; i++) checksumLast=checksumLast+buffer[current][i] ; 
     255  
     256      info(45)<<"CClientBuffer::infoBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current<<" WinState "<<winState[current] 
     257              <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<" checksum "<<(int)checksum<<" " 
     258              <<(int)buffer[current][0]<<" "<<(int)buffer[current][1]<<" "<<(int)buffer[current][2]<<" "<<(int)buffer[current][3]<<" "<<(int)buffer[current][4]<<" "<<(int)buffer[current][5]<<" " 
     259              <<(int)buffer[current][6]<<" "<<(int)buffer[current][7]<<" "<<(int)buffer[current][8]<<" "<<(int)buffer[current][9]<<" "<<(int)buffer[current][10]<<" "<<(int)buffer[current][11]<<endl ; 
     260 
     261  } 
     262 
     263  bool CClientBuffer::checkBuffer(bool send) 
    84264  { 
    85265    MPI_Status status; 
     
    96276    if (!pending) 
    97277    { 
     278      if (!send) return false ; 
    98279      if (count > 0) 
    99280      { 
    100         MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
    101         pending = true; 
    102         if (current == 1) current = 0; 
    103         else current = 1; 
    104         count = 0; 
    105         bufferedEvents = 0; 
     281        lockBuffer() ; 
     282 //       if (*control[current]==0 && bufferCount[current] > 0) 
     283        if (*bufferCount[current] > 0) 
     284        { 
     285          MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
     286          pending = true; 
     287//          *control[current]=0 ; 
     288          *firstTimeLine[current]=0 ; 
     289          *bufferCount[current]=0 ; 
     290 
     291           unlockBuffer() ; 
     292 
     293          if (current == 1) current = 0; 
     294          else current = 1; 
     295          count = 0; 
     296        } 
     297        else unlockBuffer() ; 
    106298      } 
    107299    } 
     
    112304  bool CClientBuffer::hasPendingRequest(void) 
    113305  { 
     306    
     307    lockBuffer() ; 
     308    count=*bufferCount[current] ; 
     309    unlockBuffer() ; 
     310 
    114311    return (pending || count > 0); 
    115312  } 
     313 
     314  bool CClientBuffer::isNotifiedFinalized(void) 
     315  { 
     316    
     317    bool ret ; 
     318    lockBuffer() ; 
     319    ret=*finalize[current] == 1 ? true : false ; 
     320    unlockBuffer() ; 
     321 
     322    return ret; 
     323  } 
     324 
    116325} 
Note: See TracChangeset for help on using the changeset viewer.