Changeset 2558


Ignore:
Timestamp:
09/06/23 14:53:33 (8 months ago)
Author:
ymipsl
Message:

p2p transport protocol

  • bug fix
  • more diagnostics
  • set buffer parameters to realistic values

YM

Location:
XIOS3/trunk/src
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/buffer_server.cpp

    r2547 r2558  
    99{ 
    1010 
    11   CServerBuffer::CServerBuffer(vector<CWindowDynamic*>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize)  
    12   : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress) 
     11  CServerBuffer::CServerBuffer(int clientRank, vector<CWindowDynamic*>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize)  
     12  : hasWindows(true), clientRank_(clientRank), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress) 
    1313  { 
    1414    size = 3 * buffSize; 
     
    240240    bool ok=false ; 
    241241     
    242     
     242    info(100)<<"getBufferFromClient : check data in client buffer  : clientRank "<<clientRank_<<" timeline "<<timeLine<<" ??"<< endl ; 
    243243    lockBuffer();  
    244244    CTimer::get("getBufferFromClient_locked").resume() ;    
     
    272272      for(size_t i=(count<10)?0:count-10; i<count ; i++) checksumLast=checksumLast+buffer[i] ; 
    273273       
    274       info(40)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline " 
     274      info(40)<<"getBufferFromClient timeLine==clientTimeLine: clientRank "<<clientRank_<<" timeline "<<timeLine<<" clientTimeline " 
    275275              <<clientTimeline<<" clientCount "<<count<<" checksum "<<(int)checksum<<" " 
    276276              <<(int)buffer[0]<<" "<<(int)buffer[1]<<" "<<(int)buffer[2]<<" "<<(int)buffer[3]<<" "<<(int)buffer[4]<<" "<<(int)buffer[5]<<" "  
     
    286286      unlockBuffer() ; 
    287287    } 
    288     CTimer::get("getBufferFromClient").suspend() ;    
     288    CTimer::get("getBufferFromClient").suspend() ;   
     289    info(100)<<"getBufferFromClient : check data in client buffer ==> done"<<endl ; 
     290 
    289291    if (ok) return true ; 
    290292 
  • XIOS3/trunk/src/buffer_server.hpp

    r2547 r2558  
    1515  { 
    1616    public: 
    17       CServerBuffer(vector<CWindowDynamic*>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize bufSize) ; 
     17      CServerBuffer(int clientRank, vector<CWindowDynamic*>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize bufSize) ; 
    1818      ~CServerBuffer() ; 
    1919 
     
    4444      int currentWindows ; 
    4545      bool hasWindows ; 
     46      int clientRank_ ; // for debugging 
    4647      int windowsRank_ ; 
    4748      double bufferFromClientLatency_=1e-1 ; 
  • XIOS3/trunk/src/transport/legacy_context_server.cpp

    r2547 r2558  
    140140      MPI_Barrier(winComm_[rank]) ; 
    141141 
    142       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winBufferAddress, 0, buffSize)))).first; 
     142      it=(buffers.insert(pair<int,CServerBuffer*>(rank, new CServerBuffer(rank, windows_[rank], winBufferAddress, 0, buffSize)))).first; 
    143143      lastTimeLine[rank]=0 ; 
    144144      itLastTimeLine=lastTimeLine.begin() ; 
     
    280280        windows_[rank][0] -> setWinBufferAddress(winBufferAdress[0],0) ; 
    281281        windows_[rank][1] -> setWinBufferAddress(winBufferAdress[1],0) ; 
    282         buffers[rank] = new CServerBuffer(windows_[rank], winBufferAdress, 0, newSize) ; 
     282        buffers[rank] = new CServerBuffer(rank, windows_[rank], winBufferAdress, 0, newSize) ; 
    283283        info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank 
    284284                 <<"  newSize : "<<newSize<<" Address : "<<winBufferAdress[0]<<" & "<<winBufferAdress[1]<<endl ; 
  • XIOS3/trunk/src/transport/one_sided_client_buffer.hpp

    r2547 r2558  
    194194      size_t fixedSize_ = 0 ; 
    195195      size_t currentBufferSize_= 0  ; 
    196       double growingFactor_ = 2. ;  
     196      double growingFactor_ = 1.2 ;  
    197197      MPI_Aint lastFreedBloc_=0 ; 
    198198      bool isFinalized_ = false ; 
  • XIOS3/trunk/src/transport/one_sided_server_buffer.hpp

    r2547 r2558  
    131131      size_t fixedSize_ = 0 ; 
    132132      size_t currentBufferSize_=0 ; 
    133       double growingFactor_ = 2. ; 
    134       double bufferServerFactor_=10. ; 
     133      double growingFactor_ = 1.2 ; 
     134      double bufferServerFactor_=1. ; 
    135135       
    136136      std::list<CBuffer*> buffers_ ; 
  • XIOS3/trunk/src/transport/p2p_client_buffer.cpp

    r2557 r2558  
    7272  bool CP2pClientBuffer::isBufferFree(size_t size) 
    7373  { 
     74    if (sentBlocRequest_.size()> maxSentBlocRequests_) return false ; 
    7475    if (buffers_.size()>maxWindows_-1) return false ;  
    7576    CBuffer* buffer ; 
     
    168169  { 
    169170    SBloc& bloc = blocs_.front() ; 
     171     
     172    if (info.isActive(logProtocol)) 
     173    { 
     174      size_t checksum=0 ; 
     175      for(size_t j=0;j<bloc.count;j++) checksum+=((unsigned char*)(bloc.addr))[j] ; 
     176      info(logProtocol)<<"free bloc sent to server rank "<<serverRank_<<" : addr="<<bloc.addr<<"  start="<<bloc.start<<"  count="<<bloc.count<<"  checksum="<<checksum<<endl ; 
     177    } 
     178 
    170179    bloc.buffer->free(bloc.start, bloc.count) ; 
    171180    if (bloc.buffer->getCount()==0)  
     
    243252      if (flag==false)  
    244253      { 
    245         ++it ; 
     254//        ++it ; 
    246255        break ; 
    247256      } 
  • XIOS3/trunk/src/transport/p2p_client_buffer.hpp

    r2556 r2558  
    4141          if (count_>0) ERROR("COneSidedClientBuffer::~CBuffer()",<<"Try to delete buffer that is not empty"<<std::endl) ; 
    4242          //MPI_Win_detach(window_, buffer_) ; 
    43           //MPI_Free_mem(buffer_) ; 
     43          MPI_Free_mem(buffer_) ; 
    4444          info(logProtocol)<<"Detach memory from windows : addr="<<(MPI_Aint)buffer_<<"   count="<<size_<<endl ; 
    4545        } 
     
    205205      size_t fixedSize_ = 0 ; 
    206206      size_t currentBufferSize_= 0  ; 
    207       double growingFactor_ = 2. ;  
     207      double growingFactor_ = 1.2 ;  
    208208      MPI_Aint lastFreedBloc_=0 ; 
    209209      bool isFinalized_ = false ; 
     210      int maxSentBlocRequests_ = 10000 ; 
    210211 
    211212  } ; 
  • XIOS3/trunk/src/transport/p2p_server_buffer.cpp

    r2557 r2558  
    9797    if (pendingRmaRequests_.empty()) transferEvents() ; 
    9898 
    99     if (!isLocked_) 
    100     { 
     99    //if (!isLocked_) 
     100    //{ 
    101101      if (lastBlocToFree_!=0) 
    102102      { 
     
    110110        lastBlocToFree_ = 0 ;         
    111111      } 
    112     } 
     112    //} 
    113113 
    114114    if (buffers_.size()>1)  
     
    164164                           << "  Bandwith : "<< transferedSize/time<< "byte/s"<<endl ; 
    165165          CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ; 
     166          for(int i=0;i<pendingRmaAddr_.size();i++) 
     167          { 
     168            size_t checksum=0 ; 
     169            unsigned char* buffer = (unsigned char*) pendingRmaAddr_[i] ; 
     170            for(size_t j=0;j<pendingRmaCount_[i];j++) checksum += buffer[j] ; 
     171            info(logProtocol)<<"Bloc transfered to adrr="<<(void*) buffer<<"  count="<<pendingRmaCount_[i]<<"  checksum="<<checksum<<endl ; 
     172          } 
     173 
    166174         } 
    167175 
     
    170178        pendingRmaStatus_.clear() ; 
    171179        pendingRmaCount_.clear() ; 
     180        pendingRmaAddr_.clear() ; 
    172181        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ; 
    173182         
     
    381390    pendingRmaRequests_.push_back(request) ; 
    382391    pendingRmaCount_.push_back(count) ; 
     392    pendingRmaAddr_.push_back(buffer->getBuffer()+start) ; 
    383393    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ; 
    384394  } 
  • XIOS3/trunk/src/transport/p2p_server_buffer.hpp

    r2556 r2558  
    133133      size_t fixedSize_ = 0 ; 
    134134      size_t currentBufferSize_=0 ; 
    135       double growingFactor_ = 2. ; 
    136       double bufferServerFactor_=10. ; 
     135      double growingFactor_ = 1.2 ; 
     136      double bufferServerFactor_=1. ; 
    137137       
    138138      std::list<CBuffer*> buffers_ ; 
     
    147147      vector<MPI_Request> pendingRmaRequests_ ; 
    148148      vector<MPI_Status> pendingRmaStatus_ ; 
     149      vector<char*> pendingRmaAddr_ ; 
    149150      vector<int> pendingRmaCount_ ; 
    150151 
     
    164165 
    165166      MPI_Win winControl_ ; 
    166       bool isLocked_=false ; 
     167      //bool isLocked_=false ; 
    167168      const int windowRank_=0 ; 
    168169      MPI_Aint lastBlocToFree_=0 ; 
Note: See TracChangeset for help on using the changeset viewer.