source: XIOS/dev/dev_olga/src/filter/source_filter.cpp @ 1686

Last change on this file since 1686 was 1686, checked in by yushan, 5 years ago

backup for trunk with graph

File size: 5.2 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6#include "workflow_graph.hpp"
7
8namespace xios
9{
10  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
11                               bool compression /*= true*/, bool mask /*= false*/,
12                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
13                               bool hasMissingValue /*= false*/,
14                               double defaultValue /*= 0.0*/)
15    : COutputPin(gc, manualTrigger)
16    , grid(grid)
17    , compression(compression)
18    , mask(mask)
19    , offset(offset)
20    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
21  {
22    if (!grid)
23      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
24            "Impossible to construct a source filter without providing a grid.");
25  }
26
27  void CSourceFilter::buildGraph(CDataPacketPtr packet)
28  {
29    bool building_graph = this->tag ? packet->timestamp >= this->field->field_graph_start && packet->timestamp <= this->field->field_graph_end : false;
30   
31    if(building_graph)
32    {
33      this->filterID = InvalidableObject::filterIdGenerator++; 
34      packet->src_filterID=this->filterID;
35      packet->field = this->field;
36      packet->distance = 1;
37     
38   
39      CWorkflowGraph::allocNodeEdge();
40
41      CWorkflowGraph::addNode(this->filterID, "Source Filter ", 1, 1, 0, packet);
42      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
43      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].field_id = this->field->getId();
44      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = 1;
45
46      CWorkflowGraph::build_begin = true;
47    }
48
49  }
50
51
52  template <int N>
53  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
54  {
55    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
56
57    CDataPacketPtr packet(new CDataPacket);
58    packet->date = date;
59    packet->timestamp = date;
60    packet->status = CDataPacket::NO_ERROR;
61
62    packet->data.resize(grid->storeIndex_client.numElements());   
63   
64    if (compression)
65    {
66      packet->data = defaultValue;
67      grid->uncompressField(data, packet->data);   
68    }
69    else
70    {
71      if (mask)
72        grid->maskField(data, packet->data);
73      else
74        grid->inputField(data, packet->data);
75    }
76    // Convert missing values to NaN
77    if (hasMissingValue)
78    {
79      const double nanValue = std::numeric_limits<double>::quiet_NaN();
80      const size_t nbData = packet->data.numElements();
81      for (size_t idx = 0; idx < nbData; ++idx)
82      {
83        if (defaultValue == packet->data(idx))
84          packet->data(idx) = nanValue;
85      }
86    }
87
88    if(CXios::isClient) buildGraph(packet);
89   
90
91
92    onOutputReady(packet);
93  }
94
95  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
96  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
97  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
98  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
99  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
100  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
101  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
102
103  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
104  {
105    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
106
107    CDataPacketPtr packet(new CDataPacket);
108    packet->date = date;
109    packet->timestamp = date;
110    packet->status = CDataPacket::NO_ERROR;
111   
112    if (data.size() != grid->storeIndex_fromSrv.size())
113      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
114            << "Incoherent data received from servers,"
115            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
116
117    packet->data.resize(grid->storeIndex_client.numElements());
118    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
119    for (it = data.begin(); it != itEnd; it++)
120    {     
121      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
122      for (int n = 0; n < index.numElements(); n++)
123        packet->data(index(n)) = it->second(n);
124    }
125
126    // Convert missing values to NaN
127    if (hasMissingValue)
128    {
129      const double nanValue = std::numeric_limits<double>::quiet_NaN();
130      const size_t nbData = packet->data.numElements();
131      for (size_t idx = 0; idx < nbData; ++idx)
132      {
133        if (defaultValue == packet->data(idx))
134          packet->data(idx) = nanValue;
135      }
136    }
137
138    onOutputReady(packet);
139  }
140
141  void CSourceFilter::signalEndOfStream(CDate date)
142  {
143    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
144
145    CDataPacketPtr packet(new CDataPacket);
146    packet->date = date;
147    packet->timestamp = date;
148    packet->status = CDataPacket::END_OF_STREAM;
149    onOutputReady(packet);
150  }
151} // namespace xios
Note: See TracBrowser for help on using the repository browser.