1 | #include "file_reader_source_filter.hpp" |
---|
2 | #include "grid.hpp" |
---|
3 | #include "exception.hpp" |
---|
4 | #include "calendar_util.hpp" |
---|
5 | #include "context.hpp" |
---|
6 | #include "field.hpp" |
---|
7 | #include "file.hpp" |
---|
8 | #include "context.hpp" |
---|
9 | #include "workflow_graph.hpp" |
---|
10 | |
---|
11 | namespace xios |
---|
12 | { |
---|
13 | CFileReaderSourceFilter::CFileReaderSourceFilter(CGarbageCollector& gc, CField* field) |
---|
14 | : COutputPin(gc) |
---|
15 | { |
---|
16 | field_ = field ; |
---|
17 | grid_ = field->getGrid() ; |
---|
18 | file_ = field->getFileIn() ; |
---|
19 | if (!file_->cyclic.isEmpty()) isCyclic_ = file_->cyclic ; |
---|
20 | if (!field_->scale_factor.isEmpty()) { hasScaleFactor_=true ; scaleFactor_ = field_->scale_factor ; } |
---|
21 | if (!field_->add_offset.isEmpty()) { hasAddOffset_=true ; addOffset_ = field_->add_offset ; } |
---|
22 | } |
---|
23 | |
---|
24 | void CFileReaderSourceFilter::streamData() |
---|
25 | { |
---|
26 | Time timeStamp ; |
---|
27 | CDataPacketPtr packet(new CDataPacket); |
---|
28 | packet->date = CContext::getCurrent()->getCalendar()->getCurrentDate(); |
---|
29 | packet->timestamp = timeStamp; |
---|
30 | packet->status = CDataPacket::NO_ERROR; |
---|
31 | |
---|
32 | if (!isInitialized_) initialize() ; |
---|
33 | CField::EReadField readState = CField::RF_DATA; |
---|
34 | if ( nStepMax_==0 || (nStep_ >= nStepMax_ && !isCyclic_)) readState = CField::RF_EOF; |
---|
35 | |
---|
36 | if (CField::RF_EOF != readState) |
---|
37 | { |
---|
38 | if (!file_->isEmptyZone()) readData(packet->data) ; |
---|
39 | else readState = CField::RF_NODATA; |
---|
40 | } |
---|
41 | nStep_++ ; |
---|
42 | |
---|
43 | if (readState == CField::RF_DATA) packet->status = CDataPacket::NO_ERROR; |
---|
44 | else packet->status = CDataPacket::END_OF_STREAM; |
---|
45 | |
---|
46 | info(20)<<"Read data from file : FieldId "<<field_->getId()<<" nStep "<<nStep_<<" date : "<<packet->date<<endl ; |
---|
47 | |
---|
48 | if(this->graphEnabled) |
---|
49 | { |
---|
50 | this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); |
---|
51 | CWorkflowGraph::addNode("File Reader Source filter", 1, false, 0, packet); |
---|
52 | } |
---|
53 | |
---|
54 | onOutputReady(packet); |
---|
55 | } |
---|
56 | |
---|
57 | void CFileReaderSourceFilter::initialize() |
---|
58 | { |
---|
59 | CContext* context = CContext::getCurrent(); |
---|
60 | file_->initRead(); |
---|
61 | if (!file_->isEmptyZone()) |
---|
62 | { |
---|
63 | file_->checkReadFile(); |
---|
64 | nStepMax_ = file_->getDataInput()->getFieldNbRecords(field_); |
---|
65 | nStep_ = file_->record_offset.isEmpty() ? 0 : file_->record_offset; ; |
---|
66 | } |
---|
67 | MPI_Allreduce(MPI_IN_PLACE, &nStepMax_, 1, MPI_INT, MPI_MAX, context->getIntraComm()); |
---|
68 | isInitialized_=true; |
---|
69 | } |
---|
70 | |
---|
71 | void CFileReaderSourceFilter::readData(CArray<double,1>& data) |
---|
72 | { |
---|
73 | shared_ptr<CGridLocalConnector> connector = grid_->getFullToWorkflowConnector() ; |
---|
74 | CArray<double,1> dataIn(connector->getSrcSize()) ; |
---|
75 | file_->getDataInput()->readFieldData(field_, nStep_%nStepMax_, dataIn); |
---|
76 | data.resize(connector->getDstSize()) ; |
---|
77 | connector->transfer(dataIn, data) ; |
---|
78 | |
---|
79 | if (hasScaleFactor_ || hasAddOffset_) data = data * scaleFactor_ + addOffset_; // possibility of optimization |
---|
80 | } |
---|
81 | |
---|
82 | |
---|
83 | } // namespace xios |
---|