Ignore:
Timestamp:
06/25/19 16:14:54 (5 years ago)
Author:
yushan
Message:

MARK: Dynamic workflow graph developement. Branch up to date with trunk @1663.

Location:
XIOS/dev/dev_trunk_omp/src/filter
Files:
26 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_trunk_omp/src/filter/binary_arithmetic_filter.cpp

    r804 r1677  
    11#include "binary_arithmetic_filter.hpp" 
     2#include "workflow_graph.hpp" 
    23 
    34namespace xios 
     
    78    , op(operatorExpr.getOpScalarField(op)) 
    89    , value(value) 
    9   { /* Nothing to do */ }; 
     10  {  
     11    StdString input_op_expression=op; 
     12    if(input_op_expression == "add") 
     13      op_expression = "+"; 
     14    else if(input_op_expression == "minus") 
     15      op_expression = "-"; 
     16    else if(input_op_expression == "mult") 
     17      op_expression = "x"; 
     18    else if(input_op_expression == "div") 
     19      op_expression = "/"; 
     20    else if(input_op_expression == "eq") 
     21      op_expression = "="; 
     22    else if(input_op_expression == "lt") 
     23      op_expression = "<"; 
     24    else if(input_op_expression == "gt") 
     25      op_expression = ">"; 
     26    else if(input_op_expression == "le") 
     27      op_expression = "<="; 
     28    else if(input_op_expression == "ge") 
     29      op_expression = ">="; 
     30    else if(input_op_expression == "ne") 
     31      op_expression = "!="; 
     32    else 
     33      op_expression = " "; 
     34  }; 
    1035 
    1136  CDataPacketPtr CScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    1237  { 
     38    if(this->tag) 
     39    { 
     40      this->filterID = InvalidableObject::filterIdGenerator++;     
     41 
     42      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     43      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     44 
     45      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "S "+op_expression +" F Filter" ; 
     46      std::cout<<"CScalarFieldArithmeticFilter::apply filter tag = "<<this->tag<<std::endl; 
     47 
     48      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     49      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     50      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     51    } 
     52 
    1353    CDataPacketPtr packet(new CDataPacket); 
    1454    packet->date = data[0]->date; 
    1555    packet->timestamp = data[0]->timestamp; 
    1656    packet->status = data[0]->status; 
     57    if(this->tag) packet->src_filterID = this->filterID; 
     58    packet->fieldID = this->output_field_id; 
    1759 
    1860    if (packet->status == CDataPacket::NO_ERROR) 
     
    2668    , op(operatorExpr.getOpFieldScalar(op)) 
    2769    , value(value) 
    28   { /* Nothing to do */ }; 
     70  {  
     71    StdString input_op_expression=op; 
     72    if(input_op_expression == "add") 
     73      op_expression = "+"; 
     74    else if(input_op_expression == "minus") 
     75      op_expression = "-"; 
     76    else if(input_op_expression == "mult") 
     77      op_expression = "x"; 
     78    else if(input_op_expression == "div") 
     79      op_expression = "/"; 
     80    else if(input_op_expression == "pow") 
     81      op_expression = "^"; 
     82    else if(input_op_expression == "eq") 
     83      op_expression = "="; 
     84    else if(input_op_expression == "lt") 
     85      op_expression = "<"; 
     86    else if(input_op_expression == "gt") 
     87      op_expression = ">"; 
     88    else if(input_op_expression == "le") 
     89      op_expression = "<="; 
     90    else if(input_op_expression == "ge") 
     91      op_expression = ">="; 
     92    else if(input_op_expression == "ne") 
     93      op_expression = "!="; 
     94    else 
     95      op_expression = " "; 
     96  }; 
    2997 
    3098  CDataPacketPtr CFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    3199  { 
     100    if(this->tag) 
     101    { 
     102      this->filterID = InvalidableObject::filterIdGenerator++; 
     103  
     104      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     105      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     106 
     107      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "F "+op_expression +" S Filter" ; 
     108      std::cout<<"CFieldScalarArithmeticFilter::apply filter tag = "<<this->tag<<std::endl; 
     109 
     110      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     111      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     112      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     113    } 
     114 
    32115    CDataPacketPtr packet(new CDataPacket); 
    33116    packet->date = data[0]->date; 
    34117    packet->timestamp = data[0]->timestamp; 
    35118    packet->status = data[0]->status; 
     119    if(this->tag) packet->src_filterID = this->filterID; 
     120    packet->fieldID = this->output_field_id; 
     121     
    36122 
    37123    if (packet->status == CDataPacket::NO_ERROR) 
     
    44130    : CFilter(gc, 2, this) 
    45131    , op(operatorExpr.getOpFieldField(op)) 
    46   { /* Nothing to do */ }; 
     132  {  
     133    StdString input_op_expression=op; 
     134    if(input_op_expression == "add") 
     135      op_expression = "+"; 
     136    else if(input_op_expression == "minus") 
     137      op_expression = "-"; 
     138    else if(input_op_expression == "mult") 
     139      op_expression = "x"; 
     140    else if(input_op_expression == "div") 
     141      op_expression = "/"; 
     142    else if(input_op_expression == "pow") 
     143      op_expression = "^"; 
     144    else if(input_op_expression == "eq") 
     145      op_expression = "="; 
     146    else if(input_op_expression == "lt") 
     147      op_expression = "<"; 
     148    else if(input_op_expression == "gt") 
     149      op_expression = ">"; 
     150    else if(input_op_expression == "le") 
     151      op_expression = "<="; 
     152    else if(input_op_expression == "ge") 
     153      op_expression = ">="; 
     154    else if(input_op_expression == "ne") 
     155      op_expression = "!="; 
     156    else 
     157      op_expression = " "; 
     158  }; 
    47159 
    48160  CDataPacketPtr CFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    49161  { 
     162    if(this->tag) 
     163    { 
     164 
     165      this->filterID = InvalidableObject::filterIdGenerator++; 
     166    
     167      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     168      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     169 
     170      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "F "+op_expression +" F Filter" ; 
     171      std::cout<<"CFieldFieldArithmeticFilter::apply filter tag = "<<this->tag<<std::endl; 
     172 
     173      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     174      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     175      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     176 
     177      str = data[1]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     178 
     179      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[1]->src_filterID); 
     180      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     181    } 
     182 
     183 
    50184    CDataPacketPtr packet(new CDataPacket); 
    51185    packet->date = data[0]->date; 
    52186    packet->timestamp = data[0]->timestamp; 
     187    if(this->tag) packet->src_filterID = this->filterID; 
     188    packet->fieldID = this->output_field_id; 
     189     
    53190 
    54191    if (data[0]->status != CDataPacket::NO_ERROR) 
     
    64201    return packet; 
    65202  } 
     203 
     204  StdString CScalarFieldArithmeticFilter::GetName(void)    { return StdString("CScalarFieldArithmeticFilter"); } 
     205  StdString CFieldScalarArithmeticFilter::GetName(void)    { return StdString("CFieldScalarArithmeticFilter"); } 
     206  StdString CFieldFieldArithmeticFilter::GetName(void)     { return StdString("CFieldFieldArithmeticFilter"); } 
     207 
     208 
    66209} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/binary_arithmetic_filter.hpp

    r1542 r1677  
    2323       */ 
    2424      CScalarFieldArithmeticFilter(CGarbageCollector& gc, const std::string& op, double value); 
     25      StdString virtual GetName(void); 
    2526 
    2627    protected: 
     
    5152       */ 
    5253      CFieldScalarArithmeticFilter(CGarbageCollector& gc, const std::string& op, double value); 
     54      StdString virtual GetName(void); 
    5355 
    5456    protected: 
     
    7880       */ 
    7981      CFieldFieldArithmeticFilter(CGarbageCollector& gc, const std::string& op); 
     82      StdString virtual GetName(void); 
    8083 
    8184    protected: 
  • XIOS/dev/dev_trunk_omp/src/filter/data_packet.hpp

    r1542 r1677  
    2626    Time timestamp;         //!< Timestamp of the data 
    2727    StatusCode status;      //!< Status of the packet 
    28  
     28    int src_filterID; 
     29    std::vector<int> filterIDoutputs; 
     30    StdString fieldID; 
     31     
    2932    /*! 
    3033     * Creates a deep copy of the packet. 
  • XIOS/dev/dev_trunk_omp/src/filter/file_server_writer_filter.cpp

    r1668 r1677  
    1616  void CFileServerWriterFilter::onInputReady(std::vector<CDataPacketPtr> data) 
    1717  { 
     18    if(CXios::isClient) 
     19    std::cout<<"CFileServerWriterFilter::onInputReady"<<std::endl; 
    1820    field->writeUpdateData(data[0]->data); 
    1921  } 
     
    2931  } 
    3032 
    31   int CFileServerWriterFilter::getFilterId(void) 
    32   { 
    33     return filterId; 
    34   } 
    3533 
    3634} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/file_server_writer_filter.hpp

    r1668 r1677  
    3737      bool virtual isDataExpected(const CDate& date) const; 
    3838 
    39       /*! 
    40        * Returns filter's id needed in case of building workflow graph 
    41        */ 
    42       int getFilterId(); 
    4339 
    4440    protected: 
  • XIOS/dev/dev_trunk_omp/src/filter/file_writer_filter.cpp

    r1671 r1677  
    44#include "utils.hpp" 
    55#include "workflow_graph.hpp" 
     6#include "graphviz.hpp" 
     7 
     8using namespace ep_lib; 
    69 
    710namespace xios 
     
    1417      ERROR("CFileWriterFilter::CFileWriterFilter(CField* field)", 
    1518            "The field cannot be null."); 
    16     if (buildWorkflowGraph) 
    17     { 
    18       filterId = InvalidableObject::count; 
    19       InvalidableObject::count++; 
    20     } 
    2119  } 
    2220 
    2321  void CFileWriterFilter::onInputReady(std::vector<CDataPacketPtr> data) 
    2422  { 
     23    if(this->tag) 
     24    { 
     25      this->filterID = InvalidableObject::filterIdGenerator++; 
     26 
     27      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     28      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     29 
     30      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "File Writer Filter"; 
     31 
     32      if(CXios::isClient) std::cout<<"CFileWriterFilter::apply filter tag = "<<this->tag<<std::endl; 
     33 
     34      if(CXios::isClient)  
     35      { 
     36        StdString str = this->output_field_id +" ts=" + to_string(data[0]->timestamp); 
     37        // StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     38        (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     39        (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID);   
     40      } 
     41    } 
     42     
    2543    const bool detectMissingValue = ( !field->default_value.isEmpty() && 
    2644                               ( (!field->detect_missing_value.isEmpty() || field->detect_missing_value == true) 
     
    4159 
    4260    field->sendUpdateData(dataArray); 
     61     
     62     
     63     
    4364  } 
    4465 
     
    5374  } 
    5475 
    55   int CFileWriterFilter::getFilterId(void) 
    56   { 
    57     return filterId; 
    58   } 
     76 
    5977 
    6078} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/file_writer_filter.hpp

    r1671 r1677  
    1515  { 
    1616    public: 
     17      int tag; 
     18      StdString output_field_id; 
    1719      /*! 
    1820       * Constructs the filter (with one input slot) associated to the specified field 
     
    4345      bool virtual isDataExpected(const CDate& date) const; 
    4446 
    45       /*! 
    46        * Returns filter's id needed in case of building workflow graph 
    47        */ 
    48       int getFilterId(); 
     47 
     48      int filterID; 
     49 
    4950 
    5051    protected: 
     
    5960      CField* field; //<! The associated field 
    6061      std::map<Time, CDataPacketPtr> packets; //<! The stored packets 
    61       int filterId;                           //<! Filter's id needed in case of building a workflow 
    6262 
    6363  }; // class CFileWriterFilter 
  • XIOS/dev/dev_trunk_omp/src/filter/filter.cpp

    r1668 r1677  
    1414  void CFilter::onInputReady(std::vector<CDataPacketPtr> data) 
    1515  { 
     16    // std::cout<<"CFilter::onInputReady"<<std::endl; 
    1617    CDataPacketPtr outputPacket = engine->apply(data); 
    1718    if (outputPacket) 
     19    { 
     20      // std::cout<<"Filter onOutputReady"<<std::endl; 
    1821      onOutputReady(outputPacket); 
     22    }  
    1923  } 
    2024 
     
    5256    return COutputPin::isDataExpected(date); 
    5357  } 
     58 
     59   
    5460} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/filter.hpp

    r1668 r1677  
    6565      bool virtual isDataExpected(const CDate& date) const; 
    6666 
     67       
     68       
     69 
     70      int filterID; 
     71      StdString op_expression; 
     72 
    6773    protected: 
    6874      IFilterEngine* engine; //!< The filter engine, might be the filter itself 
  • XIOS/dev/dev_trunk_omp/src/filter/garbage_collector.cpp

    r1668 r1677  
    33namespace xios 
    44{ 
    5  
    6   int InvalidableObject::count = 0; 
    7   #pragma omp threadprivate(InvalidableObject::count) 
     5  int InvalidableObject::filterIdGenerator = 0; 
     6  #pragma omp threadprivate(InvalidableObject::filterIdGenerator) 
    87   
    98  void CGarbageCollector::registerObject(InvalidableObject* Object, Time timestamp) 
  • XIOS/dev/dev_trunk_omp/src/filter/garbage_collector.hpp

    r1671 r1677  
    2121    void virtual invalidate(Time timestamp) = 0; 
    2222   
    23     static int count; //!< Counter used to identify a filter in case building workflow graph 
    24     #pragma omp threadprivate(count) 
     23     
     24 
     25    static int filterIdGenerator; 
     26    #pragma omp threadprivate(filterIdGenerator) 
     27 
     28     
    2529     
    2630  }; // struct InvalidableObject 
  • XIOS/dev/dev_trunk_omp/src/filter/input_pin.cpp

    r1668 r1677  
    1212    , triggers(slotsCount) 
    1313    , hasTriggers(false) 
    14   {   } 
     14  { 
     15    // parent_filters = new std::list< std::shared_ptr<COutputPin> >; 
     16  } 
    1517 
    1618  StdString CInputPin::GetName(void) 
     
    4244      gc.unregisterObject(this, packet->timestamp); 
    4345      onInputReady(it->second.packets); 
     46      //if(CXios::isClient) std::cout<<"setInput timestamp = "<<packet->timestamp<<std::endl; 
    4447      inputs.erase(it); 
    4548    } 
     
    8487  } 
    8588 
    86   int CInputPin::getFilterId(void) 
    87   { 
    88     return -1; 
    89   } 
     89 
     90   
     91 
    9092 
    9193} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/input_pin.hpp

    r1668 r1677  
    1818  { 
    1919    public: 
     20 
    2021      /*! 
    2122       * Constructs an input pin with the specified number of slots 
     
    8283      void virtual invalidate(Time timestamp); 
    8384       
    84       /*! 
    85        * Returns filter's id needed in case of building workflow graph 
    86        * This function should never be called from this class, instead functions defined in derived classes or in class COutputPin should be used 
    87        */ 
    88       int virtual getFilterId(); 
    8985 
    9086    protected: 
  • XIOS/dev/dev_trunk_omp/src/filter/output_pin.cpp

    r1671 r1677  
    55namespace xios 
    66{ 
     7 
     8 
    79  COutputPin::COutputPin(CGarbageCollector& gc, bool manualTrigger /*= false*/, bool buildWorkflowGraph /* =false */) 
    810    : gc(gc) 
     
    1012    , buildWorkflowGraph(buildWorkflowGraph) 
    1113  { 
    12     if (buildWorkflowGraph) 
    13     { 
    14       filterId = InvalidableObject::count; 
    15       InvalidableObject::count++; 
    16     } 
     14    // parent_filter = new std::vector< void >(0); 
    1715  } 
    1816 
     
    4038            "The packet cannot be null."); 
    4139 
    42     if (buildWorkflowGraph) 
    43     { 
    44         if(CWorkflowGraph::mapFilterTimestamps_ptr==0) CWorkflowGraph::mapFilterTimestamps_ptr = new std::unordered_map <int, vector<Time> >; 
    45       if(CWorkflowGraph::timestamps_ptr==0) CWorkflowGraph::timestamps_ptr = new set<Time>; 
    46       (*CWorkflowGraph::mapFilterTimestamps_ptr)[this->getFilterId()].push_back(packet->timestamp); 
    47         CWorkflowGraph::timestamps_ptr->insert(packet->timestamp); 
    48     } 
     40    //if(CXios::isClient) std::cout<<"onOutputReady"<<std::endl;  
     41   
    4942 
    5043    if (manualTrigger) // Don't use canBeTriggered here, this function is virtual and can be overriden 
     
    123116  } 
    124117 
    125   int COutputPin::getFilterId(void) 
     118  void COutputPin::setParentFiltersTag() 
    126119  { 
    127     return filterId; 
     120    for(int i=0; i<parent_filters.size(); i++) 
     121    { 
     122      parent_filters[i]->tag += tag; 
     123      parent_filters[i]->setParentFiltersTag(); 
     124    } 
    128125  } 
    129126 
     127 
    130128} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/output_pin.hpp

    r1668 r1677  
    77namespace xios 
    88{ 
     9  class CInputPin; 
     10  class CFilter; 
    911  /*! 
    1012   * An output pin handles the connections with downstream filters. 
     
    1315  { 
    1416    public: 
     17      StdString output_field_id; 
     18      int tag; 
     19 
     20      std::vector< std::shared_ptr<COutputPin> > parent_filters; 
     21 
    1522      /*! 
    1623       * Constructs an ouput pin with manual or automatic trigger 
     
    6976      void virtual invalidate(Time timestamp); 
    7077 
    71       /*! 
    72        * Returns filter's id needed in case of building workflow graph 
    73        */ 
    74       int getFilterId(); 
     78      void virtual setParentFiltersTag(); 
     79 
    7580 
    7681    protected: 
     
    109114      bool buildWorkflowGraph; 
    110115 
    111       //! Filter's id needed in case of building a workflow graph 
    112       int filterId; 
    113  
    114116 
    115117  }; // class COutputPin 
  • XIOS/dev/dev_trunk_omp/src/filter/pass_through_filter.cpp

    r1668 r1677  
    11#include "pass_through_filter.hpp" 
     2#include "workflow_graph.hpp" 
    23 
    34namespace xios 
     
    56  CPassThroughFilter::CPassThroughFilter(CGarbageCollector& gc, bool buildWorkflowGraph /*= false*/) 
    67    : CFilter(gc, 1, this, buildWorkflowGraph) 
    7   { /* Nothing to do */ } 
     8  {  
     9  } 
    810 
    911  CDataPacketPtr CPassThroughFilter::apply(std::vector<CDataPacketPtr> data) 
    1012  { 
     13        if(this->tag) 
     14    { 
     15      std::cout<<"CPassThroughFilter::apply tag = "<<this->tag<<std::endl; 
     16      this->filterID = InvalidableObject::filterIdGenerator++; 
     17      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     18 
     19      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "Pass Through Filter"; 
     20 
     21      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     22       
     23      StdString str = data[0]->fieldID + " ts=" + to_string(data[0]->timestamp); 
     24      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     25      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     26      data[0]->src_filterID=this->filterID; 
     27 
     28    } 
     29     
     30    data[0]->fieldID = this->output_field_id; 
     31     
     32 
    1133    return data[0]; 
    1234  } 
     35 
    1336} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/source_filter.cpp

    r1668 r1677  
    2626  } 
    2727 
     28   
    2829  template <int N> 
    29   void CSourceFilter::streamData(CDate date, const CArray<double, N>& data) 
     30  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data, const StdString field_id) 
    3031  { 
    3132    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter 
     
    6263    } 
    6364 
     65    packet->fieldID = field_id; 
     66    this->output_field_id = field_id; 
     67 
     68 
     69    if(this->tag) 
     70    { 
     71      this->filterID.first = InvalidableObject::filterIdGenerator++;   
     72      packet->src_filterID=this->filterID.first; 
     73       
     74       
     75      if(CXios::isClient) std::cout<<"source filter tag = "<<this->tag<<std::endl; 
     76     
     77      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     78 
     79      (*CWorkflowGraph::mapFilters_ptr)[this->filterID.first] = "Source Filter"; 
     80 
     81      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     82 
     83 
     84    } 
     85 
     86 
     87    onOutputReady(packet); 
     88  } 
     89 
     90  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data, const StdString field_id); 
     91  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data, const StdString field_id); 
     92  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data, const StdString field_id); 
     93  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data, const StdString field_id); 
     94  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data, const StdString field_id); 
     95  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data, const StdString field_id); 
     96  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data, const StdString field_id); 
     97 
     98 
     99  template <int N> 
     100  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data) 
     101  { 
     102    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter 
     103 
     104    CDataPacketPtr packet(new CDataPacket); 
     105    packet->date = date; 
     106    packet->timestamp = date; 
     107    packet->status = CDataPacket::NO_ERROR; 
     108 
     109    packet->data.resize(grid->storeIndex_client.numElements());     
     110     
     111    if (compression) 
     112    { 
     113      packet->data = defaultValue; 
     114      grid->uncompressField(data, packet->data);     
     115    } 
     116    else 
     117    { 
     118      if (mask) 
     119        grid->maskField(data, packet->data); 
     120      else 
     121        grid->inputField(data, packet->data); 
     122    } 
     123    // Convert missing values to NaN 
     124    if (hasMissingValue) 
     125    { 
     126      const double nanValue = std::numeric_limits<double>::quiet_NaN(); 
     127      const size_t nbData = packet->data.numElements(); 
     128      for (size_t idx = 0; idx < nbData; ++idx) 
     129      { 
     130        if (defaultValue == packet->data(idx)) 
     131          packet->data(idx) = nanValue; 
     132      } 
     133    } 
     134    this->filterID.first = InvalidableObject::filterIdGenerator++; 
     135    packet->src_filterID=this->filterID.first; 
     136 
     137    if(CXios::isClient) std::cout<<"source filter filter tag = "<<this->tag<<std::endl; 
     138     
     139    if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     140 
     141    (*CWorkflowGraph::mapFilters_ptr)[this->filterID.first] = "Source Filter"; 
     142 
     143 
     144 
    64145    onOutputReady(packet); 
    65146  } 
     
    73154  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data); 
    74155 
     156 
     157 
    75158  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data) 
    76159  { 
     
    121204    onOutputReady(packet); 
    122205  } 
     206 
     207   
    123208} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/source_filter.hpp

    r1668 r1677  
    5050      void streamData(CDate date, const CArray<double, N>& data); 
    5151 
     52      template <int N> 
     53      void streamData(CDate date, const CArray<double, N>& data, const StdString field_id); 
     54 
    5255      /*! 
    5356       * Transforms the data received from the server into a packet and send it 
     
    6669       */ 
    6770      void signalEndOfStream(CDate date); 
     71      std::pair<int, int> filterID; 
     72      std::vector<int> filterIDoutputs; 
    6873 
    6974    private: 
     
    7479      const bool compression ; //!< indicates if data need to be compressed : on client side : true, on server side : false 
    7580      const bool mask ;        //!< indicates whether grid mask should be applied (true for clients, false for servers) 
     81 
    7682  }; // class CSourceFilter 
    7783} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/spatial_transform_filter.cpp

    r1671 r1677  
    5454      { 
    5555        filter->connectOutput(firstFilter, 0); 
    56         if (buildWorkflowGraph) 
    57         { 
    58           if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
    59           if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
    60           int filterOut = (std::static_pointer_cast<COutputPin>(filter))->getFilterId(); 
    61           int filterIn = (std::static_pointer_cast<COutputPin>(firstFilter))->getFilterId(); 
    62           // PASS field's id here 
    63           (*CWorkflowGraph::mapFieldToFilters_ptr)["XXX"].push_back(filterOut); 
    64           (*CWorkflowGraph::mapFieldToFilters_ptr)["XXX"].push_back(filterIn); 
    65           (*CWorkflowGraph::mapFilters_ptr)[filterOut] = "Spatial transform filter"; 
    66           (*CWorkflowGraph::mapFilters_ptr)[filterIn] = "Spatial transform filter"; 
    67           std::cout<<"CSpatialTransformFilter::CSpatialTransformFilter CWorkflowGraph::mapFieldToFilters_ptr->size = "<<CWorkflowGraph::mapFieldToFilters_ptr->size()<<std::endl; 
    68         } 
    6956      } 
    7057 
     
    8673  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data) 
    8774  { 
     75    // if(CXios::isClient) std::cout<<"CSpatialTransformFilter onInputReady"<<std::endl; 
     76 
    8877    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine); 
    89     CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue); 
     78    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->output_field_id); 
    9079    if (outputPacket) 
     80    { 
     81      // std::cout<<"Spatial Transform Filter onOutputReady"<<std::endl; 
    9182      onOutputReady(outputPacket); 
     83    }  
    9284  } 
    9385 
     
    120112  void CSpatialTemporalFilter::onInputReady(std::vector<CDataPacketPtr> data) 
    121113  { 
     114    if(CXios::isClient) std::cout<<"CSpatialTemporalFilter onInputReady"<<std::endl; 
     115 
    122116    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine); 
    123     CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue); 
     117    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->output_field_id); 
    124118 
    125119    if (outputPacket) 
     
    146140        packet->data.resize(tmpData.numElements()); 
    147141        packet->data = tmpData; 
     142        std::cout<<"Spatial temporal filter onOutputReady"<<std::endl; 
    148143        onOutputReady(packet); 
    149144        tmpData.resize(0) ; 
     
    187182  } 
    188183 
    189   CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue) 
    190   { 
     184  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue, int tag, StdString fieldID) 
     185  { 
     186    if(tag) 
     187    { 
     188      this->filterID = InvalidableObject::filterIdGenerator++; 
     189     
     190      std::cout<<"CSpatialTransformFilter::apply filter tag = "<<tag<<std::endl; 
     191     
     192 
     193      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     194 
     195      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "Spatial Transform Filter"; 
     196 
     197      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     198 
     199      StdString str = data[0]->fieldID + " ts=" + to_string(data[0]->timestamp); 
     200      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     201      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     202    } 
     203 
     204 
    191205    CDataPacketPtr packet(new CDataPacket); 
    192206    packet->date = data[0]->date; 
     
    207221      apply(data[0]->data, packet->data); 
    208222    } 
     223 
     224    if(tag) packet->src_filterID=this->filterID; 
     225    packet->fieldID=fieldID; 
    209226 
    210227    return packet; 
  • XIOS/dev/dev_trunk_omp/src/filter/spatial_transform_filter.hpp

    r1668 r1677  
    101101  { 
    102102    public: 
     103 
     104      int filterID; 
     105      int tag; 
    103106      /*! 
    104107       * Returns the engine wrapping the specified grid transformation. 
     
    117120       * \return the result of the grid transformation 
    118121       */ 
    119       CDataPacketPtr applyFilter(std::vector<CDataPacketPtr> data, double defaultValue = 0); 
     122      CDataPacketPtr applyFilter(std::vector<CDataPacketPtr> data, double defaultValue = 0, int tag=0, StdString fieldID=0); 
    120123 
    121124       /*! 
  • XIOS/dev/dev_trunk_omp/src/filter/store_filter.cpp

    r1668 r1677  
    9090  void CStoreFilter::onInputReady(std::vector<CDataPacketPtr> data) 
    9191  { 
     92    if(CXios::isClient) std::cout<<"CStoreFilter onInputReady"<<std::endl; 
     93 
     94    if(this->tag) 
     95    { 
     96      this->filterID = InvalidableObject::filterIdGenerator++; 
     97 
     98      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     99      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     100 
     101      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "Store Filter"; 
     102 
     103      if(CXios::isClient) std::cout<<"CStoreFilter::apply filter tag = "<<this->tag<<std::endl; 
     104 
     105      if(CXios::isClient)  
     106      { 
     107        StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     108        (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     109        (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID);   
     110      } 
     111    } 
     112 
     113 
    92114 
    93115    CDataPacketPtr packet; 
     
    139161  } 
    140162 
    141   int CStoreFilter::getFilterId(void) 
    142   { 
    143     return filterId; 
    144   } 
    145163 
    146164} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/store_filter.hpp

    r1668 r1677  
    33 
    44#include "input_pin.hpp" 
     5#include "workflow_graph.hpp" 
     6 
    57 
    68namespace xios 
     
    7375      void virtual invalidate(Time timestamp); 
    7476 
    75       /*! 
    76        * Returns filter's id needed in case of building workflow graph 
    77        */ 
    78       int getFilterId(); 
     77      int filterID; 
     78      int tag; 
     79 
    7980 
    8081    protected: 
     
    9394      const double missingValue; //!< The value to use to replace missing values 
    9495      std::map<Time, CDataPacketPtr> packets; //<! The stored packets 
    95       int filterId;                     //!< Filter's id needed in case of building a workflow 
     96 
    9697 
    9798  }; // class CStoreFilter 
  • XIOS/dev/dev_trunk_omp/src/filter/temporal_filter.cpp

    r1668 r1677  
    22#include "functor_type.hpp" 
    33#include "calendar_util.hpp" 
     4#include "workflow_graph.hpp" 
    45 
    56namespace xios 
     
    3233  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data) 
    3334  { 
     35    
     36    if(this->tag) 
     37    { 
     38      if(this->filterIDoutputs.size()==0) this->filterID = InvalidableObject::filterIdGenerator++; 
     39       
     40 
     41      std::cout<<"CTemporalFilter::apply filter tag = "<<this->tag<<std::endl; 
     42 
     43      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     44 
     45      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     46      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     47      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     48        
     49     
     50      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     51 
     52      if(this->filterIDoutputs.size()==0) (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "Temporal Filter";   
     53 
     54      this->filterIDoutputs.push_back(data[0]->src_filterID);  
     55    } 
     56     
     57     
    3458    CDataPacketPtr packet; 
    3559 
     
    4266      { 
    4367        usePacket = (data[0]->date >= nextSamplingDate); 
    44 //        outputResult = (data[0]->date + samplingFreq > nextOperationDate); 
    4568        outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth); 
    4669        copyLess = (isInstantOperation && usePacket && outputResult); 
     
    79102 
    80103        isFirstOperation = false; 
    81 //        nextOperationDate = initDate + samplingFreq + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth; 
     104        if(this->tag) packet->src_filterID=this->filterID; 
     105        packet->fieldID = this->output_field_id; 
     106        if(this->tag) this->filterIDoutputs.clear(); 
    82107      } 
    83108    } 
  • XIOS/dev/dev_trunk_omp/src/filter/temporal_filter.hpp

    r1668 r1677  
    5454       */ 
    5555      bool virtual isDataExpected(const CDate& date) const; 
     56      std::vector<int> filterIDoutputs; 
    5657 
    5758    private: 
     
    7273//      CDate nextOperationDate; //!< The date of the next operation 
    7374      bool isFirstOperation; //!< True before the first operation was been computed 
     75 
    7476  }; // class CTemporalFilter 
    7577} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/ternary_arithmetic_filter.cpp

    r1162 r1677  
    11#include "ternary_arithmetic_filter.hpp" 
     2#include "workflow_graph.hpp" 
    23 
    34namespace xios 
     
    1213  CDataPacketPtr CScalarScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    1314  { 
    14     CDataPacketPtr packet(new CDataPacket); 
    15     packet->date = data[0]->date; 
    16     packet->timestamp = data[0]->timestamp; 
    17     packet->status = data[0]->status; 
     15    if(this->tag) 
     16    { 
     17      this->filterID = InvalidableObject::filterIdGenerator++; 
     18     
     19 
     20      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     21      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     22 
     23      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "SSF Filter" ; 
     24      std::cout<<"CScalarScalarFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     25 
     26      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     27      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     28      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     29    } 
     30 
     31    CDataPacketPtr packet(new CDataPacket); 
     32    packet->date = data[0]->date; 
     33    packet->timestamp = data[0]->timestamp; 
     34    packet->status = data[0]->status; 
     35    if(this->tag) packet->src_filterID = this->filterID; 
     36    packet->fieldID = this->output_field_id; 
    1837 
    1938    if (packet->status == CDataPacket::NO_ERROR) 
     
    3251  CDataPacketPtr CScalarFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    3352  { 
    34     CDataPacketPtr packet(new CDataPacket); 
    35     packet->date = data[0]->date; 
    36     packet->timestamp = data[0]->timestamp; 
    37     packet->status = data[0]->status; 
     53    if(this->tag) 
     54    { 
     55      this->filterID = InvalidableObject::filterIdGenerator++; 
     56     
     57 
     58      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     59      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     60 
     61      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "SFS Filter" ; 
     62      std::cout<<"CScalarFieldScalarArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     63 
     64      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     65      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     66      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     67    } 
     68 
     69    CDataPacketPtr packet(new CDataPacket); 
     70    packet->date = data[0]->date; 
     71    packet->timestamp = data[0]->timestamp; 
     72    packet->status = data[0]->status; 
     73    if(this->tag) packet->src_filterID = this->filterID; 
     74    packet->fieldID = this->output_field_id; 
    3875 
    3976    if (packet->status == CDataPacket::NO_ERROR) 
     
    5188  CDataPacketPtr CScalarFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    5289  { 
    53     CDataPacketPtr packet(new CDataPacket); 
    54     packet->date = data[0]->date; 
    55     packet->timestamp = data[0]->timestamp; 
    56     packet->status = data[0]->status; 
     90    if(this->tag) 
     91    { 
     92      this->filterID = InvalidableObject::filterIdGenerator++; 
     93     
     94 
     95      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     96      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     97 
     98      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "SFF Filter" ; 
     99      std::cout<<"CScalarFieldFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     100 
     101      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     102      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     103      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     104 
     105      str = data[1]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     106      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[1]->src_filterID); 
     107      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     108    } 
     109 
     110    CDataPacketPtr packet(new CDataPacket); 
     111    packet->date = data[0]->date; 
     112    packet->timestamp = data[0]->timestamp; 
     113    packet->status = data[0]->status; 
     114    if(this->tag) packet->src_filterID = this->filterID; 
     115    packet->fieldID = this->output_field_id; 
    57116 
    58117    if (data[0]->status != CDataPacket::NO_ERROR) 
     
    79138  CDataPacketPtr CFieldScalarScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    80139  { 
    81     CDataPacketPtr packet(new CDataPacket); 
    82     packet->date = data[0]->date; 
    83     packet->timestamp = data[0]->timestamp; 
    84     packet->status = data[0]->status; 
     140    if(this->tag) 
     141    { 
     142      this->filterID = InvalidableObject::filterIdGenerator++; 
     143     
     144 
     145      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     146      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     147 
     148      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "FSS Filter" ; 
     149      std::cout<<"CFieldScalarScalarArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     150 
     151      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     152      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     153      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     154    } 
     155 
     156    CDataPacketPtr packet(new CDataPacket); 
     157    packet->date = data[0]->date; 
     158    packet->timestamp = data[0]->timestamp; 
     159    packet->status = data[0]->status; 
     160    if(this->tag) packet->src_filterID = this->filterID; 
     161    packet->fieldID = this->output_field_id; 
    85162 
    86163    if (packet->status == CDataPacket::NO_ERROR) 
     
    99176  CDataPacketPtr CFieldScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    100177  { 
    101     CDataPacketPtr packet(new CDataPacket); 
    102     packet->date = data[0]->date; 
    103     packet->timestamp = data[0]->timestamp; 
    104     packet->status = data[0]->status; 
     178    if(this->tag) 
     179    { 
     180      this->filterID = InvalidableObject::filterIdGenerator++; 
     181     
     182 
     183      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     184      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     185 
     186      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "FSF Filter" ; 
     187      std::cout<<"CFieldScalarFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     188 
     189      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     190      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     191      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     192 
     193      str = data[1]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     194      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[1]->src_filterID); 
     195      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     196    } 
     197 
     198    CDataPacketPtr packet(new CDataPacket); 
     199    packet->date = data[0]->date; 
     200    packet->timestamp = data[0]->timestamp; 
     201    packet->status = data[0]->status; 
     202    if(this->tag) packet->src_filterID = this->filterID; 
     203    packet->fieldID = this->output_field_id; 
    105204 
    106205    if (data[0]->status != CDataPacket::NO_ERROR) 
     
    124223  CDataPacketPtr CFieldFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    125224  { 
    126     CDataPacketPtr packet(new CDataPacket); 
    127     packet->date = data[0]->date; 
    128     packet->timestamp = data[0]->timestamp; 
    129     packet->status = data[0]->status; 
     225    if(this->tag) 
     226    { 
     227      this->filterID = InvalidableObject::filterIdGenerator++; 
     228     
     229 
     230      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     231      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     232 
     233      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "FFS Filter" ; 
     234      std::cout<<"CFieldFieldScalarArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     235 
     236      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     237      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     238      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     239 
     240      str = data[1]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     241      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[1]->src_filterID); 
     242      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     243    } 
     244 
     245    CDataPacketPtr packet(new CDataPacket); 
     246    packet->date = data[0]->date; 
     247    packet->timestamp = data[0]->timestamp; 
     248    packet->status = data[0]->status; 
     249    if(this->tag) packet->src_filterID = this->filterID; 
     250    packet->fieldID = this->output_field_id; 
    130251 
    131252    if (data[0]->status != CDataPacket::NO_ERROR) 
     
    149270  CDataPacketPtr CFieldFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    150271  { 
    151     CDataPacketPtr packet(new CDataPacket); 
    152     packet->date = data[0]->date; 
    153     packet->timestamp = data[0]->timestamp; 
    154     packet->status = data[0]->status; 
     272    if(this->tag) 
     273    { 
     274      this->filterID = InvalidableObject::filterIdGenerator++; 
     275     
     276 
     277      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     278      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     279 
     280      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "FFF Filter" ; 
     281      std::cout<<"CFieldFieldFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     282 
     283      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     284      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     285      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     286 
     287      str = data[1]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     288      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[1]->src_filterID); 
     289      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     290 
     291      str = data[2]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     292      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[2]->src_filterID); 
     293      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     294    } 
     295 
     296 
     297    CDataPacketPtr packet(new CDataPacket); 
     298    packet->date = data[0]->date; 
     299    packet->timestamp = data[0]->timestamp; 
     300    packet->status = data[0]->status; 
     301    if(this->tag) packet->src_filterID = this->filterID; 
     302    packet->fieldID = this->output_field_id; 
    155303 
    156304    if (data[0]->status != CDataPacket::NO_ERROR) 
  • XIOS/dev/dev_trunk_omp/src/filter/unary_arithmetic_filter.cpp

    r643 r1677  
    11#include "unary_arithmetic_filter.hpp" 
    2  
     2#include "workflow_graph.hpp" 
    33namespace xios 
    44{ 
     
    66    : CFilter(gc, 1, this) 
    77    , op(operatorExpr.getOpField(op)) 
    8   { /* Nothing to do */ }; 
     8  {  
     9    StdString input_op_expression=op; 
     10    if(input_op_expression == "neg") 
     11      op_expression = "-"; 
     12    else if(input_op_expression == "sin") 
     13      op_expression = "sin"; 
     14    else if(input_op_expression == "cos") 
     15      op_expression = "cos"; 
     16    else if(input_op_expression == "tan") 
     17      op_expression = "tan"; 
     18    else if(input_op_expression == "exp") 
     19      op_expression = "exp"; 
     20    else if(input_op_expression == "log") 
     21      op_expression = "log"; 
     22    else if(input_op_expression == "log10") 
     23      op_expression = "log10"; 
     24    else if(input_op_expression == "sqrt") 
     25      op_expression = "sqrt"; 
     26    else 
     27      op_expression = " "; 
     28  }; 
    929 
    1030  CDataPacketPtr CUnaryArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    1131  { 
     32    if(this->tag) 
     33    { 
     34      this->filterID = InvalidableObject::filterIdGenerator++; 
     35       
     36      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     37      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     38 
     39 
     40      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = op_expression +" F Filter" ; 
     41      std::cout<<"CunaryArithmeticFilter::apply filter tag = "<<this->tag<<std::endl; 
     42 
     43      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     44      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     45      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID);    
     46   
     47    } 
     48 
    1249    CDataPacketPtr packet(new CDataPacket); 
    1350    packet->date = data[0]->date; 
    1451    packet->timestamp = data[0]->timestamp; 
    1552    packet->status = data[0]->status; 
     53    packet->fieldID = this->output_field_id; 
    1654 
    1755    if (packet->status == CDataPacket::NO_ERROR) 
    1856      packet->data.reference(op(data[0]->data)); 
    1957 
     58    if(this->tag) packet->src_filterID = this->filterID; 
     59 
    2060    return packet; 
    2161  } 
Note: See TracChangeset for help on using the changeset viewer.