Changeset 2594


Ignore:
Timestamp:
10/25/23 11:40:07 (7 months ago)
Author:
jderouillat
Message:

Update the p2p protocol as a mirror protocol : the servers buffers will strictly mirror (number of buffers, positions of messages in the buffers) the clients buffers. The memory consumption of servers will be capped impplicitly by the clients behavior where the time spent to wait for free buffers could be present again.

Location:
XIOS3/trunk/src/transport
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/transport/p2p_client_buffer.cpp

    r2589 r2594  
    4141    //} 
    4242    currentWindow_=-1 ; 
     43    currentMirror_=-1 ; 
    4344     
    4445    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_); 
     
    5960  void CP2pClientBuffer::newBuffer(size_t size, bool fixed) 
    6061  {  
     62    currentMirror_++; 
    6163    currentWindow_=(currentWindow_+1)%maxWindows_ ; 
    62     if (usedWindows_[currentWindow_])  
    63     { 
    64       ERROR("void CP2pClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ; 
    65     } 
    66     else usedWindows_[currentWindow_]=true ; 
    67     buffers_.push_back(new CBuffer(windows_[currentWindow_], size, fixed));  
     64    //if (usedWindows_[currentWindow_])  
     65    //{ 
     66    //  ERROR("void CP2pClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ; 
     67    //} 
     68    //else usedWindows_[currentWindow_]=true ; 
     69    buffers_.push_back(new CBuffer(windows_[currentMirror_], size, fixed));  
    6870    currentBuffer_=buffers_.back() ; 
    6971    info(logProtocol)<<"   Nb attached memory blocs="<<buffers_.size()<<endl ; 
     
    106108        if (buffers_.empty()) 
    107109        { 
    108           if (fixed_) newBuffer(fixedSize_,fixed_) ; 
     110          if (fixed_) { 
     111            currentBufferSize_=fixedSize_; 
     112            newBuffer(fixedSize_,fixed_) ; 
     113          } 
    109114          else 
    110115          {  
     
    119124        if (count > 0)  
    120125        { 
    121           blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ; 
     126          //info(logProtocol) << "Using currentMirror_ 1 : "<<currentMirror_ << endl; 
     127          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentMirror_}) ; 
    122128          nbBlocs++ ;  
    123129        } 
     
    128134        if (count > 0)  
    129135        { 
    130           blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ; 
     136          //info(logProtocol) << "Using currentMirror_ 2 : "<<currentMirror_ << endl; 
     137          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentMirror_}) ; 
    131138          nbBlocs++ ;  
    132139        } 
     
    181188      if (buffers_.size()>1)  
    182189      {   
    183         usedWindows_[bloc.window]=false ; 
     190        //usedWindows_[bloc.window]=false ; 
    184191        delete buffers_.front() ; 
    185192        buffers_.pop_front() ; 
     
    279286    ostringstream outStr ; 
    280287    SRequest request ; 
    281     request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int))*nbBlocs) ;  
     288    request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int)+sizeof(size_t))*nbBlocs) ;  
    282289    *(request.buffer)<<timeline<<nbSenders<<nbBlocs ; 
    283290    if (info.isActive(logProtocol))  outStr<<"New timeline event sent to server rank "<<serverRank_<<" : timeLine="<<timeline<<"  nbSenders="<<nbSenders<<"  nbBlocs="<<nbBlocs<<endl ; 
     
    286293    for(int i=0 ; i<nbBlocs; ++i,++it)  
    287294    { 
    288       *(request.buffer) << it->addr << it->count << it->window; 
     295      *(request.buffer) << it->addr << it->count << it->window << it->start; 
    289296     
    290297      if (info.isActive(logProtocol)) 
     
    292299        size_t checksum=0 ; 
    293300        for(size_t j=0;j<it->count;j++) checksum+=((unsigned char*)(it->addr))[j] ; 
    294         outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  ;  " ; 
     301        outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  window="<<it->window<<"  start="<<it->start<<"  ;  " ; 
    295302      } 
    296303 
     
    300307    } 
    301308    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ; 
     309    //info(logProtocol) << "Send event : " << request.buffer->count() << endl; 
    302310    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ; 
    303311    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ; 
     
    311319    request.buffer = new CBufferOut(sizeof(EVENT_BUFFER_RESIZE)+sizeof(timeline)+sizeof(size)) ;  
    312320    *(request.buffer)<<EVENT_BUFFER_RESIZE<<timeline<<size ; 
     321    //info(logProtocol) << "Send resize : " << request.buffer->count() << endl; 
    313322    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ; 
    314323    requests_.push_back(request) ; 
  • XIOS3/trunk/src/transport/p2p_client_buffer.hpp

    r2558 r2594  
    5353            if (start_-end_ >= size) 
    5454            { 
     55              //info(logProtocol)<<" CASE 0 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 
    5556              count=size ; 
    5657              size = 0 ; 
     
    6465            else 
    6566            { 
     67              //info(logProtocol)<<" CASE 1 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 
    6668              count = start_-end_ ; 
    6769              size -= count ; 
     
    7880            if (size_-end_ >= size) 
    7981            { 
     82              //info(logProtocol)<<" CASE 2 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 
    8083              count = size ; 
    8184              size = 0; 
     
    8992            else 
    9093            { 
     94              //info(logProtocol)<<" CASE 3 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 
    9195              count = size_ - end_ ; 
    9296              size -= count ; 
     
    185189      vector<bool> usedWindows_ ; 
    186190      int currentWindow_ ; 
     191      int currentMirror_ ; 
    187192      int maxWindows_ ; 
    188193 
  • XIOS3/trunk/src/transport/p2p_server_buffer.cpp

    r2589 r2594  
    1919    //bufferIn >> controlAddr_; 
    2020    createWindow(commSelf, interCommMerged) ; 
     21    countDeletedBuffers_ = 0; 
    2122  } 
    2223 
     
    8081      int count ; 
    8182      int window ; 
     83      size_t start ;  
    8284      bufferIn >> nbBlocs ; 
    8385      MPI_Aint bloc ; 
     
    8587      for(int i=0;i<nbBlocs;++i)  
    8688      { 
    87         bufferIn >> bloc >> count >> window; 
    88         blocs.push_back({bloc, count, window}) ; 
     89        bufferIn >> bloc >> count >> window >> start; 
     90        //info(logProtocol) << "Receiving window : "<<window << endl; 
     91        blocs.push_back({bloc, count, window,start}) ; 
    8992      } 
    9093    } 
     
    115118    { 
    116119      if (buffers_.front()->getCount()==0) { 
     120        // If the front buffer is empty and if another buffer become the active one (buffers_.size()>1) 
     121        //     the front buffer can be deleted, no new message will be sent through the front buffer 
    117122        delete buffers_.front(); 
    118         buffers_.pop_front() ; // if buffer is empty free buffer 
     123        buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer 
     124        //info(logProtocol) << "Deleting win : " << countDeletedBuffers_  << endl; 
     125        countDeletedBuffers_++; 
    119126      } 
    120127    } 
     
    223230    } 
    224231  } 
     232   
     233  size_t CP2pServerBuffer::remainSize(int bufferId) 
     234  { 
     235    if (bufferId-countDeletedBuffers_>=buffers_.size()) 
     236    { 
     237      //info(logProtocol) << "The buffer " << bufferId << " is not yet allocated" << endl; 
     238      return 0; 
     239    }       
     240    return buffers_[bufferId-countDeletedBuffers_]->remain() ; 
     241  } 
     242 
    225243 
    226244  void CP2pServerBuffer::transferEvents(void) 
     
    232250 
    233251      size_t timeline =  pendingBlocs_.begin()->first ; 
    234       auto& blocs = pendingBlocs_.begin()->second ; 
    235        
    236       if (!bufferResize_.empty())  
    237       { 
    238         if (bufferResize_.front().first==timeline) 
    239         { 
    240           currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 
    241           info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ; 
    242           bufferResize_.pop_front() ; 
    243           newBuffer(currentBufferSize_,fixed_) ; 
    244         } 
    245       } 
     252      auto& blocs = pendingBlocs_.begin()->second ; // map<size_t  , list<tuple<MPI_Aint,int ,int,size_t>>> pendingBlocs_; 
     253                                                    //     timeline,            addr    ,size,win,start 
     254      // addr   = std::get<0>(bloc) ; 
     255      // size   = std::get<1>(bloc) ; 
     256      // window = std::get<2>(bloc) ; 
     257      // start  = std::get<3>(bloc) ; // start : used to check mirror behavior 
    246258 
    247259      size_t eventSize=0 ; 
    248       for(auto& bloc : blocs) eventSize+=get<1>(bloc) ; 
    249        
    250       if (eventSize > remain)  
    251       { 
    252         if ( eventSize <= currentBufferSize_) return ; // wait for free storage ; 
    253         else  
    254         { 
    255           if (currentBuffer_==nullptr) remain = eventSize ; 
    256           else remain = currentBuffer_->remain() + fixedSize_ ; 
    257         } 
    258       } 
    259260       
    260261      //if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked"); 
     
    275276      //} 
    276277      //isLocked_=true ; 
    277       do 
     278//      do 
     279 
     280      bool spaceForAllblocks = true; 
     281      int lastBufferUsed = -1; 
     282      if (blocs.size()==0) spaceForAllblocks = false; 
     283      else 
     284      { 
     285        for(auto& bloc : blocs) 
     286        { 
     287          //info(logProtocol) << "blocSize = " << get<1>(bloc) 
     288          //                  << " - remain in win : " << get<2>(bloc) << " : " << remainSize( get<2>(bloc) ) 
     289          //                  << "; bufferResize_ = " <<  bufferResize_.size() << endl; 
     290           
     291          // if the active buffer change, the new buffer must be considered as empty 
     292          if (lastBufferUsed != get<2>(bloc) ) eventSize = 0; 
     293 
     294          // if the targeted buffer does not exist 
     295          if ( get<2>(bloc)-countDeletedBuffers_>=buffers_.size() ) 
     296          { 
     297            if ( bufferResize_.empty() ) // no resize order 
     298            { 
     299              spaceForAllblocks = false; 
     300              break; 
     301            } 
     302          } 
     303          else if ( ( get<1>(bloc) > (remainSize(get<2>(bloc))-eventSize) ) )  // if there is no enough place in the targeted bloc 
     304          { 
     305            spaceForAllblocks = false; 
     306            break; 
     307          } 
     308          else 
     309          { 
     310            // if there is enough place in the targeted bloc, store the  
     311            lastBufferUsed = get<2>(bloc); 
     312            eventSize += get<1>(bloc); 
     313          } 
     314        } 
     315      }       
     316 
     317      if (spaceForAllblocks) 
    278318      { 
    279319        transferEvent() ; // ok enough storage for this bloc 
     
    284324        //  break ; // transfering just one event temporary => to remove 
    285325         
    286         if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop 
    287  
    288         timeline =  pendingBlocs_.begin()->first ; 
    289         auto& blocs=pendingBlocs_.begin()->second ; 
    290          
    291         if (!bufferResize_.empty())  
    292         { 
    293           if (bufferResize_.front().first==timeline) 
    294           { 
    295             currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 
    296             info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ; 
    297             bufferResize_.pop_front() ; 
    298             newBuffer(currentBufferSize_,fixed_) ; 
    299           } 
    300         } 
    301  
    302         for(auto& bloc : blocs) eventSize+=get<1>(bloc) ; 
    303         if (transferedSize+eventSize<=remain) 
     326//        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop 
     327// 
     328//        timeline =  pendingBlocs_.begin()->first ; 
     329//        auto& blocs=pendingBlocs_.begin()->second ; 
     330//         
     331// 
     332//        for(auto& bloc : blocs) eventSize+=get<1>(bloc) ; 
     333//        if (transferedSize+eventSize<=remain) 
    304334        { 
    305335          //for(auto& bloc : blocs)  
     
    317347        } 
    318348      } 
    319       while(transferedSize+eventSize<=remain) ; 
     349//      while(transferedSize+eventSize<=remain) ; 
    320350       
    321351    } 
     
    341371      size = std::get<1>(bloc) ; 
    342372      window = std::get<2>(bloc) ; 
     373      start = std::get<3>(bloc) ; // start : used to check mirror behavior 
    343374 
    344375      offset=0 ; 
    345376 
     377      // Need to keep loop even if a given bloc will not be split. 
     378      // To mimic client behavior, especially if (size_==end_) reset end_ = 0 ; 
    346379      do 
    347380      { 
    348         if (currentBuffer_!=nullptr) 
    349         { 
    350           currentBuffer_->reserve(size, start, count) ; 
     381        //if ( (currentBuffer_!=nullptr) || (window-countDeletedBuffers_ == buffers_.size() ) ) 
     382        { 
     383          if (window-countDeletedBuffers_ >= buffers_.size()) 
     384            { 
     385              if (!bufferResize_.empty())  
     386                { 
     387                  if (bufferResize_.front().first==timeline) 
     388                    { 
     389                      currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 
     390                      //info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ; 
     391                      bufferResize_.pop_front() ; 
     392                      newBuffer(currentBufferSize_,fixed_) ; 
     393                    } 
     394                } 
     395            } 
     396           
     397          buffers_[window-countDeletedBuffers_]->reserve(size, start, count) ; 
    351398       
    352399          if ( count > 0) 
    353400          { 
    354             transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ; 
     401            transferRmaRequest(timeline, addr, offset, buffers_[window-countDeletedBuffers_], start, count, window) ; 
    355402            offset=MPI_Aint_add(offset, count) ; 
    356           } 
    357           //currentBuffer_->reserve(size, start, count) ; 
    358        
    359           //if ( count > 0) 
    360           //{ 
    361           //  transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ; 
    362           //  offset=MPI_Aint_add(offset, count) ; 
    363           //} 
     403             
     404          } 
    364405        } 
    365406 
    366         if (size>0)  
    367         { 
    368           if (fixed_) newBuffer(std::max(fixedSize_, size),fixed_) ; 
    369           else 
    370           { 
    371             currentBufferSize_ = std::max((size_t)(currentBufferSize_*growingFactor_), size) ; 
    372             newBuffer(currentBufferSize_,fixed_) ; 
    373           } 
    374         } 
    375407      } while (size > 0 ) ; 
    376408    } 
     
    385417    if (info.isActive(logProtocol)) 
    386418    { 
    387       info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<endl ; 
     419      info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<"  window="<<window<<endl ; 
    388420      info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1) 
    389421               <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ; 
     
    425457 
    426458      size+=bloc.count ; 
     459      //info(logProtocol) << "Free from : " << bloc.start << ", size : " << bloc.count<< endl; 
    427460      bloc.buffer->free(bloc.start, bloc.count) ; // free bloc 
    428461      addr=bloc.addr ; 
     
    431464        if (buffers_.size() > 1) 
    432465        { 
     466          // If the front buffer is empty and if another buffer become the active one (buffers_.size()>1) 
     467          //     the front buffer can be deleted, no new message will be sent through the front buffer 
    433468          delete buffers_.front(); 
    434           buffers_.pop_front() ; // if buffer is empty free buffer 
     469          buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer 
     470          //info(logProtocol) << "Deleting win : " << countDeletedBuffers_  << endl; 
     471          countDeletedBuffers_++; 
    435472        } 
    436473      } 
  • XIOS3/trunk/src/transport/p2p_server_buffer.hpp

    r2585 r2594  
    4646              if (start_-end_ >= size) 
    4747              { 
     48                //if (start!=end_) info(logProtocol)<<" CASE 0, recv/computed : " << start << "/" << end_ << endl; 
    4849                count=size ; 
    4950                size = 0 ; 
     
    5455              else 
    5556              { 
     57                // Server buffers get blocs from clients, they should not be splitted (mirrors the client buffers) 
     58                //if (start!=end_) info(logProtocol)<<" CASE 1"<< endl; 
     59                //info(logProtocol)<<" CASE 1 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 
     60                ERROR("COneSidedServerBuffer::reserve()",<<"This should be the case of the 2nd part of a splitted message"<<std::endl); 
    5661                //count = start_-end_ ; 
    5762                //size -= count ; 
     
    6671              if (size_-end_ >= size) 
    6772              { 
     73                //if (start!=end_ ) info(logProtocol)<<" CASE 2, recv/computed : "<< start << "/" << end_ << endl; 
    6874                count = size ; 
    6975                size = 0; 
     
    7480              else 
    7581              { 
     82                // Server buffers get blocs from clients, they should not be splitted (mirrors the client buffers) 
     83                //     1st part of a splitted message, fill the end of the buffer 
     84                //     end_ must be set to 0 like on clients 
     85                //if (start!=end_) info(logProtocol)<<" CASE 3"<< endl; 
     86                //  info(logProtocol)<<" CASE 3 : start/end/count - size : "<< start_ << ", " << end_ << ", " << count_ << " " << size_ << " vs size " << size << endl; 
    7687                //count = size_ - end_ ; 
    7788                //size -= count ; 
    7889                //start=end_ ; 
    79                 //end_ = 0 ; 
     90                end_ = 0 ; 
    8091                //count_+= count ; 
    8192                count = 0 ; 
     
    94105          } 
    95106 
    96           size_t remain(void) { return size_-count_; } 
     107          size_t remain(void) { 
     108            if (count_==0) 
     109              return size_; 
     110            else if (end_<start_) 
     111              return start_-end_; 
     112            else 
     113              return size_-end_; 
     114          } 
    97115          size_t getSize(void) { return size_ ;} 
    98116          size_t getCount(void) {return count_ ;} 
     
    110128          while (!buffers_.empty()) { 
    111129              delete buffers_.front(); 
    112               buffers_.pop_front() ; // if buffer is empty free buffer 
     130              buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer 
     131              countDeletedBuffers_++; 
    113132          } 
    114133      }; 
     
    135154      void transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window) ; 
    136155      size_t remainSize(void) ; 
     156      size_t remainSize(int bufferId) ; 
    137157 
    138158 
     
    144164      double bufferServerFactor_=1. ; 
    145165       
    146       std::list<CBuffer*> buffers_ ; 
     166      std::vector<CBuffer*> buffers_ ; 
    147167      CBuffer* currentBuffer_=nullptr ; 
    148168 
     
    151171 
    152172      map<size_t, int> nbSenders_ ; 
    153       map<size_t, list<tuple<MPI_Aint,int,int>>> pendingBlocs_; 
     173      map<size_t, list<tuple<MPI_Aint,int,int,size_t>>> pendingBlocs_; 
    154174      
    155175      vector<MPI_Request> pendingRmaRequests_ ; 
     
    176196      const int windowRank_=0 ; 
    177197      MPI_Aint lastBlocToFree_=0 ; 
     198      int countDeletedBuffers_; 
    178199 
    179200  } ; 
Note: See TracChangeset for help on using the changeset viewer.