Ignore:
Timestamp:
04/20/21 09:49:47 (3 years ago)
Author:
ymipsl
Message:

New management of client-server buffers.

  • buffers can grow automatically in intialization phase
  • buffers is evaluated after the close context definition phase and fixed at optimal value.

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp

    r2123 r2130  
    2020#include "services.hpp" 
    2121#include "contexts_manager.hpp" 
     22#include "timeline_events.hpp" 
    2223 
    2324#include <boost/functional/hash.hpp> 
     
    188189    if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
    189190    { 
    190        MPI_Aint recvBuff[3] ; 
    191        MPI_Recv(recvBuff, 3, MPI_AINT, rank, 20, interComm, &status); 
    192        StdSize buffSize = recvBuff[0]; 
     191       MPI_Aint recvBuff[4] ; 
     192       MPI_Recv(recvBuff, 4, MPI_AINT, rank, 20, interComm, &status); 
     193       remoteHashId_ = recvBuff[0] ; 
     194       StdSize buffSize = recvBuff[1]; 
    193195       vector<MPI_Aint> winAdress(2) ; 
    194        winAdress[0]=recvBuff[1] ; winAdress[1]=recvBuff[2] ; 
     196       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 
    195197       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    196198       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 
     
    290292      CBufferIn newBuffer(startBuffer,buffer.remain()); 
    291293      newBuffer>>size>>timeLine; 
    292       it=events.find(timeLine); 
    293       if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first; 
    294       it->second->push(rank,buffers[rank],startBuffer,size); 
    295  
     294 
     295      if (timeLine==timelineEventNotifyChangeBufferSize) 
     296      { 
     297        buffers[rank]->notifyBufferResizing() ; 
     298        buffers[rank]->updateCurrentWindows() ; 
     299      }  
     300      else if (timeLine==timelineEventChangeBufferSize) 
     301      { 
     302        size_t newSize ; 
     303        vector<MPI_Aint> winAdress(2) ; 
     304        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ; 
     305        buffers.erase(rank) ; 
     306        buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, newSize))); 
     307      } 
     308      else 
     309      { 
     310        it=events.find(timeLine); 
     311        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first; 
     312        it->second->push(rank,buffers[rank],startBuffer,size); 
     313        if (timeLine>0) lastTimeLine[rank]=timeLine ; 
     314      } 
    296315      buffer.advance(size); 
    297316      count=buffer.remain(); 
    298317    } 
    299  
    300     if (timeLine>0) lastTimeLine[rank]=timeLine ; 
    301318     
    302319    CTimer::get("Process request").suspend(); 
     
    310327//    if (context->isProcessingEvent()) return ; 
    311328    if (isProcessingEvent_) return ; 
     329    if (isAttachedModeEnabled()) 
     330      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ; 
    312331 
    313332    it=events.find(currentTimeLine); 
     
    356375         currentTimeLine++; 
    357376         scheduled = false; 
     377         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ; 
    358378        } 
    359379      } 
Note: See TracChangeset for help on using the changeset viewer.