source: XIOS/dev/dev_olga/src/filter/source_filter.cpp @ 1653

Last change on this file since 1653 was 1653, checked in by oabramkina, 5 years ago

Developments for visualization of XIOS workflow.

Branch is spawned from trunk r1649.

Boost library is used for producing Graphviz DOT files. Current results: a DOT file representing a static workflow. For a complete proof of concept, DOT files for each timestamp should be generated. The necessary information has been collected by XIOS, it only requires rearranging the information for graphing (changes in classes CWorkflowGraph and CGraphviz).

File size: 4.4 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6#include "workflow_graph.hpp"
7
8namespace xios
9{
10  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
11                               bool compression /*= true*/, bool mask /*= false*/,
12                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
13                               bool hasMissingValue /*= false*/,
14                               double defaultValue /*= 0.0*/,
15                               bool buildWorkflowGraph /*= false*/)
16    : COutputPin(gc, manualTrigger, buildWorkflowGraph)
17    , grid(grid)
18    , compression(compression)
19    , mask(mask)
20    , offset(offset)
21    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
22  {
23    if (!grid)
24      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
25            "Impossible to construct a source filter without providing a grid.");
26  }
27
28  template <int N>
29  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
30  {
31    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
32
33    CDataPacketPtr packet(new CDataPacket);
34    packet->date = date;
35    packet->timestamp = date;
36    packet->status = CDataPacket::NO_ERROR;
37
38    packet->data.resize(grid->storeIndex_client.numElements());   
39   
40    if (compression)
41    {
42      packet->data = defaultValue;
43      grid->uncompressField(data, packet->data);   
44    }
45    else
46    {
47      if (mask)
48        grid->maskField(data, packet->data);
49      else
50        grid->inputField(data, packet->data);
51    }
52    // Convert missing values to NaN
53    if (hasMissingValue)
54    {
55      const double nanValue = std::numeric_limits<double>::quiet_NaN();
56      const size_t nbData = packet->data.numElements();
57      for (size_t idx = 0; idx < nbData; ++idx)
58      {
59        if (defaultValue == packet->data(idx))
60          packet->data(idx) = nanValue;
61      }
62    }
63
64    onOutputReady(packet);
65  }
66
67  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
68  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
69  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
70  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
71  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
72  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
73  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
74
75  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
76  {
77    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
78
79    CDataPacketPtr packet(new CDataPacket);
80    packet->date = date;
81    packet->timestamp = date;
82    packet->status = CDataPacket::NO_ERROR;
83   
84    if (data.size() != grid->storeIndex_fromSrv.size())
85      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
86            << "Incoherent data received from servers,"
87            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
88
89    packet->data.resize(grid->storeIndex_client.numElements());
90    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
91    for (it = data.begin(); it != itEnd; it++)
92    {     
93      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
94      for (int n = 0; n < index.numElements(); n++)
95        packet->data(index(n)) = it->second(n);
96    }
97
98    // Convert missing values to NaN
99    if (hasMissingValue)
100    {
101      const double nanValue = std::numeric_limits<double>::quiet_NaN();
102      const size_t nbData = packet->data.numElements();
103      for (size_t idx = 0; idx < nbData; ++idx)
104      {
105        if (defaultValue == packet->data(idx))
106          packet->data(idx) = nanValue;
107      }
108    }
109
110    onOutputReady(packet);
111  }
112
113  void CSourceFilter::signalEndOfStream(CDate date)
114  {
115    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
116
117    CDataPacketPtr packet(new CDataPacket);
118    packet->date = date;
119    packet->timestamp = date;
120    packet->status = CDataPacket::END_OF_STREAM;
121    onOutputReady(packet);
122  }
123} // namespace xios
Note: See TracBrowser for help on using the repository browser.