Ignore:
Timestamp:
06/06/17 17:58:16 (7 years ago)
Author:
oabramkina
Message:

Two server levels: merging with trunk r1137.
There are bugs.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_olga/src/node/field.cpp

    r1144 r1158  
    3838      , areAllReferenceSolved(false), isReferenceSolved(false), isReferenceSolvedAndTransformed(false) 
    3939      , useCompressedOutput(false) 
     40      , hasTimeInstant(false) 
     41      , hasTimeCentered(false) 
    4042      , wasDataAlreadyReceivedFromServer(false) 
     43      , isEOF(false) 
    4144   { setVirtualVariableGroup(CVariableGroup::create(getId() + "_virtual_variable_group")); } 
    4245 
     
    5053      , areAllReferenceSolved(false), isReferenceSolved(false), isReferenceSolvedAndTransformed(false) 
    5154      , useCompressedOutput(false) 
     55      , hasTimeInstant(false) 
     56      , hasTimeCentered(false) 
    5257      , wasDataAlreadyReceivedFromServer(false) 
     58      , isEOF(false) 
    5359   { setVirtualVariableGroup(CVariableGroup::create(getId() + "_virtual_variable_group")); } 
    5460 
     
    122128  void CField::sendUpdateData(const CArray<double,1>& data) 
    123129  { 
    124     CTimer::get("XIOS Send Data").resume(); 
     130    CTimer::get("Field : send data").resume(); 
    125131 
    126132    CContext* context = CContext::getCurrent(); 
     
    174180      }     
    175181 
    176     CTimer::get("XIOS Send Data").suspend(); 
     182    CTimer::get("Field : send data").suspend(); 
    177183  } 
    178184 
     
    183189    list<CEventServer::SSubEvent>::iterator it; 
    184190    string fieldId; 
    185  
     191    CTimer::get("Field : recv data").resume(); 
    186192    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 
    187193    { 
     
    192198    } 
    193199    get(fieldId)->recvUpdateData(rankBuffers); 
     200    CTimer::get("Field : recv data").suspend(); 
    194201  } 
    195202 
     
    252259    } 
    253260  } 
    254    
    255 //   void  CField::recvUpdateData(vector<int>& ranks, vector<CBufferIn*>& buffers) 
    256 //   { 
    257 //     CContext* context = CContext::getCurrent(); 
    258  
    259 //     if (data_srv.empty()) 
    260 //     { 
    261 //       for (map<int, CArray<size_t, 1> >::iterator it = grid->outIndexFromClient.begin(); it != grid->outIndexFromClient.end(); ++it) 
    262 //       { 
    263 //         int rank = it->first; 
    264 //         data_srv.insert(std::make_pair(rank, CArray<double,1>(it->second.numElements()))); 
    265 //         foperation_srv.insert(pair<int,boost::shared_ptr<func::CFunctor> >(rank,boost::shared_ptr<func::CFunctor>(new func::CInstant(data_srv[rank])))); 
    266 //       } 
    267 //     } 
    268  
    269 //     const CDate& currDate = context->getCalendar()->getCurrentDate(); 
    270 //     const CDate opeDate      = last_operation_srv +freq_op + freq_operation_srv - freq_op; 
    271 //     const CDate writeDate    = last_Write_srv     + freq_write_srv; 
    272  
    273 //     if (opeDate <= currDate) 
    274 //     { 
    275 //       for (int n = 0; n < ranks.size(); n++) 
    276 //       { 
    277 //         CArray<double,1> data_tmp; 
    278 //         *buffers[n] >> data_tmp; 
    279 //         (*foperation_srv[ranks[n]])(data_tmp); 
    280 //       } 
    281 //       last_operation_srv = currDate; 
    282 //     } 
    283  
    284 //     if (writeDate < (currDate + freq_operation_srv)) 
    285 //     { 
    286 //       for (int n = 0; n < ranks.size(); n++) 
    287 //       { 
    288 //         this->foperation_srv[ranks[n]]->final(); 
    289 //       } 
    290  
    291 //       last_Write_srv = writeDate; 
    292 //     } 
    293  
    294 //     if (context->hasClient && context->hasServer) 
    295 //     { 
    296 //       size_t writtenSize; 
    297 // //      if (field->getUseCompressedOutput()) 
    298 // //        writtenSize = grid->getNumberWrittenIndexes(); 
    299 // //      else 
    300 //         writtenSize = grid->getWrittenDataSize(); 
    301  
    302 //       CArray<double,1> fieldData(writtenSize); 
    303 // //      if (!field->default_value.isEmpty()) fieldData = field->default_value; 
    304  
    305 // //      if (field->getUseCompressedOutput()) 
    306 // //        field->outputCompressedField(fieldData); 
    307 // //      else 
    308 //         this->outputField(fieldData); 
    309 //       sendUpdateData(fieldData); 
    310 //     } 
    311 //     if (!context->hasClient && context->hasServer) 
    312 //     { 
    313 //       writeField(); 
    314 //     } 
    315  
    316 //     lastlast_Write_srv = last_Write_srv; 
    317  
    318 //   } 
    319261 
    320262  void CField::writeField(void) 
     
    331273  } 
    332274 
    333   void CField::sendReadDataRequest(const CDate& tsDataRequested) 
     275  bool CField::sendReadDataRequest(const CDate& tsDataRequested) 
    334276  { 
    335277    CContext* context = CContext::getCurrent(); 
     
    339281    lastDataRequestedFromServer = tsDataRequested; 
    340282 
    341     CEventClient event(getType(), EVENT_ID_READ_DATA); 
    342     if (client->isServerLeader()) 
     283    if (!isEOF) // No need to send the request if we already know we are at EOF 
    343284    { 
    344       CMessage msg; 
    345       msg << getId(); 
    346       const std::list<int>& ranks = client->getRanksServerLeader(); 
    347       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    348         event.push(*itRank, 1, msg); 
    349       client->sendEvent(event); 
    350     } 
    351     else client->sendEvent(event); 
     285      CEventClient event(getType(), EVENT_ID_READ_DATA); 
     286      if (client->isServerLeader()) 
     287      { 
     288        CMessage msg; 
     289        msg << getId(); 
     290        const std::list<int>& ranks = client->getRanksServerLeader(); 
     291        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
     292          event.push(*itRank, 1, msg); 
     293        client->sendEvent(event); 
     294      } 
     295      else client->sendEvent(event); 
     296    } 
     297    else 
     298      serverSourceFilter->signalEndOfStream(tsDataRequested); 
     299 
     300    return !isEOF; 
    352301  } 
    353302 
     
    361310 
    362311    bool dataRequested = false; 
     312 
    363313    while (currentDate >= lastDataRequestedFromServer) 
    364314    { 
     
    368318      info(20) << "lastDataRequestedFromServer + file->output_freq.getValue() : " << lastDataRequestedFromServer + file->output_freq << endl ; 
    369319 
    370       sendReadDataRequest(lastDataRequestedFromServer + file->output_freq); 
    371  
    372       dataRequested = true; 
    373     } 
     320      dataRequested |= sendReadDataRequest(lastDataRequestedFromServer + file->output_freq); 
     321    } 
     322 
    374323    return dataRequested; 
    375324  } 
     
    386335  { 
    387336    CContext* context = CContext::getCurrent(); 
    388     CContextClient* client = context->client;     
     337    CContextClient* client = context->client; 
    389338 
    390339    CEventClient event(getType(), EVENT_ID_READ_DATA_READY); 
     
    393342    bool hasData = readField(); 
    394343 
    395      
     344    map<int, CArray<double,1> >::iterator it; 
    396345    if (!grid->doGridHaveDataDistributed()) 
    397346    { 
     
    413362            } 
    414363          } 
    415  
    416364          client->sendEvent(event); 
    417365       } 
     
    500448    std::map<int, CArray<double,1> > data; 
    501449 
    502     bool isEOF = false; 
    503  
    504450    for (int i = 0; i < ranks.size(); i++) 
    505451    { 
     
    585531   //---------------------------------------------------------------- 
    586532 
    587    bool CField::isActive(void) const 
    588    { 
    589       return (instantDataFilter != NULL); 
     533   bool CField::isActive(bool atCurrentTimestep /*= false*/) const 
     534   { 
     535      if (clientSourceFilter) 
     536        return atCurrentTimestep ? clientSourceFilter->isDataExpected(CContext::getCurrent()->getCalendar()->getCurrentDate()) : true; 
     537      else if (storeFilter) 
     538        return true; 
     539      else if (instantDataFilter) 
     540        ERROR("bool CField::isActive(bool atCurrentTimestep)", 
     541              << "Impossible to check if field [ id = " << getId() << " ] is active as it cannot be used to receive nor send data."); 
     542 
     543      return false; 
    590544   } 
    591545 
     
    815769          if (hasDirectFieldReference()) getDirectFieldReference()->solveAllReferenceEnabledField(false); 
    816770        } 
    817        else if (context->hasServer)         
     771        else if (context->hasServer) 
    818772          solveServerOperation(); 
    819773 
     
    940894       { 
    941895         // Check if we have an expression to parse 
    942        if (hasExpression()) 
     896         if (hasExpression()) 
    943897         { 
    944          boost::scoped_ptr<IFilterExprNode> expr(parseExpr(getExpression() + '\0')); 
    945          boost::shared_ptr<COutputPin> filter = expr->reduce(gc, *this); 
    946  
    947          // Check if a spatial transformation is needed 
    948          if (!field_ref.isEmpty()) 
    949          { 
    950            CGrid* gridRef = CField::get(field_ref)->grid; 
    951  
    952            if (grid && grid != gridRef && grid->hasTransform()) 
     898           boost::scoped_ptr<IFilterExprNode> expr(parseExpr(getExpression() + '\0')); 
     899           boost::shared_ptr<COutputPin> filter = expr->reduce(gc, *this); 
     900 
     901           // Check if a spatial transformation is needed 
     902           if (!field_ref.isEmpty()) 
    953903           { 
    954              double defaultValue = !default_value.isEmpty() ? default_value : 0.0; 
    955              std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters = CSpatialTransformFilter::buildFilterGraph(gc, gridRef, grid, defaultValue); 
    956  
    957              filter->connectOutput(filters.first, 0); 
    958              filter = filters.second; 
     904             CGrid* gridRef = CField::get(field_ref)->grid; 
     905 
     906             if (grid && grid != gridRef && grid->hasTransform()) 
     907             { 
     908               bool hasMissingValue = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 
     909               double defaultValue  = hasMissingValue ? default_value : (!default_value.isEmpty() ? default_value : 0.0); 
     910               std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters = CSpatialTransformFilter::buildFilterGraph(gc, gridRef, grid, hasMissingValue, defaultValue); 
     911 
     912               filter->connectOutput(filters.first, 0); 
     913               filter = filters.second; 
     914             } 
    959915           } 
    960          } 
    961  
    962          instantDataFilter = filter; 
     916 
     917           instantDataFilter = filter; 
    963918         } 
    964919         // Check if we have a reference on another field 
     
    967922         // Check if the data is to be read from a file 
    968923         else if (file && !file->mode.isEmpty() && file->mode == CFile::mode_attr::read) 
    969          instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 
    970                                                                                                      freq_offset.isEmpty() ? NoneDu : freq_offset, 
    971                                                                                                      true)); 
     924           instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 
     925                                                                                                       freq_offset.isEmpty() ? NoneDu : freq_offset, 
     926                                                                                                       true)); 
    972927         else // The data might be passed from the model 
    973          instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 
    974        } 
    975  
    976        // If the field data is to be read by the client or/and written to a file 
    977        if (enableOutput && !storeFilter && !fileWriterFilter) 
    978        { 
    979          if (!read_access.isEmpty() && read_access) 
    980928         { 
    981            storeFilter = boost::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid)); 
    982            instantDataFilter->connectOutput(storeFilter, 0); 
     929            bool ignoreMissingValue = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 
     930            double defaultValue  = ignoreMissingValue ? default_value : (!default_value.isEmpty() ? default_value : 0.0); 
     931            instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, NoneDu, false, 
     932                                                                                                        ignoreMissingValue, defaultValue)); 
    983933         } 
    984  
    985          if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 
    986          { 
    987            fileWriterFilter = boost::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this)); 
    988            getTemporalDataFilter(gc, file->output_freq)->connectOutput(fileWriterFilter, 0); 
    989          } 
    990        } 
    991      } 
    992    } 
    993  
     934       } 
     935     } 
     936 
     937     // If the field data is to be read by the client or/and written to a file 
     938     if (enableOutput && !storeFilter && !fileWriterFilter) 
     939     { 
     940       if (!read_access.isEmpty() && read_access) 
     941       { 
     942         storeFilter = boost::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid)); 
     943         instantDataFilter->connectOutput(storeFilter, 0); 
     944       } 
     945 
     946       if (file && (file->mode.isEmpty() || file->mode == CFile::mode_attr::write)) 
     947       { 
     948         fileWriterFilter = boost::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this)); 
     949         getTemporalDataFilter(gc, file->output_freq)->connectOutput(fileWriterFilter, 0); 
     950       } 
     951     } 
     952   } 
    994953 
    995954   /*! 
     
    1000959    * \return the output pin corresponding to the field reference 
    1001960    */ 
    1002      boost::shared_ptr<COutputPin> CField::getFieldReference(CGarbageCollector& gc) 
    1003      { 
    1004        if (instantDataFilter || field_ref.isEmpty()) 
    1005          ERROR("COutputPin* CField::getFieldReference(CGarbageCollector& gc)", 
    1006                "Impossible to get the field reference for a field which has already been parsed or which does not have a field_ref."); 
    1007  
    1008        CField* fieldRef = CField::get(field_ref); 
    1009        fieldRef->buildFilterGraph(gc, false); 
    1010  
    1011        std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters; 
    1012        // Check if a spatial transformation is needed 
    1013        if (grid && grid != fieldRef->grid && grid->hasTransform()) 
    1014        { 
    1015          double defaultValue = !default_value.isEmpty() ? default_value : 0.0; 
    1016          filters = CSpatialTransformFilter::buildFilterGraph(gc, fieldRef->grid, grid, defaultValue); 
    1017        } 
    1018        else 
    1019          filters.first = filters.second = boost::shared_ptr<CFilter>(new CPassThroughFilter(gc)); 
    1020  
    1021        fieldRef->getInstantDataFilter()->connectOutput(filters.first, 0); 
    1022  
    1023        return filters.second; 
    1024      } 
     961   boost::shared_ptr<COutputPin> CField::getFieldReference(CGarbageCollector& gc) 
     962   { 
     963     if (instantDataFilter || field_ref.isEmpty()) 
     964       ERROR("COutputPin* CField::getFieldReference(CGarbageCollector& gc)", 
     965             "Impossible to get the field reference for a field which has already been parsed or which does not have a field_ref."); 
     966 
     967     CField* fieldRef = CField::get(field_ref); 
     968     fieldRef->buildFilterGraph(gc, false); 
     969 
     970     std::pair<boost::shared_ptr<CFilter>, boost::shared_ptr<CFilter> > filters; 
     971     // Check if a spatial transformation is needed 
     972     if (grid && grid != fieldRef->grid && grid->hasTransform()) 
     973     {        
     974       bool hasMissingValue = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 
     975       double defaultValue  = hasMissingValue ? default_value : (!default_value.isEmpty() ? default_value : 0.0);                                 
     976       filters = CSpatialTransformFilter::buildFilterGraph(gc, fieldRef->grid, grid, hasMissingValue, defaultValue); 
     977     } 
     978     else 
     979       filters.first = filters.second = boost::shared_ptr<CFilter>(new CPassThroughFilter(gc)); 
     980 
     981     fieldRef->getInstantDataFilter()->connectOutput(filters.first, 0); 
     982 
     983     return filters.second; 
     984   } 
    1025985 
    1026986   /*! 
     
    10581018       { 
    10591019         if (!clientSourceFilter) 
    1060            clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 
     1020         { 
     1021           bool ignoreMissingValue = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 
     1022           double defaultValue  = ignoreMissingValue ? default_value : (!default_value.isEmpty() ? default_value : 0.0);  
     1023           clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, NoneDu, false, 
     1024                                                                                   ignoreMissingValue, defaultValue)); 
     1025         } 
    10611026 
    10621027         selfReferenceFilter = clientSourceFilter; 
     
    11041069     return it->second; 
    11051070   } 
     1071 
     1072  /*! 
     1073    * Returns the temporal filter corresponding to the field's temporal operation 
     1074    * for the specified operation frequency. 
     1075    * 
     1076    * \param gc the garbage collector to use 
     1077    * \param outFreq the operation frequency, i.e. the frequency at which the output data will be computed 
     1078    * \return the output pin corresponding to the requested temporal filter 
     1079    */ 
     1080    
     1081   boost::shared_ptr<COutputPin> CField::getSelfTemporalDataFilter(CGarbageCollector& gc, CDuration outFreq) 
     1082   { 
     1083     if (instantDataFilter || !hasExpression()) 
     1084       ERROR("COutputPin* CField::getSelfTemporalDataFilter(CGarbageCollector& gc)", 
     1085             "Impossible to add a self reference to a field which has already been parsed or which does not have an expression."); 
     1086 
     1087     if (!selfReferenceFilter) getSelfReference(gc) ; 
     1088 
     1089     if (serverSourceFilter || clientSourceFilter) 
     1090     { 
     1091       if (operation.isEmpty()) 
     1092         ERROR("void CField::getSelfTemporalDataFilter(CGarbageCollector& gc, CDuration outFreq)", 
     1093               << "An operation must be defined for field \"" << getId() << "\"."); 
     1094 
     1095       if (freq_op.isEmpty()) freq_op.setValue(TimeStep); 
     1096       if (freq_offset.isEmpty()) freq_offset.setValue(NoneDu); 
     1097 
     1098       const bool ignoreMissingValue = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 
     1099 
     1100       boost::shared_ptr<CTemporalFilter> temporalFilter(new CTemporalFilter(gc, operation, 
     1101                                                                             CContext::getCurrent()->getCalendar()->getInitDate(), 
     1102                                                                             freq_op, freq_offset, outFreq, 
     1103                                                                             ignoreMissingValue, ignoreMissingValue ? default_value : 0.0)); 
     1104       selfReferenceFilter->connectOutput(temporalFilter, 0); 
     1105       return temporalFilter ; 
     1106     } 
     1107     else if (!field_ref.isEmpty()) 
     1108     { 
     1109       CField* fieldRef = CField::get(field_ref); 
     1110       fieldRef->buildFilterGraph(gc, false);  
     1111       return fieldRef->getTemporalDataFilter(gc, outFreq) ; 
     1112     } 
     1113  } 
    11061114 
    11071115   //---------------------------------------------------------------- 
Note: See TracChangeset for help on using the changeset viewer.