Ignore:
Timestamp:
09/16/20 18:34:23 (4 years ago)
Author:
ymipsl
Message:

Big update on on going work related to data distribution and transfer between clients and servers.
Revisite of the source and store filter using "connectors".

-> inputs work again

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_server_source_filter.cpp

    r1930 r1934  
    66#include "calendar_util.hpp" 
    77#include "context.hpp" 
     8#include "event_client.hpp" 
     9#include "timer.hpp" 
     10#include "tracer.hpp" 
    811#include <limits>  
    912 
     
    1417  { 
    1518    CContext* context = CContext::getCurrent(); 
     19    field_ = field ; 
    1620    grid_= field->getGrid(); 
    17     freqOp_ = field->fileIn_->output_freq ; 
     21    freqOp_ = field->getRelFile()->output_freq ; 
     22    client_= field->getRelFile()->getContextClient() ; 
    1823    lastDateReceived_ = context->getCalendar()->getInitDate(); 
    1924    offset_ = field->freq_offset ; 
     
    3944      if (!wasEOF) dateEOF_ = lastDateReceived_; 
    4045      packet->status = CDataPacket::END_OF_STREAM; 
     46      info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ; 
     47      info(20)<<"lastDateReceived_ "<<lastDateReceived_<< "  date "<<packet->date<<"  ----> EOF"<<endl;  
     48 
    4149    } 
    4250    else  
    4351    { 
    44       grid_->getServerFromClientConnector()->transfer(event, packet->data) ; 
     52      CContextClient* client = event.getContextServer()->getAssociatedClient() ; 
     53      grid_->getClientFromServerConnector(client)->transfer(event, packet->data) ; // to avoid to make a search in map for corresponding client connector,  
     54      
     55      info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ; 
     56      info(20)<<"lastDateReceived_ "<<lastDateReceived_<< "  date "<<packet->date<<endl;                                                                                    // make a registration at initialization once 
    4557      packet->status = CDataPacket::NO_ERROR; 
    4658    } 
     
    4961  } 
    5062  
     63  int CClientFromServerSourceFilter::sendReadDataRequest(const CDate& tsDataRequested) 
     64  { 
     65    CContext* context = CContext::getCurrent(); 
     66    lastDataRequestedFromServer_ = tsDataRequested; 
     67 
     68    // No need to send the request if we are sure that we are already at EOF 
     69    if (!isEOF_ || context->getCalendar()->getCurrentDate() <= dateEOF_) 
     70    { 
     71      CEventClient event(field_->getType(), CField::EVENT_ID_READ_DATA); 
     72      if (client_->isServerLeader()) 
     73      { 
     74        CMessage msg; 
     75        msg << field_->getId(); 
     76        for(auto& rank : client_->getRanksServerLeader()) event.push(rank, 1, msg); 
     77        client_->sendEvent(event); 
     78      } 
     79      else client_->sendEvent(event); 
     80    } 
     81    else  
     82    { 
     83      CDataPacketPtr packet(new CDataPacket); 
     84      packet->date = tsDataRequested; 
     85      packet->timestamp = packet->date ; 
     86      packet->status = CDataPacket::END_OF_STREAM; 
     87      onOutputReady(packet); 
     88    } 
     89 
     90    wasDataRequestedFromServer_ = true; 
     91 
     92    return !isEOF_; 
     93  } 
     94 
     95  bool CClientFromServerSourceFilter::sendReadDataRequestIfNeeded(void) 
     96  TRY 
     97  { 
     98    const CDate& currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate(); 
     99 
     100    bool dataRequested = false; 
     101 
     102    while (currentDate >= lastDataRequestedFromServer_) 
     103    { 
     104      info(20) << "currentDate : " << currentDate << endl ; 
     105      info(20) << "Field : " << field_->getId() << endl ; 
     106      info(20) << "lastDataRequestedFromServer : " << lastDataRequestedFromServer_ << endl ; 
     107      info(20) << "freqOp : " << freqOp_ << endl ; 
     108      info(20) << "lastDataRequestedFromServer + fileIn_->output_freq.getValue() : " << lastDataRequestedFromServer_ + freqOp_ << endl ; 
     109 
     110      dataRequested |= sendReadDataRequest(lastDataRequestedFromServer_ + freqOp_); 
     111    } 
     112 
     113    return dataRequested; 
     114  } 
     115  CATCH 
     116 
     117  void CClientFromServerSourceFilter::checkForLateData(void) 
     118  TRY 
     119  { 
     120    CContext* context = CContext::getCurrent(); 
     121    // Check if data previously requested has been received as expected 
     122    if (wasDataRequestedFromServer_ && ! isEOF_) 
     123    { 
     124      CTimer timer("CClientFromServerSourceFilter::checkForLateDataFromServer"); 
     125      timer.resume(); 
     126      traceOff() ; 
     127      timer.suspend(); 
     128       
     129      bool isLate; 
     130      do 
     131      { 
     132        isLate = isDataLate(); 
     133        if (isLate) 
     134        { 
     135          timer.resume(); 
     136          context->globalEventLoop(); 
     137          timer.suspend(); 
     138        } 
     139      } 
     140      while (isLate && timer.getCumulatedTime() < CXios::recvFieldTimeout); 
     141      timer.resume(); 
     142      traceOn() ; 
     143      timer.suspend() ; 
     144 
     145 
     146      if (isLate) 
     147        ERROR("void CClientFromServerSourceFilter::checkForLateDataFromServer(void)", 
     148              << "Late data at timestep = " << context->getCalendar()->getCurrentDate()); 
     149    } 
     150  } 
     151  CATCH 
     152 
     153   
    51154  bool CClientFromServerSourceFilter::isDataLate(void) 
    52155  { 
     
    60163     
    61164  } 
     165 
     166 
    62167} // namespace xios 
Note: See TracChangeset for help on using the changeset viewer.