source: XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/source_filter.cpp @ 1918

Last change on this file since 1918 was 1918, checked in by ymipsl, 4 years ago

Big update on on going work related to data distribution and transfer between clients and servers.

  • move all related file into distribution directorie
  • implement the concept of data "View"
  • implement the concept of "connector" which make the data transfer between 2 differents "Views"

YM

File size: 4.3 KB
RevLine 
[638]1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
[756]4#include "calendar_util.hpp"
[1158]5#include <limits>
[638]6
7namespace xios
8{
[1637]9  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
10                               bool compression /*= true*/, bool mask /*= false*/,
[1158]11                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
12                               bool hasMissingValue /*= false*/,
13                               double defaultValue /*= 0.0*/)
[1021]14    : COutputPin(gc, manualTrigger)
15    , grid(grid)
[1241]16    , compression(compression)
[1637]17    , mask(mask)
[756]18    , offset(offset)
[1158]19    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
[638]20  {
21    if (!grid)
22      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
23            "Impossible to construct a source filter without providing a grid.");
24  }
25
26  template <int N>
[643]27  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
[638]28  {
[756]29    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
30
[638]31    CDataPacketPtr packet(new CDataPacket);
[643]32    packet->date = date;
33    packet->timestamp = date;
[638]34    packet->status = CDataPacket::NO_ERROR;
35
[1869]36    packet->data.resize(grid->getStoreIndex_client().numElements());   
[1250]37   
38    if (compression)
39    {
40      packet->data = defaultValue;
41      grid->uncompressField(data, packet->data);   
42    }
43    else
[1637]44    {
[1918]45      if (mask) grid->maskField(data, packet->data);
46      else grid->inputField(data, packet->data);
[1637]47    }
[1158]48    // Convert missing values to NaN
49    if (hasMissingValue)
50    {
[1201]51      const double nanValue = std::numeric_limits<double>::quiet_NaN();
52      const size_t nbData = packet->data.numElements();
[1158]53      for (size_t idx = 0; idx < nbData; ++idx)
54      {
55        if (defaultValue == packet->data(idx))
56          packet->data(idx) = nanValue;
57      }
58    }
59
[1021]60    onOutputReady(packet);
[638]61  }
62
[643]63  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
64  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
65  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
[932]66  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
67  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
68  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
69  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
[638]70
[643]71  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
[638]72  {
[756]73    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
74
[638]75    CDataPacketPtr packet(new CDataPacket);
[643]76    packet->date = date;
77    packet->timestamp = date;
[638]78    packet->status = CDataPacket::NO_ERROR;
[1249]79   
[1794]80    if (data.size() != grid->storeIndex_fromSrv_.size())
[643]81      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
[638]82            << "Incoherent data received from servers,"
[1794]83            << " expected " << grid->storeIndex_fromSrv_.size() << " chunks but " << data.size() << " were given.");
[638]84
[1869]85    packet->data.resize(grid->getStoreIndex_client().numElements());
[638]86    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
87    for (it = data.begin(); it != itEnd; it++)
[1249]88    {     
[1794]89      CArray<int,1>& index = grid->storeIndex_fromSrv_[it->first];
[638]90      for (int n = 0; n < index.numElements(); n++)
91        packet->data(index(n)) = it->second(n);
92    }
93
[1201]94    // Convert missing values to NaN
95    if (hasMissingValue)
96    {
97      const double nanValue = std::numeric_limits<double>::quiet_NaN();
98      const size_t nbData = packet->data.numElements();
99      for (size_t idx = 0; idx < nbData; ++idx)
100      {
101        if (defaultValue == packet->data(idx))
102          packet->data(idx) = nanValue;
103      }
104    }
105
[1021]106    onOutputReady(packet);
[638]107  }
108
[643]109  void CSourceFilter::signalEndOfStream(CDate date)
[638]110  {
[1210]111    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
112
[638]113    CDataPacketPtr packet(new CDataPacket);
[643]114    packet->date = date;
115    packet->timestamp = date;
[638]116    packet->status = CDataPacket::END_OF_STREAM;
[1021]117    onOutputReady(packet);
[638]118  }
119} // namespace xios
Note: See TracBrowser for help on using the repository browser.