Changeset 1930 for XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp
- Timestamp:
- 09/10/20 13:51:02 (4 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp
r1883 r1930 26 26 #include "file_server_reader_filter.hpp" 27 27 #include "server_to_client_filter.hpp" 28 #include "server_from_client_source_filter.hpp" 29 #include "file_reader_source_filter.hpp" 28 30 #include "tracer.hpp" 29 31 … … 148 150 CATCH 149 151 152 /* obsolete old interface 150 153 void CField::sendUpdateData(Time timeStamp, const CArray<double,1>& data, CContextClient* client) 151 154 TRY … … 204 207 } 205 208 CATCH_DUMP_ATTR 206 209 */ 210 211 void CField::sendUpdateData(Time timeStamp, const CArray<double,1>& data, CContextClient* client) 212 TRY 213 { 214 CTimer::get("Field : send data").resume(); 215 CEventClient event(getType(), EVENT_ID_UPDATE_DATA); 216 CMessage message ; 217 218 message<<getId() << timeStamp ; 219 this->getGrid()->getClientToServerConnector(client)->transfer(data, client, event, message) ; 220 CTimer::get("Field : send data").suspend(); 221 } 222 CATCH_DUMP_ATTR 223 224 /* old version obsolete 207 225 void CField::recvUpdateData(CEventServer& event) 208 226 TRY … … 224 242 } 225 243 CATCH 226 227 228 void CField::recvUpdateData(std::map<int,CBufferIn*>& rankBuffers) 229 TRY 230 { 231 if (hasCouplerIn()) recvUpdateDataFromCoupler(rankBuffers) ; 232 else recvUpdateDataFromClient(rankBuffers) ; 244 */ 245 246 void CField::recvUpdateData(CEventServer& event) 247 TRY 248 { 249 string fieldId; 250 for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> fieldId ; 251 get(fieldId)->receiveUpdateData(event); 233 252 } 234 253 CATCH 235 254 255 void CField::receiveUpdateData(CEventServer& event) 256 TRY 257 { 258 if (hasCouplerIn()) clientFromClientSourceFilter_->streamData(event) ; 259 else serverFromClientSourceFilter_->streamData(event) ; 260 } 261 CATCH 262 /* 263 void CField::recvUpdateDataFromClient(CEventServer& event) 264 TRY 265 { 266 Time timeStamp ; 267 for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> timeStamp ; 268 269 CArray<double,1> recvData ; 270 getGrid()->getServerFromClientConnector()->transfer(event,recvData) ; 271 this->setData(recvData); 272 } 273 CATCH 274 */ 275 276 /* 277 void CField::recvUpdateDataFromCoupler(CEventServer& event) 278 TRY 279 { 280 CContext* context = CContext::getCurrent(); 281 Time timeStamp ; 282 if (wasDataAlreadyReceivedFromServer) 283 { 284 lastDataReceivedFromServer = lastDataReceivedFromServer + freq_op; 285 } 286 else 287 { 288 // unlikely to input from file server where data are received at ts=0 289 // for coupling, it would be after the first freq_op, because for now we don't have 290 // restart mecanism to send the value at ts=0. It must be changed in future 291 lastDataReceivedFromServer = context->getCalendar()->getInitDate(); 292 wasDataAlreadyReceivedFromServer = true; 293 } 294 295 CArray<double,1> recvData ; 296 getGrid()->getServerFromClientConnector()->transfer(event,recvData) ; 297 clientSourceFilter->streamData(lastDataReceivedFromServer, recvData); 298 299 } 300 CATCH_DUMP_ATTR 301 */ 302 303 304 305 306 /* old interface to be removed.... */ 236 307 void CField::recvUpdateDataFromClient(std::map<int,CBufferIn*>& rankBuffers) 237 308 TRY … … 330 401 const CDate writeDate = last_Write_srv + freq_write_srv; 331 402 last_Write_srv = writeDate; 332 grid_->computeWrittenIndex();403 // grid_->computeWrittenIndex(); -> obselete function need to be removed 333 404 /* 334 405 recvDataSrv.resize(data.numElements()) ; … … 424 495 StdString fieldId; 425 496 *buffer >> fieldId; 426 get(fieldId)->recvReadDataRequest( event.getContextServer());497 get(fieldId)->recvReadDataRequest(); 427 498 } 428 499 CATCH 429 500 430 501 /*! 431 502 Receive data request sent from client and process it … … 434 505 In the future, this should (only) be done by the last level servers. 435 506 */ 507 void CField::recvReadDataRequest(void) 508 TRY 509 { 510 fileReaderSourceFilter_->streamData() ; 511 } 512 CATCH_DUMP_ATTR 513 514 /* old interface -> to remove 436 515 void CField::recvReadDataRequest(CContextServer* server) 437 516 TRY … … 446 525 } 447 526 CATCH_DUMP_ATTR 448 527 */ 449 528 450 529 void CField::sendUpdateDataServerToClient(bool isEOF, const CArray<double,1>& data, CContextClient* client) … … 509 588 \return State of field can be read from a file 510 589 */ 590 // obsolete to remove 591 /* 511 592 CField::EReadField CField::readField(CArray<double,1>& data) 512 593 TRY … … 561 642 } 562 643 CATCH_DUMP_ATTR 644 */ 563 645 564 646 /* … … 572 654 { 573 655 string fieldId; 574 vector<int> ranks; 575 vector<CBufferIn*> buffers; 576 577 list<CEventServer::SSubEvent>::iterator it; 578 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 579 { 580 ranks.push_back(it->rank); 581 CBufferIn* buffer = it->buffer; 582 *buffer >> fieldId; 583 buffers.push_back(buffer); 584 } 585 get(fieldId)->recvReadDataReady(ranks, buffers); 656 for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> fieldId ; 657 get(fieldId)->recvReadDataReady(event); 586 658 } 587 659 CATCH 588 660 589 661 662 /* old interface to be removed ..*/ 590 663 void CField::recvUpdateDataFromCoupler(std::map<int,CBufferIn*>& rankBuffers) 591 664 TRY … … 622 695 } 623 696 CATCH_DUMP_ATTR 697 698 624 699 /*! 625 700 Receive read data from server … … 627 702 \param [in] buffers buffers containing read data 628 703 */ 704 // old interface to remove 629 705 void CField::recvReadDataReady(vector<int> ranks, vector<CBufferIn*> buffers) 630 706 TRY … … 667 743 CATCH_DUMP_ATTR 668 744 745 746 747 void CField::receiveReadDataReady(CEventServer& event) 748 TRY 749 { 750 clientFromServerSourceFilter_->streamData(event) ; 751 } 752 CATCH_DUMP_ATTR 753 754 755 669 756 void CField::checkForLateDataFromCoupler(void) 670 757 TRY … … 677 764 traceOff() ; 678 765 timer.suspend(); 679 680 bool isDataLate; 766 767 bool isDataLate; 681 768 do 682 769 { 683 if (wasDataAlreadyReceivedFromServer) isDataLate = lastDataReceivedFromServer + freq_offset + freq_op <= currentDate ; 684 else isDataLate = context->getCalendar()->getInitDate()+freq_offset <= currentDate ; 685 770 isDataLate=clientFromClientSourceFilter_->isDataLate() ; 686 771 if (isDataLate) 687 772 { 688 773 timer.resume(); 689 //ym context->checkBuffersAndListen();690 //ym context->eventLoop();691 774 context->globalEventLoop(); 692 693 775 timer.suspend(); 694 776 } … … 709 791 { 710 792 CContext* context = CContext::getCurrent(); 711 const CDate& currentDate = context->getCalendar()->getCurrentDate();712 713 793 // Check if data previously requested has been received as expected 714 if (wasDataRequestedFromServer && ! isEOF)794 if (wasDataRequestedFromServer && !clientFromServerSourceFilter_->isEOF()) 715 795 { 716 796 CTimer timer("CField::checkForLateDataFromServer"); … … 722 802 do 723 803 { 724 const CDate nextDataDue = wasDataAlreadyReceivedFromServer ? (lastDataReceivedFromServer + fileIn_->output_freq) : context->getCalendar()->getInitDate(); 725 isDataLate = (nextDataDue <= currentDate); 726 804 isDataLate=clientFromServerSourceFilter_->isDataLate(); 727 805 if (isDataLate) 728 806 { … … 744 822 if (isDataLate) 745 823 ERROR("void CField::checkForLateDataFromServer(void)", 746 << "Late data at timestep = " << c urrentDate);824 << "Late data at timestep = " << context->getCalendar()->getCurrentDate()); 747 825 } 748 826 } … … 755 833 { 756 834 checkForLateDataFromServer() ; 757 serverSourceFilter->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ;835 clientFromServerSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 758 836 } 759 837 else if (hasCouplerIn()) 760 838 { 761 839 checkForLateDataFromCoupler() ; 762 client SourceFilter->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ;840 clientFromClientSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 763 841 } 764 842 } … … 769 847 TRY 770 848 { 771 mustAutoTrigger = serverSourceFilter ? serverSourceFilter->mustAutoTrigger() : false;849 mustAutoTrigger = clientFromServerSourceFilter_ ? clientFromServerSourceFilter_->mustAutoTrigger() : false; 772 850 } 773 851 CATCH_DUMP_ATTR … … 777 855 { 778 856 if (mustAutoTrigger) 779 serverSourceFilter->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate());857 clientFromServerSourceFilter_->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()); 780 858 } 781 859 CATCH_DUMP_ATTR … … 860 938 TRY 861 939 { 862 if (clientSourceFilter) 863 return atCurrentTimestep ? clientSourceFilter->isDataExpected(CContext::getCurrent()->getCalendar()->getCurrentDate()) : true; 864 else if (storeFilter) 865 return true; 940 if (modelToClientSourceFilter_) 941 return atCurrentTimestep ? modelToClientSourceFilter_->isDataExpected(CContext::getCurrent()->getCalendar()->getCurrentDate()) : true; 942 else if (clientToModelStoreFilter_) return true; 866 943 else if (instantDataFilter) 867 944 ERROR("bool CField::isActive(bool atCurrentTimestep)", … … 1412 1489 1413 1490 if (check_if_active.isEmpty()) check_if_active = false; 1414 clientSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, false, true, NoneDu, false, detectMissingValues, defaultValue));1415 clientSourceFilter-> connectOutput(inputFilter,0) ;1491 modelToClientSourceFilter_ = std::shared_ptr<CModelToClientSourceFilter>(new CModelToClientSourceFilter(gc, grid_, detectMissingValues, defaultValue)); 1492 modelToClientSourceFilter_ -> connectOutput(inputFilter,0) ; 1416 1493 } 1417 1494 … … 1421 1498 void CField::connectToClientInput(CGarbageCollector& gc) 1422 1499 { 1423 clientSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, false, false));1424 clientSourceFilter-> connectOutput(inputFilter,0) ;1500 serverFromClientSourceFilter_ = std::shared_ptr<CServerFromClientSourceFilter>(new CServerFromClientSourceFilter(gc, grid_)); 1501 serverFromClientSourceFilter_ -> connectOutput(inputFilter,0) ; 1425 1502 } 1426 1503 … … 1450 1527 if (freq_op.isEmpty()) freq_op.setValue(TimeStep); 1451 1528 if (freq_offset.isEmpty()) freq_offset.setValue(freq_op.getValue() - TimeStep); 1529 1530 /* old 1452 1531 1453 1532 freq_operation_srv = freq_op ; … … 1458 1537 clientSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, false, false, freq_offset, true)) ; 1459 1538 clientSourceFilter -> connectOutput(inputFilter,0) ; 1539 1540 */ 1541 // new 1542 1543 clientFromClientSourceFilter_ = std::shared_ptr<CClientFromClientSourceFilter>(new CClientFromClientSourceFilter(gc, this)) ; 1544 clientFromClientSourceFilter_ -> connectOutput(inputFilter,0) ; 1545 1460 1546 } 1461 1462 1547 1463 1548 /*! … … 1469 1554 instantDataFilter->connectOutput(fileServerWriterFilter, 0); 1470 1555 } 1471 1556 1472 1557 /*! 1473 1558 * Connect field to a file reader filter to read data from file (on server side). … … 1475 1560 void CField::connectToFileReader(CGarbageCollector& gc) 1476 1561 { 1477 fileServerReaderFilter_ = std::shared_ptr<CFileServerReaderFilter>(new CFileServerReaderFilter(gc, this)); 1478 fileServerReaderFilter_->connectOutput(inputFilter, 0); 1479 } 1562 fileReaderSourceFilter_ = std::shared_ptr<CFileReaderSourceFilter>(new CFileReaderSourceFilter(gc, this)); 1563 instantDataFilter->connectOutput(inputFilter, 0); 1564 } 1565 1480 1566 1481 1567 /*! … … 1484 1570 void CField::connectToModelOutput(CGarbageCollector& gc) 1485 1571 { 1486 const bool detectMissingValues = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 1487 const double defaultValue = detectMissingValues ? default_value : (!default_value.isEmpty() ? default_value : 0.0); 1488 1489 storeFilter = std::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid_, detectMissingValues, defaultValue)); 1490 instantDataFilter->connectOutput(storeFilter, 0); 1572 clientToModelStoreFilter_ = std::shared_ptr<CClientToModelStoreFilter>(new CClientToModelStoreFilter(gc, this)); 1573 instantDataFilter->connectOutput(clientToModelStoreFilter_, 0); 1491 1574 } 1492 1575 … … 1565 1648 1566 1649 // If the field data is to be read by the client or/and written to a file 1567 if (enableOutput && ! storeFilter&& !fileWriterFilter)1650 if (enableOutput && !clientToModelStoreFilter_ && !fileWriterFilter) 1568 1651 { 1569 1652 if (getRelFile() && (getRelFile()->mode.isEmpty() || getRelFile()->mode == CFile::mode_attr::write)) … … 1580 1663 1581 1664 // If the field data is to be read by the client or/and written to a file 1582 if (enableOutput && ! storeFilter&& !fileWriterFilter)1665 if (enableOutput && !clientToModelStoreFilter_ && !fileWriterFilter) 1583 1666 { 1584 1667 if (getRelFile() && (getRelFile()->mode.isEmpty() || getRelFile()->mode == CFile::mode_attr::write)) … … 1633 1716 1634 1717 // If the field data is to be read by the client or/and written to a file 1635 if (enableOutput && ! storeFilter&& !fileWriterFilter)1718 if (enableOutput && !clientToModelStoreFilter_ && !fileWriterFilter) 1636 1719 { 1637 1720 if (!read_access.isEmpty() && read_access) 1638 1721 { 1639 storeFilter = std::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid_, 1640 detectMissingValues, defaultValue)); 1641 instantDataFilter->connectOutput(storeFilter, 0); 1722 clientToModelStoreFilter_ = std::shared_ptr<CClientToModelStoreFilter>(new CClientToModelStoreFilter(gc, this)); 1723 instantDataFilter->connectOutput(clientToModelStoreFilter_, 0); 1642 1724 } 1643 1725
Note: See TracChangeset
for help on using the changeset viewer.