Ignore:
Timestamp:
04/20/21 09:49:47 (3 years ago)
Author:
ymipsl
Message:

New management of client-server buffers.

  • buffers can grow automatically in intialization phase
  • buffers is evaluated after the close context definition phase and fixed at optimal value.

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.cpp

    r1853 r2130  
    1212#include "cxios.hpp" 
    1313#include "server.hpp" 
     14#include "services.hpp" 
     15#include <boost/functional/hash.hpp> 
     16#include <random> 
     17#include <chrono> 
    1418 
    1519namespace xios 
     
    2529    { 
    2630       
    27       context = parent; 
     31      context_ = parent; 
    2832      intraComm = intraComm_; 
    2933      interComm = interComm_; 
     
    6670      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 
    6771 
     72      auto time=chrono::system_clock::now().time_since_epoch().count() ; 
     73      std::default_random_engine rd(time); // not reproducible from a run to another 
     74      std::uniform_int_distribution<size_t> dist; 
     75      hashId_=dist(rd) ; 
     76      MPI_Bcast(&hashId_,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context 
     77 
    6878      timeLine = 1; 
    6979    } 
     
    124134    { 
    125135      list<int> ranks = event.getRanks(); 
    126       info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<endl ; 
     136      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<endl ; 
    127137      if (CXios::checkEventSync) 
    128138      { 
     
    154164 
    155165        unlockBuffers(ranks) ; 
    156         info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<"  sent"<<endl ; 
     166        info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  sent"<<endl ; 
    157167           
    158168        checkBuffers(ranks); 
     
    161171      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
    162172      { 
    163         waitEvent(ranks); 
    164         CContext::setCurrent(context->getId()); 
     173        while (checkBuffers(ranks)) context_->globalEventLoop() ; 
     174       
     175        CXios::getDaemonsManager()->scheduleContext(hashId_) ; 
     176        while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) context_->globalEventLoop() ; 
    165177      } 
    166178       
     
    177189      while (checkBuffers(ranks)) 
    178190      { 
    179         CXios::getDaemonsManager()->eventLoop() ; 
     191        context_->eventLoop() ; 
    180192      } 
    181193 
     
    256268          for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 
    257269          checkBuffers(); 
    258            
     270/*           
    259271          context->server->listen(); 
    260272 
     
    265277            context->globalEventLoop() ; 
    266278          } 
    267  
    268         } 
     279*/ 
     280           context_->globalEventLoop() ; 
     281        } 
     282 
    269283      } while (!areBuffersFree && !nonBlocking); 
    270284      CTimer::get("Blocking time").suspend(); 
     
    295309   
    296310      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, Wins, clientRank, rank, mapBufferSize_[rank], maxEventSizes[rank]); 
     311      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ; 
     312      else buffer->fixBuffer() ; 
    297313      // Notify the server 
    298       CBufferOut* bufOut = buffer->getBuffer(0, 3*sizeof(MPI_Aint)); 
    299       MPI_Aint sendBuff[3] ; 
    300       sendBuff[0]=mapBufferSize_[rank]; // Stupid C++ 
    301       sendBuff[1]=buffers[rank]->getWinAddress(0);  
    302       sendBuff[2]=buffers[rank]->getWinAddress(1);  
     314      CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint)); 
     315      MPI_Aint sendBuff[4] ; 
     316      sendBuff[0]=hashId_; 
     317      sendBuff[1]=mapBufferSize_[rank]; 
     318      sendBuff[2]=buffers[rank]->getWinAddress(0);  
     319      sendBuff[3]=buffers[rank]->getWinAddress(1);  
    303320      info(100)<<"CContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl; 
    304       bufOut->put(sendBuff, 3); // Stupid C++ 
     321      bufOut->put(sendBuff, 4);  
    305322      buffer->checkBuffer(true); 
    306323 
     
    383400    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event 
    384401   */ 
    385    void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize) 
    386    { 
    387      mapBufferSize_ = mapSize; 
    388      maxEventSizes = maxEventSize; 
     402   void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize) 
     403   { 
     404     for(auto& it : mapSize) {buffers[it.first]->fixBufferSize(std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0));} 
    389405   } 
    390406 
     
    463479    for (itMap = itbMap; itMap != iteMap; ++itMap) 
    464480    { 
    465       report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 
     481      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl 
    466482                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 
    467483      totalBuf += itMap->second; 
    468484    } 
    469     report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 
     485    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 
    470486 
    471487  } 
Note: See TracChangeset for help on using the changeset viewer.