Changeset 2130


Ignore:
Timestamp:
04/20/21 09:49:47 (3 years ago)
Author:
ymipsl
Message:

New management of client-server buffers.

  • buffers can grow automatically in intialization phase
  • buffers is evaluated after the close context definition phase and fixed at optimal value.

YM

Location:
XIOS/dev/dev_ym/XIOS_COUPLING/src
Files:
1 added
21 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp

    r1757 r2130  
    77#include "mpi.hpp" 
    88#include "tracer.hpp" 
     9#include "timeline_events.hpp" 
    910 
    1011namespace xios 
     
    200201   
    201202    lockBuffer(); 
     203    count=*bufferCount[current] ; 
     204     
     205    if (resizingBufferStep_ > 0 ) return false ; 
     206 
    202207    if (size > bufferSize) 
    203       ERROR("bool CClientBuffer::isBufferFree(StdSize size)", 
    204             << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl); 
     208    { 
     209      // ERROR("bool CClientBuffer::isBufferFree(StdSize size)", 
     210      //      << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl); 
     211      resizingBufferStep_=1 ; 
     212      newBufferSize_=size ; 
     213      return false ; 
     214    } 
    205215 
    206216    if (size > maxEventSize) 
     
    214224      if (size > maxRequestSize) maxRequestSize = size; 
    215225    } 
    216  
    217       count=*bufferCount[current] ; 
    218       return (size <= remain()); 
     226     
     227    if (size > remain()) 
     228    { 
     229      if (isGrowableBuffer_) 
     230      { 
     231        resizingBufferStep_ = 1 ; 
     232        newBufferSize_ = (count+size)*growFactor_ ; 
     233      }   
     234      return false ; 
     235    } 
     236    else return true ; 
    219237  } 
    220238 
     
    276294    if (!pending) 
    277295    { 
    278       if (!send) return false ; 
     296      if (!send && resizingBufferStep_==0 ) return false ; 
     297 
    279298      if (count > 0) 
    280299      { 
     
    284303        { 
    285304          MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
     305          if (resizingBufferStep_==3) resizingBufferStep_=0 ; 
    286306          pending = true; 
    287307//          *control[current]=0 ; 
     
    295315          count = 0; 
    296316        } 
    297         else unlockBuffer() ; 
     317        else  
     318        { 
     319          unlockBuffer() ; 
     320        } 
    298321      } 
     322      else 
     323      { 
     324        if (resizingBufferStep_==2) resizeBuffer(newBufferSize_) ; 
     325        if (resizingBufferStep_==1) resizeBufferNotify() ; 
     326      } 
    299327    } 
    300328 
    301329    return pending; 
     330  } 
     331 
     332  void CClientBuffer::resizeBufferNotify(void) 
     333  { 
     334    // notify server of changing buffers size 
     335    lockBuffer() ; 
     336    int size=sizeof(int)+sizeof(size_t) ; 
     337    CBufferOut* bufOut = this->getBuffer(timelineEventNotifyChangeBufferSize, size); 
     338    bufOut->put(size); 
     339    bufOut->put(timelineEventNotifyChangeBufferSize); 
     340    resizingBufferStep_ = 2 ; 
     341    unlockBuffer() ; 
     342  } 
     343 
     344  void CClientBuffer::resizeBuffer(size_t newSize) 
     345  { 
     346    if (hasWindows) 
     347    {  
     348      MPI_Win_detach(windows_[0], bufferHeader[0]) ; 
     349      MPI_Win_detach(windows_[1], bufferHeader[1]) ; 
     350    } 
     351    MPI_Free_mem(bufferHeader[0]) ; 
     352    MPI_Free_mem(bufferHeader[1]) ; 
     353 
     354    bufferSize=newSize ; 
     355    MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 
     356    MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 
     357    buffer[0] = bufferHeader[0]+headerSize ; 
     358    buffer[1] = bufferHeader[1]+headerSize ; 
     359    firstTimeLine[0]=(size_t*)bufferHeader[0] ; 
     360    firstTimeLine[1]=(size_t*)bufferHeader[1] ; 
     361    bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 
     362    bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 
     363    control[0]=(size_t*)bufferHeader[0] +2 ; 
     364    control[1]=(size_t*)bufferHeader[1] +2 ; 
     365    finalize[0]=(size_t*)bufferHeader[0] +3 ; 
     366    finalize[1]=(size_t*)bufferHeader[1] +3 ; 
     367 
     368    *firstTimeLine[0]=0 ; 
     369    *firstTimeLine[1]=0 ; 
     370    *bufferCount[0]=0 ; 
     371    *bufferCount[1]=0 ; 
     372    *control[0]=0 ; 
     373    *control[1]=0 ; 
     374    *finalize[0]=0 ; 
     375    *finalize[1]=0 ; 
     376    winState[0]=false ; 
     377    winState[1]=false ; 
     378    current=0 ; 
     379     
     380    if (hasWindows) 
     381    {   
     382     
     383      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; 
     384      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; 
     385           
     386      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
     387      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
     388 
     389      MPI_Win_unlock(clientRank_, windows_[1]) ; 
     390      MPI_Win_unlock(clientRank_, windows_[0]) ; 
     391    }  
     392 
     393    lockBuffer() ; 
     394  
     395    int size=sizeof(int)+2*sizeof(size_t)+2*sizeof(MPI_AINT) ; 
     396    CBufferOut* bufOut = this->getBuffer(timelineEventChangeBufferSize, size); 
     397    bufOut->put(size); 
     398    bufOut->put(timelineEventChangeBufferSize); 
     399    bufOut->put(newBufferSize_); 
     400    bufOut->put(this->getWinAddress(0)); 
     401    bufOut->put(this->getWinAddress(1)); 
     402 
     403    resizingBufferStep_=3; 
     404    unlockBuffer() ; 
    302405  } 
    303406 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.hpp

    r1757 r2130  
    2929      void infoBuffer(void) ; 
    3030      bool isNotifiedFinalized(void) ; 
     31      void setGrowableBuffer(double growFactor) { growFactor_=growFactor ; isGrowableBuffer_=true ;} 
     32      void fixBufferSize(size_t bufferSize) { newBufferSize_=bufferSize ; isGrowableBuffer_=false ; resizingBufferStep_=1 ;} 
     33      void fixBuffer(void) { isGrowableBuffer_=false ;} 
    3134    private: 
     35       void resizeBuffer(size_t newSize) ; 
     36       void resizeBufferNotify(void) ; 
     37 
     38 
    3239      char* buffer[2]; 
    3340      char* bufferHeader[2]; 
     
    3845      bool winState[2] ; 
    3946      int current; 
     47       
     48      double growFactor_=1.2 ; 
     49      bool isGrowableBuffer_=true ; 
    4050 
     51      int resizingBufferStep_ = 0 ; 
     52      size_t newBufferSize_ ; 
    4153      StdSize count; 
    4254      StdSize maxEventSize; 
    43       const StdSize bufferSize; 
     55      StdSize bufferSize; 
    4456      const StdSize estimatedMaxEventSize; 
    4557 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.cpp

    r1757 r2130  
    1515    end = size; 
    1616    used=0 ; 
    17     buffer = new char[size]; // use MPI_ALLOC_MEM later? 
     17    MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer) ; 
    1818    currentWindows=1 ; 
    1919    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 
     
    2222  CServerBuffer::~CServerBuffer() 
    2323  { 
    24     delete [] buffer ; 
     24    MPI_Free_mem(buffer) ; 
    2525  } 
    2626 
     
    222222  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 
    223223  { 
    224     if (!hasWindows) return false ; 
     224    if (!hasWindows || resizingBuffer_) return false ; 
    225225 
    226226     
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.hpp

    r1757 r2130  
    2626      void unlockBuffer(void) ; 
    2727      void notifyClientFinalize(void) ; 
     28      void notifyBufferResizing(void) { resizingBuffer_=true ;} 
    2829    private: 
    2930      char* buffer; 
     
    3536      std::vector<MPI_Win> windows_ ; 
    3637      std::vector<MPI_Aint> winAddress_ ; 
    37  
     38      bool resizingBuffer_ = false ; 
    3839      int currentWindows ; 
    3940      bool hasWindows ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.cpp

    r1853 r2130  
    1212#include "cxios.hpp" 
    1313#include "server.hpp" 
     14#include "services.hpp" 
     15#include <boost/functional/hash.hpp> 
     16#include <random> 
     17#include <chrono> 
    1418 
    1519namespace xios 
     
    2529    { 
    2630       
    27       context = parent; 
     31      context_ = parent; 
    2832      intraComm = intraComm_; 
    2933      interComm = interComm_; 
     
    6670      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 
    6771 
     72      auto time=chrono::system_clock::now().time_since_epoch().count() ; 
     73      std::default_random_engine rd(time); // not reproducible from a run to another 
     74      std::uniform_int_distribution<size_t> dist; 
     75      hashId_=dist(rd) ; 
     76      MPI_Bcast(&hashId_,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context 
     77 
    6878      timeLine = 1; 
    6979    } 
     
    124134    { 
    125135      list<int> ranks = event.getRanks(); 
    126       info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<endl ; 
     136      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<endl ; 
    127137      if (CXios::checkEventSync) 
    128138      { 
     
    154164 
    155165        unlockBuffers(ranks) ; 
    156         info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<"  sent"<<endl ; 
     166        info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  sent"<<endl ; 
    157167           
    158168        checkBuffers(ranks); 
     
    161171      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
    162172      { 
    163         waitEvent(ranks); 
    164         CContext::setCurrent(context->getId()); 
     173        while (checkBuffers(ranks)) context_->globalEventLoop() ; 
     174       
     175        CXios::getDaemonsManager()->scheduleContext(hashId_) ; 
     176        while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) context_->globalEventLoop() ; 
    165177      } 
    166178       
     
    177189      while (checkBuffers(ranks)) 
    178190      { 
    179         CXios::getDaemonsManager()->eventLoop() ; 
     191        context_->eventLoop() ; 
    180192      } 
    181193 
     
    256268          for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 
    257269          checkBuffers(); 
    258            
     270/*           
    259271          context->server->listen(); 
    260272 
     
    265277            context->globalEventLoop() ; 
    266278          } 
    267  
    268         } 
     279*/ 
     280           context_->globalEventLoop() ; 
     281        } 
     282 
    269283      } while (!areBuffersFree && !nonBlocking); 
    270284      CTimer::get("Blocking time").suspend(); 
     
    295309   
    296310      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, Wins, clientRank, rank, mapBufferSize_[rank], maxEventSizes[rank]); 
     311      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ; 
     312      else buffer->fixBuffer() ; 
    297313      // Notify the server 
    298       CBufferOut* bufOut = buffer->getBuffer(0, 3*sizeof(MPI_Aint)); 
    299       MPI_Aint sendBuff[3] ; 
    300       sendBuff[0]=mapBufferSize_[rank]; // Stupid C++ 
    301       sendBuff[1]=buffers[rank]->getWinAddress(0);  
    302       sendBuff[2]=buffers[rank]->getWinAddress(1);  
     314      CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint)); 
     315      MPI_Aint sendBuff[4] ; 
     316      sendBuff[0]=hashId_; 
     317      sendBuff[1]=mapBufferSize_[rank]; 
     318      sendBuff[2]=buffers[rank]->getWinAddress(0);  
     319      sendBuff[3]=buffers[rank]->getWinAddress(1);  
    303320      info(100)<<"CContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl; 
    304       bufOut->put(sendBuff, 3); // Stupid C++ 
     321      bufOut->put(sendBuff, 4);  
    305322      buffer->checkBuffer(true); 
    306323 
     
    383400    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event 
    384401   */ 
    385    void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize) 
    386    { 
    387      mapBufferSize_ = mapSize; 
    388      maxEventSizes = maxEventSize; 
     402   void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize) 
     403   { 
     404     for(auto& it : mapSize) {buffers[it.first]->fixBufferSize(std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0));} 
    389405   } 
    390406 
     
    463479    for (itMap = itbMap; itMap != iteMap; ++itMap) 
    464480    { 
    465       report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 
     481      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl 
    466482                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 
    467483      totalBuf += itMap->second; 
    468484    } 
    469     report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 
     485    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 
    470486 
    471487  } 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.hpp

    r1918 r2130  
    6363      void finalize(void); 
    6464 
    65       void setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize); 
     65      void setBufferSize(const std::map<int,StdSize>& mapSize); 
    6666 
    6767      int getRemoteSize(void) {return serverSize;} 
     
    7575      /*! get the associated server (dual chanel client/server) */       
    7676      CContextServer* getAssociatedServer(void) { return associatedServer_;} 
    77  
     77      void setGrowableBuffer(void) { isGrowableBuffer_=true;} 
     78      void setFixedBuffer(void) { isGrowableBuffer_=false;} 
    7879    public: 
    79       CContext* context; //!< Context for client 
     80      CContext* context_; //!< Context for client 
    8081 
    8182      size_t timeLine; //!< Timeline of each event 
     
    9899 
    99100      bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 
     101 
     102      size_t hashId_ ; //!< hash id on the context client that will be used for context server to identify the remote calling context client. 
    100103 
    101104    private: 
     
    122125      bool isAttached_ ; 
    123126      CContextServer* associatedServer_ ; //!< The server associated to the pair client/server 
    124  
     127      bool isGrowableBuffer_ = true ; 
    125128  }; 
    126129} 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp

    r2123 r2130  
    2020#include "services.hpp" 
    2121#include "contexts_manager.hpp" 
     22#include "timeline_events.hpp" 
    2223 
    2324#include <boost/functional/hash.hpp> 
     
    188189    if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
    189190    { 
    190        MPI_Aint recvBuff[3] ; 
    191        MPI_Recv(recvBuff, 3, MPI_AINT, rank, 20, interComm, &status); 
    192        StdSize buffSize = recvBuff[0]; 
     191       MPI_Aint recvBuff[4] ; 
     192       MPI_Recv(recvBuff, 4, MPI_AINT, rank, 20, interComm, &status); 
     193       remoteHashId_ = recvBuff[0] ; 
     194       StdSize buffSize = recvBuff[1]; 
    193195       vector<MPI_Aint> winAdress(2) ; 
    194        winAdress[0]=recvBuff[1] ; winAdress[1]=recvBuff[2] ; 
     196       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 
    195197       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    196198       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 
     
    290292      CBufferIn newBuffer(startBuffer,buffer.remain()); 
    291293      newBuffer>>size>>timeLine; 
    292       it=events.find(timeLine); 
    293       if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first; 
    294       it->second->push(rank,buffers[rank],startBuffer,size); 
    295  
     294 
     295      if (timeLine==timelineEventNotifyChangeBufferSize) 
     296      { 
     297        buffers[rank]->notifyBufferResizing() ; 
     298        buffers[rank]->updateCurrentWindows() ; 
     299      }  
     300      else if (timeLine==timelineEventChangeBufferSize) 
     301      { 
     302        size_t newSize ; 
     303        vector<MPI_Aint> winAdress(2) ; 
     304        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ; 
     305        buffers.erase(rank) ; 
     306        buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, newSize))); 
     307      } 
     308      else 
     309      { 
     310        it=events.find(timeLine); 
     311        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first; 
     312        it->second->push(rank,buffers[rank],startBuffer,size); 
     313        if (timeLine>0) lastTimeLine[rank]=timeLine ; 
     314      } 
    296315      buffer.advance(size); 
    297316      count=buffer.remain(); 
    298317    } 
    299  
    300     if (timeLine>0) lastTimeLine[rank]=timeLine ; 
    301318     
    302319    CTimer::get("Process request").suspend(); 
     
    310327//    if (context->isProcessingEvent()) return ; 
    311328    if (isProcessingEvent_) return ; 
     329    if (isAttachedModeEnabled()) 
     330      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ; 
    312331 
    313332    it=events.find(currentTimeLine); 
     
    356375         currentTimeLine++; 
    357376         scheduled = false; 
     377         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ; 
    358378        } 
    359379      } 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.hpp

    r1853 r2130  
    7272      bool isProcessingEvent_ ; 
    7373      CContextClient* associatedClient_ ; 
     74      size_t remoteHashId_; //!< the hash is of the calling context client 
    7475  } ; 
    7576 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/distribution/grid_scatterer_connector.hpp

    r1934 r2130  
    4242      }      
    4343 
    44  
     44      const map<int,int>& getTransferedDataSize(void) {return dstSize_;} 
     45      
    4546      template<typename T, int N>  
    4647      void transfer(const CArray<T,N>& input, map<int, CArray<T,1>>& output) 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_to_server_store_filter.cpp

    r2022 r2130  
    2828    CTimer::get("Field : send data").suspend(); 
    2929  } 
     30   
     31  CContextClient* CClientToServerStoreFilter::getTransferedDataSize(map<int,int>& size) 
     32  { 
     33    size = field_->getSentGrid()->getClientToServerConnector(client_)->getTransferedDataSize() ; 
     34    return client_ ; 
     35  } 
    3036 
    3137  bool CClientToServerStoreFilter::mustAutoTrigger() const 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_to_server_store_filter.hpp

    r1935 r2130  
    2323       */ 
    2424      CClientToServerStoreFilter(CGarbageCollector& gc, CField* field, CContextClient* client); 
     25      /*! 
     26       * Get the size of data transfered by call. Needed for context client buffer size evaluation 
     27       * 
     28       * \param size : map returning the size for each server rank   
     29       * \return the associated context client 
     30       */ 
     31      CContextClient* getTransferedDataSize(map<int,int>& size) ; 
    2532 
    2633      /*! 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/server_to_client_store_filter.cpp

    r1934 r2130  
    4444  } 
    4545 
     46  CContextClient* CServerToClientStoreFilter::getTransferedDataSize(map<int,int>& size) 
     47  { 
     48    size = grid_->getServerToClientConnector()->getTransferedDataSize() ; 
     49    return client_ ; 
     50  } 
    4651 
    4752  bool CServerToClientStoreFilter::mustAutoTrigger() const 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/server_to_client_store_filter.hpp

    r1934 r2130  
    2424       */ 
    2525      CServerToClientStoreFilter(CGarbageCollector& gc, CField* field, CContextClient* client); 
     26 
     27      /*! 
     28       * Get the size of data transfered by call. Needed for context client buffer size evaluation 
     29       * 
     30       * \param size : map returning the size for each server rank   
     31       * \return the associated context client 
     32       */ 
     33      CContextClient* getTransferedDataSize(map<int,int>& size) ; 
    2634 
    2735      /*! 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/daemons_manager.hpp

    r1764 r2130  
    11#ifndef __DAEMONS_MANAGER_HPP__ 
    22#define __DAEMONS_MANAGER_HPP__ 
     3#include <cstddef> 
    34 
    45namespace xios 
     
    1617    bool servicesEventLoop(void) ; 
    1718     
     19    void scheduleContext(size_t hashId) { scheduledContext_=hashId ;} //!< for attached mode, give the hand to the associated context server 
     20    bool isScheduledContext(size_t hashId) { return scheduledContext_==hashId ;} //!< for attached mode, return true if context server is sceduled 
     21    void unscheduleContext(void) { scheduledContext_=0 ;} //!< for attached mode : unschedule context 
     22 
    1823    private: 
    1924    bool isServer_ ; 
     25    size_t scheduledContext_ = 0 ; //!< Hash id of the next scehduled context for attached mode 
    2026  } ; 
    2127} 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.hpp

    r1764 r2130  
    3333    void freeComm(void) ; 
    3434    bool isAttachedMode(void) { return isAttachedMode_ ;} 
     35    CService* getParentService(void) {return parentService_ ; } 
     36 
    3537    private: 
    3638    void createIntercomm(void) ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.hpp

    r1764 r2130  
    3737    int getType(void) {return type_;} 
    3838    int getNbPartitions(void) {return nbPartitions_;} 
    39  
     39     
    4040    private: 
    4141    void sendNotification(int rank) ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp

    r2124 r2130  
    292292 
    293293 
    294    void CContext::setClientServerBuffer(vector<CField*>& fields, bool bufferForWriting) 
    295    TRY 
    296    { 
    297       // Estimated minimum event size for small events (20 is an arbitrary constant just for safety) 
    298      const size_t minEventSize = CEventClient::headerSize + 20 * sizeof(int); 
    299       // Ensure there is at least some room for 20 of such events in the buffers 
    300      size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize); 
    301  
    302 #define DECLARE_NODE(Name_, name_)    \ 
    303      if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition); 
    304 #define DECLARE_NODE_PAR(Name_, name_) 
    305 #include "node_type.conf" 
    306 #undef DECLARE_NODE 
    307 #undef DECLARE_NODE_PAR 
    308  
    309  
    310      map<CContextClient*,map<int,size_t>> dataSize ; 
    311      map<CContextClient*,map<int,size_t>> maxEventSize ; 
    312      map<CContextClient*,map<int,size_t>> attributesSize ;   
    313  
    314      for(auto field : fields) 
    315      { 
    316        field->setContextClientDataBufferSize(dataSize, maxEventSize, bufferForWriting) ; 
    317        field->setContextClientAttributesBufferSize(attributesSize, maxEventSize, bufferForWriting) ; 
    318      } 
    319       
    320  
    321      for(auto& it : attributesSize) 
    322      { 
    323        auto contextClient = it.first ; 
    324        auto& contextDataSize =  dataSize[contextClient] ; 
    325        auto& contextAttributesSize =  attributesSize[contextClient] ; 
    326        auto& contextMaxEventSize =  maxEventSize[contextClient] ; 
    327     
    328        for (auto& it : contextAttributesSize) 
    329        { 
    330          auto serverRank=it.first ; 
    331          auto& buffer = contextAttributesSize[serverRank] ; 
    332          if (contextDataSize[serverRank] > buffer) buffer=contextDataSize[serverRank] ; 
    333          buffer *= CXios::bufferSizeFactor; 
    334          if (buffer < minBufferSize) buffer = minBufferSize; 
    335          if (buffer > CXios::maxBufferSize ) buffer = CXios::maxBufferSize; 
    336        } 
    337  
    338        // Leaders will have to send some control events so ensure there is some room for those in the buffers 
    339        if (contextClient->isServerLeader()) 
    340          for(auto& rank : contextClient->getRanksServerLeader()) 
    341            if (!contextAttributesSize.count(rank)) 
    342            { 
    343              contextAttributesSize[rank] = minBufferSize; 
    344              contextMaxEventSize[rank] = minEventSize; 
    345            } 
    346        
    347        contextClient->setBufferSize(contextAttributesSize, contextMaxEventSize);     
    348      } 
    349    } 
    350    CATCH_DUMP_ATTR 
    351  
    352  
    353     /*! 
    354     Sets client buffers. 
    355     \param [in] contextClient 
    356     \param [in] bufferForWriting True if buffers are used for sending data for writing 
    357     This flag is only true for client and server-1 for communication with server-2 
    358   */ 
    359   // ym obsolete to be removed 
    360    void CContext::setClientServerBuffer(CContextClient* contextClient, bool bufferForWriting) 
    361    TRY 
    362    { 
    363       // Estimated minimum event size for small events (20 is an arbitrary constant just for safety) 
    364      const size_t minEventSize = CEventClient::headerSize + 20 * sizeof(int); 
    365  
    366       // Ensure there is at least some room for 20 of such events in the buffers 
    367       size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize); 
    368  
    369 #define DECLARE_NODE(Name_, name_)    \ 
    370      if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition); 
    371 #define DECLARE_NODE_PAR(Name_, name_) 
    372 #include "node_type.conf" 
    373 #undef DECLARE_NODE 
    374 #undef DECLARE_NODE_PAR 
    375  
    376      // Compute the buffer sizes needed to send the attributes and data corresponding to fields 
    377      std::map<int, StdSize> maxEventSize; 
    378      std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize, contextClient, bufferForWriting); 
    379      std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize, contextClient, bufferForWriting); 
    380  
    381      std::map<int, StdSize>::iterator it, ite = dataBufferSize.end(); 
    382      for (it = dataBufferSize.begin(); it != ite; ++it) 
    383        if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second; 
    384  
    385      // Apply the buffer size factor, check that we are above the minimum buffer size and below the maximum size 
    386      ite = bufferSize.end(); 
    387      for (it = bufferSize.begin(); it != ite; ++it) 
    388      { 
    389        it->second *= CXios::bufferSizeFactor; 
    390        if (it->second < minBufferSize) it->second = minBufferSize; 
    391        if (it->second > CXios::maxBufferSize) it->second = CXios::maxBufferSize; 
    392      } 
    393  
    394      // Leaders will have to send some control events so ensure there is some room for those in the buffers 
    395      if (contextClient->isServerLeader()) 
    396      { 
    397        const std::list<int>& ranks = contextClient->getRanksServerLeader(); 
    398        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    399        { 
    400          if (!bufferSize.count(*itRank)) 
    401          { 
    402            bufferSize[*itRank] = minBufferSize; 
    403            maxEventSize[*itRank] = minEventSize; 
    404          } 
    405        } 
    406      } 
    407      contextClient->setBufferSize(bufferSize, maxEventSize); 
    408    } 
    409    CATCH_DUMP_ATTR 
    410  
    411294 /*! 
    412295    * Compute the required buffer size to send the fields data. 
     
    716599  CATCH_DUMP_ATTR 
    717600 
     601  void CContext::globalEventLoop(void) 
     602  { 
     603    lockContext() ; 
     604    CXios::getDaemonsManager()->eventLoop() ; 
     605    unlockContext() ; 
     606    setCurrent(getId()) ; 
     607  } 
     608 
    718609  bool CContext::scheduledEventLoop(bool enableEventsProcessing)  
    719610  { 
     
    739630  { 
    740631    bool  finished;  
     632    if (isLockedContext()) return ; 
     633     
    741634    setCurrent(getId()) ; 
    742635 
     
    792685        couplerOutClient_[fullContextId] = client ; 
    793686        couplerOutServer_[fullContextId] = server ; 
    794  
    795 /* 
    796       // for now, we don't now which beffer size must be used for client coupler 
    797       // It will be evaluated later. Fix a constant size for now... 
    798       // set to 10Mb for development 
    799        map<int,size_t> bufferSize, maxEventSize ; 
    800        for(int i=0;i<client->getRemoteSize();i++) 
    801        { 
    802          bufferSize[i]=10000000 ; 
    803          maxEventSize[i]=10000000 ; 
    804        } 
    805  
    806        client->setBufferSize(bufferSize, maxEventSize);     
    807 */ 
    808687      } 
    809688    } 
     
    827706       MPI_Comm_free(&interComm) ; 
    828707 
    829        map<int,size_t> bufferSize, maxEventSize ; 
    830        for(int i=0;i<client->getRemoteSize();i++) 
    831        { 
    832          bufferSize[i]=10000000 ; 
    833          maxEventSize[i]=10000000 ; 
    834        } 
    835  
    836        client->setBufferSize(bufferSize, maxEventSize);     
    837708       couplerInClient_[fullContextId] = client ; 
    838709       couplerInServer_[fullContextId] = server ;         
     
    840711  } 
    841712   
    842   void CContext::globalEventLoop(void) 
    843   { 
    844     CXios::getDaemonsManager()->eventLoop() ; 
    845     setCurrent(getId()) ; 
    846   } 
    847  
    848  
    849713   void CContext::finalize(void) 
    850714   TRY 
     
    1054918        // connect to couplerOut -> to do 
    1055919      } 
    1056       if (first) setClientServerBuffer(couplerOutField, true) ; // set buffer context --> to check 
    1057920 
    1058921      bool couplersReady ; 
     
    1099962        field->connectToFileServer(garbageCollector) ; // connect the field to server filter 
    1100963      } 
    1101       setClientServerBuffer(fileOutField, true) ; // set buffer context --> to review 
    1102964      for(auto field : fileOutField) field->sendFieldToFileServer() ; 
    1103965    } 
     
    12071069      this->scheduledEventLoop() ; 
    12081070    } while (!ok) ; 
     1071 
     1072    // Now evaluate the size of the context client buffers 
     1073    map<CContextClient*,map<int,size_t>> fieldBufferEvaluation ; 
     1074    for(auto field : fileOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to server 
     1075    for(auto field : couplerOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to coupler 
     1076    for(auto field : fieldModelIn) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // server to client (for io servers) 
     1077     
     1078    // fix size for each context client 
     1079    for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ; 
     1080 
    12091081 
    12101082     CTimer::get("Context : close definition").suspend() ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.hpp

    r2123 r2130  
    150150         std::map<int, StdSize> getAttributesBufferSize(std::map<int, StdSize>& maxEventSize, CContextClient* contextClient, bool bufferForWriting = false); 
    151151         std::map<int, StdSize> getDataBufferSize(std::map<int, StdSize>& maxEventSize, CContextClient* contextClient, bool bufferForWriting = false); 
    152          void setClientServerBuffer(CContextClient* contextClient, bool bufferForWriting = false); // old interface to be removed 
    153          void setClientServerBuffer(vector<CField*>& fields, bool bufferForWriting) ;  
    154152 
    155153         // Distribute files (in write mode) among secondary-server pools according to the estimated data flux 
     
    357355         string contextId_ ; //!< context client id for the servers. For clients this is same as getId()  
    358356         bool isProcessingEvent_ ; 
     357    private:      
    359358         CServerContext* parentServerContext_ ; 
    360  
     359    public: 
     360         CServerContext* getParentServerContext(void) { return parentServerContext_; } 
     361    private:  
     362      bool lockedContext_=false; 
     363    public:  
     364        void lockContext(void) {lockedContext_=true; } 
     365        void unlockContext(void) {lockedContext_=true; } 
     366        bool isLockedContext(void) { return lockedContext_;} 
    361367      public: // Some function maybe removed in the near future 
    362368        // virtual void toBinary  (StdOStream & os) const; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp

    r2022 r2130  
    497497  CATCH_DUMP_ATTR 
    498498 
     499  bool CField::evaluateBufferSize(map<CContextClient*,map<int,size_t>>& evaluateBuffer, bool isOptPerformance) 
     500  { 
     501    CContextClient* client=nullptr ; 
     502 
     503    for(int i=0;i<2;i++) 
     504    { 
     505      map<int,int> dataSize ; 
     506      if (i==0  && clientToServerStoreFilter_) client = clientToServerStoreFilter_-> getTransferedDataSize(dataSize) ; 
     507      if (i==1  && serverToClientStoreFilter_) client = serverToClientStoreFilter_-> getTransferedDataSize(dataSize) ; 
     508 
     509      if (client!=nullptr) 
     510      { 
     511        map<int,size_t> bufferSize ; 
     512    
     513        if (evaluateBuffer.count(client)!=0) bufferSize = evaluateBuffer[client] ; 
     514        if (isOptPerformance) 
     515        { 
     516          for(auto& it : dataSize)  
     517          { 
     518            if (bufferSize.count(it.first)==0) bufferSize[it.first]=it.second ; 
     519            else bufferSize[it.first]+=it.second ; 
     520          } 
     521        } 
     522        else 
     523        { 
     524          for(auto& it : dataSize)  
     525          { 
     526            if (bufferSize.count(it.first)==0) bufferSize[it.first]=it.second ; 
     527            else bufferSize[it.first]=std::max(bufferSize[it.first],(size_t)it.second) ; 
     528          } 
     529        } 
     530        evaluateBuffer[client] = bufferSize ; 
     531        client=nullptr ; 
     532      } 
     533    } 
     534    if (client==nullptr) return false ; 
     535    else return true; 
     536  }   
    499537 
    500538 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.hpp

    r2022 r2130  
    110110         // Grid data buffer size for each connection of contextclient 
    111111         std::map<int, StdSize> getGridDataBufferSize(CContextClient* client, bool bufferForWriting = false); 
    112  
     112          
     113         // evaluation the size of the buffer for the field 
     114         bool evaluateBufferSize(map<CContextClient*,map<int,size_t>>& evaluateBuffer, bool isOptPerformance) ; 
    113115       public: 
    114116          void makeGridAliasForCoupling(void) ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/object_factory_decl_macro.hpp

    r1869 r2130  
    1919 
    2020 
     21 
Note: See TracChangeset for help on using the changeset viewer.