Ignore:
Timestamp:
06/25/19 16:14:54 (5 years ago)
Author:
yushan
Message:

MARK: Dynamic workflow graph developement. Branch up to date with trunk @1663.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_trunk_omp/src/filter/ternary_arithmetic_filter.cpp

    r1162 r1677  
    11#include "ternary_arithmetic_filter.hpp" 
     2#include "workflow_graph.hpp" 
    23 
    34namespace xios 
     
    1213  CDataPacketPtr CScalarScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    1314  { 
    14     CDataPacketPtr packet(new CDataPacket); 
    15     packet->date = data[0]->date; 
    16     packet->timestamp = data[0]->timestamp; 
    17     packet->status = data[0]->status; 
     15    if(this->tag) 
     16    { 
     17      this->filterID = InvalidableObject::filterIdGenerator++; 
     18     
     19 
     20      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     21      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     22 
     23      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "SSF Filter" ; 
     24      std::cout<<"CScalarScalarFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     25 
     26      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     27      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     28      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     29    } 
     30 
     31    CDataPacketPtr packet(new CDataPacket); 
     32    packet->date = data[0]->date; 
     33    packet->timestamp = data[0]->timestamp; 
     34    packet->status = data[0]->status; 
     35    if(this->tag) packet->src_filterID = this->filterID; 
     36    packet->fieldID = this->output_field_id; 
    1837 
    1938    if (packet->status == CDataPacket::NO_ERROR) 
     
    3251  CDataPacketPtr CScalarFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    3352  { 
    34     CDataPacketPtr packet(new CDataPacket); 
    35     packet->date = data[0]->date; 
    36     packet->timestamp = data[0]->timestamp; 
    37     packet->status = data[0]->status; 
     53    if(this->tag) 
     54    { 
     55      this->filterID = InvalidableObject::filterIdGenerator++; 
     56     
     57 
     58      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     59      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     60 
     61      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "SFS Filter" ; 
     62      std::cout<<"CScalarFieldScalarArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     63 
     64      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     65      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     66      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     67    } 
     68 
     69    CDataPacketPtr packet(new CDataPacket); 
     70    packet->date = data[0]->date; 
     71    packet->timestamp = data[0]->timestamp; 
     72    packet->status = data[0]->status; 
     73    if(this->tag) packet->src_filterID = this->filterID; 
     74    packet->fieldID = this->output_field_id; 
    3875 
    3976    if (packet->status == CDataPacket::NO_ERROR) 
     
    5188  CDataPacketPtr CScalarFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    5289  { 
    53     CDataPacketPtr packet(new CDataPacket); 
    54     packet->date = data[0]->date; 
    55     packet->timestamp = data[0]->timestamp; 
    56     packet->status = data[0]->status; 
     90    if(this->tag) 
     91    { 
     92      this->filterID = InvalidableObject::filterIdGenerator++; 
     93     
     94 
     95      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     96      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     97 
     98      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "SFF Filter" ; 
     99      std::cout<<"CScalarFieldFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     100 
     101      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     102      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     103      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     104 
     105      str = data[1]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     106      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[1]->src_filterID); 
     107      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     108    } 
     109 
     110    CDataPacketPtr packet(new CDataPacket); 
     111    packet->date = data[0]->date; 
     112    packet->timestamp = data[0]->timestamp; 
     113    packet->status = data[0]->status; 
     114    if(this->tag) packet->src_filterID = this->filterID; 
     115    packet->fieldID = this->output_field_id; 
    57116 
    58117    if (data[0]->status != CDataPacket::NO_ERROR) 
     
    79138  CDataPacketPtr CFieldScalarScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    80139  { 
    81     CDataPacketPtr packet(new CDataPacket); 
    82     packet->date = data[0]->date; 
    83     packet->timestamp = data[0]->timestamp; 
    84     packet->status = data[0]->status; 
     140    if(this->tag) 
     141    { 
     142      this->filterID = InvalidableObject::filterIdGenerator++; 
     143     
     144 
     145      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     146      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     147 
     148      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "FSS Filter" ; 
     149      std::cout<<"CFieldScalarScalarArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     150 
     151      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     152      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     153      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     154    } 
     155 
     156    CDataPacketPtr packet(new CDataPacket); 
     157    packet->date = data[0]->date; 
     158    packet->timestamp = data[0]->timestamp; 
     159    packet->status = data[0]->status; 
     160    if(this->tag) packet->src_filterID = this->filterID; 
     161    packet->fieldID = this->output_field_id; 
    85162 
    86163    if (packet->status == CDataPacket::NO_ERROR) 
     
    99176  CDataPacketPtr CFieldScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    100177  { 
    101     CDataPacketPtr packet(new CDataPacket); 
    102     packet->date = data[0]->date; 
    103     packet->timestamp = data[0]->timestamp; 
    104     packet->status = data[0]->status; 
     178    if(this->tag) 
     179    { 
     180      this->filterID = InvalidableObject::filterIdGenerator++; 
     181     
     182 
     183      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     184      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     185 
     186      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "FSF Filter" ; 
     187      std::cout<<"CFieldScalarFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     188 
     189      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     190      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     191      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     192 
     193      str = data[1]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     194      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[1]->src_filterID); 
     195      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     196    } 
     197 
     198    CDataPacketPtr packet(new CDataPacket); 
     199    packet->date = data[0]->date; 
     200    packet->timestamp = data[0]->timestamp; 
     201    packet->status = data[0]->status; 
     202    if(this->tag) packet->src_filterID = this->filterID; 
     203    packet->fieldID = this->output_field_id; 
    105204 
    106205    if (data[0]->status != CDataPacket::NO_ERROR) 
     
    124223  CDataPacketPtr CFieldFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    125224  { 
    126     CDataPacketPtr packet(new CDataPacket); 
    127     packet->date = data[0]->date; 
    128     packet->timestamp = data[0]->timestamp; 
    129     packet->status = data[0]->status; 
     225    if(this->tag) 
     226    { 
     227      this->filterID = InvalidableObject::filterIdGenerator++; 
     228     
     229 
     230      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     231      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     232 
     233      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "FFS Filter" ; 
     234      std::cout<<"CFieldFieldScalarArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     235 
     236      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     237      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     238      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     239 
     240      str = data[1]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     241      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[1]->src_filterID); 
     242      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     243    } 
     244 
     245    CDataPacketPtr packet(new CDataPacket); 
     246    packet->date = data[0]->date; 
     247    packet->timestamp = data[0]->timestamp; 
     248    packet->status = data[0]->status; 
     249    if(this->tag) packet->src_filterID = this->filterID; 
     250    packet->fieldID = this->output_field_id; 
    130251 
    131252    if (data[0]->status != CDataPacket::NO_ERROR) 
     
    149270  CDataPacketPtr CFieldFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    150271  { 
    151     CDataPacketPtr packet(new CDataPacket); 
    152     packet->date = data[0]->date; 
    153     packet->timestamp = data[0]->timestamp; 
    154     packet->status = data[0]->status; 
     272    if(this->tag) 
     273    { 
     274      this->filterID = InvalidableObject::filterIdGenerator++; 
     275     
     276 
     277      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >; 
     278      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>; 
     279 
     280      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "FFF Filter" ; 
     281      std::cout<<"CFieldFieldFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
     282 
     283      StdString str = data[0]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     284      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID); 
     285      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     286 
     287      str = data[1]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     288      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[1]->src_filterID); 
     289      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     290 
     291      str = data[2]->fieldID +" ts=" + to_string(data[0]->timestamp); 
     292      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[2]->src_filterID); 
     293      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID); 
     294    } 
     295 
     296 
     297    CDataPacketPtr packet(new CDataPacket); 
     298    packet->date = data[0]->date; 
     299    packet->timestamp = data[0]->timestamp; 
     300    packet->status = data[0]->status; 
     301    if(this->tag) packet->src_filterID = this->filterID; 
     302    packet->fieldID = this->output_field_id; 
    155303 
    156304    if (data[0]->status != CDataPacket::NO_ERROR) 
Note: See TracChangeset for help on using the changeset viewer.