Changeset 1875 for XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp
- Timestamp:
- 05/12/20 11:52:13 (4 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp
r1872 r1875 133 133 return true; 134 134 break; 135 135 136 case EVENT_ID_GRID_COMPLETED : 137 recvGridCompleted(event); 138 return true; 139 break; 136 140 default : 137 141 ERROR("bool CField::dispatchEvent(CEventServer& event)", << "Unknown Event"); … … 142 146 CATCH 143 147 144 void CField::sendUpdateData( const CArray<double,1>& data, CContextClient* client)148 void CField::sendUpdateData(Time timeStamp, const CArray<double,1>& data, CContextClient* client) 145 149 TRY 146 150 { … … 169 173 for (int n = 0; n < data_tmp.numElements(); n++) data_tmp(n) = data(index(n)); 170 174 171 list_msg.back() << getId() << data_tmp;175 list_msg.back() << getId() << timeStamp << data_tmp; 172 176 event.push(rank, 1, list_msg.back()); 173 177 } … … 189 193 for (int n = 0; n < data_tmp.numElements(); n++) data_tmp(n) = data(index(n)); 190 194 191 list_msg.back() << getId() << data_tmp;195 list_msg.back() << getId() << timeStamp << data_tmp; 192 196 event.push(rank, grid_->nbSenders_[receiverSize][rank], list_msg.back()); 193 197 } … … 219 223 CATCH 220 224 225 221 226 void CField::recvUpdateData(std::map<int,CBufferIn*>& rankBuffers) 222 227 TRY 223 228 { 229 if (hasCouplerIn()) recvUpdateDataFromCoupler(rankBuffers) ; 230 else recvUpdateDataFromClient(rankBuffers) ; 231 } 232 CATCH 233 234 void CField::recvUpdateDataFromClient(std::map<int,CBufferIn*>& rankBuffers) 235 TRY 236 { 224 237 CContext* context = CContext::getCurrent(); 225 238 Time timeStamp ; 226 239 size_t sizeData = 0; 227 240 if (0 == recvDataSrv.numElements()) … … 249 262 CArray<double,1> tmp; 250 263 CArray<size_t,1>& indexTmp = it->second; 251 *(rankBuffers[it->first]) >> t mp;264 *(rankBuffers[it->first]) >> timeStamp >> tmp; 252 265 for (int idx = 0; idx < indexTmp.numElements(); ++idx) recv_data_tmp(indexTmp(idx)) = tmp(idx); 253 266 } … … 552 565 CATCH 553 566 567 568 void CField::recvUpdateDataFromCoupler(std::map<int,CBufferIn*>& rankBuffers) 569 TRY 570 { 571 CContext* context = CContext::getCurrent(); 572 Time timeStamp ; 573 if (wasDataAlreadyReceivedFromServer) 574 { 575 lastDataReceivedFromServer = lastDataReceivedFromServer + freq_op; 576 } 577 else 578 { 579 // unlikely to input from file server where data are received at ts=0 580 // for coupling, it would be after the first freq_op, because for now we don't have 581 // restart mecanism to send the value at ts=0. It mus be changed in future 582 lastDataReceivedFromServer = context->getCalendar()->getInitDate()+freq_op; 583 wasDataAlreadyReceivedFromServer = true; 584 } 585 586 CArray<int,1>& storeClient = grid_->getStoreIndex_client(); 587 CArray<double,1> recv_data_tmp(storeClient.numElements()); 588 589 auto& outLocalIndexStoreOnClient = grid_-> getOutLocalIndexStoreOnClient() ; 590 for (auto it = outLocalIndexStoreOnClient.begin(); it != outLocalIndexStoreOnClient.end(); ++it) 591 { 592 CArray<double,1> tmp; 593 CArray<size_t,1>& indexTmp = it->second; 594 *(rankBuffers[it->first]) >> timeStamp >> tmp; 595 for (int idx = 0; idx < indexTmp.numElements(); ++idx) recv_data_tmp(indexTmp(idx)) = tmp(idx); 596 } 597 598 clientSourceFilter->streamData(lastDataReceivedFromServer, recv_data_tmp); 599 600 } 601 CATCH_DUMP_ATTR 554 602 /*! 555 603 Receive read data from server … … 596 644 } 597 645 CATCH_DUMP_ATTR 646 647 void CField::checkForLateDataFromCoupler(void) 648 TRY 649 { 650 CContext* context = CContext::getCurrent(); 651 const CDate& currentDate = context->getCalendar()->getCurrentDate(); 652 653 CTimer timer("CField::checkForLateDataFromCoupler"); 654 timer.resume(); 655 traceOff() ; 656 timer.suspend(); 657 658 bool isDataLate; 659 do 660 { 661 const CDate nextDataDue = wasDataAlreadyReceivedFromServer ? (lastDataReceivedFromServer + freq_op) : context->getCalendar()->getInitDate(); 662 isDataLate = (nextDataDue <= currentDate); 663 664 if (isDataLate) 665 { 666 timer.resume(); 667 //ym context->checkBuffersAndListen(); 668 //ym context->eventLoop(); 669 context->globalEventLoop(); 670 671 timer.suspend(); 672 } 673 } while (isDataLate && timer.getCumulatedTime() < CXios::recvFieldTimeout); 674 675 timer.resume(); 676 traceOn() ; 677 timer.suspend() ; 678 679 if (isDataLate) ERROR("void CField::checkForLateDataFromCoupler(void)", 680 << "Late data at timestep = " << currentDate); 681 } 682 CATCH_DUMP_ATTR 683 598 684 599 685 void CField::checkForLateDataFromServer(void) … … 640 726 } 641 727 CATCH_DUMP_ATTR 728 729 void CField::triggerLateField(void) 730 TRY 731 { 732 if (hasFileIn()) 733 { 734 checkForLateDataFromServer() ; 735 serverSourceFilter->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 736 } 737 else if (hasCouplerIn()) 738 { 739 checkForLateDataFromCoupler() ; 740 clientSourceFilter->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 741 } 742 } 743 CATCH_DUMP_ATTR 744 642 745 643 746 void CField::checkIfMustAutoTrigger(void) … … 1140 1243 { 1141 1244 if (buildWorkflowGraphDone_) return true ; 1245 1142 1246 const bool detectMissingValues = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 1143 1247 const double defaultValue = detectMissingValues ? default_value : (!default_value.isEmpty() ? default_value : 0.0); … … 1152 1256 } 1153 1257 1258 // now construct grid and check if element are enabled 1259 solveGridReference() ; // grid_ is now defined 1260 if (!isGridCompleted()) return false; 1261 1154 1262 // Check if we have an expression to parse 1155 1263 std::shared_ptr<COutputPin> filterExpr ; … … 1161 1269 } 1162 1270 1163 // now construct grid and check if element are enabled1164 solveGridReference() ; // grid_ is now defined1165 1166 1271 // prepare transformation. Need to know before if workflow of auxillary field can be built 1167 1272 if (hasDirectFieldReference()) … … 1173 1278 for(auto grid : gridPath) 1174 1279 { 1175 if (!grid->checkIfCompleted()) return false ;1176 1280 grid->solveElementsRefInheritance() ; 1177 grid_->completeGrid(gridSrc); // grid generation, to be checked 1281 grid->completeGrid(gridSrc); // grid generation, to be checked 1282 grid->checkElementsAttributes() ; 1178 1283 grid->prepareTransformGrid(gridSrc) ; // prepare the grid tranformation 1179 1284 for(auto fieldId : grid->getAuxInputTransformGrid()) // try to build workflow graph for auxillary field tranformation … … 1205 1310 else 1206 1311 { 1207 if (!grid_->checkIfCompleted()) return false ;1208 1209 1312 if (hasFileIn()) // input file, attemp to read the grid from file 1210 1313 { … … 1221 1324 instantDataFilter=inputFilter ; 1222 1325 } 1223 else 1326 else if (hasCouplerIn()) 1327 { 1328 grid_->checkElementsAttributes() ; 1329 instantDataFilter=inputFilter ; 1330 } 1331 else 1224 1332 { 1225 1333 setModelIn() ; // no reference, the field is potentially a source field from model … … 1256 1364 // insert temporal filter before sending to files 1257 1365 getTemporalDataFilter(gc, fileOut_->output_freq)->connectOutput(fileWriterFilter, 0); 1366 } 1367 1368 void CField::connectToCouplerOut(CGarbageCollector& gc) 1369 { 1370 // insert temporal filter before sending to files 1371 fileWriterFilter = std::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this, client)); 1372 instantDataFilter->connectOutput(fileWriterFilter, 0); 1258 1373 } 1259 1374 … … 1303 1418 serverSourceFilter -> connectOutput(inputFilter,0) ; 1304 1419 } 1420 1421 /*! 1422 * Connect field to a source filter to receive data from coupler (on client side). 1423 */ 1424 void CField::connectToCouplerIn(CGarbageCollector& gc) 1425 { 1426 CContext* context = CContext::getCurrent(); 1427 1428 if (freq_op.isEmpty()) freq_op.setValue(TimeStep); 1429 if (freq_offset.isEmpty()) freq_offset.setValue(freq_op.getValue() - TimeStep); 1430 1431 freq_operation_srv = freq_op ; 1432 last_operation_srv = context->getCalendar()->getInitDate(); 1433 const CDuration toffset = freq_operation_srv - freq_offset.getValue() - context->getCalendar()->getTimeStep(); 1434 last_operation_srv = last_operation_srv - toffset; 1435 1436 clientSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, false, false, freq_offset, true)) ; 1437 clientSourceFilter -> connectOutput(inputFilter,0) ; 1438 } 1439 1305 1440 1306 1441 /*! … … 2021 2156 CATCH_DUMP_ATTR 2022 2157 2023 CContextClient* CField::getContextClient() 2024 TRY 2025 { 2026 return client; 2027 } 2028 CATCH 2029 2030 2158 void CField::sendCloseDefinition(void) 2159 { 2160 CContext::getCurrent()->sendCloseDefinition(client) ; 2161 } 2162 2031 2163 void CField::sendFieldToFileServer(void) 2032 2164 { … … 2037 2169 this->sendAddAllVariables(client); 2038 2170 } 2171 2172 void CField::sendFieldToCouplerOut(void) 2173 { 2174 if (sendFieldToCouplerOut_done_) return ; 2175 else sendFieldToCouplerOut_done_=true ; 2176 grid_->sendGridToCouplerOut(client, this->getId()); 2177 this->sendGridCompleted(); 2178 2179 } 2039 2180 2181 void CField::makeGridAliasForCoupling(void) 2182 { 2183 grid_->makeAliasForCoupling(this->getId()); 2184 } 2185 2186 //! Client side: Send a message announcing that the grid definition has been received from a coupling context 2187 void CField::sendGridCompleted(void) 2188 TRY 2189 { 2190 CEventClient event(getType(),EVENT_ID_GRID_COMPLETED); 2191 2192 if (client->isServerLeader()) 2193 { 2194 CMessage msg; 2195 msg<<this->getId(); 2196 for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg); 2197 client->sendEvent(event); 2198 } 2199 else client->sendEvent(event); 2200 } 2201 CATCH_DUMP_ATTR 2202 2203 //! Server side: Receive a message announcing that the grid definition has been received from a coupling context 2204 void CField::recvGridCompleted(CEventServer& event) 2205 TRY 2206 { 2207 CBufferIn* buffer=event.subEvents.begin()->buffer; 2208 string id; 2209 *buffer>>id ; 2210 get(id)->recvGridCompleted(*buffer); 2211 } 2212 CATCH 2213 2214 //! Server side: Receive a message message announcing that the grid definition has been received from a coupling context 2215 void CField::recvGridCompleted(CBufferIn& buffer) 2216 TRY 2217 { 2218 setGridCompleted() ; 2219 } 2220 CATCH_DUMP_ATTR 2221 2222 2223 2224 2225 2040 2226 void CField::sendFieldToInputFileServer(void) 2041 2227 { … … 2145 2331 TRY 2146 2332 { 2147 bool isFieldRead = getRelFile() && !getRelFile()->mode.isEmpty() && getRelFile()->mode == CFile::mode_attr::read; 2148 bool isFieldWrite = getRelFile() && ( getRelFile()->mode.isEmpty() || getRelFile()->mode == CFile::mode_attr::write); 2149 if (isFieldRead && !(operation.getValue() == "instant" || operation.getValue() == "once") ) 2333 if (hasFileIn() && !(operation.getValue() == "instant" || operation.getValue() == "once") ) 2150 2334 ERROR("void CField::checkTimeAttributes(void)", 2151 2335 << "Unsupported operation for field '" << getFieldOutputName() << "'." << std::endl … … 2156 2340 if (operation.getValue() == "instant") 2157 2341 { 2158 if ( isFieldRead || isFieldWrite) freq_op.setValue(getRelFile()->output_freq.getValue());2342 if (hasFileIn() || hasFileOut()) freq_op.setValue(getRelFile()->output_freq.getValue()); 2159 2343 else freq_op=*freqOp ; 2160 2344 } 2161 2345 else freq_op.setValue(TimeStep); 2162 2346 } 2163 if (freq_offset.isEmpty()) freq_offset.setValue( isFieldRead? NoneDu : (freq_op.getValue() - TimeStep));2347 if (freq_offset.isEmpty()) freq_offset.setValue(hasFileIn() ? NoneDu : (freq_op.getValue() - TimeStep)); 2164 2348 } 2165 2349 CATCH_DUMP_ATTR
Note: See TracChangeset
for help on using the changeset viewer.