source: XIOS/dev/dev_trunk_omp/src/filter/store_filter.cpp @ 1689

Last change on this file since 1689 was 1689, checked in by yushan, 5 years ago

dev for graph. up to date with trunk at r1684

File size: 5.6 KB
Line 
1#include "store_filter.hpp"
2#include "context.hpp"
3#include "grid.hpp"
4#include "timer.hpp"
5#include "file.hpp"
6
7namespace xios
8{
9  CStoreFilter::CStoreFilter(CGarbageCollector& gc, CContext* context, CGrid* grid,
10                             bool detectMissingValues /*= false*/, double missingValue /*= 0.0*/)
11    : CInputPin(gc, 1)
12    , gc(gc)
13    , context(context)
14    , grid(grid)
15    , detectMissingValues(detectMissingValues)
16    , missingValue(missingValue)
17  {
18    if (!context)
19      ERROR("CStoreFilter::CStoreFilter(CContext* context, CGrid* grid)",
20            "Impossible to construct a store filter without providing a context.");
21    if (!grid)
22      ERROR("CStoreFilter::CStoreFilter(CContext* context, CGrid* grid)",
23            "Impossible to construct a store filter without providing a grid.");
24  }
25
26  CConstDataPacketPtr CStoreFilter::getPacket(Time timestamp)
27  {
28    CTimer timer("CStoreFilter::getPacket");
29    CConstDataPacketPtr packet;
30    const double timeout = CXios::recvFieldTimeout;
31
32    do
33    {
34      if (canBeTriggered())
35        trigger(timestamp);
36
37      timer.resume();
38
39      std::map<Time, CDataPacketPtr>::const_iterator it = packets.find(timestamp);
40      if (it != packets.end())
41        packet = it->second;
42      else // if the packet is not available yet, check if it can be received
43        context->checkBuffersAndListen();
44
45      timer.suspend();
46    } while (!packet && timer.getCumulatedTime() < timeout);
47
48    if (!packet)
49    {
50      std::map<Time, CDataPacketPtr>::const_iterator it ;
51      #pragma omp critical (_output)
52      {
53        info(0)<<"Impossible to get the packet with timestamp = " << timestamp<<std::endl<<"Available timestamp are : "<<std::endl ;
54      }
55      for(it=packets.begin();it!=packets.end();++it)
56      {
57        #pragma omp critical (_output)
58        {
59          info(0)<<it->first<<"  ";
60        }
61      }
62      #pragma omp critical (_output)
63      { 
64        info(0)<<std::endl ;
65      }
66      ERROR("CConstDataPacketPtr CStoreFilter::getPacket(Time timestamp) const",
67            << "Impossible to get the packet with timestamp = " << timestamp);
68    }
69    return packet;
70  }
71
72  template <int N>
73  CDataPacket::StatusCode CStoreFilter::getData(Time timestamp, CArray<double, N>& data)
74  {
75    CConstDataPacketPtr packet = getPacket(timestamp);
76
77    if (packet->status == CDataPacket::NO_ERROR)
78      grid->outputField(packet->data, data);
79
80    return packet->status;
81  }
82
83  template CDataPacket::StatusCode CStoreFilter::getData<1>(Time timestamp, CArray<double, 1>& data);
84  template CDataPacket::StatusCode CStoreFilter::getData<2>(Time timestamp, CArray<double, 2>& data);
85  template CDataPacket::StatusCode CStoreFilter::getData<3>(Time timestamp, CArray<double, 3>& data);
86  template CDataPacket::StatusCode CStoreFilter::getData<4>(Time timestamp, CArray<double, 4>& data);
87  template CDataPacket::StatusCode CStoreFilter::getData<5>(Time timestamp, CArray<double, 5>& data);
88  template CDataPacket::StatusCode CStoreFilter::getData<6>(Time timestamp, CArray<double, 6>& data);
89  template CDataPacket::StatusCode CStoreFilter::getData<7>(Time timestamp, CArray<double, 7>& data);
90
91  void CStoreFilter::buildGraph(std::vector<CDataPacketPtr> data)
92  {
93    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false;
94
95    if(building_graph)
96    {
97      this->filterID = InvalidableObject::filterIdGenerator++;
98      int edgeID = InvalidableObject::edgeIdGenerator++;
99
100      CWorkflowGraph::allocNodeEdge();
101 
102      CWorkflowGraph::addNode(this->filterID, "Store Filter", 7, 0, 1, data[0]);
103      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = ++(data[0]->distance);
104      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
105      if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes();
106
107      // if(CXios::isClient) std::cout<<"CStoreFilter::apply filter tag = "<<this->tag<<std::endl;
108
109      if(CXios::isClient && CWorkflowGraph::build_begin) 
110      {
111        CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);;
112        (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0;
113      }
114      else CWorkflowGraph::build_begin = true;
115    }
116  }
117
118  void CStoreFilter::onInputReady(std::vector<CDataPacketPtr> data)
119  {
120    buildGraph(data);
121
122    CDataPacketPtr packet;
123    if (detectMissingValues)
124    {
125      const size_t nbData = data[0]->data.numElements();
126
127      packet = CDataPacketPtr(new CDataPacket);
128      packet->date = data[0]->date;
129      packet->timestamp = data[0]->timestamp;
130      packet->status = data[0]->status;
131      packet->data.resize(nbData);
132      packet->data = data[0]->data;
133
134      for (size_t idx = 0; idx < nbData; ++idx)
135      {
136        if (NumTraits<double>::isNan(packet->data(idx)))
137          packet->data(idx) = missingValue;
138      }
139
140    }
141
142    else
143    {
144      packet = data[0];
145    }
146
147    packets.insert(std::make_pair(packet->timestamp, packet));
148    // The packet is always destroyed by the garbage collector
149    // so we register but never unregister
150    gc.registerObject(this, packet->timestamp);
151
152  }
153
154  bool CStoreFilter::mustAutoTrigger() const
155  {
156    return false;
157  }
158
159  bool CStoreFilter::isDataExpected(const CDate& date) const
160  {
161    return true;
162  }
163
164  void CStoreFilter::invalidate(Time timestamp)
165  {
166    CInputPin::invalidate(timestamp);
167    packets.erase(packets.begin(), packets.lower_bound(timestamp));
168  }
169} // namespace xios
Note: See TracBrowser for help on using the repository browser.