Ignore:
Timestamp:
03/28/23 16:42:11 (15 months ago)
Author:
ymipsl
Message:

First guess in supression of attached mode replaced by online reader and write filters

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/dev/XIOS_ATTACHED/src/node/field.cpp

    r2433 r2482  
    2020#include "temporal_filter.hpp" 
    2121#include "server_from_client_source_filter.hpp" 
     22#include "client_online_reader_filter.hpp" 
    2223#include "file_reader_source_filter.hpp" 
     24#include "grid_redistribute_filter.hpp" 
    2325#include "tracer.hpp" 
    2426#include "graph_package.hpp" 
     
    205207  TRY 
    206208  { 
    207     return clientFromServerSourceFilter_->sendReadDataRequest(tsDataRequested) ; 
     209    if (clientFromServerSourceFilter_) return clientFromServerSourceFilter_->sendReadDataRequest(tsDataRequested) ; 
     210    else if (clientOnlineReaderFilter_) return clientOnlineReaderFilter_->sendReadDataRequest(tsDataRequested) ; 
     211    else ERROR("bool CField::sendReadDataRequest(const CDate& tsDataRequested)", << "uninitialized source filter"); 
    208212  } 
    209213  CATCH_DUMP_ATTR 
     
    217221  TRY 
    218222  { 
    219     return clientFromServerSourceFilter_->sendReadDataRequestIfNeeded() ; 
     223    if (clientFromServerSourceFilter_) return clientFromServerSourceFilter_->sendReadDataRequestIfNeeded() ; 
     224    else if (clientOnlineReaderFilter_) return clientOnlineReaderFilter_->sendReadDataRequestIfNeeded() ; 
     225    else ERROR("bool CField::sendReadDataRequestIfNeeded(void)", << "uninitialized source filter"); 
    220226  } 
    221227  CATCH_DUMP_ATTR 
     
    304310  TRY 
    305311  { 
    306     clientFromServerSourceFilter_->checkForLateData() ; 
     312    if (clientFromServerSourceFilter_) return clientFromServerSourceFilter_->checkForLateData() ; 
     313    else if (clientOnlineReaderFilter_) return clientOnlineReaderFilter_->checkForLateData() ; 
     314    else ERROR("void CField::checkForLateDataFromServer(void)", << "uninitialized source filter"); 
    307315  } 
    308316  CATCH_DUMP_ATTR  
     
    315323    { 
    316324      checkForLateDataFromServer() ; 
    317       clientFromServerSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 
     325      if (clientFromServerSourceFilter_) clientFromServerSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 
     326      else if (clientOnlineReaderFilter_) clientOnlineReaderFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 
    318327    }  
    319328    else if (hasCouplerIn()) 
     
    325334  CATCH_DUMP_ATTR 
    326335 
    327  
    328   void CField::checkIfMustAutoTrigger(void) 
    329   TRY 
    330   { 
    331     mustAutoTrigger = clientFromServerSourceFilter_ ? clientFromServerSourceFilter_->mustAutoTrigger() : false; 
    332   } 
    333   CATCH_DUMP_ATTR 
    334  
    335   void CField::autoTriggerIfNeeded(void) 
    336   TRY 
    337   { 
    338     if (mustAutoTrigger) 
    339       clientFromServerSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()); 
    340   } 
    341   CATCH_DUMP_ATTR 
    342  
     336   
    343337 
    344338  //---------------------------------------------------------------- 
     
    506500                                              bool bufferForWriting) 
    507501  { 
    508     auto& contextBufferSize = bufferSize[client] ; 
    509     auto& contextMaxEventSize = maxEventSize[client] ; 
    510     const std::map<int, size_t> mapSize = grid_->getDataBufferSize(client, getId(), bufferForWriting); 
     502    auto& contextBufferSize = bufferSize[client_] ; 
     503    auto& contextMaxEventSize = maxEventSize[client_] ; 
     504    const std::map<int, size_t> mapSize = grid_->getDataBufferSize(client_, getId(), bufferForWriting); 
    511505    for(auto& it : mapSize ) 
    512506    { 
     
    525519                                                   bool bufferForWriting) 
    526520  { 
    527     auto& contextBufferSize = bufferSize[client] ; 
    528     auto& contextMaxEventSize = maxEventSize[client] ; 
    529     const std::map<int, size_t> mapSize = grid_->getAttributesBufferSize(client, bufferForWriting); 
     521    auto& contextBufferSize = bufferSize[client_] ; 
     522    auto& contextMaxEventSize = maxEventSize[client_] ; 
     523    const std::map<int, size_t> mapSize = grid_->getAttributesBufferSize(client_, bufferForWriting); 
    530524    for(auto& it : mapSize ) 
    531525    { 
     
    797791  { 
    798792    // insert temporal filter before sending to files 
    799     clientToServerStoreFilter_ = std::shared_ptr<CClientToServerStoreFilter>(new CClientToServerStoreFilter(gc, this, client)); 
     793    clientToServerStoreFilter_ = std::shared_ptr<CClientToServerStoreFilter>(new CClientToServerStoreFilter(gc, this, client_)); 
    800794    // insert temporal filter before sending to files 
    801795    getTemporalDataFilter(gc, fileOut_->output_freq)->connectOutput(clientToServerStoreFilter_, 0); 
     
    809803  }  
    810804 
     805  void CField::connectToOnlineWriter(CGarbageCollector& gc) 
     806  { 
     807    // insert temporal filter before sending to files 
     808    CField* fieldOut ; 
     809    redistributeFilter_ = std::shared_ptr<CGridRedistributeFilter>(new CGridRedistributeFilter(gc, this, fieldOut)); 
     810    fieldOut->setFileOut(this->getFileOut()); 
     811    fileOut_->replaceEnabledFields(this, fieldOut) ; 
     812    // insert temporal filter before sending to files 
     813    getTemporalDataFilter(gc, fileOut_->output_freq)->connectOutput(redistributeFilter_, 0); 
     814    fieldOut->inputFilter = std::shared_ptr<CPassThroughFilter>(new CPassThroughFilter(gc));  
     815    fieldOut->instantDataFilter = fieldOut->inputFilter ; 
     816    redistributeFilter_->connectOutput(fieldOut->inputFilter, 0); 
     817    fieldOut->connectToFileWriter(gc) ; 
     818    fieldOut->solveServerOperation() ; // might not be called, create a new time functor.... find a better solution later 
     819    const bool buildGraph_ = !build_workflow_graph.isEmpty() && build_workflow_graph == true ; 
     820   
     821    if(buildGraph_)  
     822    { 
     823      clientToServerStoreFilter_->graphPackage = new CGraphPackage; 
     824      clientToServerStoreFilter_->graphEnabled = true; 
     825      clientToServerStoreFilter_->graphPackage->inFields.push_back(this); 
     826    } 
     827  }  
     828 
     829  void CField::connectToOnlineReader(CGarbageCollector& gc) 
     830  { 
     831    // insert temporal filter before sending to files 
     832    clientOnlineReaderFilter_ = std::shared_ptr<CClientOnlineReaderFilter>(new CClientOnlineReaderFilter(gc,this)) ; 
     833    clientOnlineReaderFilter_ -> connectOutput(inputFilter,0) ; 
     834  }  
     835 
    811836  void CField::connectToCouplerOut(CGarbageCollector& gc) 
    812837  { 
    813838    // insert temporal filter before sending to files 
    814     clientToServerStoreFilter_ = std::shared_ptr<CClientToServerStoreFilter>(new CClientToServerStoreFilter(gc, this, client)); 
     839    clientToServerStoreFilter_ = std::shared_ptr<CClientToServerStoreFilter>(new CClientToServerStoreFilter(gc, this, client_)); 
    815840    instantDataFilter->connectOutput(clientToServerStoreFilter_, 0); 
    816841    const bool buildGraph_ = !build_workflow_graph.isEmpty() && build_workflow_graph == true ; 
     
    951976  void CField::connectToServerToClient(CGarbageCollector& gc) 
    952977  { 
    953     serverToClientStoreFilter_ = std::shared_ptr<CServerToClientStoreFilter>(new CServerToClientStoreFilter(gc, this, client)); 
     978    serverToClientStoreFilter_ = std::shared_ptr<CServerToClientStoreFilter>(new CServerToClientStoreFilter(gc, this, client_)); 
    954979    instantDataFilter->connectOutput(serverToClientStoreFilter_, 0); 
    955980    const bool buildGraph_ = !build_workflow_graph.isEmpty() && build_workflow_graph == true ; 
     
    14351460  { 
    14361461    CContext* context = CContext::getCurrent(); 
    1437     client = contextClient; 
     1462    client_ = contextClient; 
    14381463   
    14391464    // A grid is sent by a client (both for read or write) or by primary server (write only) 
     
    14411466    { 
    14421467      if (getRelFile()->mode.isEmpty() || (!getRelFile()->mode.isEmpty() && getRelFile()->mode == CFile::mode_attr::write)) 
    1443         grid_->setContextClient(contextClient); 
     1468        /*grid_->setContextClient(contextClient) */; // => nothing to do with thats now, to remove... 
    14441469    } 
    14451470    else if (context->getServiceType()==CServicesManager::CLIENT) 
    14461471    { 
    14471472      if (grid_) 
    1448         grid_->setContextClient(contextClient); 
     1473        /*grid_->setContextClient(contextClient)*/; // => nothing to do with thats now, to remove... 
    14491474      else 
     1475 
    14501476        ERROR( "CField::setContextClient(contextClient)", 
    14511477               << "Grid not defined for " << getId() 
     
    14591485  void CField::sendFieldToFileServer(void) 
    14601486  { 
    1461     CContext::getCurrent()->sendContextToFileServer(client); 
    1462     getRelFile()->sendFileToFileServer(client); 
     1487    CContext::getCurrent()->sendContextToFileServer(client_); 
     1488    getRelFile()->sendFileToFileServer(client_); 
    14631489    sentGrid_ = grid_-> duplicateSentGrid() ; 
    1464     sentGrid_->sendGridToFileServer(client, false); 
     1490    sentGrid_->sendGridToFileServer(client_, false); 
    14651491    name = getFieldOutputName() ; 
    1466     this->sendAllAttributesToServer(client); 
    1467     this->sendAddAllVariables(client); 
     1492    this->sendAllAttributesToServer(client_); 
     1493    this->sendAddAllVariables(client_); 
    14681494  } 
    14691495 
    14701496  void CField::sendFieldToInputFileServer(void) 
    14711497  { 
    1472     CContext::getCurrent()->sendContextToFileServer(client); 
    1473     getRelFile()->sendFileToFileServer(client); 
     1498    CContext::getCurrent()->sendContextToFileServer(client_); 
     1499    getRelFile()->sendFileToFileServer(client_); 
    14741500    sentGrid_ = grid_-> duplicateSentGrid() ; 
    1475     sentGrid_->sendGridToFileServer(client, true); 
     1501    sentGrid_->sendGridToFileServer(client_, true); 
    14761502    read_access=true ; // not the best solution, but on server side, the field must be a starting point of the workflow 
    14771503                       // must be replace by a better solution when implementing filters for reading and send to client 
    14781504                       // on server side 
    1479     this->sendAllAttributesToServer(client); 
    1480     this->sendAddAllVariables(client); 
     1505    this->sendAllAttributesToServer(client_); 
     1506    this->sendAddAllVariables(client_); 
    14811507  } 
    14821508 
     
    14861512    else sendFieldToCouplerOut_done_=true ; 
    14871513    sentGrid_ = grid_-> duplicateSentGrid() ; 
    1488     sentGrid_->sendGridToCouplerOut(client, this->getId()); 
     1514    sentGrid_->sendGridToCouplerOut(client_, this->getId()); 
    14891515    this->sendGridCompleted(); 
    14901516 
     
    15021528      CEventClient event(getType(),EVENT_ID_GRID_COMPLETED); 
    15031529 
    1504       if (client->isServerLeader()) 
     1530      if (client_->isServerLeader()) 
    15051531      { 
    15061532        CMessage msg; 
    15071533        msg<<this->getId(); 
    1508         for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg); 
    1509         client->sendEvent(event); 
    1510       } 
    1511       else client->sendEvent(event); 
     1534        for (auto& rank : client_->getRanksServerLeader()) event.push(rank,1,msg); 
     1535        client_->sendEvent(event); 
     1536      } 
     1537      else client_->sendEvent(event); 
    15121538   } 
    15131539   CATCH_DUMP_ATTR 
Note: See TracChangeset for help on using the changeset viewer.