Ignore:
Timestamp:
05/12/20 11:52:13 (4 years ago)
Author:
ymipsl
Message:

XIOS coupling branch
Some updates.

First coupling test is beginning to work...

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp

    r1872 r1875  
    133133          return true; 
    134134          break; 
    135  
     135      
     136        case EVENT_ID_GRID_COMPLETED : 
     137          recvGridCompleted(event); 
     138          return true; 
     139          break; 
    136140        default : 
    137141          ERROR("bool CField::dispatchEvent(CEventServer& event)", << "Unknown Event"); 
     
    142146  CATCH 
    143147 
    144   void CField::sendUpdateData(const CArray<double,1>& data, CContextClient* client) 
     148  void CField::sendUpdateData(Time timeStamp, const CArray<double,1>& data, CContextClient* client) 
    145149  TRY 
    146150  { 
     
    169173            for (int n = 0; n < data_tmp.numElements(); n++) data_tmp(n) = data(index(n)); 
    170174 
    171             list_msg.back() << getId() << data_tmp; 
     175            list_msg.back() << getId() << timeStamp << data_tmp; 
    172176            event.push(rank, 1, list_msg.back()); 
    173177          } 
     
    189193        for (int n = 0; n < data_tmp.numElements(); n++) data_tmp(n) = data(index(n)); 
    190194 
    191         list_msg.back() << getId() << data_tmp; 
     195        list_msg.back() << getId() << timeStamp << data_tmp; 
    192196        event.push(rank, grid_->nbSenders_[receiverSize][rank], list_msg.back()); 
    193197      } 
     
    219223  CATCH 
    220224 
     225 
    221226  void  CField::recvUpdateData(std::map<int,CBufferIn*>& rankBuffers) 
    222227  TRY 
    223228  { 
     229    if (hasCouplerIn()) recvUpdateDataFromCoupler(rankBuffers) ; 
     230    else recvUpdateDataFromClient(rankBuffers) ; 
     231  } 
     232  CATCH 
     233 
     234  void  CField::recvUpdateDataFromClient(std::map<int,CBufferIn*>& rankBuffers) 
     235  TRY 
     236  { 
    224237    CContext* context = CContext::getCurrent(); 
    225  
     238    Time timeStamp ; 
    226239    size_t sizeData = 0; 
    227240    if (0 == recvDataSrv.numElements()) 
     
    249262        CArray<double,1> tmp; 
    250263        CArray<size_t,1>& indexTmp = it->second; 
    251         *(rankBuffers[it->first]) >> tmp; 
     264        *(rankBuffers[it->first]) >> timeStamp >> tmp; 
    252265        for (int idx = 0; idx < indexTmp.numElements(); ++idx) recv_data_tmp(indexTmp(idx)) = tmp(idx); 
    253266      } 
     
    552565  CATCH 
    553566 
     567   
     568  void CField::recvUpdateDataFromCoupler(std::map<int,CBufferIn*>& rankBuffers) 
     569  TRY 
     570  { 
     571    CContext* context = CContext::getCurrent(); 
     572    Time timeStamp ; 
     573    if (wasDataAlreadyReceivedFromServer) 
     574    {   
     575      lastDataReceivedFromServer = lastDataReceivedFromServer + freq_op; 
     576    } 
     577    else 
     578    { 
     579      // unlikely to input from file server where data are received at ts=0 
     580      // for coupling, it would be after the first freq_op, because for now we don't have 
     581      // restart mecanism to send the value at ts=0. It mus be changed in future 
     582      lastDataReceivedFromServer = context->getCalendar()->getInitDate()+freq_op; 
     583      wasDataAlreadyReceivedFromServer = true; 
     584    } 
     585 
     586    CArray<int,1>& storeClient = grid_->getStoreIndex_client(); 
     587    CArray<double,1> recv_data_tmp(storeClient.numElements());   
     588 
     589    auto& outLocalIndexStoreOnClient = grid_-> getOutLocalIndexStoreOnClient() ; 
     590    for (auto it = outLocalIndexStoreOnClient.begin(); it != outLocalIndexStoreOnClient.end(); ++it) 
     591    { 
     592      CArray<double,1> tmp; 
     593      CArray<size_t,1>& indexTmp = it->second; 
     594      *(rankBuffers[it->first]) >> timeStamp >> tmp; 
     595      for (int idx = 0; idx < indexTmp.numElements(); ++idx) recv_data_tmp(indexTmp(idx)) = tmp(idx); 
     596    } 
     597     
     598    clientSourceFilter->streamData(lastDataReceivedFromServer, recv_data_tmp); 
     599     
     600  } 
     601  CATCH_DUMP_ATTR 
    554602  /*! 
    555603    Receive read data from server 
     
    596644  } 
    597645  CATCH_DUMP_ATTR 
     646 
     647  void CField::checkForLateDataFromCoupler(void) 
     648  TRY 
     649  { 
     650    CContext* context = CContext::getCurrent(); 
     651    const CDate& currentDate = context->getCalendar()->getCurrentDate(); 
     652 
     653    CTimer timer("CField::checkForLateDataFromCoupler"); 
     654    timer.resume(); 
     655    traceOff() ; 
     656    timer.suspend(); 
     657       
     658    bool isDataLate; 
     659    do 
     660    { 
     661      const CDate nextDataDue = wasDataAlreadyReceivedFromServer ? (lastDataReceivedFromServer + freq_op) : context->getCalendar()->getInitDate(); 
     662      isDataLate = (nextDataDue <= currentDate); 
     663 
     664      if (isDataLate) 
     665      { 
     666        timer.resume(); 
     667//ym          context->checkBuffersAndListen(); 
     668//ym            context->eventLoop(); 
     669        context->globalEventLoop(); 
     670 
     671        timer.suspend(); 
     672      } 
     673    } while (isDataLate && timer.getCumulatedTime() < CXios::recvFieldTimeout); 
     674     
     675    timer.resume(); 
     676    traceOn() ; 
     677    timer.suspend() ; 
     678 
     679    if (isDataLate) ERROR("void CField::checkForLateDataFromCoupler(void)", 
     680                            << "Late data at timestep = " << currentDate); 
     681  } 
     682  CATCH_DUMP_ATTR 
     683 
    598684 
    599685  void CField::checkForLateDataFromServer(void) 
     
    640726  } 
    641727  CATCH_DUMP_ATTR 
     728 
     729  void CField::triggerLateField(void) 
     730  TRY 
     731  { 
     732    if (hasFileIn())  
     733    { 
     734      checkForLateDataFromServer() ; 
     735      serverSourceFilter->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 
     736    }  
     737    else if (hasCouplerIn()) 
     738    { 
     739      checkForLateDataFromCoupler() ; 
     740      clientSourceFilter->trigger(CContext::getCurrent()->getCalendar()->getCurrentDate()) ; 
     741    } 
     742  } 
     743  CATCH_DUMP_ATTR 
     744 
    642745 
    643746  void CField::checkIfMustAutoTrigger(void) 
     
    11401243  { 
    11411244    if (buildWorkflowGraphDone_) return true ; 
     1245     
    11421246    const bool detectMissingValues = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 
    11431247    const double defaultValue  = detectMissingValues ? default_value : (!default_value.isEmpty() ? default_value : 0.0); 
     
    11521256    } 
    11531257 
     1258    // now construct grid and check if element are enabled 
     1259    solveGridReference() ; // grid_ is now defined 
     1260    if (!isGridCompleted()) return false; 
     1261 
    11541262    // Check if we have an expression to parse 
    11551263    std::shared_ptr<COutputPin> filterExpr ; 
     
    11611269    } 
    11621270     
    1163     // now construct grid and check if element are enabled 
    1164     solveGridReference() ; // grid_ is now defined 
    1165      
    11661271    // prepare transformation. Need to know before if workflow of auxillary field can be built 
    11671272    if (hasDirectFieldReference()) 
     
    11731278      for(auto grid : gridPath) 
    11741279      { 
    1175         if (!grid->checkIfCompleted()) return false ; 
    11761280        grid->solveElementsRefInheritance() ; 
    1177         grid_->completeGrid(gridSrc); // grid generation, to be checked 
     1281        grid->completeGrid(gridSrc); // grid generation, to be checked 
     1282        grid->checkElementsAttributes() ; 
    11781283        grid->prepareTransformGrid(gridSrc) ; // prepare the grid tranformation 
    11791284        for(auto fieldId : grid->getAuxInputTransformGrid()) // try to build workflow graph for auxillary field tranformation 
     
    12051310    else  
    12061311    { 
    1207       if (!grid_->checkIfCompleted()) return false ; 
    1208        
    12091312      if (hasFileIn()) // input file, attemp to read the grid from file 
    12101313      { 
     
    12211324         instantDataFilter=inputFilter ; 
    12221325      }   
    1223       else  
     1326      else if (hasCouplerIn()) 
     1327      { 
     1328        grid_->checkElementsAttributes() ; 
     1329        instantDataFilter=inputFilter ; 
     1330      } 
     1331      else 
    12241332      { 
    12251333        setModelIn() ; // no reference, the field is potentially a source field from model 
     
    12561364    // insert temporal filter before sending to files 
    12571365    getTemporalDataFilter(gc, fileOut_->output_freq)->connectOutput(fileWriterFilter, 0); 
     1366  }  
     1367 
     1368  void CField::connectToCouplerOut(CGarbageCollector& gc) 
     1369  { 
     1370    // insert temporal filter before sending to files 
     1371    fileWriterFilter = std::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this, client)); 
     1372    instantDataFilter->connectOutput(fileWriterFilter, 0); 
    12581373  }  
    12591374 
     
    13031418    serverSourceFilter -> connectOutput(inputFilter,0) ; 
    13041419  }  
     1420 
     1421  /*! 
     1422   * Connect field to a source filter to receive data from coupler (on client side). 
     1423   */ 
     1424   void CField::connectToCouplerIn(CGarbageCollector& gc) 
     1425  { 
     1426    CContext* context = CContext::getCurrent(); 
     1427 
     1428    if (freq_op.isEmpty()) freq_op.setValue(TimeStep); 
     1429    if (freq_offset.isEmpty()) freq_offset.setValue(freq_op.getValue() - TimeStep); 
     1430 
     1431    freq_operation_srv = freq_op ; 
     1432    last_operation_srv = context->getCalendar()->getInitDate(); 
     1433    const CDuration toffset = freq_operation_srv - freq_offset.getValue() - context->getCalendar()->getTimeStep(); 
     1434    last_operation_srv     = last_operation_srv - toffset; 
     1435 
     1436    clientSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, false, false, freq_offset, true)) ; 
     1437    clientSourceFilter -> connectOutput(inputFilter,0) ; 
     1438  }  
     1439 
    13051440 
    13061441  /*! 
     
    20212156  CATCH_DUMP_ATTR 
    20222157 
    2023   CContextClient* CField::getContextClient() 
    2024   TRY 
    2025   { 
    2026     return client; 
    2027   } 
    2028   CATCH 
    2029  
    2030    
     2158  void CField::sendCloseDefinition(void) 
     2159  { 
     2160    CContext::getCurrent()->sendCloseDefinition(client) ; 
     2161  } 
     2162 
    20312163  void CField::sendFieldToFileServer(void) 
    20322164  { 
     
    20372169    this->sendAddAllVariables(client); 
    20382170  } 
     2171 
     2172  void CField::sendFieldToCouplerOut(void) 
     2173  { 
     2174    if (sendFieldToCouplerOut_done_) return ; 
     2175    else sendFieldToCouplerOut_done_=true ; 
     2176    grid_->sendGridToCouplerOut(client, this->getId()); 
     2177    this->sendGridCompleted(); 
     2178 
     2179  } 
    20392180   
     2181  void CField::makeGridAliasForCoupling(void)  
     2182  {  
     2183    grid_->makeAliasForCoupling(this->getId());  
     2184  } 
     2185 
     2186 //! Client side: Send a message  announcing that the grid definition has been received from a coupling context 
     2187   void CField::sendGridCompleted(void) 
     2188   TRY 
     2189   { 
     2190      CEventClient event(getType(),EVENT_ID_GRID_COMPLETED); 
     2191 
     2192      if (client->isServerLeader()) 
     2193      { 
     2194        CMessage msg; 
     2195        msg<<this->getId(); 
     2196        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg); 
     2197        client->sendEvent(event); 
     2198      } 
     2199      else client->sendEvent(event); 
     2200   } 
     2201   CATCH_DUMP_ATTR 
     2202 
     2203   //! Server side: Receive a message announcing that the grid definition has been received from a coupling context 
     2204   void CField::recvGridCompleted(CEventServer& event) 
     2205   TRY 
     2206   { 
     2207      CBufferIn* buffer=event.subEvents.begin()->buffer; 
     2208      string id; 
     2209      *buffer>>id ; 
     2210      get(id)->recvGridCompleted(*buffer); 
     2211   } 
     2212   CATCH 
     2213 
     2214   //! Server side: Receive a message  message  announcing that the grid definition has been received from a coupling context 
     2215   void CField::recvGridCompleted(CBufferIn& buffer) 
     2216   TRY 
     2217   { 
     2218      setGridCompleted() ; 
     2219   } 
     2220   CATCH_DUMP_ATTR 
     2221 
     2222 
     2223 
     2224 
     2225 
    20402226  void CField::sendFieldToInputFileServer(void) 
    20412227  { 
     
    21452331  TRY 
    21462332  { 
    2147     bool isFieldRead  = getRelFile() && !getRelFile()->mode.isEmpty() && getRelFile()->mode == CFile::mode_attr::read; 
    2148     bool isFieldWrite = getRelFile() && ( getRelFile()->mode.isEmpty() ||  getRelFile()->mode == CFile::mode_attr::write); 
    2149     if (isFieldRead && !(operation.getValue() == "instant" || operation.getValue() == "once") )      
     2333    if (hasFileIn() && !(operation.getValue() == "instant" || operation.getValue() == "once") )      
    21502334      ERROR("void CField::checkTimeAttributes(void)", 
    21512335         << "Unsupported operation for field '" << getFieldOutputName() << "'." << std::endl 
     
    21562340      if (operation.getValue() == "instant") 
    21572341      { 
    2158         if (isFieldRead || isFieldWrite) freq_op.setValue(getRelFile()->output_freq.getValue()); 
     2342        if (hasFileIn() || hasFileOut()) freq_op.setValue(getRelFile()->output_freq.getValue()); 
    21592343        else freq_op=*freqOp ; 
    21602344      } 
    21612345      else freq_op.setValue(TimeStep); 
    21622346    } 
    2163     if (freq_offset.isEmpty()) freq_offset.setValue(isFieldRead ? NoneDu : (freq_op.getValue() - TimeStep)); 
     2347    if (freq_offset.isEmpty()) freq_offset.setValue(hasFileIn() ? NoneDu : (freq_op.getValue() - TimeStep)); 
    21642348  } 
    21652349  CATCH_DUMP_ATTR 
Note: See TracChangeset for help on using the changeset viewer.