Ignore:
Timestamp:
10/25/23 11:40:07 (8 months ago)
Author:
jderouillat
Message:

Update the p2p protocol as a mirror protocol : the servers buffers will strictly mirror (number of buffers, positions of messages in the buffers) the clients buffers. The memory consumption of servers will be capped impplicitly by the clients behavior where the time spent to wait for free buffers could be present again.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/transport/p2p_client_buffer.cpp

    r2589 r2594  
    4141    //} 
    4242    currentWindow_=-1 ; 
     43    currentMirror_=-1 ; 
    4344     
    4445    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_); 
     
    5960  void CP2pClientBuffer::newBuffer(size_t size, bool fixed) 
    6061  {  
     62    currentMirror_++; 
    6163    currentWindow_=(currentWindow_+1)%maxWindows_ ; 
    62     if (usedWindows_[currentWindow_])  
    63     { 
    64       ERROR("void CP2pClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ; 
    65     } 
    66     else usedWindows_[currentWindow_]=true ; 
    67     buffers_.push_back(new CBuffer(windows_[currentWindow_], size, fixed));  
     64    //if (usedWindows_[currentWindow_])  
     65    //{ 
     66    //  ERROR("void CP2pClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ; 
     67    //} 
     68    //else usedWindows_[currentWindow_]=true ; 
     69    buffers_.push_back(new CBuffer(windows_[currentMirror_], size, fixed));  
    6870    currentBuffer_=buffers_.back() ; 
    6971    info(logProtocol)<<"   Nb attached memory blocs="<<buffers_.size()<<endl ; 
     
    106108        if (buffers_.empty()) 
    107109        { 
    108           if (fixed_) newBuffer(fixedSize_,fixed_) ; 
     110          if (fixed_) { 
     111            currentBufferSize_=fixedSize_; 
     112            newBuffer(fixedSize_,fixed_) ; 
     113          } 
    109114          else 
    110115          {  
     
    119124        if (count > 0)  
    120125        { 
    121           blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ; 
     126          //info(logProtocol) << "Using currentMirror_ 1 : "<<currentMirror_ << endl; 
     127          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentMirror_}) ; 
    122128          nbBlocs++ ;  
    123129        } 
     
    128134        if (count > 0)  
    129135        { 
    130           blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ; 
     136          //info(logProtocol) << "Using currentMirror_ 2 : "<<currentMirror_ << endl; 
     137          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentMirror_}) ; 
    131138          nbBlocs++ ;  
    132139        } 
     
    181188      if (buffers_.size()>1)  
    182189      {   
    183         usedWindows_[bloc.window]=false ; 
     190        //usedWindows_[bloc.window]=false ; 
    184191        delete buffers_.front() ; 
    185192        buffers_.pop_front() ; 
     
    279286    ostringstream outStr ; 
    280287    SRequest request ; 
    281     request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int))*nbBlocs) ;  
     288    request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int)+sizeof(size_t))*nbBlocs) ;  
    282289    *(request.buffer)<<timeline<<nbSenders<<nbBlocs ; 
    283290    if (info.isActive(logProtocol))  outStr<<"New timeline event sent to server rank "<<serverRank_<<" : timeLine="<<timeline<<"  nbSenders="<<nbSenders<<"  nbBlocs="<<nbBlocs<<endl ; 
     
    286293    for(int i=0 ; i<nbBlocs; ++i,++it)  
    287294    { 
    288       *(request.buffer) << it->addr << it->count << it->window; 
     295      *(request.buffer) << it->addr << it->count << it->window << it->start; 
    289296     
    290297      if (info.isActive(logProtocol)) 
     
    292299        size_t checksum=0 ; 
    293300        for(size_t j=0;j<it->count;j++) checksum+=((unsigned char*)(it->addr))[j] ; 
    294         outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  ;  " ; 
     301        outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  window="<<it->window<<"  start="<<it->start<<"  ;  " ; 
    295302      } 
    296303 
     
    300307    } 
    301308    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ; 
     309    //info(logProtocol) << "Send event : " << request.buffer->count() << endl; 
    302310    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ; 
    303311    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ; 
     
    311319    request.buffer = new CBufferOut(sizeof(EVENT_BUFFER_RESIZE)+sizeof(timeline)+sizeof(size)) ;  
    312320    *(request.buffer)<<EVENT_BUFFER_RESIZE<<timeline<<size ; 
     321    //info(logProtocol) << "Send resize : " << request.buffer->count() << endl; 
    313322    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ; 
    314323    requests_.push_back(request) ; 
Note: See TracChangeset for help on using the changeset viewer.