Ignore:
Timestamp:
10/18/19 14:55:57 (5 years ago)
Author:
ymipsl
Message:

Implement one sided communication in client/server protocol to avoid dead-lock when some buffer are full.

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.cpp

    r885 r1757  
    77{ 
    88 
    9   CServerBuffer::CServerBuffer(StdSize buffSize) 
     9  CServerBuffer::CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize)  
     10  : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress) 
    1011  { 
    1112    size = 3 * buffSize; 
     
    1314    current = 1; 
    1415    end = size; 
     16    used=0 ; 
    1517    buffer = new char[size]; // use MPI_ALLOC_MEM later? 
     18    currentWindows=1 ; 
     19    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 
    1620  } 
    1721 
     
    2125  } 
    2226 
     27  void CServerBuffer::updateCurrentWindows(void) 
     28  { 
     29    if (currentWindows==0) currentWindows=1 ; 
     30    else currentWindows=0 ; 
     31  } 
     32 
     33/* 
     34  void CServerBuffer::createWindows(MPI_Comm oneSidedComm) 
     35  { 
     36    MPI_Barrier(oneSidedComm) ; 
     37    MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
     38    MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
     39    hasWindows=true ; 
     40    updateCurrentWindows() ; 
     41    MPI_Barrier(oneSidedComm) ; 
     42  } 
     43*/ 
     44 
     45/* 
     46  bool CServerBuffer::freeWindows() 
     47  { 
     48    if (hasWindows) 
     49    { 
     50      size_t header[3] ; 
     51      size_t& control=header[2] ; 
     52      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows_[0]) ; 
     53      MPI_Get(&control, 1, MPI_LONG_LONG_INT, windowsRank , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ; 
     54      MPI_Win_unlock(0,windows[0]) ; 
     55      if (control==2)  // ok for free windows 
     56      { 
     57        MPI_Win_free( &(windows[0])) ; 
     58        MPI_Win_free( &(windows[1])) ; 
     59        hasWindows=false ; 
     60        return true ; 
     61      } 
     62      else return false ; 
     63    } 
     64    else return true ; 
     65  } 
     66*/ 
    2367 
    2468  bool CServerBuffer::isBufferFree(size_t count) 
     
    72116  } 
    73117 
     118  bool CServerBuffer::isBufferEmpty(void) 
     119  { 
     120    if (used==0) return true ; 
     121    else return false; 
     122  } 
    74123 
    75124  void* CServerBuffer::getBuffer(size_t count) 
     
    128177    } 
    129178 
     179    used+=count ; 
    130180    return ret ; 
    131181  } 
     
    167217      } 
    168218    } 
    169   } 
    170  
     219    used-=count ; 
     220  } 
     221 
     222  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 
     223  { 
     224    if (!hasWindows) return false ; 
     225 
     226     
     227    size_t header[3] ; 
     228    size_t& clientTimeline=header[0] ; 
     229    size_t& clientCount=header[1] ; 
     230    size_t& control=header[2] ; 
     231    bool ok=false ; 
     232     
     233    MPI_Group group ; 
     234    int groupSize,groupRank ; 
     235    MPI_Win_get_group(windows_[currentWindows], &group) ; 
     236    MPI_Group_size(group, &groupSize) ; 
     237    MPI_Group_rank(group, &groupRank) ; 
     238     
     239    lockBuffer();  
     240 
     241// lock is acquired 
     242 
     243    MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],0), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     244    MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     245    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
     246 
     247//    control=1 ; 
     248//    MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     249    
     250//    MPI_Win_unlock(windowsRank_, windows_[currentWindows]) ; 
     251    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
     252//    info(100)<<"getBufferFromClient : windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 
     253    if (timeLine==clientTimeline) 
     254    { 
     255//      info(50)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 
     256  
     257//      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
     258      buffer=(char*)getBuffer(clientCount) ; 
     259      count=clientCount ; 
     260      MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ; 
     261      clientTimeline = 0 ; 
     262      clientCount = 0 ; 
     263//      control=0 ; 
     264      MPI_Put(&header[0], 2, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],0) , 2, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     265 
     266// release lock 
     267     unlockBuffer() ; 
     268 
     269      ok=true ; 
     270      char checksum=0 ; 
     271      for(size_t i=0;i<count;i++) checksum=checksum+buffer[i] ; 
     272      char checksumFirst=0 ; 
     273      for(size_t i=5; i<10 && i<count ;i++) checksumFirst=checksumFirst+buffer[i] ; 
     274      char checksumLast=0 ; 
     275      for(size_t i=(count<10)?0:count-10; i<count ; i++) checksumLast=checksumLast+buffer[i] ; 
     276       
     277      info(40)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline " 
     278              <<clientTimeline<<" clientCount "<<count<<" checksum "<<(int)checksum<<" " 
     279              <<(int)buffer[0]<<" "<<(int)buffer[1]<<" "<<(int)buffer[2]<<" "<<(int)buffer[3]<<" "<<(int)buffer[4]<<" "<<(int)buffer[5]<<" "  
     280              <<(int)buffer[6]<<" "<<(int)buffer[7]<<" "<<(int)buffer[8]<<" "<<(int)buffer[9]<<" "<<(int)buffer[10]<<" "<<(int)buffer[11]<<endl ; 
     281 
     282    } 
     283    else 
     284    { 
     285      //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
     286      //control=0 ; 
     287      //MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     288  
     289 // release lock 
     290      unlockBuffer() ; 
     291    } 
     292 
     293    if (ok) return true ; 
     294 
     295    return false ; 
     296  } 
     297   
     298  void CServerBuffer::lockBuffer(void) 
     299  { 
     300    if (!hasWindows) return ; 
     301 
     302    long long int lock=1 ; 
     303    long long int zero=0, one=1 ; 
     304//    control=1 ; 
     305    MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
     306    while(lock!=0) 
     307    { 
     308      MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 
     309                           windows_[currentWindows]) ; 
     310      MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
     311    } 
     312  } 
     313 
     314  void CServerBuffer::unlockBuffer(void) 
     315  { 
     316    if (!hasWindows) return ; 
     317    long long int lock=1 ; 
     318    long long int zero=0, one=1 ; 
     319     
     320    MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 
     321                          windows_[currentWindows]) ; 
     322    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;  
     323    MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ; 
     324  } 
     325   
     326  void CServerBuffer::notifyClientFinalize(void) 
     327  { 
     328    if (!hasWindows) return ; 
     329    size_t finalize=1 ; 
     330    lockBuffer();  
     331// lock is acquired 
     332    MPI_Put(&finalize, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],3*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     333    unlockBuffer() ; 
     334  } 
    171335} 
Note: See TracChangeset for help on using the changeset viewer.