source: XIOS/dev/XIOS_DEV_CMIP6/src/filter/source_filter.cpp @ 1242

Last change on this file since 1242 was 1242, checked in by ymipsl, 7 years ago

Bug fix from previous commit. Pb of memory allocation.

YM

File size: 4.4 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6
7namespace xios
8{
9  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid, bool compression, 
10                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
11                               bool hasMissingValue /*= false*/,
12                               double defaultValue /*= 0.0*/)
13    : COutputPin(gc, manualTrigger)
14    , grid(grid)
15    , compression(compression)
16    , offset(offset)
17    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
18  {
19    if (!grid)
20      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
21            "Impossible to construct a source filter without providing a grid.");
22  }
23
24  template <int N>
25  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
26  {
27    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
28
29    CDataPacketPtr packet(new CDataPacket);
30    packet->date = date;
31    packet->timestamp = date;
32    packet->status = CDataPacket::NO_ERROR;
33
34    packet->data.resize(grid->storeIndex_client.numElements());
35    if (compression) grid->inputField(data, packet->data) ;
36    else
37    {
38      // just make a flat copy
39      CArray<double, N> data_tmp(data.copy()) ; // supress const attribute
40      CArray<double,1> dataTmp2(data_tmp.dataFirst(),shape(data.numElements()),neverDeleteData) ;
41      packet->data = dataTmp2 ;
42    }
43    // Convert missing values to NaN
44    if (hasMissingValue)
45    {
46      const double nanValue = std::numeric_limits<double>::quiet_NaN();
47      const size_t nbData = packet->data.numElements();
48      for (size_t idx = 0; idx < nbData; ++idx)
49      {
50        if (defaultValue == packet->data(idx))
51          packet->data(idx) = nanValue;
52      }
53    }
54
55    onOutputReady(packet);
56  }
57
58  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
59  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
60  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
61  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
62  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
63  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
64  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
65
66  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
67  {
68    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
69
70    CDataPacketPtr packet(new CDataPacket);
71    packet->date = date;
72    packet->timestamp = date;
73    packet->status = CDataPacket::NO_ERROR;
74
75    // if (data.size() != grid->storeIndex_toSrv.size())
76    if (data.size() != grid->storeIndex_fromSrv.size())
77      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
78            << "Incoherent data received from servers,"
79            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
80
81    packet->data.resize(grid->storeIndex_client.numElements());
82    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
83    for (it = data.begin(); it != itEnd; it++)
84    {
85      // CArray<int,1>& index = grid->storeIndex_toSrv[it->first];
86      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
87      for (int n = 0; n < index.numElements(); n++)
88        packet->data(index(n)) = it->second(n);
89    }
90
91    // Convert missing values to NaN
92    if (hasMissingValue)
93    {
94      const double nanValue = std::numeric_limits<double>::quiet_NaN();
95      const size_t nbData = packet->data.numElements();
96      for (size_t idx = 0; idx < nbData; ++idx)
97      {
98        if (defaultValue == packet->data(idx))
99          packet->data(idx) = nanValue;
100      }
101    }
102
103    onOutputReady(packet);
104  }
105
106  void CSourceFilter::signalEndOfStream(CDate date)
107  {
108    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
109
110    CDataPacketPtr packet(new CDataPacket);
111    packet->date = date;
112    packet->timestamp = date;
113    packet->status = CDataPacket::END_OF_STREAM;
114    onOutputReady(packet);
115  }
116} // namespace xios
Note: See TracBrowser for help on using the repository browser.