source: XIOS/dev/dev_oa/src/filter/source_filter.cpp @ 1963

Last change on this file since 1963 was 1963, checked in by oabramkina, 4 years ago

dev_oa: adding interface to xios_send permitting sending a tile

File size: 6.3 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6#include "workflow_graph.hpp"
7
8namespace xios
9{
10  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
11                               bool compression /*= true*/, bool mask /*= false*/,
12                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
13                               bool hasMissingValue /*= false*/,
14                               double defaultValue /*= 0.0*/)
15    : COutputPin(gc, manualTrigger)
16    , grid(grid)
17    , compression(compression)
18    , mask(mask)
19    , offset(offset)
20    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
21    , ntiles(0)
22  {
23    if (!grid)
24      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
25            "Impossible to construct a source filter without providing a grid.");
26  }
27
28  void CSourceFilter::buildGraph(CDataPacketPtr packet)
29  {
30    bool filter_interval=false;
31    if (this->field)
32    {
33      if(this->field->field_graph_start == -1 && this->field->field_graph_end == -1) filter_interval = true;
34      else filter_interval = packet->timestamp >= this->field->field_graph_start && packet->timestamp <= this->field->field_graph_end;
35    }
36    bool building_graph = this->tag ? filter_interval : false;
37    if(building_graph)
38    {
39      this->filterID = InvalidableObject::filterIdGenerator++; 
40      packet->src_filterID=this->filterID;
41      packet->field = this->field;
42      packet->distance = 1;
43     
44      CWorkflowGraph::allocNodeEdge();
45     
46      CWorkflowGraph::addNode(this->filterID, "Source Filter ", 1, 1, 0, packet);
47      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
48      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].field_id = this->field->getId();
49      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = 1;
50
51      CWorkflowGraph::build_begin = true;
52    }
53
54  }
55
56  template <int N>
57  void CSourceFilter::streamTile(CDate date, const CArray<double, N>& data, int ntile)
58  {
59    // TO DO
60  }
61
62
63  template <int N>
64  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
65  {
66    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
67
68    CDataPacketPtr packet(new CDataPacket);
69    packet->date = date;
70    packet->timestamp = date;
71    packet->status = CDataPacket::NO_ERROR;
72
73    packet->data.resize(grid->storeIndex_client.numElements());   
74   
75    if (compression)
76    {
77      packet->data = defaultValue;
78      grid->uncompressField(data, packet->data);   
79    }
80    else
81    {
82      if (mask)
83        grid->maskField(data, packet->data);
84      else
85        grid->inputField(data, packet->data);
86    }
87    // Convert missing values to NaN
88    if (hasMissingValue)
89    {
90      const double nanValue = std::numeric_limits<double>::quiet_NaN();
91      const size_t nbData = packet->data.numElements();
92      for (size_t idx = 0; idx < nbData; ++idx)
93      {
94        if (defaultValue == packet->data(idx))
95          packet->data(idx) = nanValue;
96      }
97    }
98
99    if(CXios::isClient) buildGraph(packet);
100   
101
102
103    onOutputReady(packet);
104  }
105
106  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
107  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
108  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
109  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
110  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
111  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
112  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
113
114  template void CSourceFilter::streamTile<1>(CDate date, const CArray<double, 1>& data, int ntile);
115  template void CSourceFilter::streamTile<2>(CDate date, const CArray<double, 2>& data, int ntile);
116  template void CSourceFilter::streamTile<3>(CDate date, const CArray<double, 3>& data, int ntile);
117  template void CSourceFilter::streamTile<4>(CDate date, const CArray<double, 4>& data, int ntile);
118  template void CSourceFilter::streamTile<5>(CDate date, const CArray<double, 5>& data, int ntile);
119  template void CSourceFilter::streamTile<6>(CDate date, const CArray<double, 6>& data, int ntile);
120  template void CSourceFilter::streamTile<7>(CDate date, const CArray<double, 7>& data, int ntile);
121
122  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
123  {
124    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
125
126    CDataPacketPtr packet(new CDataPacket);
127    packet->date = date;
128    packet->timestamp = date;
129    packet->status = CDataPacket::NO_ERROR;
130   
131    if (data.size() != grid->storeIndex_fromSrv.size())
132      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
133            << "Incoherent data received from servers,"
134            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
135
136    packet->data.resize(grid->storeIndex_client.numElements());
137    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
138    for (it = data.begin(); it != itEnd; it++)
139    {     
140      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
141      for (int n = 0; n < index.numElements(); n++)
142        packet->data(index(n)) = it->second(n);
143    }
144
145    // Convert missing values to NaN
146    if (hasMissingValue)
147    {
148      const double nanValue = std::numeric_limits<double>::quiet_NaN();
149      const size_t nbData = packet->data.numElements();
150      for (size_t idx = 0; idx < nbData; ++idx)
151      {
152        if (defaultValue == packet->data(idx))
153          packet->data(idx) = nanValue;
154      }
155    }
156    if(CXios::isClient) buildGraph(packet);
157    onOutputReady(packet);
158  }
159
160  void CSourceFilter::signalEndOfStream(CDate date)
161  {
162    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
163
164    CDataPacketPtr packet(new CDataPacket);
165    packet->date = date;
166    packet->timestamp = date;
167    packet->status = CDataPacket::END_OF_STREAM; 
168    onOutputReady(packet);
169  }
170} // namespace xios
Note: See TracBrowser for help on using the repository browser.