source: XIOS3/branches/xios-3.0-beta/src/filter/temporal_filter.cpp @ 2427

Last change on this file since 2427 was 2427, checked in by jderouillat, 19 months ago

Backport the system to log the memory consumption (commit ID [2418-2420,2425-2426])

File size: 5.7 KB
RevLine 
[643]1#include "temporal_filter.hpp"
2#include "functor_type.hpp"
3#include "calendar_util.hpp"
[2143]4#include "workflow_graph.hpp"
[2427]5#include "mem_checker.hpp"
[643]6
7namespace xios
8{
[1440]9  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData);
[1158]10
[643]11  CTemporalFilter::CTemporalFilter(CGarbageCollector& gc, const std::string& opId,
12                                   const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq,
[1440]13                                   bool ignoreMissingValue /*= false*/)
[643]14    : CFilter(gc, 1, this)
[1440]15    , functor(createFunctor(opId, ignoreMissingValue, tmpData))
[1158]16    , isOnceOperation(functor->timeType() == func::CFunctor::once)
17    , isInstantOperation(functor->timeType() == func::CFunctor::instant)
[1278]18    , samplingFreq(samplingFreq)
19    , samplingOffset(samplingOffset)
[643]20    , opFreq(opFreq)
[1472]21    , offsetMonth(0, this->samplingOffset.month, 0, 0, 0, 0, 0)
22    , offsetAllButMonth(this->samplingOffset.year, 0 , this->samplingOffset.day,
23                        this->samplingOffset.hour, this->samplingOffset.minute,
24                        this->samplingOffset.second, this->samplingOffset.timestep)
[1302]25    , initDate(initDate)
[1523]26//    , nextSamplingDate(initDate + (this->samplingOffset + initDate.getRelCalendar().getTimeStep()))
27    , nextSamplingDate(initDate + offsetMonth + ( offsetAllButMonth + initDate.getRelCalendar().getTimeStep()))
[1302]28    , nbOperationDates(1)
[1473]29    , nbSamplingDates(0)
[1302]30//    , nextOperationDate(initDate + opFreq + this->samplingOffset)
[643]31    , isFirstOperation(true)
[2143]32    , graphCycleCompleted(true)
[2193]33    , temporalOperation(opId)
[643]34  {
35  }
36
[2193]37  std::string CTemporalFilter::getTemporalOperation()
38  {
39    return this->temporalOperation;
40  }
41
[2143]42  void CTemporalFilter::buildWorkflowGraph(std::vector<CDataPacketPtr> data)
43  {
44    if(this->graphEnabled )
45    {
46      if(!data[0]->graphPackage)
47      {
48        data[0]->graphPackage = new CGraphDataPackage;
49      }
50     
51      if(graphCycleCompleted)
52      { 
53        this->graphPackage->filterId = CWorkflowGraph::getNodeSize();
[2193]54        CWorkflowGraph::addNode("Temporal filter \\n("+getTemporalOperation()+")", 3, false, 0, data[0]);
[2143]55        graphCycleCompleted = false;
56      }
57     
58      data[0]->graphPackage->currentField = this->graphPackage->inFields[0];
59      std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end());
60     
61      CWorkflowGraph::addEdge(data[0]->graphPackage->fromFilter, this->graphPackage->filterId, data[0]);
62      data[0]->graphPackage->fromFilter = this->graphPackage->filterId;
[2193]63      // this->graphPackage->sourceFilterIds.push_back(data[0]->graphPackage->fromFilter);
[2143]64      data[0]->graphPackage->currentField = this->graphPackage->inFields[0];
65      std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end());
66    }
67
68  }
[2195]69 
[643]70  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data)
71  {
[2143]72    buildWorkflowGraph(data);
73 
[643]74    CDataPacketPtr packet;
75
76    if (data[0]->status != CDataPacket::END_OF_STREAM)
77    {
[1158]78      bool usePacket, outputResult, copyLess;
79      if (isOnceOperation)
80        usePacket = outputResult = copyLess = isFirstOperation;
81      else
82      {
83        usePacket = (data[0]->date >= nextSamplingDate);
[1302]84        outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
[1158]85        copyLess = (isInstantOperation && usePacket && outputResult);
86      }
87
[643]88      if (usePacket)
89      {
[1473]90        nbSamplingDates ++;
[1158]91        if (!copyLess)
92        {
93          if (!tmpData.numElements())
94            tmpData.resize(data[0]->data.numElements());
[643]95
[1158]96          (*functor)(data[0]->data);
97        }
[643]98
[1473]99        nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep();
[643]100      }
101
102      if (outputResult)
103      {
[1302]104        nbOperationDates ++;
[1158]105        if (!copyLess)
106        {
107          functor->final();
[643]108
[1158]109          packet = CDataPacketPtr(new CDataPacket);
110          packet->date = data[0]->date;
111          packet->timestamp = data[0]->timestamp;
112          packet->status = data[0]->status;
113          packet->data.resize(tmpData.numElements());
114          packet->data = tmpData;
[2143]115          packet->graphPackage = data[0]->graphPackage;
[1158]116        }
117        else
118          packet = data[0];
[643]119
[2427]120        CMemChecker::logMem( "CTemporalFilter::apply" );
121
[1285]122        isFirstOperation = false;
[2143]123        graphCycleCompleted = true;
124     }
[643]125    }
126
127    return packet;
128  }
[1158]129
[1358]130  bool CTemporalFilter::mustAutoTrigger() const
131  {
132    return true;
133  }
134
[1158]135  bool CTemporalFilter::isDataExpected(const CDate& date) const
136  {
[1302]137//    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date + samplingFreq > nextOperationDate);
138    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
[1158]139  }
140
[1440]141  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData)
[1158]142  {
143    func::CFunctor* functor = NULL;
144
[1440]145    double defaultValue = std::numeric_limits<double>::quiet_NaN();
[1158]146
147#define DECLARE_FUNCTOR(MType, mtype) \
148    if (opId.compare(#mtype) == 0) \
149    { \
150      if (ignoreMissingValue) \
151      { \
152        functor = new func::C##MType(tmpData, defaultValue); \
153      } \
154      else \
155      { \
156        functor = new func::C##MType(tmpData); \
157      } \
158    }
159
160#include "functor_type.conf"
161
162    if (!functor)
163      ERROR("createFunctor(const std::string& opId, ...)",
164            << "\"" << opId << "\" is not a valid operation.");
165
166    return functor;
167  }
[643]168} // namespace xios
Note: See TracBrowser for help on using the repository browser.