Ignore:
Timestamp:
09/10/20 13:51:02 (4 years ago)
Author:
ymipsl
Message:

Big update on on going work related to data distribution and transfer between clients and servers.
Revisite of the source and store filter using "connectors".

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp

    r1883 r1930  
    2626#include "file_server_reader_filter.hpp" 
    2727#include "server_to_client_filter.hpp" 
     28#include "server_from_client_source_filter.hpp" 
     29#include "file_reader_source_filter.hpp" 
    2830#include "tracer.hpp" 
    2931 
     
    148150  CATCH 
    149151 
     152/* obsolete old interface 
    150153  void CField::sendUpdateData(Time timeStamp, const CArray<double,1>& data, CContextClient* client) 
    151154  TRY 
     
    204207  } 
    205208  CATCH_DUMP_ATTR 
    206  
     209*/ 
     210 
     211  void CField::sendUpdateData(Time timeStamp, const CArray<double,1>& data, CContextClient* client) 
     212  TRY 
     213  { 
     214    CTimer::get("Field : send data").resume(); 
     215    CEventClient event(getType(), EVENT_ID_UPDATE_DATA); 
     216    CMessage message ; 
     217 
     218    message<<getId() << timeStamp ; 
     219    this->getGrid()->getClientToServerConnector(client)->transfer(data, client, event, message) ; 
     220    CTimer::get("Field : send data").suspend(); 
     221  } 
     222  CATCH_DUMP_ATTR 
     223 
     224  /* old version obsolete 
    207225  void CField::recvUpdateData(CEventServer& event) 
    208226  TRY 
     
    224242  } 
    225243  CATCH 
    226  
    227  
    228   void  CField::recvUpdateData(std::map<int,CBufferIn*>& rankBuffers) 
    229   TRY 
    230   { 
    231     if (hasCouplerIn()) recvUpdateDataFromCoupler(rankBuffers) ; 
    232     else recvUpdateDataFromClient(rankBuffers) ; 
     244*/ 
     245 
     246  void CField::recvUpdateData(CEventServer& event) 
     247  TRY 
     248  { 
     249    string fieldId; 
     250    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> fieldId  ; 
     251    get(fieldId)->receiveUpdateData(event); 
    233252  } 
    234253  CATCH 
    235254 
     255  void  CField::receiveUpdateData(CEventServer& event) 
     256  TRY 
     257  { 
     258    if (hasCouplerIn()) clientFromClientSourceFilter_->streamData(event) ; 
     259    else serverFromClientSourceFilter_->streamData(event) ; 
     260  } 
     261  CATCH 
     262/* 
     263  void  CField::recvUpdateDataFromClient(CEventServer& event) 
     264  TRY 
     265  { 
     266    Time timeStamp ; 
     267    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> timeStamp  ; 
     268 
     269    CArray<double,1> recvData ; 
     270    getGrid()->getServerFromClientConnector()->transfer(event,recvData) ; 
     271    this->setData(recvData); 
     272  } 
     273  CATCH 
     274*/ 
     275 
     276/*   
     277  void CField::recvUpdateDataFromCoupler(CEventServer& event) 
     278  TRY 
     279  { 
     280    CContext* context = CContext::getCurrent(); 
     281    Time timeStamp ; 
     282    if (wasDataAlreadyReceivedFromServer) 
     283    {   
     284      lastDataReceivedFromServer = lastDataReceivedFromServer + freq_op; 
     285    } 
     286    else 
     287    { 
     288      // unlikely to input from file server where data are received at ts=0 
     289      // for coupling, it would be after the first freq_op, because for now we don't have 
     290      // restart mecanism to send the value at ts=0. It must be changed in future 
     291      lastDataReceivedFromServer = context->getCalendar()->getInitDate(); 
     292      wasDataAlreadyReceivedFromServer = true; 
     293    } 
     294 
     295    CArray<double,1> recvData ; 
     296    getGrid()->getServerFromClientConnector()->transfer(event,recvData) ; 
     297    clientSourceFilter->streamData(lastDataReceivedFromServer, recvData); 
     298 
     299  } 
     300  CATCH_DUMP_ATTR 
     301*/ 
     302 
     303 
     304 
     305 
     306  /* old interface to be removed.... */ 
    236307  void  CField::recvUpdateDataFromClient(std::map<int,CBufferIn*>& rankBuffers) 
    237308  TRY 
     
    330401    const CDate writeDate = last_Write_srv + freq_write_srv; 
    331402    last_Write_srv = writeDate; 
    332     grid_->computeWrittenIndex(); 
     403    // grid_->computeWrittenIndex(); -> obselete function need to be removed 
    333404    /* 
    334405    recvDataSrv.resize(data.numElements()) ; 
     
    424495    StdString fieldId; 
    425496    *buffer >> fieldId; 
    426     get(fieldId)->recvReadDataRequest(event.getContextServer()); 
     497    get(fieldId)->recvReadDataRequest(); 
    427498  } 
    428499  CATCH 
    429  
     500   
    430501  /*! 
    431502    Receive data request sent from client and process it 
     
    434505    In the future, this should (only) be done by the last level servers. 
    435506  */ 
     507  void CField::recvReadDataRequest(void) 
     508  TRY 
     509  { 
     510    fileReaderSourceFilter_->streamData() ; 
     511  } 
     512  CATCH_DUMP_ATTR   
     513 
     514/* old interface -> to remove 
    436515  void CField::recvReadDataRequest(CContextServer* server) 
    437516  TRY 
     
    446525  } 
    447526  CATCH_DUMP_ATTR 
    448  
     527*/ 
    449528 
    450529  void CField::sendUpdateDataServerToClient(bool isEOF, const CArray<double,1>& data, CContextClient* client) 
     
    509588    \return State of field can be read from a file 
    510589  */ 
     590  // obsolete to remove 
     591  /* 
    511592  CField::EReadField CField::readField(CArray<double,1>& data) 
    512593  TRY 
     
    561642  } 
    562643  CATCH_DUMP_ATTR 
     644  */ 
    563645 
    564646  /* 
     
    572654  { 
    573655    string fieldId; 
    574     vector<int> ranks; 
    575     vector<CBufferIn*> buffers; 
    576  
    577     list<CEventServer::SSubEvent>::iterator it; 
    578     for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 
    579     { 
    580       ranks.push_back(it->rank); 
    581       CBufferIn* buffer = it->buffer; 
    582       *buffer >> fieldId; 
    583       buffers.push_back(buffer); 
    584     } 
    585     get(fieldId)->recvReadDataReady(ranks, buffers); 
     656    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> fieldId  ; 
     657    get(fieldId)->recvReadDataReady(event); 
    586658  } 
    587659  CATCH 
    588660 
    589    
     661 
     662  /* old interface to be removed ..*/ 
    590663  void CField::recvUpdateDataFromCoupler(std::map<int,CBufferIn*>& rankBuffers) 
    591664  TRY 
     
    622695  } 
    623696  CATCH_DUMP_ATTR 
     697  
     698 
    624699  /*! 
    625700    Receive read data from server 
     
    627702    \param [in] buffers buffers containing read data 
    628703  */ 
     704  // old interface to remove  
    629705  void CField::recvReadDataReady(vector<int> ranks, vector<CBufferIn*> buffers) 
    630706  TRY 
     
    667743  CATCH_DUMP_ATTR 
    668744 
     745 
     746 
     747  void CField::receiveReadDataReady(CEventServer& event) 
     748  TRY 
     749  { 
     750    clientFromServerSourceFilter_->streamData(event) ;     
     751  } 
     752  CATCH_DUMP_ATTR 
     753 
     754 
     755 
    669756  void CField::checkForLateDataFromCoupler(void) 
    670757  TRY 
     
    677764    traceOff() ; 
    678765    timer.suspend(); 
    679        
    680     bool isDataLate; 
     766     
     767    bool isDataLate;   
    681768    do 
    682769    { 
    683       if (wasDataAlreadyReceivedFromServer) isDataLate = lastDataReceivedFromServer + freq_offset + freq_op <= currentDate ; 
    684       else isDataLate = context->getCalendar()->getInitDate()+freq_offset <= currentDate ; 
    685  
     770      isDataLate=clientFromClientSourceFilter_->isDataLate() ; 
    686771      if (isDataLate) 
    687772      { 
    688773        timer.resume(); 
    689 //ym          context->checkBuffersAndListen(); 
    690 //ym            context->eventLoop(); 
    691774        context->globalEventLoop(); 
    692  
    693775        timer.suspend(); 
    694776      } 
     
    709791  { 
    710792    CContext* context = CContext::getCurrent(); 
    711     const CDate& currentDate = context->getCalendar()->getCurrentDate(); 
    712  
    713793    // Check if data previously requested has been received as expected 
    714     if (wasDataRequestedFromServer && !isEOF) 
     794    if (wasDataRequestedFromServer && !clientFromServerSourceFilter_->isEOF()) 
    715795    { 
    716796      CTimer timer("CField::checkForLateDataFromServer"); 
     
    722802      do 
    723803      { 
    724         const CDate nextDataDue = wasDataAlreadyReceivedFromServer ? (lastDataReceivedFromServer + fileIn_->output_freq) : context->getCalendar()->getInitDate(); 
    725         isDataLate = (nextDataDue <= currentDate); 
    726  
     804        isDataLate=clientFromServerSourceFilter_->isDataLate(); 
    727805        if (isDataLate) 
    728806        { 
     
    744822      if (isDataLate) 
    745823        ERROR("void CField::checkForLateDataFromServer(void)", 
    746               << "Late data at timestep = " << currentDate); 
     824              << "Late data at timestep = " << context->getCalendar()->getCurrentDate()); 
    747825    } 
    748826  } 
     
    755833    { 
    756834      checkForLateDataFromServer() ; 
    757       serverSourceFilter->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 
     835      clientFromServerSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 
    758836    }  
    759837    else if (hasCouplerIn()) 
    760838    { 
    761839      checkForLateDataFromCoupler() ; 
    762       clientSourceFilter->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 
     840      clientFromClientSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 
    763841    } 
    764842  } 
     
    769847  TRY 
    770848  { 
    771     mustAutoTrigger = serverSourceFilter ? serverSourceFilter->mustAutoTrigger() : false; 
     849    mustAutoTrigger = clientFromServerSourceFilter_ ? clientFromServerSourceFilter_->mustAutoTrigger() : false; 
    772850  } 
    773851  CATCH_DUMP_ATTR 
     
    777855  { 
    778856    if (mustAutoTrigger) 
    779       serverSourceFilter->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()); 
     857      clientFromServerSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()); 
    780858  } 
    781859  CATCH_DUMP_ATTR 
     
    860938  TRY 
    861939  { 
    862     if (clientSourceFilter) 
    863       return atCurrentTimestep ? clientSourceFilter->isDataExpected(CContext::getCurrent()->getCalendar()->getCurrentDate()) : true; 
    864     else if (storeFilter) 
    865       return true; 
     940    if (modelToClientSourceFilter_)  
     941      return atCurrentTimestep ? modelToClientSourceFilter_->isDataExpected(CContext::getCurrent()->getCalendar()->getCurrentDate()) : true; 
     942    else if (clientToModelStoreFilter_)  return true; 
    866943    else if (instantDataFilter) 
    867944      ERROR("bool CField::isActive(bool atCurrentTimestep)", 
     
    14121489 
    14131490    if (check_if_active.isEmpty()) check_if_active = false;  
    1414     clientSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, false, true, NoneDu, false, detectMissingValues, defaultValue)); 
    1415     clientSourceFilter -> connectOutput(inputFilter,0) ; 
     1491    modelToClientSourceFilter_ = std::shared_ptr<CModelToClientSourceFilter>(new CModelToClientSourceFilter(gc, grid_, detectMissingValues, defaultValue)); 
     1492    modelToClientSourceFilter_ -> connectOutput(inputFilter,0) ; 
    14161493  }  
    14171494  
     
    14211498  void CField::connectToClientInput(CGarbageCollector& gc) 
    14221499  { 
    1423     clientSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc,  grid_, false, false)); 
    1424     clientSourceFilter -> connectOutput(inputFilter,0) ; 
     1500    serverFromClientSourceFilter_ = std::shared_ptr<CServerFromClientSourceFilter>(new CServerFromClientSourceFilter(gc,  grid_)); 
     1501    serverFromClientSourceFilter_ -> connectOutput(inputFilter,0) ; 
    14251502  }  
    14261503 
     
    14501527    if (freq_op.isEmpty()) freq_op.setValue(TimeStep); 
    14511528    if (freq_offset.isEmpty()) freq_offset.setValue(freq_op.getValue() - TimeStep); 
     1529     
     1530    /* old  
    14521531 
    14531532    freq_operation_srv = freq_op ; 
     
    14581537    clientSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, false, false, freq_offset, true)) ; 
    14591538    clientSourceFilter -> connectOutput(inputFilter,0) ; 
     1539 
     1540    */ 
     1541    // new 
     1542 
     1543    clientFromClientSourceFilter_ = std::shared_ptr<CClientFromClientSourceFilter>(new CClientFromClientSourceFilter(gc, this)) ; 
     1544    clientFromClientSourceFilter_ -> connectOutput(inputFilter,0) ; 
     1545    
    14601546  }  
    1461  
    14621547 
    14631548  /*! 
     
    14691554    instantDataFilter->connectOutput(fileServerWriterFilter, 0); 
    14701555  }  
    1471    
     1556 
    14721557  /*! 
    14731558   * Connect field to a file reader filter to read data from file (on server side). 
     
    14751560  void CField::connectToFileReader(CGarbageCollector& gc) 
    14761561  { 
    1477     fileServerReaderFilter_ = std::shared_ptr<CFileServerReaderFilter>(new CFileServerReaderFilter(gc, this)); 
    1478     fileServerReaderFilter_->connectOutput(inputFilter, 0); 
    1479   }  
     1562    fileReaderSourceFilter_ = std::shared_ptr<CFileReaderSourceFilter>(new CFileReaderSourceFilter(gc, this)); 
     1563    instantDataFilter->connectOutput(inputFilter, 0); 
     1564  } 
     1565 
    14801566 
    14811567  /*! 
     
    14841570  void CField::connectToModelOutput(CGarbageCollector& gc) 
    14851571  { 
    1486     const bool detectMissingValues = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 
    1487     const double defaultValue  = detectMissingValues ? default_value : (!default_value.isEmpty() ? default_value : 0.0); 
    1488  
    1489     storeFilter = std::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid_, detectMissingValues, defaultValue)); 
    1490     instantDataFilter->connectOutput(storeFilter, 0); 
     1572    clientToModelStoreFilter_ = std::shared_ptr<CClientToModelStoreFilter>(new CClientToModelStoreFilter(gc, this)); 
     1573    instantDataFilter->connectOutput(clientToModelStoreFilter_, 0); 
    14911574  } 
    14921575 
     
    15651648 
    15661649      // If the field data is to be read by the client or/and written to a file 
    1567       if (enableOutput && !storeFilter && !fileWriterFilter) 
     1650      if (enableOutput && !clientToModelStoreFilter_ && !fileWriterFilter) 
    15681651      { 
    15691652        if (getRelFile() && (getRelFile()->mode.isEmpty() || getRelFile()->mode == CFile::mode_attr::write)) 
     
    15801663 
    15811664      // If the field data is to be read by the client or/and written to a file 
    1582       if (enableOutput && !storeFilter && !fileWriterFilter) 
     1665      if (enableOutput && !clientToModelStoreFilter_ && !fileWriterFilter) 
    15831666      { 
    15841667        if (getRelFile() && (getRelFile()->mode.isEmpty() || getRelFile()->mode == CFile::mode_attr::write)) 
     
    16331716 
    16341717      // If the field data is to be read by the client or/and written to a file 
    1635       if (enableOutput && !storeFilter && !fileWriterFilter) 
     1718      if (enableOutput && !clientToModelStoreFilter_ && !fileWriterFilter) 
    16361719      { 
    16371720        if (!read_access.isEmpty() && read_access) 
    16381721        { 
    1639           storeFilter = std::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid_, 
    1640                                                                           detectMissingValues, defaultValue)); 
    1641           instantDataFilter->connectOutput(storeFilter, 0); 
     1722          clientToModelStoreFilter_ = std::shared_ptr<CClientToModelStoreFilter>(new CClientToModelStoreFilter(gc, this)); 
     1723          instantDataFilter->connectOutput(clientToModelStoreFilter_, 0); 
    16421724        } 
    16431725 
Note: See TracChangeset for help on using the changeset viewer.