Ignore:
Timestamp:
05/26/15 16:13:47 (9 years ago)
Author:
rlacroix
Message:

Add the infrastructure to request fields from the server.

This will be used to read input files so add a new file attribute mode to define whether data is written or read from a file.

Currently the data is not actually read and random data is transfered for those fields in read mode.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/node/field.cpp

    r595 r598  
    2828      , active(false) , hasOutputFile(false),hasFieldOut(false), slotUpdateDate(NULL) 
    2929      , processed(false), domAxisIds_("", ""), areAllReferenceSolved(false), areAllExpressionBuilt(false) 
     30      , isReadDataRequestPending(false) 
    3031      { setVirtualVariableGroup(); } 
    3132 
     
    4041      , active(false), hasOutputFile(false), hasFieldOut(false), slotUpdateDate(NULL) 
    4142      , processed(false), domAxisIds_("", ""), areAllReferenceSolved(false), areAllExpressionBuilt(false) 
     43      , isReadDataRequestPending(false) 
    4244   { setVirtualVariableGroup(); } 
    4345 
     
    112114   } 
    113115 
    114    bool CField::dispatchEvent(CEventServer& event) 
    115   { 
    116  
     116  bool CField::dispatchEvent(CEventServer& event) 
     117  { 
    117118    if (SuperClass::dispatchEvent(event)) return true; 
    118119    else 
     
    125126          break; 
    126127 
    127             case EVENT_ID_ADD_VARIABLE : 
    128              recvAddVariable(event); 
    129              return true; 
    130              break; 
    131  
    132            case EVENT_ID_ADD_VARIABLE_GROUP : 
    133              recvAddVariableGroup(event); 
    134              return true; 
    135              break; 
     128        case EVENT_ID_READ_DATA : 
     129          recvReadDataRequest(event); 
     130          return true; 
     131          break; 
     132 
     133        case EVENT_ID_READ_DATA_READY : 
     134          recvReadDataReady(event); 
     135          return true; 
     136          break; 
     137 
     138        case EVENT_ID_ADD_VARIABLE : 
     139          recvAddVariable(event); 
     140          return true; 
     141          break; 
     142 
     143        case EVENT_ID_ADD_VARIABLE_GROUP : 
     144          recvAddVariableGroup(event); 
     145          return true; 
     146          break; 
    136147 
    137148        default : 
     
    263274      } 
    264275    } 
     276  } 
     277 
     278  void CField::sendReadDataRequest(void) 
     279  { 
     280    CContext* context = CContext::getCurrent(); 
     281    CContextClient* client = context->client; 
     282 
     283    CEventClient event(getType(), EVENT_ID_READ_DATA); 
     284    if (client->isServerLeader()) 
     285    { 
     286      CMessage msg; 
     287      msg << getId(); 
     288      const std::list<int>& ranks = client->getRanksServerLeader(); 
     289      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
     290        event.push(*itRank, 1, msg); 
     291      client->sendEvent(event); 
     292    } 
     293    else client->sendEvent(event); 
     294 
     295    lastDataRequestedFromServer = context->getCalendar()->getCurrentDate(); 
     296    isReadDataRequestPending = true; 
     297  } 
     298 
     299  /*! 
     300  Send request new data read from file if need be, that is the current data is out-of-date. 
     301  \return true if and only if some data was requested 
     302  */ 
     303  bool CField::sendReadDataRequestIfNeeded(void) 
     304  { 
     305    const CDate& currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate(); 
     306 
     307    bool requestData = (currentDate >= lastDataRequestedFromServer + file->output_freq.getValue()); 
     308 
     309    if (requestData) 
     310      sendReadDataRequest(); 
     311 
     312    return requestData; 
     313  } 
     314 
     315  void CField::recvReadDataRequest(CEventServer& event) 
     316  { 
     317    CBufferIn* buffer = event.subEvents.begin()->buffer; 
     318    StdString fieldId; 
     319    *buffer >> fieldId; 
     320    get(fieldId)->recvReadDataRequest(); 
     321  } 
     322 
     323  void CField::recvReadDataRequest(void) 
     324  { 
     325    CContext* context = CContext::getCurrent(); 
     326    CContextClient* client = context->client; 
     327 
     328    CEventClient event(getType(), EVENT_ID_READ_DATA_READY); 
     329    std::list<CMessage> msgs; 
     330 
     331    // TODO: Read the data from the file 
     332    if (data_srv.empty()) 
     333    { 
     334      for (map<int, CArray<size_t, 1>* >::iterator it = grid->outIndexFromClient.begin(); it != grid->outIndexFromClient.end(); ++it) 
     335        data_srv.insert( pair<int, CArray<double,1>* >(it->first, new CArray<double,1>(it->second->numElements()))); 
     336    } 
     337 
     338    map<int, CArray<double,1>* >::iterator it; 
     339    for (it = data_srv.begin(); it != data_srv.end(); it++) 
     340    { 
     341      msgs.push_back(CMessage()); 
     342      CMessage& msg = msgs.back(); 
     343      msg << getId() << getNStep() << *it->second; 
     344      event.push(it->first, grid->nbSenders[it->first], msg); 
     345    } 
     346    client->sendEvent(event); 
     347 
     348    incrementNStep(); 
     349  } 
     350 
     351  void CField::recvReadDataReady(CEventServer& event) 
     352  { 
     353    string fieldId; 
     354    vector<int> ranks; 
     355    vector<CBufferIn*> buffers; 
     356 
     357    list<CEventServer::SSubEvent>::iterator it; 
     358    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 
     359    { 
     360      ranks.push_back(it->rank); 
     361      CBufferIn* buffer = it->buffer; 
     362      *buffer >> fieldId; 
     363      buffers.push_back(buffer); 
     364    } 
     365    get(fieldId)->recvReadDataReady(ranks, buffers); 
     366  } 
     367 
     368  void CField::recvReadDataReady(vector<int> ranks, vector<CBufferIn*> buffers) 
     369  { 
     370    CContext* context = CContext::getCurrent(); 
     371    StdSize record; 
     372    for (int i = 0; i < ranks.size(); i++) 
     373    { 
     374      int rank = ranks[i]; 
     375      CArray<int,1>& index = *grid->storeIndex_toSrv[rank]; 
     376      CArray<double,1> data_tmp(index.numElements()); 
     377      *buffers[i] >> record >> data_tmp; 
     378 
     379      for (int n = 0; n < data_tmp.numElements(); n++) 
     380        instantData(index(n)) = data_tmp(n); 
     381    } 
     382 
     383    for (list< pair<CField*, int> >::iterator it = fieldDependency.begin(); it != fieldDependency.end(); ++it) 
     384      it->first->setSlot(it->second); 
     385 
     386    if (!hasExpression) // Should be always true ? 
     387    { 
     388      const std::vector<CField*>& refField = getAllReference(); 
     389      std::vector<CField*>::const_iterator it = refField.begin(), end = refField.end(); 
     390 
     391      for (; it != end; it++) (*it)->setDataFromExpression(instantData); 
     392      if (hasFieldOut) updateDataFromExpression(instantData); 
     393    } 
     394 
     395    isReadDataRequestPending = false; 
    265396  } 
    266397 
     
    477608         const CDuration toffset = this->freq_operation - freq_offset.getValue() - context->getCalendar()->getTimeStep(); 
    478609         *this->last_operation   = *this->last_operation - toffset; 
     610 
     611         lastDataRequestedFromServer.setRelCalendar(*context->getCalendar()); 
    479612 
    480613        if (operation.get() == "once") isOnceOperation = true; 
Note: See TracChangeset for help on using the changeset viewer.