source: XIOS/dev/dev_trunk_graph/src/filter/temporal_filter.cpp @ 2142

Last change on this file since 2142 was 2137, checked in by yushan, 3 years ago

temporal commit for merging graph into XIOS_coupling

File size: 5.7 KB
Line 
1#include "temporal_filter.hpp"
2#include "functor_type.hpp"
3#include "calendar_util.hpp"
4#include "workflow_graph.hpp"
5
6namespace xios
7{
8  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData);
9
10  CTemporalFilter::CTemporalFilter(CGarbageCollector& gc, const std::string& opId,
11                                   const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq,
12                                   bool ignoreMissingValue /*= false*/)
13    : CFilter(gc, 1, this)
14    , functor(createFunctor(opId, ignoreMissingValue, tmpData))
15    , isOnceOperation(functor->timeType() == func::CFunctor::once)
16    , isInstantOperation(functor->timeType() == func::CFunctor::instant)
17    , samplingFreq(samplingFreq)
18    , samplingOffset(samplingOffset)
19    , opFreq(opFreq)
20    , offsetMonth(0, this->samplingOffset.month, 0, 0, 0, 0, 0)
21    , offsetAllButMonth(this->samplingOffset.year, 0 , this->samplingOffset.day,
22                        this->samplingOffset.hour, this->samplingOffset.minute,
23                        this->samplingOffset.second, this->samplingOffset.timestep)
24    , initDate(initDate)
25//    , nextSamplingDate(initDate + (this->samplingOffset + initDate.getRelCalendar().getTimeStep()))
26    , nextSamplingDate(initDate + offsetMonth + ( offsetAllButMonth + initDate.getRelCalendar().getTimeStep()))
27    , nbOperationDates(1)
28    , nbSamplingDates(0)
29//    , nextOperationDate(initDate + opFreq + this->samplingOffset)
30    , isFirstOperation(true)
31    , graphCycleCompleted(true)
32  {
33  }
34
35  void CTemporalFilter::buildWorkflowGraph(std::vector<CDataPacketPtr> data)
36  {
37    if(this->graphEnabled )
38    {
39      if(!data[0]->graphPackage)
40      {
41        data[0]->graphPackage = new CGraphDataPackage;
42      }
43     
44      //if(this->graphPackage->sourceFilterIds.size()==0)
45      if(graphCycleCompleted)
46      { 
47        this->graphPackage->filterId = CWorkflowGraph::getNodeSize();
48        CWorkflowGraph::addNode("Temporal filter", 3, false, 0, data[0]);
49        graphCycleCompleted = false;
50      }
51     
52      data[0]->graphPackage->currentField = this->graphPackage->inFields[0];
53      std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end());
54     
55      CWorkflowGraph::addEdge(data[0]->graphPackage->fromFilter, this->graphPackage->filterId, data[0]);
56      data[0]->graphPackage->fromFilter = this->graphPackage->filterId;
57      this->graphPackage->sourceFilterIds.push_back(data[0]->graphPackage->fromFilter); 
58      data[0]->graphPackage->currentField = this->graphPackage->inFields[0];
59      std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end());
60    }
61
62  }
63  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data)
64  {
65    buildWorkflowGraph(data);
66 
67    CDataPacketPtr packet;
68
69    if (data[0]->status != CDataPacket::END_OF_STREAM)
70    {
71      bool usePacket, outputResult, copyLess;
72      if (isOnceOperation)
73        usePacket = outputResult = copyLess = isFirstOperation;
74      else
75      {
76        usePacket = (data[0]->date >= nextSamplingDate);
77//        outputResult = (data[0]->date + samplingFreq > nextOperationDate);
78        outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
79        copyLess = (isInstantOperation && usePacket && outputResult);
80      }
81
82      if (usePacket)
83      {
84        nbSamplingDates ++;
85        if (!copyLess)
86        {
87          if (!tmpData.numElements())
88            tmpData.resize(data[0]->data.numElements());
89
90          (*functor)(data[0]->data);
91        }
92
93        nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep();
94      }
95
96      if (outputResult)
97      {
98        nbOperationDates ++;
99        if (!copyLess)
100        {
101          functor->final();
102
103          packet = CDataPacketPtr(new CDataPacket);
104          packet->date = data[0]->date;
105          packet->timestamp = data[0]->timestamp;
106          packet->status = data[0]->status;
107          packet->data.resize(tmpData.numElements());
108          packet->data = tmpData;
109          packet->graphPackage = data[0]->graphPackage;
110        }
111        else
112          packet = data[0];
113
114        isFirstOperation = false;
115//        nextOperationDate = initDate + samplingFreq + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth;
116        graphCycleCompleted = true;
117     }
118    }
119
120    return packet;
121  }
122
123  bool CTemporalFilter::mustAutoTrigger() const
124  {
125    return true;
126  }
127
128  bool CTemporalFilter::isDataExpected(const CDate& date) const
129  {
130//    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date + samplingFreq > nextOperationDate);
131    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
132  }
133
134  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData)
135  {
136    func::CFunctor* functor = NULL;
137
138    double defaultValue = std::numeric_limits<double>::quiet_NaN();
139
140#define DECLARE_FUNCTOR(MType, mtype) \
141    if (opId.compare(#mtype) == 0) \
142    { \
143      if (ignoreMissingValue) \
144      { \
145        functor = new func::C##MType(tmpData, defaultValue); \
146      } \
147      else \
148      { \
149        functor = new func::C##MType(tmpData); \
150      } \
151    }
152
153#include "functor_type.conf"
154
155    if (!functor)
156      ERROR("createFunctor(const std::string& opId, ...)",
157            << "\"" << opId << "\" is not a valid operation.");
158
159    return functor;
160  }
161} // namespace xios
Note: See TracBrowser for help on using the repository browser.