source: XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/temporal_filter.cpp @ 2195

Last change on this file since 2195 was 2195, checked in by yushan, 3 years ago

workflow graph : enable temporal splitting filter

File size: 5.6 KB
RevLine 
[643]1#include "temporal_filter.hpp"
2#include "functor_type.hpp"
3#include "calendar_util.hpp"
[2143]4#include "workflow_graph.hpp"
[643]5
6namespace xios
7{
[1440]8  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData);
[1158]9
[643]10  CTemporalFilter::CTemporalFilter(CGarbageCollector& gc, const std::string& opId,
11                                   const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq,
[1440]12                                   bool ignoreMissingValue /*= false*/)
[643]13    : CFilter(gc, 1, this)
[1440]14    , functor(createFunctor(opId, ignoreMissingValue, tmpData))
[1158]15    , isOnceOperation(functor->timeType() == func::CFunctor::once)
16    , isInstantOperation(functor->timeType() == func::CFunctor::instant)
[1278]17    , samplingFreq(samplingFreq)
18    , samplingOffset(samplingOffset)
[643]19    , opFreq(opFreq)
[1472]20    , offsetMonth(0, this->samplingOffset.month, 0, 0, 0, 0, 0)
21    , offsetAllButMonth(this->samplingOffset.year, 0 , this->samplingOffset.day,
22                        this->samplingOffset.hour, this->samplingOffset.minute,
23                        this->samplingOffset.second, this->samplingOffset.timestep)
[1302]24    , initDate(initDate)
[1523]25//    , nextSamplingDate(initDate + (this->samplingOffset + initDate.getRelCalendar().getTimeStep()))
26    , nextSamplingDate(initDate + offsetMonth + ( offsetAllButMonth + initDate.getRelCalendar().getTimeStep()))
[1302]27    , nbOperationDates(1)
[1473]28    , nbSamplingDates(0)
[1302]29//    , nextOperationDate(initDate + opFreq + this->samplingOffset)
[643]30    , isFirstOperation(true)
[2143]31    , graphCycleCompleted(true)
[2193]32    , temporalOperation(opId)
[643]33  {
34  }
35
[2193]36  std::string CTemporalFilter::getTemporalOperation()
37  {
38    return this->temporalOperation;
39  }
40
[2143]41  void CTemporalFilter::buildWorkflowGraph(std::vector<CDataPacketPtr> data)
42  {
43    if(this->graphEnabled )
44    {
45      if(!data[0]->graphPackage)
46      {
47        data[0]->graphPackage = new CGraphDataPackage;
48      }
49     
50      if(graphCycleCompleted)
51      { 
52        this->graphPackage->filterId = CWorkflowGraph::getNodeSize();
[2193]53        CWorkflowGraph::addNode("Temporal filter \\n("+getTemporalOperation()+")", 3, false, 0, data[0]);
[2143]54        graphCycleCompleted = false;
55      }
56     
57      data[0]->graphPackage->currentField = this->graphPackage->inFields[0];
58      std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end());
59     
60      CWorkflowGraph::addEdge(data[0]->graphPackage->fromFilter, this->graphPackage->filterId, data[0]);
61      data[0]->graphPackage->fromFilter = this->graphPackage->filterId;
[2193]62      // this->graphPackage->sourceFilterIds.push_back(data[0]->graphPackage->fromFilter);
[2143]63      data[0]->graphPackage->currentField = this->graphPackage->inFields[0];
64      std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end());
65    }
66
67  }
[2195]68 
[643]69  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data)
70  {
[2143]71    buildWorkflowGraph(data);
72 
[643]73    CDataPacketPtr packet;
74
75    if (data[0]->status != CDataPacket::END_OF_STREAM)
76    {
[1158]77      bool usePacket, outputResult, copyLess;
78      if (isOnceOperation)
79        usePacket = outputResult = copyLess = isFirstOperation;
80      else
81      {
82        usePacket = (data[0]->date >= nextSamplingDate);
[1302]83        outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
[1158]84        copyLess = (isInstantOperation && usePacket && outputResult);
85      }
86
[643]87      if (usePacket)
88      {
[1473]89        nbSamplingDates ++;
[1158]90        if (!copyLess)
91        {
92          if (!tmpData.numElements())
93            tmpData.resize(data[0]->data.numElements());
[643]94
[1158]95          (*functor)(data[0]->data);
96        }
[643]97
[1473]98        nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep();
[643]99      }
100
101      if (outputResult)
102      {
[1302]103        nbOperationDates ++;
[1158]104        if (!copyLess)
105        {
106          functor->final();
[643]107
[1158]108          packet = CDataPacketPtr(new CDataPacket);
109          packet->date = data[0]->date;
110          packet->timestamp = data[0]->timestamp;
111          packet->status = data[0]->status;
112          packet->data.resize(tmpData.numElements());
113          packet->data = tmpData;
[2143]114          packet->graphPackage = data[0]->graphPackage;
[1158]115        }
116        else
117          packet = data[0];
[643]118
[1285]119        isFirstOperation = false;
[2143]120        graphCycleCompleted = true;
121     }
[643]122    }
123
124    return packet;
125  }
[1158]126
[1358]127  bool CTemporalFilter::mustAutoTrigger() const
128  {
129    return true;
130  }
131
[1158]132  bool CTemporalFilter::isDataExpected(const CDate& date) const
133  {
[1302]134//    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date + samplingFreq > nextOperationDate);
135    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
[1158]136  }
137
[1440]138  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData)
[1158]139  {
140    func::CFunctor* functor = NULL;
141
[1440]142    double defaultValue = std::numeric_limits<double>::quiet_NaN();
[1158]143
144#define DECLARE_FUNCTOR(MType, mtype) \
145    if (opId.compare(#mtype) == 0) \
146    { \
147      if (ignoreMissingValue) \
148      { \
149        functor = new func::C##MType(tmpData, defaultValue); \
150      } \
151      else \
152      { \
153        functor = new func::C##MType(tmpData); \
154      } \
155    }
156
157#include "functor_type.conf"
158
159    if (!functor)
160      ERROR("createFunctor(const std::string& opId, ...)",
161            << "\"" << opId << "\" is not a valid operation.");
162
163    return functor;
164  }
[643]165} // namespace xios
Note: See TracBrowser for help on using the repository browser.