source: XIOS2/trunk/src/filter/temporal_filter.cpp @ 2428

Last change on this file since 2428 was 2428, checked in by jderouillat, 19 months ago

Backport the XIOS3 system to log the memory consumption (commit ID [2418-2420,2425-2426])

File size: 6.5 KB
Line 
1#include "temporal_filter.hpp"
2#include "functor_type.hpp"
3#include "calendar_util.hpp"
4#include "workflow_graph.hpp"
5#include "file.hpp"
6#include "mem_checker.hpp"
7
8namespace xios
9{
10  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData);
11
12  CTemporalFilter::CTemporalFilter(CGarbageCollector& gc, const std::string& opId,
13                                   const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq,
14                                   bool ignoreMissingValue /*= false*/)
15    : CFilter(gc, 1, this)
16    , functor(createFunctor(opId, ignoreMissingValue, tmpData))
17    , isOnceOperation(functor->timeType() == func::CFunctor::once)
18    , isInstantOperation(functor->timeType() == func::CFunctor::instant)
19    , samplingFreq(samplingFreq)
20    , samplingOffset(samplingOffset)
21    , opFreq(opFreq)
22    , offsetMonth(0, this->samplingOffset.month, 0, 0, 0, 0, 0)
23    , offsetAllButMonth(this->samplingOffset.year, 0 , this->samplingOffset.day,
24                        this->samplingOffset.hour, this->samplingOffset.minute,
25                        this->samplingOffset.second, this->samplingOffset.timestep)
26    , initDate(initDate)
27    , nextSamplingDate(initDate + offsetMonth + ( offsetAllButMonth + initDate.getRelCalendar().getTimeStep()))
28    , nbOperationDates(1)
29    , nbSamplingDates(0)
30//    , nextOperationDate(initDate + opFreq + this->samplingOffset)
31    , isFirstOperation(true)
32    , temp_op(opId)
33  {
34  }
35
36 
37
38
39
40  bool CTemporalFilter::buildGraph(std::vector<CDataPacketPtr> data)
41  {
42    bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false;
43   
44    if(building_graph)
45    {
46      if(this->filterIDoutputs.size()==0) this->filterID = InvalidableObject::filterIdGenerator++;
47      int edgeID = InvalidableObject::edgeIdGenerator++;
48     
49      // std::cout<<"CTemporalFilter::apply filter tag = "<<this->tag<<" start = "<<this->start_graph<<" end = "<<this->end_graph<<std::endl;
50
51      CWorkflowGraph::allocNodeEdge();
52
53      if(this->filterIDoutputs.size()==0)
54      {
55        CWorkflowGraph::addNode(this->filterID, "Temporal Filter\\n("+this->temp_op+")", 5, 1, 0, data[0]);   
56        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].transform_type = this->temp_op;   
57        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = false ;
58        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].clusterID = 1 ;
59        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = (data[0]->distance);
60
61
62        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
63        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes();
64      }
65
66      if(CWorkflowGraph::build_begin)
67      {
68
69        CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);
70
71        (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;
72        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb += 1 ;
73        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = max(data[0]->distance+1, (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance);
74      }
75
76
77      this->filterIDoutputs.push_back(data[0]->src_filterID); 
78    }
79
80    return building_graph;
81  }
82
83
84  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data)
85  {
86    bool BG = buildGraph(data);
87
88    CDataPacketPtr packet=0;
89
90    if (data[0]->status != CDataPacket::END_OF_STREAM)
91    {
92      bool usePacket, outputResult, copyLess;
93      if (isOnceOperation)
94        usePacket = outputResult = copyLess = isFirstOperation;
95      else
96      {
97        usePacket = (data[0]->date >= nextSamplingDate);
98        outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
99        copyLess = (isInstantOperation && usePacket && outputResult);
100      }
101
102      if (usePacket)
103      {
104        nbSamplingDates ++;
105        if (!copyLess)
106        {
107          if (!tmpData.numElements())
108            tmpData.resize(data[0]->data.numElements());
109
110          (*functor)(data[0]->data);
111        }
112
113        nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep();
114      }
115
116      if (outputResult)
117      {
118        nbOperationDates ++;
119        if (!copyLess)
120        {
121          functor->final();
122
123          packet = CDataPacketPtr(new CDataPacket);
124          packet->date = data[0]->date;
125          packet->timestamp = data[0]->timestamp;
126          packet->status = data[0]->status;
127          packet->data.resize(tmpData.numElements());
128          packet->data = tmpData;
129        }
130        else
131          packet = data[0];
132
133        CMemChecker::logMem( "CTemporalFilter::apply" );
134
135        isFirstOperation = false;
136       
137        packet->field = this->field;
138       
139        if(BG)
140        {
141          packet->src_filterID=this->filterID;
142          packet->distance = data[0]->distance+1;
143          this->filterIDoutputs.clear();
144          CWorkflowGraph::build_begin=true;
145          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ;
146        }
147      }
148    }
149
150    return packet;
151  }
152
153  bool CTemporalFilter::mustAutoTrigger() const
154  {
155    return true;
156  }
157
158  bool CTemporalFilter::isDataExpected(const CDate& date) const
159  {
160//    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date + samplingFreq > nextOperationDate);
161    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
162  }
163
164  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData)
165  {
166    func::CFunctor* functor = NULL;
167
168    double defaultValue = std::numeric_limits<double>::quiet_NaN();
169
170#define DECLARE_FUNCTOR(MType, mtype) \
171    if (opId.compare(#mtype) == 0) \
172    { \
173      if (ignoreMissingValue) \
174      { \
175        functor = new func::C##MType(tmpData, defaultValue); \
176      } \
177      else \
178      { \
179        functor = new func::C##MType(tmpData); \
180      } \
181    }
182
183#include "functor_type.conf"
184
185    if (!functor)
186      ERROR("createFunctor(const std::string& opId, ...)",
187            << "\"" << opId << "\" is not a valid operation.");
188
189    return functor;
190  }
191} // namespace xios
Note: See TracBrowser for help on using the repository browser.