Changeset 2482 for XIOS3/dev/XIOS_ATTACHED/src/node/field.cpp
- Timestamp:
- 03/28/23 16:42:11 (15 months ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/dev/XIOS_ATTACHED/src/node/field.cpp
r2433 r2482 20 20 #include "temporal_filter.hpp" 21 21 #include "server_from_client_source_filter.hpp" 22 #include "client_online_reader_filter.hpp" 22 23 #include "file_reader_source_filter.hpp" 24 #include "grid_redistribute_filter.hpp" 23 25 #include "tracer.hpp" 24 26 #include "graph_package.hpp" … … 205 207 TRY 206 208 { 207 return clientFromServerSourceFilter_->sendReadDataRequest(tsDataRequested) ; 209 if (clientFromServerSourceFilter_) return clientFromServerSourceFilter_->sendReadDataRequest(tsDataRequested) ; 210 else if (clientOnlineReaderFilter_) return clientOnlineReaderFilter_->sendReadDataRequest(tsDataRequested) ; 211 else ERROR("bool CField::sendReadDataRequest(const CDate& tsDataRequested)", << "uninitialized source filter"); 208 212 } 209 213 CATCH_DUMP_ATTR … … 217 221 TRY 218 222 { 219 return clientFromServerSourceFilter_->sendReadDataRequestIfNeeded() ; 223 if (clientFromServerSourceFilter_) return clientFromServerSourceFilter_->sendReadDataRequestIfNeeded() ; 224 else if (clientOnlineReaderFilter_) return clientOnlineReaderFilter_->sendReadDataRequestIfNeeded() ; 225 else ERROR("bool CField::sendReadDataRequestIfNeeded(void)", << "uninitialized source filter"); 220 226 } 221 227 CATCH_DUMP_ATTR … … 304 310 TRY 305 311 { 306 clientFromServerSourceFilter_->checkForLateData() ; 312 if (clientFromServerSourceFilter_) return clientFromServerSourceFilter_->checkForLateData() ; 313 else if (clientOnlineReaderFilter_) return clientOnlineReaderFilter_->checkForLateData() ; 314 else ERROR("void CField::checkForLateDataFromServer(void)", << "uninitialized source filter"); 307 315 } 308 316 CATCH_DUMP_ATTR … … 315 323 { 316 324 checkForLateDataFromServer() ; 317 clientFromServerSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 325 if (clientFromServerSourceFilter_) clientFromServerSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 326 else if (clientOnlineReaderFilter_) clientOnlineReaderFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 318 327 } 319 328 else if (hasCouplerIn()) … … 325 334 CATCH_DUMP_ATTR 326 335 327 328 void CField::checkIfMustAutoTrigger(void) 329 TRY 330 { 331 mustAutoTrigger = clientFromServerSourceFilter_ ? clientFromServerSourceFilter_->mustAutoTrigger() : false; 332 } 333 CATCH_DUMP_ATTR 334 335 void CField::autoTriggerIfNeeded(void) 336 TRY 337 { 338 if (mustAutoTrigger) 339 clientFromServerSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()); 340 } 341 CATCH_DUMP_ATTR 342 336 343 337 344 338 //---------------------------------------------------------------- … … 506 500 bool bufferForWriting) 507 501 { 508 auto& contextBufferSize = bufferSize[client ] ;509 auto& contextMaxEventSize = maxEventSize[client ] ;510 const std::map<int, size_t> mapSize = grid_->getDataBufferSize(client , getId(), bufferForWriting);502 auto& contextBufferSize = bufferSize[client_] ; 503 auto& contextMaxEventSize = maxEventSize[client_] ; 504 const std::map<int, size_t> mapSize = grid_->getDataBufferSize(client_, getId(), bufferForWriting); 511 505 for(auto& it : mapSize ) 512 506 { … … 525 519 bool bufferForWriting) 526 520 { 527 auto& contextBufferSize = bufferSize[client ] ;528 auto& contextMaxEventSize = maxEventSize[client ] ;529 const std::map<int, size_t> mapSize = grid_->getAttributesBufferSize(client , bufferForWriting);521 auto& contextBufferSize = bufferSize[client_] ; 522 auto& contextMaxEventSize = maxEventSize[client_] ; 523 const std::map<int, size_t> mapSize = grid_->getAttributesBufferSize(client_, bufferForWriting); 530 524 for(auto& it : mapSize ) 531 525 { … … 797 791 { 798 792 // insert temporal filter before sending to files 799 clientToServerStoreFilter_ = std::shared_ptr<CClientToServerStoreFilter>(new CClientToServerStoreFilter(gc, this, client ));793 clientToServerStoreFilter_ = std::shared_ptr<CClientToServerStoreFilter>(new CClientToServerStoreFilter(gc, this, client_)); 800 794 // insert temporal filter before sending to files 801 795 getTemporalDataFilter(gc, fileOut_->output_freq)->connectOutput(clientToServerStoreFilter_, 0); … … 809 803 } 810 804 805 void CField::connectToOnlineWriter(CGarbageCollector& gc) 806 { 807 // insert temporal filter before sending to files 808 CField* fieldOut ; 809 redistributeFilter_ = std::shared_ptr<CGridRedistributeFilter>(new CGridRedistributeFilter(gc, this, fieldOut)); 810 fieldOut->setFileOut(this->getFileOut()); 811 fileOut_->replaceEnabledFields(this, fieldOut) ; 812 // insert temporal filter before sending to files 813 getTemporalDataFilter(gc, fileOut_->output_freq)->connectOutput(redistributeFilter_, 0); 814 fieldOut->inputFilter = std::shared_ptr<CPassThroughFilter>(new CPassThroughFilter(gc)); 815 fieldOut->instantDataFilter = fieldOut->inputFilter ; 816 redistributeFilter_->connectOutput(fieldOut->inputFilter, 0); 817 fieldOut->connectToFileWriter(gc) ; 818 fieldOut->solveServerOperation() ; // might not be called, create a new time functor.... find a better solution later 819 const bool buildGraph_ = !build_workflow_graph.isEmpty() && build_workflow_graph == true ; 820 821 if(buildGraph_) 822 { 823 clientToServerStoreFilter_->graphPackage = new CGraphPackage; 824 clientToServerStoreFilter_->graphEnabled = true; 825 clientToServerStoreFilter_->graphPackage->inFields.push_back(this); 826 } 827 } 828 829 void CField::connectToOnlineReader(CGarbageCollector& gc) 830 { 831 // insert temporal filter before sending to files 832 clientOnlineReaderFilter_ = std::shared_ptr<CClientOnlineReaderFilter>(new CClientOnlineReaderFilter(gc,this)) ; 833 clientOnlineReaderFilter_ -> connectOutput(inputFilter,0) ; 834 } 835 811 836 void CField::connectToCouplerOut(CGarbageCollector& gc) 812 837 { 813 838 // insert temporal filter before sending to files 814 clientToServerStoreFilter_ = std::shared_ptr<CClientToServerStoreFilter>(new CClientToServerStoreFilter(gc, this, client ));839 clientToServerStoreFilter_ = std::shared_ptr<CClientToServerStoreFilter>(new CClientToServerStoreFilter(gc, this, client_)); 815 840 instantDataFilter->connectOutput(clientToServerStoreFilter_, 0); 816 841 const bool buildGraph_ = !build_workflow_graph.isEmpty() && build_workflow_graph == true ; … … 951 976 void CField::connectToServerToClient(CGarbageCollector& gc) 952 977 { 953 serverToClientStoreFilter_ = std::shared_ptr<CServerToClientStoreFilter>(new CServerToClientStoreFilter(gc, this, client ));978 serverToClientStoreFilter_ = std::shared_ptr<CServerToClientStoreFilter>(new CServerToClientStoreFilter(gc, this, client_)); 954 979 instantDataFilter->connectOutput(serverToClientStoreFilter_, 0); 955 980 const bool buildGraph_ = !build_workflow_graph.isEmpty() && build_workflow_graph == true ; … … 1435 1460 { 1436 1461 CContext* context = CContext::getCurrent(); 1437 client = contextClient;1462 client_ = contextClient; 1438 1463 1439 1464 // A grid is sent by a client (both for read or write) or by primary server (write only) … … 1441 1466 { 1442 1467 if (getRelFile()->mode.isEmpty() || (!getRelFile()->mode.isEmpty() && getRelFile()->mode == CFile::mode_attr::write)) 1443 grid_->setContextClient(contextClient);1468 /*grid_->setContextClient(contextClient) */; // => nothing to do with thats now, to remove... 1444 1469 } 1445 1470 else if (context->getServiceType()==CServicesManager::CLIENT) 1446 1471 { 1447 1472 if (grid_) 1448 grid_->setContextClient(contextClient);1473 /*grid_->setContextClient(contextClient)*/; // => nothing to do with thats now, to remove... 1449 1474 else 1475 1450 1476 ERROR( "CField::setContextClient(contextClient)", 1451 1477 << "Grid not defined for " << getId() … … 1459 1485 void CField::sendFieldToFileServer(void) 1460 1486 { 1461 CContext::getCurrent()->sendContextToFileServer(client );1462 getRelFile()->sendFileToFileServer(client );1487 CContext::getCurrent()->sendContextToFileServer(client_); 1488 getRelFile()->sendFileToFileServer(client_); 1463 1489 sentGrid_ = grid_-> duplicateSentGrid() ; 1464 sentGrid_->sendGridToFileServer(client , false);1490 sentGrid_->sendGridToFileServer(client_, false); 1465 1491 name = getFieldOutputName() ; 1466 this->sendAllAttributesToServer(client );1467 this->sendAddAllVariables(client );1492 this->sendAllAttributesToServer(client_); 1493 this->sendAddAllVariables(client_); 1468 1494 } 1469 1495 1470 1496 void CField::sendFieldToInputFileServer(void) 1471 1497 { 1472 CContext::getCurrent()->sendContextToFileServer(client );1473 getRelFile()->sendFileToFileServer(client );1498 CContext::getCurrent()->sendContextToFileServer(client_); 1499 getRelFile()->sendFileToFileServer(client_); 1474 1500 sentGrid_ = grid_-> duplicateSentGrid() ; 1475 sentGrid_->sendGridToFileServer(client , true);1501 sentGrid_->sendGridToFileServer(client_, true); 1476 1502 read_access=true ; // not the best solution, but on server side, the field must be a starting point of the workflow 1477 1503 // must be replace by a better solution when implementing filters for reading and send to client 1478 1504 // on server side 1479 this->sendAllAttributesToServer(client );1480 this->sendAddAllVariables(client );1505 this->sendAllAttributesToServer(client_); 1506 this->sendAddAllVariables(client_); 1481 1507 } 1482 1508 … … 1486 1512 else sendFieldToCouplerOut_done_=true ; 1487 1513 sentGrid_ = grid_-> duplicateSentGrid() ; 1488 sentGrid_->sendGridToCouplerOut(client , this->getId());1514 sentGrid_->sendGridToCouplerOut(client_, this->getId()); 1489 1515 this->sendGridCompleted(); 1490 1516 … … 1502 1528 CEventClient event(getType(),EVENT_ID_GRID_COMPLETED); 1503 1529 1504 if (client ->isServerLeader())1530 if (client_->isServerLeader()) 1505 1531 { 1506 1532 CMessage msg; 1507 1533 msg<<this->getId(); 1508 for (auto& rank : client ->getRanksServerLeader()) event.push(rank,1,msg);1509 client ->sendEvent(event);1510 } 1511 else client ->sendEvent(event);1534 for (auto& rank : client_->getRanksServerLeader()) event.push(rank,1,msg); 1535 client_->sendEvent(event); 1536 } 1537 else client_->sendEvent(event); 1512 1538 } 1513 1539 CATCH_DUMP_ATTR
Note: See TracChangeset
for help on using the changeset viewer.