Changeset 1158 for XIOS/dev/dev_olga/src/node/field.cpp
- Timestamp:
- 06/06/17 17:58:16 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_olga/src/node/field.cpp
r1144 r1158 38 38 , areAllReferenceSolved(false), isReferenceSolved(false), isReferenceSolvedAndTransformed(false) 39 39 , useCompressedOutput(false) 40 , hasTimeInstant(false) 41 , hasTimeCentered(false) 40 42 , wasDataAlreadyReceivedFromServer(false) 43 , isEOF(false) 41 44 { setVirtualVariableGroup(CVariableGroup::create(getId() + "_virtual_variable_group")); } 42 45 … … 50 53 , areAllReferenceSolved(false), isReferenceSolved(false), isReferenceSolvedAndTransformed(false) 51 54 , useCompressedOutput(false) 55 , hasTimeInstant(false) 56 , hasTimeCentered(false) 52 57 , wasDataAlreadyReceivedFromServer(false) 58 , isEOF(false) 53 59 { setVirtualVariableGroup(CVariableGroup::create(getId() + "_virtual_variable_group")); } 54 60 … … 122 128 void CField::sendUpdateData(const CArray<double,1>& data) 123 129 { 124 CTimer::get(" XIOS Send Data").resume();130 CTimer::get("Field : send data").resume(); 125 131 126 132 CContext* context = CContext::getCurrent(); … … 174 180 } 175 181 176 CTimer::get(" XIOS Send Data").suspend();182 CTimer::get("Field : send data").suspend(); 177 183 } 178 184 … … 183 189 list<CEventServer::SSubEvent>::iterator it; 184 190 string fieldId; 185 191 CTimer::get("Field : recv data").resume(); 186 192 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 187 193 { … … 192 198 } 193 199 get(fieldId)->recvUpdateData(rankBuffers); 200 CTimer::get("Field : recv data").suspend(); 194 201 } 195 202 … … 252 259 } 253 260 } 254 255 // void CField::recvUpdateData(vector<int>& ranks, vector<CBufferIn*>& buffers)256 // {257 // CContext* context = CContext::getCurrent();258 259 // if (data_srv.empty())260 // {261 // for (map<int, CArray<size_t, 1> >::iterator it = grid->outIndexFromClient.begin(); it != grid->outIndexFromClient.end(); ++it)262 // {263 // int rank = it->first;264 // data_srv.insert(std::make_pair(rank, CArray<double,1>(it->second.numElements())));265 // foperation_srv.insert(pair<int,boost::shared_ptr<func::CFunctor> >(rank,boost::shared_ptr<func::CFunctor>(new func::CInstant(data_srv[rank]))));266 // }267 // }268 269 // const CDate& currDate = context->getCalendar()->getCurrentDate();270 // const CDate opeDate = last_operation_srv +freq_op + freq_operation_srv - freq_op;271 // const CDate writeDate = last_Write_srv + freq_write_srv;272 273 // if (opeDate <= currDate)274 // {275 // for (int n = 0; n < ranks.size(); n++)276 // {277 // CArray<double,1> data_tmp;278 // *buffers[n] >> data_tmp;279 // (*foperation_srv[ranks[n]])(data_tmp);280 // }281 // last_operation_srv = currDate;282 // }283 284 // if (writeDate < (currDate + freq_operation_srv))285 // {286 // for (int n = 0; n < ranks.size(); n++)287 // {288 // this->foperation_srv[ranks[n]]->final();289 // }290 291 // last_Write_srv = writeDate;292 // }293 294 // if (context->hasClient && context->hasServer)295 // {296 // size_t writtenSize;297 // // if (field->getUseCompressedOutput())298 // // writtenSize = grid->getNumberWrittenIndexes();299 // // else300 // writtenSize = grid->getWrittenDataSize();301 302 // CArray<double,1> fieldData(writtenSize);303 // // if (!field->default_value.isEmpty()) fieldData = field->default_value;304 305 // // if (field->getUseCompressedOutput())306 // // field->outputCompressedField(fieldData);307 // // else308 // this->outputField(fieldData);309 // sendUpdateData(fieldData);310 // }311 // if (!context->hasClient && context->hasServer)312 // {313 // writeField();314 // }315 316 // lastlast_Write_srv = last_Write_srv;317 318 // }319 261 320 262 void CField::writeField(void) … … 331 273 } 332 274 333 voidCField::sendReadDataRequest(const CDate& tsDataRequested)275 bool CField::sendReadDataRequest(const CDate& tsDataRequested) 334 276 { 335 277 CContext* context = CContext::getCurrent(); … … 339 281 lastDataRequestedFromServer = tsDataRequested; 340 282 341 CEventClient event(getType(), EVENT_ID_READ_DATA); 342 if (client->isServerLeader()) 283 if (!isEOF) // No need to send the request if we already know we are at EOF 343 284 { 344 CMessage msg; 345 msg << getId(); 346 const std::list<int>& ranks = client->getRanksServerLeader(); 347 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 348 event.push(*itRank, 1, msg); 349 client->sendEvent(event); 350 } 351 else client->sendEvent(event); 285 CEventClient event(getType(), EVENT_ID_READ_DATA); 286 if (client->isServerLeader()) 287 { 288 CMessage msg; 289 msg << getId(); 290 const std::list<int>& ranks = client->getRanksServerLeader(); 291 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 292 event.push(*itRank, 1, msg); 293 client->sendEvent(event); 294 } 295 else client->sendEvent(event); 296 } 297 else 298 serverSourceFilter->signalEndOfStream(tsDataRequested); 299 300 return !isEOF; 352 301 } 353 302 … … 361 310 362 311 bool dataRequested = false; 312 363 313 while (currentDate >= lastDataRequestedFromServer) 364 314 { … … 368 318 info(20) << "lastDataRequestedFromServer + file->output_freq.getValue() : " << lastDataRequestedFromServer + file->output_freq << endl ; 369 319 370 sendReadDataRequest(lastDataRequestedFromServer + file->output_freq); 371 372 dataRequested = true; 373 } 320 dataRequested |= sendReadDataRequest(lastDataRequestedFromServer + file->output_freq); 321 } 322 374 323 return dataRequested; 375 324 } … … 386 335 { 387 336 CContext* context = CContext::getCurrent(); 388 CContextClient* client = context->client; 337 CContextClient* client = context->client; 389 338 390 339 CEventClient event(getType(), EVENT_ID_READ_DATA_READY); … … 393 342 bool hasData = readField(); 394 343 395 344 map<int, CArray<double,1> >::iterator it; 396 345 if (!grid->doGridHaveDataDistributed()) 397 346 { … … 413 362 } 414 363 } 415 416 364 client->sendEvent(event); 417 365 } … … 500 448 std::map<int, CArray<double,1> > data; 501 449 502 bool isEOF = false;503 504 450 for (int i = 0; i < ranks.size(); i++) 505 451 { … … 585 531 //---------------------------------------------------------------- 586 532 587 bool CField::isActive(void) const 588 { 589 return (instantDataFilter != NULL); 533 bool CField::isActive(bool atCurrentTimestep /*= false*/) const 534 { 535 if (clientSourceFilter) 536 return atCurrentTimestep ? clientSourceFilter->isDataExpected(CContext::getCurrent()->getCalendar()->getCurrentDate()) : true; 537 else if (storeFilter) 538 return true; 539 else if (instantDataFilter) 540 ERROR("bool CField::isActive(bool atCurrentTimestep)", 541 << "Impossible to check if field [ id = " << getId() << " ] is active as it cannot be used to receive nor send data."); 542 543 return false; 590 544 } 591 545 … … 815 769 if (hasDirectFieldReference()) getDirectFieldReference()->solveAllReferenceEnabledField(false); 816 770 } 817 else if (context->hasServer)771 else if (context->hasServer) 818 772 solveServerOperation(); 819 773 … … 940 894 { 941 895 // Check if we have an expression to parse 942 if (hasExpression())896 if (hasExpression()) 943 897 { 944 boost::scoped_ptr<IFilterExprNode> expr(parseExpr(getExpression() + '\0')); 945 boost::shared_ptr<COutputPin> filter = expr->reduce(gc, *this); 946 947 // Check if a spatial transformation is needed 948 if (!field_ref.isEmpty()) 949 { 950 CGrid* gridRef = CField::get(field_ref)->grid; 951 952 if (grid && grid != gridRef && grid->hasTransform()) 898 boost::scoped_ptr<IFilterExprNode> expr(parseExpr(getExpression() + '\0')); 899 boost::shared_ptr<COutputPin> filter = expr->reduce(gc, *this); 900 901 // Check if a spatial transformation is needed 902 if (!field_ref.isEmpty()) 953 903 { 954 double defaultValue = !default_value.isEmpty() ? default_value : 0.0; 955 std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters = CSpatialTransformFilter::buildFilterGraph(gc, gridRef, grid, defaultValue); 956 957 filter->connectOutput(filters.first, 0); 958 filter = filters.second; 904 CGrid* gridRef = CField::get(field_ref)->grid; 905 906 if (grid && grid != gridRef && grid->hasTransform()) 907 { 908 bool hasMissingValue = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 909 double defaultValue = hasMissingValue ? default_value : (!default_value.isEmpty() ? default_value : 0.0); 910 std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters = CSpatialTransformFilter::buildFilterGraph(gc, gridRef, grid, hasMissingValue, defaultValue); 911 912 filter->connectOutput(filters.first, 0); 913 filter = filters.second; 914 } 959 915 } 960 } 961 962 instantDataFilter = filter; 916 917 instantDataFilter = filter; 963 918 } 964 919 // Check if we have a reference on another field … … 967 922 // Check if the data is to be read from a file 968 923 else if (file && !file->mode.isEmpty() && file->mode == CFile::mode_attr::read) 969 instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid,970 freq_offset.isEmpty() ? NoneDu : freq_offset,971 true));924 instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 925 freq_offset.isEmpty() ? NoneDu : freq_offset, 926 true)); 972 927 else // The data might be passed from the model 973 instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid));974 }975 976 // If the field data is to be read by the client or/and written to a file977 if (enableOutput && !storeFilter && !fileWriterFilter)978 {979 if (!read_access.isEmpty() && read_access)980 928 { 981 storeFilter = boost::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid)); 982 instantDataFilter->connectOutput(storeFilter, 0); 929 bool ignoreMissingValue = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 930 double defaultValue = ignoreMissingValue ? default_value : (!default_value.isEmpty() ? default_value : 0.0); 931 instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, NoneDu, false, 932 ignoreMissingValue, defaultValue)); 983 933 } 984 985 if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 986 { 987 fileWriterFilter = boost::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this)); 988 getTemporalDataFilter(gc, file->output_freq)->connectOutput(fileWriterFilter, 0); 989 } 990 } 991 } 992 } 993 934 } 935 } 936 937 // If the field data is to be read by the client or/and written to a file 938 if (enableOutput && !storeFilter && !fileWriterFilter) 939 { 940 if (!read_access.isEmpty() && read_access) 941 { 942 storeFilter = boost::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid)); 943 instantDataFilter->connectOutput(storeFilter, 0); 944 } 945 946 if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 947 { 948 fileWriterFilter = boost::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this)); 949 getTemporalDataFilter(gc, file->output_freq)->connectOutput(fileWriterFilter, 0); 950 } 951 } 952 } 994 953 995 954 /*! … … 1000 959 * \return the output pin corresponding to the field reference 1001 960 */ 1002 boost::shared_ptr<COutputPin> CField::getFieldReference(CGarbageCollector& gc) 1003 { 1004 if (instantDataFilter || field_ref.isEmpty()) 1005 ERROR("COutputPin* CField::getFieldReference(CGarbageCollector& gc)", 1006 "Impossible to get the field reference for a field which has already been parsed or which does not have a field_ref."); 1007 1008 CField* fieldRef = CField::get(field_ref); 1009 fieldRef->buildFilterGraph(gc, false); 1010 1011 std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters; 1012 // Check if a spatial transformation is needed 1013 if (grid && grid != fieldRef->grid && grid->hasTransform()) 1014 { 1015 double defaultValue = !default_value.isEmpty() ? default_value : 0.0; 1016 filters = CSpatialTransformFilter::buildFilterGraph(gc, fieldRef->grid, grid, defaultValue); 1017 } 1018 else 1019 filters.first = filters.second = boost::shared_ptr<CFilter>(new CPassThroughFilter(gc)); 1020 1021 fieldRef->getInstantDataFilter()->connectOutput(filters.first, 0); 1022 1023 return filters.second; 1024 } 961 boost::shared_ptr<COutputPin> CField::getFieldReference(CGarbageCollector& gc) 962 { 963 if (instantDataFilter || field_ref.isEmpty()) 964 ERROR("COutputPin* CField::getFieldReference(CGarbageCollector& gc)", 965 "Impossible to get the field reference for a field which has already been parsed or which does not have a field_ref."); 966 967 CField* fieldRef = CField::get(field_ref); 968 fieldRef->buildFilterGraph(gc, false); 969 970 std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters; 971 // Check if a spatial transformation is needed 972 if (grid && grid != fieldRef->grid && grid->hasTransform()) 973 { 974 bool hasMissingValue = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 975 double defaultValue = hasMissingValue ? default_value : (!default_value.isEmpty() ? default_value : 0.0); 976 filters = CSpatialTransformFilter::buildFilterGraph(gc, fieldRef->grid, grid, hasMissingValue, defaultValue); 977 } 978 else 979 filters.first = filters.second = boost::shared_ptr<CFilter>(new CPassThroughFilter(gc)); 980 981 fieldRef->getInstantDataFilter()->connectOutput(filters.first, 0); 982 983 return filters.second; 984 } 1025 985 1026 986 /*! … … 1058 1018 { 1059 1019 if (!clientSourceFilter) 1060 clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 1020 { 1021 bool ignoreMissingValue = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 1022 double defaultValue = ignoreMissingValue ? default_value : (!default_value.isEmpty() ? default_value : 0.0); 1023 clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, NoneDu, false, 1024 ignoreMissingValue, defaultValue)); 1025 } 1061 1026 1062 1027 selfReferenceFilter = clientSourceFilter; … … 1104 1069 return it->second; 1105 1070 } 1071 1072 /*! 1073 * Returns the temporal filter corresponding to the field's temporal operation 1074 * for the specified operation frequency. 1075 * 1076 * \param gc the garbage collector to use 1077 * \param outFreq the operation frequency, i.e. the frequency at which the output data will be computed 1078 * \return the output pin corresponding to the requested temporal filter 1079 */ 1080 1081 boost::shared_ptr<COutputPin> CField::getSelfTemporalDataFilter(CGarbageCollector& gc, CDuration outFreq) 1082 { 1083 if (instantDataFilter || !hasExpression()) 1084 ERROR("COutputPin* CField::getSelfTemporalDataFilter(CGarbageCollector& gc)", 1085 "Impossible to add a self reference to a field which has already been parsed or which does not have an expression."); 1086 1087 if (!selfReferenceFilter) getSelfReference(gc) ; 1088 1089 if (serverSourceFilter || clientSourceFilter) 1090 { 1091 if (operation.isEmpty()) 1092 ERROR("void CField::getSelfTemporalDataFilter(CGarbageCollector& gc, CDuration outFreq)", 1093 << "An operation must be defined for field \"" << getId() << "\"."); 1094 1095 if (freq_op.isEmpty()) freq_op.setValue(TimeStep); 1096 if (freq_offset.isEmpty()) freq_offset.setValue(NoneDu); 1097 1098 const bool ignoreMissingValue = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 1099 1100 boost::shared_ptr<CTemporalFilter> temporalFilter(new CTemporalFilter(gc, operation, 1101 CContext::getCurrent()->getCalendar()->getInitDate(), 1102 freq_op, freq_offset, outFreq, 1103 ignoreMissingValue, ignoreMissingValue ? default_value : 0.0)); 1104 selfReferenceFilter->connectOutput(temporalFilter, 0); 1105 return temporalFilter ; 1106 } 1107 else if (!field_ref.isEmpty()) 1108 { 1109 CField* fieldRef = CField::get(field_ref); 1110 fieldRef->buildFilterGraph(gc, false); 1111 return fieldRef->getTemporalDataFilter(gc, outFreq) ; 1112 } 1113 } 1106 1114 1107 1115 //----------------------------------------------------------------
Note: See TracChangeset
for help on using the changeset viewer.