source: XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/file_reader_source_filter.cpp @ 1930

Last change on this file since 1930 was 1930, checked in by ymipsl, 4 years ago

Big update on on going work related to data distribution and transfer between clients and servers.
Revisite of the source and store filter using "connectors".

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 2.3 KB
Line 
1#include "file_reader_source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include "context.hpp"
6#include "field.hpp"
7#include "file.hpp"
8#include "context.hpp"
9
10
11namespace xios
12{
13  CFileReaderSourceFilter::CFileReaderSourceFilter(CGarbageCollector& gc, CField* field)
14    : COutputPin(gc)
15  {
16    field_ = field ;
17    grid_ = field->getGrid() ;
18    file_ = field->getFileIn() ;
19    if (!file_->cyclic.isEmpty()) isCyclic_ = file_->cyclic ;
20    if (!field_->scale_factor.isEmpty()) { hasScaleFactor_=true ; scaleFactor_ = field_->scale_factor ; }
21    if (!field_->add_offset.isEmpty()) { hasAddOffset_=true ; addOffset_ = field_->add_offset ; }
22  }
23
24  void CFileReaderSourceFilter::streamData()
25  {
26    Time timeStamp ;
27    CDataPacketPtr packet(new CDataPacket);
28    packet->date = CContext::getCurrent()->getCalendar()->getCurrentDate();
29    packet->timestamp = timeStamp;
30    packet->status = CDataPacket::NO_ERROR;
31
32    CArray<double,1> data ;
33    if (!isInitialized_)  initialize() ;
34    CField::EReadField readState = CField::RF_DATA;
35
36    if ( nStepMax_==0 || (nStep_ >= nStepMax_ && !isCyclic_)) readState = CField::RF_EOF;
37 
38    if (CField::RF_EOF != readState)
39    {
40      if (file_->isEmptyZone()) readData(data) ;
41      else readState = CField::RF_NODATA;
42    }
43    nStep_++ ;
44   
45    if (readState == CField::RF_DATA) packet->status = CDataPacket::NO_ERROR;
46    else packet->status = CDataPacket::END_OF_STREAM;
47           
48    onOutputReady(packet);
49  }
50
51  void CFileReaderSourceFilter::initialize()
52  {
53    CContext* context = CContext::getCurrent();
54    file_->initRead();
55    if (file_->isEmptyZone())
56    {     
57      file_->checkReadFile();
58      nStepMax_ = file_->getDataInput()->getFieldNbRecords(field_);
59      nStep_ = 0 ;
60    }
61    MPI_Allreduce(MPI_IN_PLACE, &nStepMax_, 1, MPI_INT, MPI_MAX, context->getIntraComm());
62  }
63
64  void CFileReaderSourceFilter::readData(CArray<double,1>& data)
65  {
66    CGridLocalConnector*  connector = grid_->getFullToWorkflowConnector() ;
67    CArray<double,1> dataIn(connector->getDstSize()) ;
68    file_->getDataInput()->readFieldData(field_, nStep_%nStepMax_, dataIn);
69    connector->transfer(dataIn, data) ; 
70
71    if (hasScaleFactor_ || hasAddOffset_) data = data * scaleFactor_ + addOffset_; // possibility of optimization
72  }
73 
74 
75} // namespace xios
Note: See TracBrowser for help on using the repository browser.