Changeset 1686 for XIOS/dev/dev_olga/src/filter
- Timestamp:
- 07/31/19 13:51:01 (5 years ago)
- Location:
- XIOS/dev/dev_olga/src/filter
- Files:
-
- 29 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_olga/src/filter/binary_arithmetic_filter.cpp
r804 r1686 1 1 #include "binary_arithmetic_filter.hpp" 2 #include "workflow_graph.hpp" 3 #include "yacc_var.hpp" 4 #include "file.hpp" 5 2 6 3 7 namespace xios … … 7 11 , op(operatorExpr.getOpScalarField(op)) 8 12 , value(value) 9 { /* Nothing to do */ }; 13 { 14 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 15 }; 16 17 std::tuple<int, int, int> CScalarFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 18 { 19 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 20 // bool building_graph = true; 21 int unique_filter_id; 22 bool firstround; 23 24 if(building_graph) 25 { 26 CWorkflowGraph::allocNodeEdge(); 27 28 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 29 30 // first round 31 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 32 { 33 firstround = true; 34 this->filterID = InvalidableObject::filterIdGenerator++; 35 int edgeID = InvalidableObject::edgeIdGenerator++; 36 37 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 38 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 39 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 40 41 42 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 43 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 44 45 46 if(CWorkflowGraph::build_begin) 47 { 48 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 49 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 50 51 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 52 } 53 else CWorkflowGraph::build_begin = true; 54 55 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 56 unique_filter_id = this->filterID; 57 } 58 // not first round 59 else 60 { 61 firstround=false; 62 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 63 if(data[0]->src_filterID != unique_filter_id) 64 { 65 int edgeID = InvalidableObject::edgeIdGenerator++; 66 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 67 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 68 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 69 } 70 } 71 } 72 73 return std::make_tuple(building_graph, firstround, unique_filter_id); 74 } 75 10 76 11 77 CDataPacketPtr CScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 16 82 packet->status = data[0]->status; 17 83 84 std::tuple<int, int, int> graph = buildGraph(data); 85 86 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 87 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 88 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 89 90 packet->field = this->field; 91 18 92 if (packet->status == CDataPacket::NO_ERROR) 19 93 packet->data.reference(op(value, data[0]->data)); … … 26 100 , op(operatorExpr.getOpFieldScalar(op)) 27 101 , value(value) 28 { /* Nothing to do */ }; 102 { 103 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 104 }; 105 106 std::tuple<int, int, int> CFieldScalarArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 107 { 108 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 109 // bool building_graph = true; 110 int unique_filter_id; 111 bool firstround; 112 113 if(building_graph) 114 { 115 CWorkflowGraph::allocNodeEdge(); 116 117 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 118 119 // first round 120 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 121 { 122 firstround = true; 123 this->filterID = InvalidableObject::filterIdGenerator++; 124 int edgeID = InvalidableObject::edgeIdGenerator++; 125 126 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 127 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 128 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 129 130 131 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 132 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 133 134 135 if(CWorkflowGraph::build_begin) 136 { 137 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 138 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 139 140 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 141 } 142 else CWorkflowGraph::build_begin = true; 143 144 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 145 unique_filter_id = this->filterID; 146 } 147 // not first round 148 else 149 { 150 firstround=false; 151 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 152 if(data[0]->src_filterID != unique_filter_id) 153 { 154 int edgeID = InvalidableObject::edgeIdGenerator++; 155 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 156 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 157 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 158 } 159 } 160 } 161 162 return std::make_tuple(building_graph, firstround, unique_filter_id); 163 } 29 164 30 165 CDataPacketPtr CFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 35 170 packet->status = data[0]->status; 36 171 172 std::tuple<int, int, int> graph = buildGraph(data); 173 174 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 175 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 176 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 177 178 packet->field = this->field; 179 37 180 if (packet->status == CDataPacket::NO_ERROR) 38 181 packet->data.reference(op(data[0]->data, value)); … … 44 187 : CFilter(gc, 2, this) 45 188 , op(operatorExpr.getOpFieldField(op)) 46 { /* Nothing to do */ }; 189 { 190 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 191 }; 192 193 std::tuple<int, int, int> CFieldFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 194 { 195 bool building_graph = this->tag ? ((data[0]->timestamp >= this->field->field_graph_start && data[0]->timestamp <= this->field->field_graph_end) && (data[0]->timestamp == data[1]->timestamp)) : false; 196 197 int unique_filter_id; 198 199 bool firstround; 200 201 if(building_graph) 202 { 203 CWorkflowGraph::allocNodeEdge(); 204 205 // std::cout<<"CFieldFieldArithmeticFilter::apply filter tag = "<<this->tag<<std::endl; 206 207 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 208 209 // first round 210 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 211 { 212 firstround = true; 213 this->filterID = InvalidableObject::filterIdGenerator++; 214 int edgeID = InvalidableObject::edgeIdGenerator++; 215 216 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 217 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 218 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 219 220 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 221 222 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 223 if(CWorkflowGraph::build_begin) 224 { 225 226 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 227 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 228 229 edgeID = InvalidableObject::edgeIdGenerator++; 230 231 CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 232 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 233 234 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 235 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 236 } 237 CWorkflowGraph::build_begin = true; 238 239 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 240 unique_filter_id = this->filterID; 241 242 } 243 // not first round 244 else 245 { 246 firstround = false; 247 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 248 if(data[0]->src_filterID != unique_filter_id) 249 { 250 int edgeID = InvalidableObject::edgeIdGenerator++; 251 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 252 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 253 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 254 } 255 if(data[1]->src_filterID != unique_filter_id) 256 { 257 int edgeID = InvalidableObject::edgeIdGenerator++; 258 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[1]); 259 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 260 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 261 } 262 263 } 264 } 265 266 return std::make_tuple(building_graph, firstround, unique_filter_id); 267 } 47 268 48 269 CDataPacketPtr CFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 51 272 packet->date = data[0]->date; 52 273 packet->timestamp = data[0]->timestamp; 274 275 std::tuple<int, int, int> graph = buildGraph(data); 276 277 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 278 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 279 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 280 281 packet->field = this->field; 282 53 283 54 284 if (data[0]->status != CDataPacket::NO_ERROR) … … 64 294 return packet; 65 295 } 296 297 StdString CScalarFieldArithmeticFilter::GetName(void) { return StdString("CScalarFieldArithmeticFilter"); } 298 StdString CFieldScalarArithmeticFilter::GetName(void) { return StdString("CFieldScalarArithmeticFilter"); } 299 StdString CFieldFieldArithmeticFilter::GetName(void) { return StdString("CFieldFieldArithmeticFilter"); } 300 301 66 302 } // namespace xios -
XIOS/dev/dev_olga/src/filter/binary_arithmetic_filter.hpp
r1542 r1686 6 6 #include "operator_expr.hpp" 7 7 #include <unordered_map> 8 #include <tuple> 8 9 9 10 namespace xios … … 23 24 */ 24 25 CScalarFieldArithmeticFilter(CGarbageCollector& gc, const std::string& op, double value); 26 StdString virtual GetName(void); 27 25 28 26 29 protected: … … 35 38 */ 36 39 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 40 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 41 42 37 43 }; // class CScalarFieldArithmeticFilter 38 44 … … 51 57 */ 52 58 CFieldScalarArithmeticFilter(CGarbageCollector& gc, const std::string& op, double value); 59 StdString virtual GetName(void); 60 53 61 54 62 protected: … … 63 71 */ 64 72 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 73 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 65 74 }; // class CFieldScalarArithmeticFilter 66 75 … … 78 87 */ 79 88 CFieldFieldArithmeticFilter(CGarbageCollector& gc, const std::string& op); 89 StdString virtual GetName(void); 90 80 91 81 92 protected: … … 89 100 */ 90 101 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 102 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 91 103 }; // class CFieldFieldArithmeticFilter 92 104 } // namespace xios -
XIOS/dev/dev_olga/src/filter/data_packet.hpp
r1542 r1686 9 9 namespace xios 10 10 { 11 class CField; 11 12 /*! 12 13 * A packet corresponds to a timestamped array of data. … … 26 27 Time timestamp; //!< Timestamp of the data 27 28 StatusCode status; //!< Status of the packet 28 29 int src_filterID; 30 std::vector<int> filterIDoutputs; 31 CField *field; 32 int distance; 33 29 34 /*! 30 35 * Creates a deep copy of the packet. -
XIOS/dev/dev_olga/src/filter/file_server_writer_filter.cpp
r1654 r1686 16 16 void CFileServerWriterFilter::onInputReady(std::vector<CDataPacketPtr> data) 17 17 { 18 field->writeUpdateData(data[0]->data); 18 field->writeUpdateData(data[0]->data); 19 19 } 20 20 … … 28 28 return true; 29 29 } 30 31 int CFileServerWriterFilter::getFilterId(void)32 {33 return filterId;34 }35 36 30 } // namespace xios -
XIOS/dev/dev_olga/src/filter/file_server_writer_filter.hpp
r1653 r1686 37 37 bool virtual isDataExpected(const CDate& date) const; 38 38 39 /*!40 * Returns filter's id needed in case of building workflow graph41 */42 int getFilterId();43 44 39 protected: 45 40 /*! … … 51 46 52 47 private: 53 CField* field; 48 CField* field; //<! The associated field 54 49 std::map<Time, CDataPacketPtr> packets; //<! The stored packets 55 int filterId; //<! Filter's id needed in case of building a workflow56 57 50 }; // class CFileServerWriterFilter 58 51 } // namespace xios -
XIOS/dev/dev_olga/src/filter/file_writer_filter.cpp
r1654 r1686 4 4 #include "utils.hpp" 5 5 #include "workflow_graph.hpp" 6 #include "graphviz.hpp" 6 7 7 8 namespace xios 8 9 { 9 CFileWriterFilter::CFileWriterFilter(CGarbageCollector& gc, CField* field , bool buildWorkflowGraph /* =false */)10 CFileWriterFilter::CFileWriterFilter(CGarbageCollector& gc, CField* field) 10 11 : CInputPin(gc, 1) 11 12 , field(field) … … 14 15 ERROR("CFileWriterFilter::CFileWriterFilter(CField* field)", 15 16 "The field cannot be null."); 16 if (buildWorkflowGraph) 17 } 18 19 void CFileWriterFilter::buildGraph(std::vector<CDataPacketPtr> data) 20 { 21 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph: false; 22 23 if(building_graph) 17 24 { 18 filterId = InvalidableObject::count; 19 InvalidableObject::count++; 25 this->filterID = InvalidableObject::filterIdGenerator++; 26 int edgeID = InvalidableObject::edgeIdGenerator++; 27 28 CWorkflowGraph::allocNodeEdge(); 29 StdString namestring = to_string(this->field->name); 30 namestring.erase(0, 6); 31 namestring.erase(namestring.length()-1, 1); 32 33 CWorkflowGraph::addNode(this->filterID, namestring + "\\n("+this->field->file->getId()+".nc)", 6, 0, 1, data[0]); 34 35 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 36 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 37 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].clusterID =1; 38 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = ++(data[0]->distance); 39 40 if(CXios::isClient && CWorkflowGraph::build_begin) 41 { 42 43 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 44 45 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 46 } 47 else CWorkflowGraph::build_begin=true; 20 48 } 21 49 } … … 23 51 void CFileWriterFilter::onInputReady(std::vector<CDataPacketPtr> data) 24 52 { 53 buildGraph(data); 54 25 55 const bool detectMissingValue = ( !field->default_value.isEmpty() && 26 56 ( (!field->detect_missing_value.isEmpty() || field->detect_missing_value == true) … … 41 71 42 72 field->sendUpdateData(dataArray); 43 44 73 } 45 74 … … 53 82 return true; 54 83 } 55 56 int CFileWriterFilter::getFilterId(void)57 {58 return filterId;59 }60 61 84 } // namespace xios -
XIOS/dev/dev_olga/src/filter/file_writer_filter.hpp
r1654 r1686 3 3 4 4 #include "input_pin.hpp" 5 #include "file.hpp" 6 #include "duration.hpp" 5 7 6 8 namespace xios … … 14 16 { 15 17 public: 18 int tag; 19 Time start_graph; 20 Time end_graph; 21 CField* field; //<! The associated field 22 int filterID; 23 int distance; 24 16 25 /*! 17 26 * Constructs the filter (with one input slot) associated to the specified field … … 20 29 * \param gc the associated garbage collector 21 30 * \param field the associated field 22 * \param[in] buildWorkflowGraph indicates whether the workflow will be visualized23 31 */ 24 CFileWriterFilter(CGarbageCollector& gc, CField* field , bool buildWorkflowGraph = false);32 CFileWriterFilter(CGarbageCollector& gc, CField* field); 25 33 26 inline StdString GetName(void) {return StdString("File writer filter");}; 34 inline StdString GetName(void) {return "File writer filter";}; 35 27 36 28 37 /*! … … 40 49 bool virtual isDataExpected(const CDate& date) const; 41 50 42 /*!43 * Returns filter's id needed in case of building workflow graph44 */45 int getFilterId();46 47 51 protected: 48 52 /*! … … 52 56 */ 53 57 void virtual onInputReady(std::vector<CDataPacketPtr> data); 58 void virtual buildGraph(std::vector<CDataPacketPtr> data); 54 59 55 60 private: 56 CField* field; //<! The associated field57 61 std::map<Time, CDataPacketPtr> packets; //<! The stored packets 58 int filterId; //<! Filter's id needed in case of building a workflow59 60 62 }; // class CFileWriterFilter 61 63 } // namespace xios -
XIOS/dev/dev_olga/src/filter/filter.cpp
r1653 r1686 3 3 namespace xios 4 4 { 5 CFilter::CFilter(CGarbageCollector& gc, size_t inputSlotsCount, IFilterEngine* engine , bool buildWorkflowGraph /*= false*/)5 CFilter::CFilter(CGarbageCollector& gc, size_t inputSlotsCount, IFilterEngine* engine) 6 6 : CInputPin(gc, inputSlotsCount) 7 , COutputPin(gc, false , buildWorkflowGraph)7 , COutputPin(gc, false) 8 8 , engine(engine) 9 9 , inputSlotCount(inputSlotCount) -
XIOS/dev/dev_olga/src/filter/filter.hpp
r1653 r1686 23 23 * \param inputSlotsCount the number of input slots 24 24 * \param engine the filter engine 25 * \param buildWorkflowGraph indicates whether data will be visualized26 25 */ 27 CFilter(CGarbageCollector& gc, size_t inputSlotsCount, IFilterEngine* engine , bool buildWorkflowGraph = false);26 CFilter(CGarbageCollector& gc, size_t inputSlotsCount, IFilterEngine* engine); 28 27 29 28 StdString virtual GetName(void); … … 65 64 bool virtual isDataExpected(const CDate& date) const; 66 65 66 67 68 69 int filterID; 70 StdString expression; 71 67 72 protected: 68 73 IFilterEngine* engine; //!< The filter engine, might be the filter itself -
XIOS/dev/dev_olga/src/filter/garbage_collector.cpp
r1653 r1686 3 3 namespace xios 4 4 { 5 int InvalidableObject::filterIdGenerator = 0; 5 6 6 int InvalidableObject::count = 0; 7 int InvalidableObject::edgeIdGenerator = 0; 8 9 int InvalidableObject::clusterIdGenerator = 0; 7 10 8 11 void CGarbageCollector::registerObject(InvalidableObject* Object, Time timestamp) -
XIOS/dev/dev_olga/src/filter/garbage_collector.hpp
r1653 r1686 21 21 void virtual invalidate(Time timestamp) = 0; 22 22 23 static int count; //!< Counter used to identify a filter in case building workflow graph 23 24 25 static int filterIdGenerator; 26 27 static int edgeIdGenerator; 28 29 static int clusterIdGenerator; 30 31 24 32 25 33 }; // struct InvalidableObject -
XIOS/dev/dev_olga/src/filter/input_pin.cpp
r1653 r1686 12 12 , triggers(slotsCount) 13 13 , hasTriggers(false) 14 { 14 { } 15 15 16 16 StdString CInputPin::GetName(void) … … 83 83 inputs.erase(inputs.begin(), inputs.lower_bound(timestamp)); 84 84 } 85 86 int CInputPin::getFilterId(void)87 {88 return -1;89 }90 91 85 } // namespace xios -
XIOS/dev/dev_olga/src/filter/input_pin.hpp
r1654 r1686 81 81 */ 82 82 void virtual invalidate(Time timestamp); 83 84 /*!85 * Returns filter's id needed in case of building workflow graph86 * This function should never be called from this class, instead functions defined in derived classes or in class COutputPin should be used87 */88 int virtual getFilterId();89 83 90 84 protected: … … 130 124 //! Whether some triggers have been set 131 125 bool hasTriggers; 132 133 126 }; // class CInputPin 134 127 } // namespace xios -
XIOS/dev/dev_olga/src/filter/output_pin.cpp
r1654 r1686 5 5 namespace xios 6 6 { 7 COutputPin::COutputPin(CGarbageCollector& gc, bool manualTrigger /*= false*/ , bool buildWorkflowGraph /* =false */)7 COutputPin::COutputPin(CGarbageCollector& gc, bool manualTrigger /*= false*/) 8 8 : gc(gc) 9 9 , manualTrigger(manualTrigger) 10 , buildWorkflowGraph(buildWorkflowGraph) 11 { 12 if (buildWorkflowGraph) 13 { 14 filterId = InvalidableObject::count; 15 InvalidableObject::count++; 16 } 17 } 10 { } 18 11 19 12 StdString COutputPin::GetName(void) … … 40 33 "The packet cannot be null."); 41 34 42 if (buildWorkflowGraph)43 {44 CWorkflowGraph::mapFilterTimestamps[this->getFilterId()].push_back(packet->timestamp);45 CWorkflowGraph::timestamps.insert(packet->timestamp);46 }47 48 35 if (manualTrigger) // Don't use canBeTriggered here, this function is virtual and can be overriden 49 36 { … … 52 39 } 53 40 else 54 {55 41 deliverOuput(packet); 56 }57 42 } 58 43 … … 123 108 } 124 109 125 int COutputPin::getFilterId(void)110 void COutputPin::setParentFiltersTag() 126 111 { 127 return filterId; 112 for(int i=0; i<parent_filters.size(); i++) 113 { 114 115 if(parent_filters[i]->start_graph<0) parent_filters[i]->start_graph = start_graph; 116 else parent_filters[i]->start_graph = min(parent_filters[i]->start_graph, start_graph); 117 118 119 if(parent_filters[i]->end_graph<0) parent_filters[i]->end_graph = end_graph; 120 else parent_filters[i]->end_graph = max(parent_filters[i]->end_graph, end_graph); 121 122 123 parent_filters[i]->tag += tag; 124 parent_filters[i]->setParentFiltersTag(); 125 } 128 126 } 129 127 128 129 130 130 131 } // namespace xios -
XIOS/dev/dev_olga/src/filter/output_pin.hpp
r1654 r1686 4 4 #include "garbage_collector.hpp" 5 5 #include "input_pin.hpp" 6 #include "duration.hpp" 6 7 7 8 namespace xios 8 9 { 10 class CField; 11 class CInputPin; 12 class CFilter; 13 class CDuration; 9 14 /*! 10 15 * An output pin handles the connections with downstream filters. … … 13 18 { 14 19 public: 20 int tag; 21 Time start_graph; 22 Time end_graph; 23 CField *field; 24 int distance; 25 26 27 28 std::vector< std::shared_ptr<COutputPin> > parent_filters; 29 15 30 /*! 16 31 * Constructs an ouput pin with manual or automatic trigger … … 19 34 * \param gc the garbage collector associated with this ouput pin 20 35 * \param slotsCount the number of slots 21 * \param buildWorkflowGraph indicates whether data will be visualized22 36 */ 23 COutputPin(CGarbageCollector& gc, bool manualTrigger = false , bool buildWorkflowGraph = false);37 COutputPin(CGarbageCollector& gc, bool manualTrigger = false); 24 38 25 39 StdString virtual GetName(void); … … 69 83 void virtual invalidate(Time timestamp); 70 84 71 /*! 72 * Returns filter's id needed in case of building workflow graph 73 */ 74 int getFilterId(); 85 void virtual setParentFiltersTag(); 86 75 87 76 88 protected: … … 97 109 CGarbageCollector& gc; //!< The garbage collector associated to the output pin 98 110 99 //! Whether the ouput should be triggered manually111 //!< Whether the ouput should be triggered manually 100 112 bool manualTrigger; 101 113 102 //! The list of connected filters and the corresponding slot numbers114 //!< The list of connected filters and the corresponding slot numbers 103 115 std::vector<std::pair<std::shared_ptr<CInputPin>, size_t> > outputs; 104 116 105 117 //! Output buffer, store the packets until the output is triggered 106 118 std::map<Time, CDataPacketPtr> outputPackets; 107 108 //! Indicates whether the workflow will be visualized109 bool buildWorkflowGraph;110 111 //! Filter's id needed in case of building a workflow graph112 int filterId;113 114 115 119 }; // class COutputPin 116 120 } // namespace xios -
XIOS/dev/dev_olga/src/filter/pass_through_filter.cpp
r1653 r1686 1 1 #include "pass_through_filter.hpp" 2 #include "workflow_graph.hpp" 3 #include "field.hpp" 4 #include "file.hpp" 2 5 3 6 namespace xios 4 7 { 5 CPassThroughFilter::CPassThroughFilter(CGarbageCollector& gc, bool buildWorkflowGraph /*= false*/) 6 : CFilter(gc, 1, this, buildWorkflowGraph) 7 { /* Nothing to do */ } 8 CPassThroughFilter::CPassThroughFilter(CGarbageCollector& gc) 9 : CFilter(gc, 1, this) 10 { 11 } 12 13 void CPassThroughFilter::buildGraph(std::vector<CDataPacketPtr> data) 14 { 15 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 16 // bool building_graph = this->tag ? data[0]->timestamp >= this->field->field_graph_start && data[0]->timestamp <= this->field->field_graph_end : false; 17 18 if(building_graph) 19 { 20 // std::cout<<"CPassThroughFilter::apply field_id = "<<this->field->getId()<<" start = "<<start_graph<<" end = "<<end_graph<<std::endl; 21 this->filterID = InvalidableObject::filterIdGenerator++; 22 int edgeID = InvalidableObject::edgeIdGenerator++; 23 24 CWorkflowGraph::allocNodeEdge(); 25 26 CWorkflowGraph::addNode(this->filterID, "Pass Through Filter\\n("+data[0]->field->getId()+")", 2, 1, 1, data[0]); 27 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = ++(data[0]->distance); 28 29 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 30 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 31 32 33 if(CWorkflowGraph::build_begin) 34 { 35 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 36 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0; 37 } 38 else CWorkflowGraph::build_begin = true; 39 40 data[0]->src_filterID=this->filterID; 41 42 } 43 44 data[0]->field = this->field; 45 } 8 46 9 47 CDataPacketPtr CPassThroughFilter::apply(std::vector<CDataPacketPtr> data) 10 48 { 49 if(CXios::isClient) buildGraph(data); 11 50 return data[0]; 12 51 } -
XIOS/dev/dev_olga/src/filter/pass_through_filter.hpp
r1653 r1686 17 17 * 18 18 * \param gc the associated garbage collector 19 * \param buildWorkflowGraph indicates whether data will be visualized20 19 */ 21 CPassThroughFilter(CGarbageCollector& gc , bool buildWorkflowGraph = false);20 CPassThroughFilter(CGarbageCollector& gc); 22 21 23 22 inline StdString GetName(void) {return StdString("Pass through filter");}; … … 31 30 */ 32 31 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 32 void virtual buildGraph(std::vector<CDataPacketPtr> data); 33 33 }; // class CPassThroughFilter 34 34 } // namespace xios -
XIOS/dev/dev_olga/src/filter/source_filter.cpp
r1654 r1686 12 12 const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/, 13 13 bool hasMissingValue /*= false*/, 14 double defaultValue /*= 0.0*/, 15 bool buildWorkflowGraph /*= false*/) 16 : COutputPin(gc, manualTrigger, buildWorkflowGraph) 14 double defaultValue /*= 0.0*/) 15 : COutputPin(gc, manualTrigger) 17 16 , grid(grid) 18 17 , compression(compression) … … 25 24 "Impossible to construct a source filter without providing a grid."); 26 25 } 27 26 27 void CSourceFilter::buildGraph(CDataPacketPtr packet) 28 { 29 bool building_graph = this->tag ? packet->timestamp >= this->field->field_graph_start && packet->timestamp <= this->field->field_graph_end : false; 30 31 if(building_graph) 32 { 33 this->filterID = InvalidableObject::filterIdGenerator++; 34 packet->src_filterID=this->filterID; 35 packet->field = this->field; 36 packet->distance = 1; 37 38 39 CWorkflowGraph::allocNodeEdge(); 40 41 CWorkflowGraph::addNode(this->filterID, "Source Filter ", 1, 1, 0, packet); 42 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 43 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].field_id = this->field->getId(); 44 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = 1; 45 46 CWorkflowGraph::build_begin = true; 47 } 48 49 } 50 51 28 52 template <int N> 29 53 void CSourceFilter::streamData(CDate date, const CArray<double, N>& data) … … 61 85 } 62 86 } 87 88 if(CXios::isClient) buildGraph(packet); 89 90 63 91 64 92 onOutputReady(packet); -
XIOS/dev/dev_olga/src/filter/source_filter.hpp
r1654 r1686 27 27 * \param hasMissingValue whether data has missing value 28 28 * \param defaultValue missing value to detect 29 * \param[in] buildWorkflowGraph indicates whether the workflow will be visualized30 29 */ 31 30 CSourceFilter(CGarbageCollector& gc, CGrid* grid, … … 34 33 const CDuration offset = NoneDu, bool manualTrigger = false, 35 34 bool hasMissingValue = false, 36 double defaultValue = 0.0, 37 bool buildWorkflowGraph = false); 35 double defaultValue = 0.0); 38 36 39 37 inline StdString GetName(void) {return StdString("Source filter");}; … … 49 47 template <int N> 50 48 void streamData(CDate date, const CArray<double, N>& data); 49 50 void virtual buildGraph(CDataPacketPtr packet); 51 51 52 52 /*! … … 66 66 */ 67 67 void signalEndOfStream(CDate date); 68 int filterID; 68 69 69 70 private: … … 74 75 const bool compression ; //!< indicates if data need to be compressed : on client side : true, on server side : false 75 76 const bool mask ; //!< indicates whether grid mask should be applied (true for clients, false for servers) 76 77 77 }; // class CSourceFilter 78 78 } // namespace xios -
XIOS/dev/dev_olga/src/filter/spatial_transform_filter.cpp
r1653 r1686 5 5 #include "timer.hpp" 6 6 #include "workflow_graph.hpp" 7 #include "file.hpp" 7 8 8 9 namespace xios 9 10 { 10 CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, 11 double outputValue, size_t inputSlotsCount, bool buildWorkflowGraph /*= false*/) 12 : CFilter(gc, inputSlotsCount, engine, buildWorkflowGraph), outputDefaultValue(outputValue) 11 CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, double outputValue, size_t inputSlotsCount) 12 : CFilter(gc, inputSlotsCount, engine), outputDefaultValue(outputValue) 13 13 { /* Nothing to do */ } 14 14 15 15 std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> > 16 CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue, 17 bool buildWorkflowGraph) 16 CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue) 18 17 { 19 18 if (!srcGrid || !destGrid) … … 32 31 double defaultValue = (hasMissingValue) ? std::numeric_limits<double>::quiet_NaN() : 0.0; 33 32 33 34 34 const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ; 35 35 CGridTransformationSelector::ListAlgoType::const_iterator it ; … … 39 39 40 40 std::shared_ptr<CSpatialTransformFilter> filter ; 41 if( isSpatialTemporal) 42 filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTemporalFilter(gc, engine, gridTransformation, defaultValue, inputCount, buildWorkflowGraph)); 43 else 44 filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount, buildWorkflowGraph)); 45 41 if( isSpatialTemporal) filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTemporalFilter(gc, engine, gridTransformation, defaultValue, inputCount)); 42 else filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount)); 43 44 46 45 if (!lastFilter) 47 46 lastFilter = filter; 48 47 else 49 {50 48 filter->connectOutput(firstFilter, 0); 51 if (buildWorkflowGraph)52 {53 int filterOut = (std::static_pointer_cast<COutputPin>(filter))->getFilterId();54 int filterIn = (std::static_pointer_cast<COutputPin>(firstFilter))->getFilterId();55 // PASS field's id here56 CWorkflowGraph::mapFieldToFilters["XXX"].push_back(filterOut);57 CWorkflowGraph::mapFieldToFilters["XXX"].push_back(filterIn);58 CWorkflowGraph::mapFilters[filterOut] = "Spatial transform filter";59 CWorkflowGraph::mapFilters[filterIn] = "Spatial transform filter";60 }61 }62 49 63 50 firstFilter = filter; … … 79 66 { 80 67 CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine); 81 CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue );68 CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->start_graph, this->end_graph, this->field); 82 69 if (outputPacket) 83 70 onOutputReady(outputPacket); 84 71 } 85 72 86 CSpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, 87 CGridTransformation* gridTransformation, double outputValue, 88 size_t inputSlotsCount, bool buildWorkflowGraph) 89 : CSpatialTransformFilter(gc, engine, outputValue, inputSlotsCount, buildWorkflowGraph), record(0) 73 CSpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount) 74 : CSpatialTransformFilter(gc, engine, outputValue, inputSlotsCount), record(0) 90 75 { 91 76 const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ; … … 113 98 { 114 99 CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine); 115 CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue );100 CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->start_graph, this->end_graph, this->field); 116 101 117 102 if (outputPacket) … … 138 123 packet->data.resize(tmpData.numElements()); 139 124 packet->data = tmpData; 125 packet->field = this->field; 140 126 onOutputReady(packet); 141 127 tmpData.resize(0) ; … … 176 162 } 177 163 178 CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue) 179 { 164 bool CSpatialTransformFilterEngine::buildGraph(std::vector<CDataPacketPtr> data, int tag, Time start_graph, Time end_graph, CField *field) 165 { 166 bool building_graph = tag ? data[0]->timestamp >= start_graph && data[0]->timestamp <= end_graph : false; 167 if(building_graph) 168 { 169 this->filterID = InvalidableObject::filterIdGenerator++; 170 int edgeID = InvalidableObject::edgeIdGenerator++; 171 172 CWorkflowGraph::allocNodeEdge(); 173 174 CWorkflowGraph::addNode(this->filterID, "Spatial Transform Filter", 4, 1, 1, data[0]); 175 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 176 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = field->record4graphXiosAttributes(); 177 if(field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +field->file->record4graphXiosAttributes(); 178 179 180 if(CWorkflowGraph::build_begin) 181 { 182 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 183 184 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0; 185 } 186 else CWorkflowGraph::build_begin = true; 187 } 188 189 return building_graph; 190 } 191 192 CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue, int tag, Time start_graph, Time end_graph, CField *field) 193 { 194 195 bool BG = buildGraph(data, tag, start_graph, end_graph, field); 196 180 197 CDataPacketPtr packet(new CDataPacket); 181 198 packet->date = data[0]->date; … … 194 211 if (0 != packet->data.numElements()) 195 212 (packet->data)(0) = defaultValue; 196 apply(data[0]->data, packet->data); 213 if(BG) apply(data[0]->data, packet->data, this->filterID); 214 else apply(data[0]->data, packet->data); 197 215 } 198 216 … … 200 218 } 201 219 202 void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest )220 void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest, int filterID) 203 221 { 204 222 CTimer::get("CSpatialTransformFilterEngine::apply").resume(); … … 313 331 const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second; 314 332 int srcRank = itRecv->first; 333 334 if(filterID >=0) // building_graph 335 { 336 (*CWorkflowGraph::mapFilters_ptr_with_info)[filterID].filter_name = (*itAlgo)->getName(); 337 } 315 338 if (srcRank != rank) 316 339 { -
XIOS/dev/dev_olga/src/filter/spatial_transform_filter.hpp
r1653 r1686 3 3 4 4 #include "filter.hpp" 5 #include "field.hpp" 5 6 6 7 namespace xios … … 9 10 class CGridTransformation; 10 11 class CSpatialTransformFilterEngine; 12 13 class CField; 11 14 12 15 /*! … … 23 26 * \param outputValue default value of output pin 24 27 * \param [in] inputSlotsCount number of input, by default there is only one for field src 25 * \param buildWorkflowGraph indicates whether data will be visualized26 28 */ 27 29 CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, 28 double outputValue, size_t inputSlotsCount = 1 , bool buildWorkflowGraph = false);30 double outputValue, size_t inputSlotsCount = 1); 29 31 30 32 inline StdString GetName(void) {return StdString("Spatial transform filter");}; … … 38 40 * \param hasMissingValue whether field source has missing value 39 41 * \param defaultValue default value 40 * \param buildWorkflowGraph indicates whether data will be visualized41 42 * \return the first and the last filters of the filter graph 42 43 */ 43 44 static std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> > 44 buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double defaultValue , bool buildWorkflowGraph = false);45 buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double defaultValue); 45 46 46 47 protected: … … 72 73 * \param outputValue default value of output pin 73 74 * \param [in] inputSlotsCount number of input, by default there is only one for field src 74 * \param buildWorkflowGraph indicates whether data will be visualized75 *76 75 */ 77 CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, 78 double outputValue, size_t inputSlotsCount = 1, bool buildWorkflowGraph = false); 76 CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount = 1); 79 77 80 78 … … 101 99 { 102 100 public: 101 102 int filterID; 103 int tag; 104 CField *field; 103 105 /*! 104 106 * Returns the engine wrapping the specified grid transformation. … … 117 119 * \return the result of the grid transformation 118 120 */ 119 CDataPacketPtr applyFilter(std::vector<CDataPacketPtr> data, double defaultValue = 0); 121 CDataPacketPtr applyFilter(std::vector<CDataPacketPtr> data, double defaultValue = 0, int tag=0, Time start_graph=0, Time end_graph=-1, CField *field=0); 122 bool buildGraph(std::vector<CDataPacketPtr> data, int tag=0, Time start_graph=0, Time end_graph=-1, CField *field=0); 120 123 121 124 /*! … … 144 147 * \param dataDest the resulting transformed data 145 148 */ 146 void apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest );149 void apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest, int filterID=-1); 147 150 148 151 CGridTransformation* gridTransformation; //!< The grid transformation used by the engine -
XIOS/dev/dev_olga/src/filter/store_filter.cpp
r1654 r1686 3 3 #include "grid.hpp" 4 4 #include "timer.hpp" 5 #include "file.hpp" 5 6 6 7 namespace xios … … 21 22 ERROR("CStoreFilter::CStoreFilter(CContext* context, CGrid* grid)", 22 23 "Impossible to construct a store filter without providing a grid."); 23 // filterId = InvalidableObject::count;24 // InvalidableObject::count++;25 24 } 26 25 … … 78 77 template CDataPacket::StatusCode CStoreFilter::getData<7>(Time timestamp, CArray<double, 7>& data); 79 78 79 void CStoreFilter::buildGraph(std::vector<CDataPacketPtr> data) 80 { 81 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 82 83 if(building_graph) 84 { 85 this->filterID = InvalidableObject::filterIdGenerator++; 86 int edgeID = InvalidableObject::edgeIdGenerator++; 87 88 CWorkflowGraph::allocNodeEdge(); 89 90 CWorkflowGraph::addNode(this->filterID, "Store Filter", 7, 0, 1, data[0]); 91 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = ++(data[0]->distance); 92 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 93 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 94 95 // if(CXios::isClient) std::cout<<"CStoreFilter::apply filter tag = "<<this->tag<<std::endl; 96 97 if(CXios::isClient && CWorkflowGraph::build_begin) 98 { 99 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);; 100 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0; 101 } 102 else CWorkflowGraph::build_begin = true; 103 } 104 } 105 80 106 void CStoreFilter::onInputReady(std::vector<CDataPacketPtr> data) 81 107 { 108 buildGraph(data); 82 109 83 110 CDataPacketPtr packet; … … 128 155 packets.erase(packets.begin(), packets.lower_bound(timestamp)); 129 156 } 130 131 int CStoreFilter::getFilterId(void)132 {133 return filterId;134 }135 136 157 } // namespace xios -
XIOS/dev/dev_olga/src/filter/store_filter.hpp
r1653 r1686 3 3 4 4 #include "input_pin.hpp" 5 #include "workflow_graph.hpp" 6 5 7 6 8 namespace xios … … 8 10 class CContext; 9 11 class CGrid; 12 class CField; 10 13 11 14 /*! … … 73 76 void virtual invalidate(Time timestamp); 74 77 75 /*! 76 * Returns filter's id needed in case of building workflow graph 77 */ 78 int getFilterId(); 78 int filterID; 79 int tag; 80 Time start_graph; 81 Time end_graph; 82 CField *field; 83 int distance; 84 79 85 80 86 protected: … … 85 91 */ 86 92 void virtual onInputReady(std::vector<CDataPacketPtr> data); 93 void virtual buildGraph(std::vector<CDataPacketPtr> data); 87 94 88 95 private: 89 CGarbageCollector& gc; 90 CContext* context; 91 CGrid* grid; 92 const bool detectMissingValues; 93 const double missingValue; 96 CGarbageCollector& gc; //!< The garbage collector associated to the filter 97 CContext* context; //!< The context to which the data belongs 98 CGrid* grid; //!< The grid attached to the data the filter can accept 99 const bool detectMissingValues; //!< Whether missing values should be detected 100 const double missingValue; //!< The value to use to replace missing values 94 101 std::map<Time, CDataPacketPtr> packets; //<! The stored packets 95 int filterId; //!< Filter's id needed in case of building a workflow96 97 102 }; // class CStoreFilter 98 103 } // namespace xios -
XIOS/dev/dev_olga/src/filter/temporal_filter.cpp
r1653 r1686 2 2 #include "functor_type.hpp" 3 3 #include "calendar_util.hpp" 4 #include "workflow_graph.hpp" 5 #include "file.hpp" 4 6 5 7 namespace xios … … 9 11 CTemporalFilter::CTemporalFilter(CGarbageCollector& gc, const std::string& opId, 10 12 const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq, 11 bool ignoreMissingValue /*= false*/ , bool buildWorkflowGraph /*= false*/)12 : CFilter(gc, 1, this , buildWorkflowGraph)13 bool ignoreMissingValue /*= false*/) 14 : CFilter(gc, 1, this) 13 15 , functor(createFunctor(opId, ignoreMissingValue, tmpData)) 14 16 , isOnceOperation(functor->timeType() == func::CFunctor::once) … … 25 27 , nbOperationDates(1) 26 28 , nbSamplingDates(0) 29 // , nextOperationDate(initDate + opFreq + this->samplingOffset) 27 30 , isFirstOperation(true) 31 , temp_op(opId) 28 32 { 29 33 } 30 34 35 36 37 38 39 bool CTemporalFilter::buildGraph(std::vector<CDataPacketPtr> data) 40 { 41 bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 42 43 if(building_graph) 44 { 45 if(this->filterIDoutputs.size()==0) this->filterID = InvalidableObject::filterIdGenerator++; 46 int edgeID = InvalidableObject::edgeIdGenerator++; 47 48 // std::cout<<"CTemporalFilter::apply filter tag = "<<this->tag<<" start = "<<this->start_graph<<" end = "<<this->end_graph<<std::endl; 49 50 CWorkflowGraph::allocNodeEdge(); 51 52 if(this->filterIDoutputs.size()==0) 53 { 54 CWorkflowGraph::addNode(this->filterID, "Temporal Filter\\n("+this->temp_op+")", 5, 1, 0, data[0]); 55 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].transform_type = this->temp_op; 56 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = false ; 57 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].clusterID = 1 ; 58 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = (data[0]->distance); 59 60 61 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 62 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 63 } 64 65 if(CWorkflowGraph::build_begin) 66 { 67 68 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 69 70 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 71 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb += 1 ; 72 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = max(data[0]->distance+1, (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance); 73 } 74 75 76 this->filterIDoutputs.push_back(data[0]->src_filterID); 77 } 78 79 return building_graph; 80 } 81 82 31 83 CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data) 32 84 { 85 bool BG = buildGraph(data); 86 33 87 CDataPacketPtr packet; 34 88 … … 77 131 78 132 isFirstOperation = false; 133 134 packet->field = this->field; 135 136 if(BG) 137 { 138 packet->src_filterID=this->filterID; 139 packet->distance = data[0]->distance+1; 140 this->filterIDoutputs.clear(); 141 CWorkflowGraph::build_begin=true; 142 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ; 143 } 79 144 } 80 145 } … … 90 155 bool CTemporalFilter::isDataExpected(const CDate& date) const 91 156 { 157 // return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date + samplingFreq > nextOperationDate); 92 158 return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth); 93 159 } -
XIOS/dev/dev_olga/src/filter/temporal_filter.hpp
r1653 r1686 29 29 CTemporalFilter(CGarbageCollector& gc, const std::string& opId, 30 30 const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq, 31 bool ignoreMissingValue = false , bool buildWorkflowGraph = false);31 bool ignoreMissingValue = false); 32 32 33 33 inline StdString GetName(void) {return StdString("Temporal filter");}; … … 40 40 */ 41 41 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 42 bool virtual buildGraph(std::vector<CDataPacketPtr> data); 42 43 43 44 /*! … … 54 55 */ 55 56 bool virtual isDataExpected(const CDate& date) const; 57 std::vector<int > filterIDoutputs; 58 std::vector<std::pair<int, int> > filterIDoutputs_pair; 59 60 StdString temp_op; 56 61 57 62 private: -
XIOS/dev/dev_olga/src/filter/ternary_arithmetic_filter.cpp
r1162 r1686 1 1 #include "ternary_arithmetic_filter.hpp" 2 #include "workflow_graph.hpp" 3 #include "yacc_var.hpp" 4 #include "file.hpp" 2 5 3 6 namespace xios … … 8 11 , value1(value1) 9 12 , value2(value2) 10 { /* Nothing to do */ }; 13 { 14 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 15 /* Nothing to do */ 16 }; 17 18 std::tuple<int, int, int> CScalarScalarFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 19 { 20 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 21 int unique_filter_id; 22 bool firstround; 23 24 if(building_graph) 25 { 26 CWorkflowGraph::allocNodeEdge(); 27 28 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 29 30 // first round 31 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 32 { 33 firstround = true; 34 this->filterID = InvalidableObject::filterIdGenerator++; 35 int edgeID = InvalidableObject::edgeIdGenerator++; 36 37 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 38 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 39 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 40 41 42 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 43 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 44 45 46 if(CWorkflowGraph::build_begin) 47 { 48 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 49 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 50 51 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 52 } 53 else CWorkflowGraph::build_begin = true; 54 55 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 56 unique_filter_id = this->filterID; 57 } 58 // not first round 59 else 60 { 61 firstround=false; 62 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 63 if(data[0]->src_filterID != unique_filter_id) 64 { 65 int edgeID = InvalidableObject::edgeIdGenerator++; 66 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 67 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 68 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 69 } 70 } 71 } 72 73 return std::make_tuple(building_graph, firstround, unique_filter_id); 74 } 11 75 12 76 CDataPacketPtr CScalarScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 16 80 packet->timestamp = data[0]->timestamp; 17 81 packet->status = data[0]->status; 82 83 std::tuple<int, int, int> graph = buildGraph(data); 84 85 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 86 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 87 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 88 89 packet->field = this->field; 18 90 19 91 if (packet->status == CDataPacket::NO_ERROR) … … 28 100 , value1(value1) 29 101 , value2(value2) 30 { /* Nothing to do */ }; 102 { 103 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 104 /* Nothing to do */ 105 }; 106 107 std::tuple<int, int, int> CScalarFieldScalarArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 108 { 109 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 110 int unique_filter_id; 111 bool firstround; 112 113 if(building_graph) 114 { 115 CWorkflowGraph::allocNodeEdge(); 116 117 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 118 119 // first round 120 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 121 { 122 firstround = true; 123 this->filterID = InvalidableObject::filterIdGenerator++; 124 int edgeID = InvalidableObject::edgeIdGenerator++; 125 126 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 127 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 128 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 129 130 131 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 132 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 133 134 135 if(CWorkflowGraph::build_begin) 136 { 137 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 138 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 139 140 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 141 } 142 else CWorkflowGraph::build_begin = true; 143 144 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 145 unique_filter_id = this->filterID; 146 } 147 // not first round 148 else 149 { 150 firstround=false; 151 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 152 if(data[0]->src_filterID != unique_filter_id) 153 { 154 int edgeID = InvalidableObject::edgeIdGenerator++; 155 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 156 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 157 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 158 } 159 } 160 } 161 162 return std::make_tuple(building_graph, firstround, unique_filter_id); 163 } 31 164 32 165 CDataPacketPtr CScalarFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 36 169 packet->timestamp = data[0]->timestamp; 37 170 packet->status = data[0]->status; 171 172 std::tuple<int, int, int> graph = buildGraph(data); 173 174 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 175 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 176 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 177 178 packet->field = this->field; 38 179 39 180 if (packet->status == CDataPacket::NO_ERROR) … … 47 188 , op(operatorExpr.getOpScalarFieldField(op)) 48 189 , value(value) 49 { /* Nothing to do */ }; 190 { 191 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 192 /* Nothing to do */ 193 }; 194 195 std::tuple<int, int, int> CScalarFieldFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 196 { 197 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 198 int unique_filter_id; 199 200 bool firstround; 201 202 if(building_graph) 203 { 204 CWorkflowGraph::allocNodeEdge(); 205 206 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 207 208 // first round 209 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 210 { 211 firstround = true; 212 this->filterID = InvalidableObject::filterIdGenerator++; 213 int edgeID = InvalidableObject::edgeIdGenerator++; 214 215 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 216 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 217 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 218 219 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 220 221 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 222 if(CWorkflowGraph::build_begin) 223 { 224 225 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 226 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 227 228 edgeID = InvalidableObject::edgeIdGenerator++; 229 230 CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 231 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 232 233 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 234 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 235 } 236 CWorkflowGraph::build_begin = true; 237 238 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 239 unique_filter_id = this->filterID; 240 241 } 242 // not first round 243 else 244 { 245 firstround = false; 246 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 247 if(data[0]->src_filterID != unique_filter_id) 248 { 249 int edgeID = InvalidableObject::edgeIdGenerator++; 250 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 251 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 252 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 253 } 254 if(data[1]->src_filterID != unique_filter_id) 255 { 256 int edgeID = InvalidableObject::edgeIdGenerator++; 257 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[1]); 258 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 259 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 260 } 261 262 } 263 } 264 265 return std::make_tuple(building_graph, firstround, unique_filter_id); 266 } 50 267 51 268 CDataPacketPtr CScalarFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 55 272 packet->timestamp = data[0]->timestamp; 56 273 packet->status = data[0]->status; 274 275 std::tuple<int, int, int> graph = buildGraph(data); 276 277 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 278 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 279 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 280 281 packet->field = this->field; 57 282 58 283 if (data[0]->status != CDataPacket::NO_ERROR) … … 75 300 , value1(value1) 76 301 , value2(value2) 77 { /* Nothing to do */ }; 302 { 303 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 304 /* Nothing to do */ 305 }; 306 307 std::tuple<int, int, int> CFieldScalarScalarArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 308 { 309 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 310 int unique_filter_id; 311 bool firstround; 312 313 if(building_graph) 314 { 315 CWorkflowGraph::allocNodeEdge(); 316 317 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 318 319 // first round 320 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 321 { 322 firstround = true; 323 this->filterID = InvalidableObject::filterIdGenerator++; 324 int edgeID = InvalidableObject::edgeIdGenerator++; 325 326 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 327 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 328 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 329 330 331 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 332 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 333 334 335 if(CWorkflowGraph::build_begin) 336 { 337 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 338 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 339 340 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 341 } 342 else CWorkflowGraph::build_begin = true; 343 344 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 345 unique_filter_id = this->filterID; 346 } 347 // not first round 348 else 349 { 350 firstround=false; 351 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 352 if(data[0]->src_filterID != unique_filter_id) 353 { 354 int edgeID = InvalidableObject::edgeIdGenerator++; 355 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 356 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 357 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 358 } 359 } 360 } 361 362 return std::make_tuple(building_graph, firstround, unique_filter_id); 363 } 78 364 79 365 CDataPacketPtr CFieldScalarScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 84 370 packet->status = data[0]->status; 85 371 372 std::tuple<int, int, int> graph = buildGraph(data); 373 374 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 375 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 376 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 377 378 packet->field = this->field; 379 86 380 if (packet->status == CDataPacket::NO_ERROR) 87 381 packet->data.reference(op(data[0]->data, value1, value2)); … … 95 389 , op(operatorExpr.getOpFieldScalarField(op)) 96 390 , value(value) 97 { /* Nothing to do */ }; 391 { 392 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 393 /* Nothing to do */ 394 }; 395 396 std::tuple<int, int, int> CFieldScalarFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 397 { 398 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 399 int unique_filter_id; 400 401 bool firstround; 402 403 if(building_graph) 404 { 405 CWorkflowGraph::allocNodeEdge(); 406 407 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 408 409 // first round 410 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 411 { 412 firstround = true; 413 this->filterID = InvalidableObject::filterIdGenerator++; 414 int edgeID = InvalidableObject::edgeIdGenerator++; 415 416 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 417 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 418 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 419 420 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 421 422 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 423 if(CWorkflowGraph::build_begin) 424 { 425 426 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 427 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 428 429 edgeID = InvalidableObject::edgeIdGenerator++; 430 431 CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 432 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 433 434 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 435 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 436 } 437 CWorkflowGraph::build_begin = true; 438 439 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 440 unique_filter_id = this->filterID; 441 442 } 443 // not first round 444 else 445 { 446 firstround = false; 447 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 448 if(data[0]->src_filterID != unique_filter_id) 449 { 450 int edgeID = InvalidableObject::edgeIdGenerator++; 451 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 452 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 453 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 454 } 455 if(data[1]->src_filterID != unique_filter_id) 456 { 457 int edgeID = InvalidableObject::edgeIdGenerator++; 458 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[1]); 459 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 460 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 461 } 462 463 } 464 } 465 466 return std::make_tuple(building_graph, firstround, unique_filter_id); 467 } 98 468 99 469 CDataPacketPtr CFieldScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 103 473 packet->timestamp = data[0]->timestamp; 104 474 packet->status = data[0]->status; 475 476 std::tuple<int, int, int> graph = buildGraph(data); 477 478 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 479 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 480 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 481 482 packet->field = this->field; 105 483 106 484 if (data[0]->status != CDataPacket::NO_ERROR) … … 120 498 , op(operatorExpr.getOpFieldFieldScalar(op)) 121 499 , value(value) 122 { /* Nothing to do */ }; 500 { 501 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 502 std::cout<<"expression = "<<expression; 503 /* Nothing to do */ 504 }; 505 506 std::tuple<int, int, int> CFieldFieldScalarArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 507 { 508 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 509 int unique_filter_id; 510 511 bool firstround; 512 513 if(building_graph) 514 { 515 CWorkflowGraph::allocNodeEdge(); 516 517 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 518 519 // first round 520 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 521 { 522 firstround = true; 523 this->filterID = InvalidableObject::filterIdGenerator++; 524 int edgeID = InvalidableObject::edgeIdGenerator++; 525 526 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 527 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 528 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 529 530 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 531 532 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 533 if(CWorkflowGraph::build_begin) 534 { 535 536 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 537 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 538 539 edgeID = InvalidableObject::edgeIdGenerator++; 540 541 CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 542 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 543 544 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 545 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 546 } 547 CWorkflowGraph::build_begin = true; 548 549 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 550 unique_filter_id = this->filterID; 551 552 } 553 // not first round 554 else 555 { 556 firstround = false; 557 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 558 if(data[0]->src_filterID != unique_filter_id) 559 { 560 int edgeID = InvalidableObject::edgeIdGenerator++; 561 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 562 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 563 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 564 } 565 if(data[1]->src_filterID != unique_filter_id) 566 { 567 int edgeID = InvalidableObject::edgeIdGenerator++; 568 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[1]); 569 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 570 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 571 } 572 573 } 574 } 575 576 return std::make_tuple(building_graph, firstround, unique_filter_id); 577 } 123 578 124 579 CDataPacketPtr CFieldFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 128 583 packet->timestamp = data[0]->timestamp; 129 584 packet->status = data[0]->status; 585 586 std::tuple<int, int, int> graph = buildGraph(data); 587 588 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 589 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 590 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 591 592 packet->field = this->field; 130 593 131 594 if (data[0]->status != CDataPacket::NO_ERROR) … … 145 608 : CFilter(gc, 3, this) 146 609 , op(operatorExpr.getOpFieldFieldField(op)) 147 { /* Nothing to do */ }; 610 { 611 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 612 /* Nothing to do */ 613 }; 614 615 std::tuple<int, int, int> CFieldFieldFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 616 { 617 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 618 int unique_filter_id; 619 620 bool firstround; 621 622 if(building_graph) 623 { 624 CWorkflowGraph::allocNodeEdge(); 625 626 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 627 628 // first round 629 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 630 { 631 firstround = true; 632 this->filterID = InvalidableObject::filterIdGenerator++; 633 int edgeID = InvalidableObject::edgeIdGenerator++; 634 635 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 636 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 637 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 638 639 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 640 641 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 642 if(CWorkflowGraph::build_begin) 643 { 644 645 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 646 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 647 648 edgeID = InvalidableObject::edgeIdGenerator++; 649 650 CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 651 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 652 653 edgeID = InvalidableObject::edgeIdGenerator++; 654 655 CWorkflowGraph::addEdge(edgeID, this->filterID, data[2]); 656 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 657 658 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 659 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 660 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[2]->src_filterID].filter_filled = 0 ; 661 } 662 CWorkflowGraph::build_begin = true; 663 664 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 665 unique_filter_id = this->filterID; 666 667 } 668 // not first round 669 else 670 { 671 firstround = false; 672 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 673 if(data[0]->src_filterID != unique_filter_id) 674 { 675 int edgeID = InvalidableObject::edgeIdGenerator++; 676 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 677 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 678 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 679 } 680 if(data[1]->src_filterID != unique_filter_id) 681 { 682 int edgeID = InvalidableObject::edgeIdGenerator++; 683 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[1]); 684 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 685 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 686 } 687 if(data[2]->src_filterID != unique_filter_id) 688 { 689 int edgeID = InvalidableObject::edgeIdGenerator++; 690 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[2]); 691 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[2]->src_filterID].filter_filled = 0 ; 692 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 693 } 694 695 } 696 } 697 698 return std::make_tuple(building_graph, firstround, unique_filter_id); 699 } 148 700 149 701 CDataPacketPtr CFieldFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 153 705 packet->timestamp = data[0]->timestamp; 154 706 packet->status = data[0]->status; 707 708 std::tuple<int, int, int> graph = buildGraph(data); 709 710 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 711 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 712 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 713 714 packet->field = this->field; 155 715 156 716 if (data[0]->status != CDataPacket::NO_ERROR) -
XIOS/dev/dev_olga/src/filter/ternary_arithmetic_filter.hpp
r1162 r1686 5 5 #include <string> 6 6 #include "operator_expr.hpp" 7 #include <tuple> 7 8 8 9 namespace xios … … 36 37 */ 37 38 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 39 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 38 40 }; // class CScalarScalarFieldArithmeticFilter 39 41 … … 67 69 */ 68 70 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 71 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 69 72 }; // class CScalarScalarFieldArithmeticFilter 70 73 … … 95 98 */ 96 99 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 100 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 97 101 }; // class CScalarScalarFieldArithmeticFilter 98 102 … … 127 131 */ 128 132 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 133 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 129 134 }; // class CFieldScalarScalarArithmeticFilter 130 135 … … 156 161 */ 157 162 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 163 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 158 164 }; // class CFieldScalarFieldArithmeticFilter 159 165 … … 185 191 */ 186 192 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 193 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 187 194 }; // class CFieldFielScalardArithmeticFilter 188 195 … … 212 219 */ 213 220 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 221 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 214 222 }; // class CFieldFielFieldArithmeticFilter 215 223 -
XIOS/dev/dev_olga/src/filter/unary_arithmetic_filter.cpp
r643 r1686 1 1 #include "unary_arithmetic_filter.hpp" 2 #include "workflow_graph.hpp" 3 #include "yacc_var.hpp" 4 #include "file.hpp" 2 5 3 6 namespace xios … … 6 9 : CFilter(gc, 1, this) 7 10 , op(operatorExpr.getOpField(op)) 8 { /* Nothing to do */ }; 11 { 12 expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1); 13 }; 14 15 std::tuple<int, int, int> CUnaryArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 16 { 17 bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 18 int unique_filter_id; 19 bool firstround; 20 21 if(building_graph) 22 { 23 CWorkflowGraph::allocNodeEdge(); 24 size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 25 26 // first round 27 if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 28 { 29 firstround=true; 30 this->filterID = InvalidableObject::filterIdGenerator++; 31 int edgeID = InvalidableObject::edgeIdGenerator++; 32 33 CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 34 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 35 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 36 37 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 38 if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 39 40 if(CWorkflowGraph::build_begin) 41 { 42 43 CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 44 (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 45 46 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 47 } 48 else CWorkflowGraph::build_begin = true; 49 50 (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 51 unique_filter_id = this->filterID; 52 } 53 else 54 { 55 firstround=false; 56 unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 57 if(data[0]->src_filterID != unique_filter_id) 58 { 59 int edgeID = InvalidableObject::edgeIdGenerator++; 60 CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 61 (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 62 (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 63 } 64 } 65 66 } 67 68 return std::make_tuple(building_graph, firstround, unique_filter_id); 69 } 9 70 10 71 CDataPacketPtr CUnaryArithmeticFilter::apply(std::vector<CDataPacketPtr> data) … … 15 76 packet->status = data[0]->status; 16 77 78 std::tuple<int, int, int> graph = buildGraph(data); 79 80 if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 81 if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 82 if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 83 84 packet->field = this->field; 85 17 86 if (packet->status == CDataPacket::NO_ERROR) 18 87 packet->data.reference(op(data[0]->data)); -
XIOS/dev/dev_olga/src/filter/unary_arithmetic_filter.hpp
r642 r1686 34 34 */ 35 35 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 36 std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 36 37 }; // class CUnaryArithmeticFilter 37 38 } // namespace xios
Note: See TracChangeset
for help on using the changeset viewer.