Ignore:
Timestamp:
07/19/19 15:28:33 (5 years ago)
Author:
yushan
Message:

MARK: Dynamic workflow graph developement. Branch up to date with trunk @1676. Bug fixed

Location:
XIOS/dev/dev_trunk_omp/src/filter
Files:
22 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_trunk_omp/src/filter/binary_arithmetic_filter.cpp

    r1680 r1681  
    1515  }; 
    1616 
    17   CDataPacketPtr CScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
     17  std::tuple<int, int, int> CScalarFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
    1818  { 
    1919    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    2020    // bool building_graph = true; 
    2121    int unique_filter_id; 
     22    bool firstround; 
    2223 
    2324    if(building_graph) 
     
    3031      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
    3132      { 
     33        firstround = true; 
    3234        this->filterID = InvalidableObject::filterIdGenerator++; 
    3335        int edgeID = InvalidableObject::edgeIdGenerator++; 
     
    3537        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
    3638        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
    37  
    38         (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes(); 
    39         if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->recordXiosAttributes(); 
     39        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     40 
     41 
     42        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     43        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
    4044       
    4145 
     
    5559      else  
    5660      { 
     61        firstround=false; 
    5762        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
    5863        if(data[0]->src_filterID != unique_filter_id) 
     
    6368          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
    6469        } 
    65        
    66          
    6770      }   
    6871    } 
    6972 
     73    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     74  } 
     75 
     76 
     77  CDataPacketPtr CScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
     78  { 
     79     
    7080    CDataPacketPtr packet(new CDataPacket); 
    7181    packet->date = data[0]->date; 
    7282    packet->timestamp = data[0]->timestamp; 
    7383    packet->status = data[0]->status; 
    74     if(building_graph) packet->src_filterID = unique_filter_id; 
     84 
     85    std::tuple<int, int, int> graph = buildGraph(data); 
     86 
     87    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     88    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     89    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     90 
    7591    packet->field = this->field; 
    7692 
     
    89105  }; 
    90106 
    91   CDataPacketPtr CFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
     107  std::tuple<int, int, int> CFieldScalarArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
    92108  { 
    93109    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    94110    // bool building_graph = true; 
    95111    int unique_filter_id; 
     112    bool firstround; 
    96113 
    97114    if(building_graph) 
     
    104121      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
    105122      { 
     123        firstround = true; 
    106124        this->filterID = InvalidableObject::filterIdGenerator++; 
    107125        int edgeID = InvalidableObject::edgeIdGenerator++; 
     
    109127        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
    110128        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
    111  
    112         (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes(); 
    113         if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->recordXiosAttributes(); 
     129        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     130 
     131 
     132        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     133        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
    114134       
    115135 
     
    129149      else  
    130150      { 
     151        firstround=false; 
    131152        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
    132153        if(data[0]->src_filterID != unique_filter_id) 
     
    137158          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
    138159        } 
    139        
    140          
    141160      }   
    142161    } 
     162 
     163    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     164  } 
     165 
     166  CDataPacketPtr CFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
     167  { 
    143168 
    144169    CDataPacketPtr packet(new CDataPacket); 
     
    146171    packet->timestamp = data[0]->timestamp; 
    147172    packet->status = data[0]->status; 
    148     if(building_graph) packet->src_filterID = unique_filter_id; 
     173 
     174    std::tuple<int, int, int> graph = buildGraph(data); 
     175 
     176    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     177    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     178    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     179 
    149180    packet->field = this->field; 
    150      
    151181 
    152182    if (packet->status == CDataPacket::NO_ERROR) 
     
    163193  }; 
    164194 
    165   CDataPacketPtr CFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    166   { 
    167     bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    168     // bool building_graph = true; 
     195  std::tuple<int, int, int> CFieldFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
     196  { 
     197    bool building_graph = this->tag ? ((data[0]->timestamp >= this->field->field_graph_start && data[0]->timestamp <= this->field->field_graph_end) && (data[0]->timestamp == data[1]->timestamp)) : false; 
     198 
    169199    int unique_filter_id; 
     200 
     201    bool firstround; 
    170202 
    171203    if(building_graph) 
     
    173205      CWorkflowGraph::allocNodeEdge(); 
    174206 
    175       std::cout<<"CFieldFieldArithmeticFilter::apply filter tag = "<<this->tag<<std::endl; 
     207      // std::cout<<"CFieldFieldArithmeticFilter::apply filter tag = "<<this->tag<<std::endl; 
    176208 
    177209      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 
     
    180212      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
    181213      { 
     214        firstround = true; 
    182215        this->filterID = InvalidableObject::filterIdGenerator++; 
    183216        int edgeID = InvalidableObject::edgeIdGenerator++; 
    184217     
    185218        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
    186         (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes(); 
    187         if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->recordXiosAttributes(); 
     219        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     220        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     221 
     222        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
    188223     
    189224        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
     
    211246      else  
    212247      { 
     248        firstround = false; 
    213249        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
    214250        if(data[0]->src_filterID != unique_filter_id) 
     
    230266    } 
    231267 
     268    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     269  } 
     270 
     271  CDataPacketPtr CFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
     272  { 
    232273 
    233274    CDataPacketPtr packet(new CDataPacket); 
    234275    packet->date = data[0]->date; 
    235276    packet->timestamp = data[0]->timestamp; 
    236     if(building_graph) packet->src_filterID = unique_filter_id; 
     277 
     278    std::tuple<int, int, int> graph = buildGraph(data); 
     279 
     280    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     281    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     282    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     283     
    237284    packet->field = this->field; 
    238285     
  • XIOS/dev/dev_trunk_omp/src/filter/binary_arithmetic_filter.hpp

    r1680 r1681  
    66#include "operator_expr.hpp" 
    77#include <unordered_map> 
     8#include <tuple> 
    89 
    910namespace xios 
     
    3738       */ 
    3839      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     40      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
     41       
     42      // CDataPacketPtr virtual buildGraph_post(CDataPacketPtr packet, std::vector<CDataPacketPtr> data); 
    3943  }; // class CScalarFieldArithmeticFilter 
    4044 
     
    6771       */ 
    6872      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     73      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
    6974  }; // class CFieldScalarArithmeticFilter 
    7075 
     
    95100       */ 
    96101      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     102      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
    97103  }; // class CFieldFieldArithmeticFilter 
    98104} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/data_packet.hpp

    r1679 r1681  
    3030    std::vector<int> filterIDoutputs; 
    3131    CField *field; 
     32    int distance; 
    3233     
    3334    /*! 
  • XIOS/dev/dev_trunk_omp/src/filter/file_server_writer_filter.cpp

    r1677 r1681  
    1616  void CFileServerWriterFilter::onInputReady(std::vector<CDataPacketPtr> data) 
    1717  { 
    18     if(CXios::isClient) 
    19     std::cout<<"CFileServerWriterFilter::onInputReady"<<std::endl; 
    2018    field->writeUpdateData(data[0]->data); 
    2119  } 
  • XIOS/dev/dev_trunk_omp/src/filter/file_writer_filter.cpp

    r1680 r1681  
    1919  } 
    2020 
    21   void CFileWriterFilter::onInputReady(std::vector<CDataPacketPtr> data) 
     21  void CFileWriterFilter::buildGraph(std::vector<CDataPacketPtr> data) 
    2222  { 
    2323    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph: false; 
     
    2929 
    3030      CWorkflowGraph::allocNodeEdge(); 
     31      StdString namestring = to_string(this->field->name); 
     32      namestring.erase(0, 6); 
     33      namestring.erase(namestring.length()-1, 1); 
    3134 
    32       CWorkflowGraph::addNode(this->filterID, "File Writer Filter \\n("+this->field->file->getId()+".nc)", 6, 0, 1, data[0]); 
     35      CWorkflowGraph::addNode(this->filterID, namestring + "\\n("+this->field->file->getId()+".nc)", 6, 0, 1, data[0]); 
    3336 
    34       // (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->file->recordXiosAttributes(); 
    35       (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes(); 
    36       (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->recordXiosAttributes(); 
    37  
    38       // if(CXios::isClient) std::cout<<"CFileWriterFilter::apply filter tag = "<<this->tag<<" start = "<<start_graph<<" end = "<<end_graph<<std::endl; 
     37      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     38      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
     39      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].clusterID =1; 
     40      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = ++(data[0]->distance); 
    3941 
    4042      if(CXios::isClient && CWorkflowGraph::build_begin)  
     
    4749      else CWorkflowGraph::build_begin=true; 
    4850    } 
     51  } 
     52 
     53  void CFileWriterFilter::onInputReady(std::vector<CDataPacketPtr> data) 
     54  { 
     55    buildGraph(data); 
    4956     
    5057    const bool detectMissingValue = ( !field->default_value.isEmpty() && 
  • XIOS/dev/dev_trunk_omp/src/filter/file_writer_filter.hpp

    r1679 r1681  
    2121      CField* field; //<! The associated field 
    2222      int filterID; 
     23      int distance; 
    2324 
    2425      /*! 
     
    4748       */ 
    4849      bool virtual isDataExpected(const CDate& date) const; 
    49  
    50  
    5150       
    5251 
     
    5958       */ 
    6059      void virtual onInputReady(std::vector<CDataPacketPtr> data); 
     60      void virtual buildGraph(std::vector<CDataPacketPtr> data); 
    6161 
    6262    private: 
  • XIOS/dev/dev_trunk_omp/src/filter/output_pin.cpp

    r1680 r1681  
    129129  } 
    130130 
     131  int COutputPin::getDistance() 
     132  { 
     133    int distance = 0; 
     134    // for(int i=0; i<parent_filters.size(); i++) 
     135    // { 
     136    //   distance = max(distance, parent_filters[i]->getDistance()+1); 
     137    // } 
     138    return distance; 
     139  } 
     140 
    131141 
    132142} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/output_pin.hpp

    r1679 r1681  
    2323      Time end_graph; 
    2424      CField *field; 
     25      int distance; 
    2526 
    2627 
     
    8586 
    8687      void virtual setParentFiltersTag(); 
     88      int virtual getDistance(); 
    8789 
    8890 
  • XIOS/dev/dev_trunk_omp/src/filter/pass_through_filter.cpp

    r1680 r1681  
    1111  } 
    1212 
    13   CDataPacketPtr CPassThroughFilter::apply(std::vector<CDataPacketPtr> data) 
     13  void CPassThroughFilter::buildGraph(std::vector<CDataPacketPtr> data) 
    1414  { 
    15         bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     15    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     16    // bool building_graph = this->tag ? data[0]->timestamp >= this->field->field_graph_start && data[0]->timestamp <= this->field->field_graph_end : false; 
    1617 
    1718    if(building_graph) 
    1819    { 
    19       // std::cout<<"CPassThroughFilter::apply tag = "<<this->tag<<" start = "<<start_graph<<" end = "<<end_graph<<std::endl; 
     20      // std::cout<<"CPassThroughFilter::apply field_id = "<<this->field->getId()<<" start = "<<start_graph<<" end = "<<end_graph<<std::endl; 
    2021      this->filterID = InvalidableObject::filterIdGenerator++; 
    2122      int edgeID = InvalidableObject::edgeIdGenerator++; 
     
    2324      CWorkflowGraph::allocNodeEdge(); 
    2425 
    25       CWorkflowGraph::addNode(this->filterID, "Pass Through Filter", 2, 1, 1, data[0]); 
     26      CWorkflowGraph::addNode(this->filterID, "Pass Through Filter\\n("+data[0]->field->getId()+")", 2, 1, 1, data[0]); 
     27      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = ++(data[0]->distance); 
    2628 
    27       (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes(); 
    28       if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->recordXiosAttributes(); 
     29      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     30      if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
    2931 
    3032       
     
    3941 
    4042    } 
     43 
    4144    data[0]->field = this->field; 
    42      
     45  } 
    4346 
     47  CDataPacketPtr CPassThroughFilter::apply(std::vector<CDataPacketPtr> data) 
     48  { 
     49    if(CXios::isClient) buildGraph(data); 
    4450    return data[0]; 
    4551  } 
  • XIOS/dev/dev_trunk_omp/src/filter/pass_through_filter.hpp

    r1679 r1681  
    3030       */ 
    3131      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     32      void virtual buildGraph(std::vector<CDataPacketPtr> data); 
    3233  }; // class CPassThroughFilter 
    3334} // namespace xios 
  • XIOS/dev/dev_trunk_omp/src/filter/source_filter.cpp

    r1680 r1681  
    2525  } 
    2626 
    27    
    28   template <int N> 
    29   void CSourceFilter::streamData(CDate date, const CArray<double, N>& data, const StdString field_id) 
     27  void CSourceFilter::buildGraph(CDataPacketPtr packet) 
    3028  { 
    31     date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter 
    32  
    33     CDataPacketPtr packet(new CDataPacket); 
    34     packet->date = date; 
    35     packet->timestamp = date; 
    36     packet->status = CDataPacket::NO_ERROR; 
    37  
    38     packet->data.resize(grid->storeIndex_client.numElements());     
     29    bool building_graph = this->tag ? packet->timestamp >= this->field->field_graph_start && packet->timestamp <= this->field->field_graph_end : false; 
     30    // bool building_graph = this->tag ? packet->timestamp >= this->start_graph && packet->timestamp <= this->end_graph : false; 
    3931     
    40     if (compression) 
    41     { 
    42       packet->data = defaultValue; 
    43       grid->uncompressField(data, packet->data);     
    44     } 
    45     else 
    46     { 
    47       if (mask) 
    48         grid->maskField(data, packet->data); 
    49       else 
    50         grid->inputField(data, packet->data); 
    51     } 
    52     // Convert missing values to NaN 
    53     if (hasMissingValue) 
    54     { 
    55       const double nanValue = std::numeric_limits<double>::quiet_NaN(); 
    56       const size_t nbData = packet->data.numElements(); 
    57       for (size_t idx = 0; idx < nbData; ++idx) 
    58       { 
    59         if (defaultValue == packet->data(idx)) 
    60           packet->data(idx) = nanValue; 
    61       } 
    62     } 
    63  
    64      
    65     bool building_graph = this->tag ? packet->timestamp >= this->start_graph && packet->timestamp <= this->end_graph : false; 
     32    // std::cout<<"************************************************source filter 1 : field_id = "<<this->field->getId()<<" this->start_graph = "<<this->start_graph<<" this->end_graph = "<<this->end_graph<<std::endl; 
     33    // std::cout<<"************************************************source filter 2 : field_id = "<<this->field->getId()<<" this->field->field_graph_start = "<<this->field->field_graph_start<<" this->field->field_graph_end = "<<this->field->field_graph_end<<std::endl; 
    6634 
    6735    if(building_graph) 
     
    7038      packet->src_filterID=this->filterID; 
    7139      packet->field = this->field; 
     40      packet->distance = 1; 
    7241       
    73       if(CXios::isClient) std::cout<<"source filter tag = "<<this->tag<<" start = "<<start_graph<<" end = "<<end_graph<<std::endl; 
     42      // if(CXios::isClient) std::cout<<"source filter tag = "<<this->tag<<" start = "<<start_graph<<" end = "<<end_graph<<std::endl; 
    7443     
    7544      CWorkflowGraph::allocNodeEdge(); 
    7645 
    7746      CWorkflowGraph::addNode(this->filterID, "Source Filter ", 1, 1, 0, packet); 
    78       (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes(); 
     47      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
    7948      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].field_id = this->field->getId(); 
     49      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = 1; 
    8050 
    8151      CWorkflowGraph::build_begin = true; 
    8252    } 
    8353 
    84  
    85     onOutputReady(packet); 
     54    // if(building_graph) std::cout<<packet->timestamp<<"************************************************ source filter : field_id = "<<this->field->getId()<<" start = "<<this->field->field_graph_start<<" end_graph = "<<this->field->field_graph_end<<std::endl; 
    8655  } 
    87  
    88   template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data, const StdString field_id); 
    89   template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data, const StdString field_id); 
    90   template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data, const StdString field_id); 
    91   template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data, const StdString field_id); 
    92   template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data, const StdString field_id); 
    93   template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data, const StdString field_id); 
    94   template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data, const StdString field_id); 
    9556 
    9657 
     
    13091      } 
    13192    } 
    132     this->filterID = InvalidableObject::filterIdGenerator++; 
    133     packet->src_filterID=this->filterID; 
    13493 
    135     if(CXios::isClient) std::cout<<"source filter filter tag = "<<this->tag<<std::endl; 
     94    if(CXios::isClient) buildGraph(packet); 
     95     
     96 
    13697 
    13798    onOutputReady(packet); 
  • XIOS/dev/dev_trunk_omp/src/filter/source_filter.hpp

    r1679 r1681  
    4848      void streamData(CDate date, const CArray<double, N>& data); 
    4949 
    50       template <int N> 
    51       void streamData(CDate date, const CArray<double, N>& data, const StdString field_id); 
     50      void virtual buildGraph(CDataPacketPtr packet); 
    5251 
    5352      /*! 
  • XIOS/dev/dev_trunk_omp/src/filter/spatial_transform_filter.cpp

    r1680 r1681  
    7575 
    7676    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine); 
    77     CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->start_graph, this->end_graph, this->field); 
     77    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->start_graph, this->end_graph, this->field, this->getDistance()); 
    7878    if (outputPacket) 
    7979    { 
     
    112112 
    113113    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine); 
    114     CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->start_graph, this->end_graph, this->field); 
     114    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->start_graph, this->end_graph, this->field, this->getDistance()); 
    115115 
    116116    if (outputPacket) 
     
    179179  } 
    180180 
    181   CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue, int tag, Time start_graph, Time end_graph, CField *field) 
     181  bool CSpatialTransformFilterEngine::buildGraph(std::vector<CDataPacketPtr> data, int tag, Time start_graph, Time end_graph, CField *field, int distance) 
    182182  { 
    183183    bool building_graph = tag ? data[0]->timestamp >= start_graph && data[0]->timestamp <= end_graph : false; 
     
    190190 
    191191      CWorkflowGraph::addNode(this->filterID, "Spatial Transform Filter", 4, 1, 1, data[0]); 
    192       (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = field->recordXiosAttributes(); 
    193       if(field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +field->file->recordXiosAttributes(); 
     192      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     193      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = field->record4graphXiosAttributes(); 
     194      if(field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +field->file->record4graphXiosAttributes(); 
    194195 
    195196 
     
    203204    } 
    204205 
     206    return building_graph; 
     207  } 
     208 
     209  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue, int tag, Time start_graph, Time end_graph, CField *field, int distance) 
     210  { 
     211     
     212    bool BG = buildGraph(data, tag, start_graph, end_graph, field, distance); 
    205213 
    206214    CDataPacketPtr packet(new CDataPacket); 
     
    221229      if (0 != packet->data.numElements()) 
    222230        (packet->data)(0) = defaultValue; 
    223       if(building_graph) apply(data[0]->data, packet->data, this->filterID); 
     231      if(BG) apply(data[0]->data, packet->data, this->filterID); 
    224232      else apply(data[0]->data, packet->data); 
    225233    } 
    226234 
    227     if(building_graph) packet->src_filterID=this->filterID; 
     235    if(BG) packet->src_filterID=this->filterID; 
     236    if(BG) packet->distance=data[0]->distance+1; 
    228237    packet->field = field; 
    229238 
     
    347356        int srcRank = itRecv->first; 
    348357 
    349         if(filterID >=0) 
     358        if(filterID >=0) // building_graph 
    350359        { 
    351360           (*CWorkflowGraph::mapFilters_ptr_with_info)[filterID].filter_name = (*itAlgo)->getName(); 
  • XIOS/dev/dev_trunk_omp/src/filter/spatial_transform_filter.hpp

    r1679 r1681  
    121121       * \return the result of the grid transformation 
    122122       */ 
    123       CDataPacketPtr applyFilter(std::vector<CDataPacketPtr> data, double defaultValue = 0, int tag=0, Time start_graph=0, Time end_graph=-1, CField *field=0); 
     123      CDataPacketPtr applyFilter(std::vector<CDataPacketPtr> data, double defaultValue = 0, int tag=0, Time start_graph=0, Time end_graph=-1, CField *field=0, int distance=1); 
     124      bool buildGraph(std::vector<CDataPacketPtr> data, int tag=0, Time start_graph=0, Time end_graph=-1, CField *field=0, int distance=1); 
    124125 
    125126       /*! 
  • XIOS/dev/dev_trunk_omp/src/filter/store_filter.cpp

    r1680 r1681  
    8989  template CDataPacket::StatusCode CStoreFilter::getData<7>(Time timestamp, CArray<double, 7>& data); 
    9090 
    91   void CStoreFilter::onInputReady(std::vector<CDataPacketPtr> data) 
     91  void CStoreFilter::buildGraph(std::vector<CDataPacketPtr> data) 
    9292  { 
    93  
    9493    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    9594 
     
    101100      CWorkflowGraph::allocNodeEdge(); 
    102101  
    103       CWorkflowGraph::addNode(this->filterID, "Store Filter", 4, 1, 1, data[0]); 
    104       (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes(); 
    105       if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->recordXiosAttributes(); 
     102      CWorkflowGraph::addNode(this->filterID, "Store Filter", 7, 1, 1, data[0]); 
     103      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = ++(data[0]->distance); 
     104      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     105      if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
    106106 
    107107      // if(CXios::isClient) std::cout<<"CStoreFilter::apply filter tag = "<<this->tag<<std::endl; 
     
    114114      else CWorkflowGraph::build_begin = true; 
    115115    } 
     116  } 
    116117 
    117  
     118  void CStoreFilter::onInputReady(std::vector<CDataPacketPtr> data) 
     119  { 
     120    buildGraph(data); 
    118121 
    119122    CDataPacketPtr packet; 
  • XIOS/dev/dev_trunk_omp/src/filter/store_filter.hpp

    r1679 r1681  
    8181      Time end_graph; 
    8282      CField *field; 
     83      int distance; 
    8384 
    8485 
     
    9091       */ 
    9192      void virtual onInputReady(std::vector<CDataPacketPtr> data); 
     93      void virtual buildGraph(std::vector<CDataPacketPtr> data); 
    9294 
    9395    private: 
  • XIOS/dev/dev_trunk_omp/src/filter/temporal_filter.cpp

    r1680 r1681  
    3333  } 
    3434 
    35   CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data) 
    36   { 
    37     bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    38  
    39     if(building_graph) 
    40     { 
    41       int edgeID = InvalidableObject::edgeIdGenerator++; 
    42  
    43       CWorkflowGraph::allocNodeEdge(); 
    44  
    45       if(CWorkflowGraph::build_begin) 
    46       { 
    47         CWorkflowGraph::addEdge(edgeID, -1, data[0]);         
    48       } 
    49       this->filterIDoutputs_pair.push_back(make_pair(edgeID, data[0]->src_filterID));  
    50     } 
    51      
    52      
    53     CDataPacketPtr packet; 
    54  
    55     if (data[0]->status != CDataPacket::END_OF_STREAM) 
    56     { 
    57       bool usePacket, outputResult, copyLess; 
    58       if (isOnceOperation) 
    59         usePacket = outputResult = copyLess = isFirstOperation; 
    60       else 
    61       { 
    62         usePacket = (data[0]->date >= nextSamplingDate); 
    63         outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth); 
    64         copyLess = (isInstantOperation && usePacket && outputResult); 
    65       } 
    66  
    67       if (usePacket) 
    68       { 
    69         nbSamplingDates ++; 
    70         if (!copyLess) 
    71         { 
    72           if (!tmpData.numElements()) 
    73             tmpData.resize(data[0]->data.numElements()); 
    74  
    75           (*functor)(data[0]->data); 
    76         } 
    77  
    78         nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep(); 
    79       } 
    80  
    81       if (outputResult) 
    82       { 
    83         if(building_graph) 
    84         { 
    85           this->filterID = InvalidableObject::filterIdGenerator++; 
    86           CWorkflowGraph::addNode(this->filterID, "Temporal Filter\\n("+this->temp_op+")", 5, 1, 0, data[0]);    
    87           (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].transform_type = this->temp_op;    
    88           (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ; 
    89  
    90           (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes(); 
    91           if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->recordXiosAttributes(); 
    92  
    93  
    94           for(int i=0; i<this->filterIDoutputs_pair.size(); i++) 
    95           { 
    96             (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterIDoutputs_pair[i].second].filter_filled = 0 ;   
    97             (*CWorkflowGraph::mapFieldToFilters_ptr_with_info)[this->filterIDoutputs_pair[i].first].to = this->filterID ;   
    98           } 
    99  
    100           (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb = this->filterIDoutputs_pair.size() ; 
    101         } 
     35   
    10236 
    10337 
    10438 
    105         nbOperationDates ++; 
    106         if (!copyLess) 
    107         { 
    108           functor->final(); 
    109  
    110           packet = CDataPacketPtr(new CDataPacket); 
    111           packet->date = data[0]->date; 
    112           packet->timestamp = data[0]->timestamp; 
    113           packet->status = data[0]->status; 
    114           packet->data.resize(tmpData.numElements()); 
    115           packet->data = tmpData; 
    116         } 
    117         else 
    118           packet = data[0]; 
    119  
    120         isFirstOperation = false; 
    121          
    122         packet->field = this->field; 
    123         if(building_graph) packet->src_filterID=this->filterID; 
    124         if(building_graph) this->filterIDoutputs_pair.clear(); 
    125         if(building_graph) CWorkflowGraph::build_begin=true; 
    126       } 
    127     } 
    128  
    129     return packet; 
    130   } 
    131  
    132  
    133  
    134   CDataPacketPtr CTemporalFilter::apply_old(std::vector<CDataPacketPtr> data) 
     39  bool CTemporalFilter::buildGraph(std::vector<CDataPacketPtr> data) 
    13540  { 
    13641    bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     
    15055        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].transform_type = this->temp_op;    
    15156        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = false ; 
     57        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].clusterID = 1 ; 
     58        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = (data[0]->distance); 
     59 
     60 
     61        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     62        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
    15263      } 
    15364 
     
    15970        (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
    16071        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb += 1 ; 
     72        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = max(data[0]->distance+1, (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance); 
    16173      } 
    16274 
     
    16476      this->filterIDoutputs.push_back(data[0]->src_filterID);  
    16577    } 
    166      
    167      
     78 
     79    return building_graph; 
     80  } 
     81 
     82 
     83  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data) 
     84  { 
     85    bool BG = buildGraph(data); 
     86 
    16887    CDataPacketPtr packet; 
    16988 
     
    207126          packet->data.resize(tmpData.numElements()); 
    208127          packet->data = tmpData; 
     128           
    209129        } 
    210130        else 
     
    214134         
    215135        packet->field = this->field; 
    216         if(building_graph) packet->src_filterID=this->filterID; 
    217         if(building_graph) this->filterIDoutputs.clear(); 
    218         if(building_graph) CWorkflowGraph::build_begin=true; 
    219         if(building_graph) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ; 
     136         
     137        if(BG) 
     138        { 
     139          packet->src_filterID=this->filterID; 
     140          packet->distance = data[0]->distance+1; 
     141          this->filterIDoutputs.clear(); 
     142          CWorkflowGraph::build_begin=true; 
     143          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ; 
     144        } 
    220145      } 
    221146    } 
  • XIOS/dev/dev_trunk_omp/src/filter/temporal_filter.hpp

    r1680 r1681  
    4040       */ 
    4141      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
    42       CDataPacketPtr virtual apply_old(std::vector<CDataPacketPtr> data); 
     42      bool virtual buildGraph(std::vector<CDataPacketPtr> data); 
    4343 
    4444      /*! 
  • XIOS/dev/dev_trunk_omp/src/filter/ternary_arithmetic_filter.cpp

    r1680 r1681  
    22#include "workflow_graph.hpp" 
    33#include "yacc_var.hpp" 
     4#include "file.hpp" 
    45 
    56namespace xios 
     
    1516  }; 
    1617 
    17   CDataPacketPtr CScalarScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    18   { 
    19     bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    20     
     18  std::tuple<int, int, int> CScalarScalarFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
     19  { 
     20    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     21    int unique_filter_id; 
     22    bool firstround; 
     23 
    2124    if(building_graph) 
    2225    { 
    23       this->filterID = InvalidableObject::filterIdGenerator++; 
    24        int edgeID = InvalidableObject::edgeIdGenerator++; 
    25      
    2626      CWorkflowGraph::allocNodeEdge(); 
    2727 
    28       std::cout<<"CScalarScalarFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
    29  
    30  
    31  
    32       CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 1, data[0]); 
    33  
    34  
    35       if(CWorkflowGraph::build_begin) 
    36       { 
    37  
    38         CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
    39  
    40         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
     28      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 
     29 
     30      // first round 
     31      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
     32      { 
     33        firstround = true; 
     34        this->filterID = InvalidableObject::filterIdGenerator++; 
     35        int edgeID = InvalidableObject::edgeIdGenerator++; 
     36 
     37        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
     38        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
     39        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     40 
     41 
     42        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     43        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
     44       
     45 
     46        if(CWorkflowGraph::build_begin) 
     47        { 
     48          CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
     49          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     50 
     51          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
     52        } 
     53        else CWorkflowGraph::build_begin = true; 
     54 
     55        (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID;  
     56        unique_filter_id = this->filterID; 
    4157      } 
    42       else CWorkflowGraph::build_begin = true; 
    43     } 
    44  
     58      // not first round 
     59      else  
     60      { 
     61        firstround=false; 
     62        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
     63        if(data[0]->src_filterID != unique_filter_id) 
     64        { 
     65          int edgeID = InvalidableObject::edgeIdGenerator++; 
     66          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]);   
     67          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;  
     68          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     69        } 
     70      }   
     71    } 
     72 
     73    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     74  } 
     75 
     76  CDataPacketPtr CScalarScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
     77  { 
     78   
    4579    CDataPacketPtr packet(new CDataPacket); 
    4680    packet->date = data[0]->date; 
    4781    packet->timestamp = data[0]->timestamp; 
    4882    packet->status = data[0]->status; 
    49     if(building_graph) packet->src_filterID = this->filterID; 
     83     
     84    std::tuple<int, int, int> graph = buildGraph(data); 
     85 
     86    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     87    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     88    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     89 
    5090    packet->field = this->field; 
    5191 
     
    66106  }; 
    67107 
    68   CDataPacketPtr CScalarFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    69   { 
    70     bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    71     
     108  std::tuple<int, int, int> CScalarFieldScalarArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
     109  { 
     110    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     111    int unique_filter_id; 
     112    bool firstround; 
     113 
    72114    if(building_graph) 
    73115    { 
    74       this->filterID = InvalidableObject::filterIdGenerator++; 
    75        int edgeID = InvalidableObject::edgeIdGenerator++; 
    76      
    77  
    78116      CWorkflowGraph::allocNodeEdge(); 
    79117 
    80       std::cout<<"CScalarFieldScalarArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
    81  
    82  
    83  
    84       CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 1, data[0]); 
    85  
    86       if(CWorkflowGraph::build_begin) 
    87       { 
    88  
    89         CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
    90         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
     118      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 
     119 
     120      // first round 
     121      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
     122      { 
     123        firstround = true; 
     124        this->filterID = InvalidableObject::filterIdGenerator++; 
     125        int edgeID = InvalidableObject::edgeIdGenerator++; 
     126 
     127        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
     128        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
     129        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     130 
     131 
     132        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     133        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
     134       
     135 
     136        if(CWorkflowGraph::build_begin) 
     137        { 
     138          CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
     139          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     140 
     141          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
     142        } 
     143        else CWorkflowGraph::build_begin = true; 
     144 
     145        (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID;  
     146        unique_filter_id = this->filterID; 
    91147      } 
    92       else CWorkflowGraph::build_begin = true; 
    93     } 
     148      // not first round 
     149      else  
     150      { 
     151        firstround=false; 
     152        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
     153        if(data[0]->src_filterID != unique_filter_id) 
     154        { 
     155          int edgeID = InvalidableObject::edgeIdGenerator++; 
     156          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]);   
     157          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;  
     158          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     159        } 
     160      }   
     161    } 
     162 
     163    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     164  } 
     165 
     166  CDataPacketPtr CScalarFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
     167  { 
    94168 
    95169    CDataPacketPtr packet(new CDataPacket); 
     
    97171    packet->timestamp = data[0]->timestamp; 
    98172    packet->status = data[0]->status; 
    99     if(building_graph) packet->src_filterID = this->filterID; 
     173     
     174    std::tuple<int, int, int> graph = buildGraph(data); 
     175 
     176    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     177    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     178    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     179 
    100180    packet->field = this->field; 
    101181 
     
    115195  }; 
    116196 
     197  std::tuple<int, int, int> CScalarFieldFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
     198  { 
     199    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     200    int unique_filter_id; 
     201 
     202    bool firstround; 
     203 
     204    if(building_graph) 
     205    {   
     206      CWorkflowGraph::allocNodeEdge(); 
     207 
     208      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 
     209 
     210      // first round 
     211      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
     212      { 
     213        firstround = true; 
     214        this->filterID = InvalidableObject::filterIdGenerator++; 
     215        int edgeID = InvalidableObject::edgeIdGenerator++; 
     216     
     217        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
     218        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     219        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     220 
     221        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
     222     
     223        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
     224        if(CWorkflowGraph::build_begin) 
     225        { 
     226 
     227          CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
     228          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     229 
     230          edgeID = InvalidableObject::edgeIdGenerator++; 
     231 
     232          CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 
     233          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     234 
     235          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
     236          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
     237        } 
     238        CWorkflowGraph::build_begin = true; 
     239 
     240        (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID;  
     241        unique_filter_id = this->filterID; 
     242  
     243      } 
     244      // not first round 
     245      else  
     246      { 
     247        firstround = false; 
     248        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
     249        if(data[0]->src_filterID != unique_filter_id) 
     250        { 
     251          int edgeID = InvalidableObject::edgeIdGenerator++; 
     252          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]);  
     253          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;  
     254          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     255        } 
     256        if(data[1]->src_filterID != unique_filter_id) 
     257        {  
     258          int edgeID = InvalidableObject::edgeIdGenerator++; 
     259          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[1]);   
     260          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
     261          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     262        } 
     263         
     264      }   
     265    } 
     266 
     267    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     268  } 
     269 
    117270  CDataPacketPtr CScalarFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    118271  { 
    119     bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    120     
    121     if(building_graph) 
    122     { 
    123       this->filterID = InvalidableObject::filterIdGenerator++; 
    124       int edgeID = InvalidableObject::edgeIdGenerator++; 
    125      
    126  
    127       CWorkflowGraph::allocNodeEdge(); 
    128  
    129       std::cout<<"CScalarFieldFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
    130  
    131  
    132       CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 2, data[0]); 
    133  
    134       if(CWorkflowGraph::build_begin) 
    135       { 
    136  
    137         CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
    138  
    139  
    140         edgeID = InvalidableObject::edgeIdGenerator++; 
    141  
    142         CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 
    143  
    144         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
    145         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
    146       } 
    147       else CWorkflowGraph::build_begin = true; 
    148     } 
    149272 
    150273    CDataPacketPtr packet(new CDataPacket); 
     
    152275    packet->timestamp = data[0]->timestamp; 
    153276    packet->status = data[0]->status; 
    154     if(building_graph) packet->src_filterID = this->filterID; 
     277     
     278    std::tuple<int, int, int> graph = buildGraph(data); 
     279 
     280    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     281    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     282    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     283 
    155284    packet->field = this->field; 
    156285 
     
    179308  }; 
    180309 
    181   CDataPacketPtr CFieldScalarScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    182   { 
    183     bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    184     
     310  std::tuple<int, int, int> CFieldScalarScalarArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
     311  { 
     312    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     313    int unique_filter_id; 
     314    bool firstround; 
     315 
    185316    if(building_graph) 
    186317    { 
    187       this->filterID = InvalidableObject::filterIdGenerator++; 
    188       int edgeID = InvalidableObject::edgeIdGenerator++; 
    189      
    190  
    191318      CWorkflowGraph::allocNodeEdge(); 
    192319 
    193       std::cout<<"CFieldScalarScalarArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
    194  
    195       CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 1, data[0]); 
    196  
    197       if(CWorkflowGraph::build_begin) 
    198       { 
    199  
    200         CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
    201  
    202         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
     320      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 
     321 
     322      // first round 
     323      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
     324      { 
     325        firstround = true; 
     326        this->filterID = InvalidableObject::filterIdGenerator++; 
     327        int edgeID = InvalidableObject::edgeIdGenerator++; 
     328 
     329        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
     330        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
     331        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     332 
     333 
     334        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     335        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
     336       
     337 
     338        if(CWorkflowGraph::build_begin) 
     339        { 
     340          CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
     341          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     342 
     343          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
     344        } 
     345        else CWorkflowGraph::build_begin = true; 
     346 
     347        (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID;  
     348        unique_filter_id = this->filterID; 
    203349      } 
    204       else CWorkflowGraph::build_begin = true; 
    205     } 
     350      // not first round 
     351      else  
     352      { 
     353        firstround=false; 
     354        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
     355        if(data[0]->src_filterID != unique_filter_id) 
     356        { 
     357          int edgeID = InvalidableObject::edgeIdGenerator++; 
     358          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]);   
     359          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;  
     360          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     361        } 
     362      }   
     363    } 
     364 
     365    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     366  } 
     367 
     368  CDataPacketPtr CFieldScalarScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
     369  { 
    206370 
    207371    CDataPacketPtr packet(new CDataPacket); 
     
    209373    packet->timestamp = data[0]->timestamp; 
    210374    packet->status = data[0]->status; 
    211     if(building_graph) packet->src_filterID = this->filterID; 
     375 
     376    std::tuple<int, int, int> graph = buildGraph(data); 
     377 
     378    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     379    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     380    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     381 
    212382    packet->field = this->field; 
    213383 
     
    228398  }; 
    229399 
     400  std::tuple<int, int, int> CFieldScalarFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
     401  { 
     402    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     403    int unique_filter_id; 
     404 
     405    bool firstround; 
     406 
     407    if(building_graph) 
     408    {   
     409      CWorkflowGraph::allocNodeEdge(); 
     410 
     411      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 
     412 
     413      // first round 
     414      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
     415      { 
     416        firstround = true; 
     417        this->filterID = InvalidableObject::filterIdGenerator++; 
     418        int edgeID = InvalidableObject::edgeIdGenerator++; 
     419     
     420        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
     421        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     422        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     423 
     424        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
     425     
     426        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
     427        if(CWorkflowGraph::build_begin) 
     428        { 
     429 
     430          CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
     431          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     432 
     433          edgeID = InvalidableObject::edgeIdGenerator++; 
     434 
     435          CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 
     436          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     437 
     438          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
     439          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
     440        } 
     441        CWorkflowGraph::build_begin = true; 
     442 
     443        (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID;  
     444        unique_filter_id = this->filterID; 
     445  
     446      } 
     447      // not first round 
     448      else  
     449      { 
     450        firstround = false; 
     451        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
     452        if(data[0]->src_filterID != unique_filter_id) 
     453        { 
     454          int edgeID = InvalidableObject::edgeIdGenerator++; 
     455          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]);  
     456          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;  
     457          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     458        } 
     459        if(data[1]->src_filterID != unique_filter_id) 
     460        {  
     461          int edgeID = InvalidableObject::edgeIdGenerator++; 
     462          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[1]);   
     463          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
     464          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     465        } 
     466         
     467      }   
     468    } 
     469 
     470    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     471  } 
     472 
    230473  CDataPacketPtr CFieldScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    231474  { 
    232     bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    233     
    234     if(building_graph) 
    235     { 
    236       this->filterID = InvalidableObject::filterIdGenerator++; 
    237       int edgeID = InvalidableObject::edgeIdGenerator++; 
    238      
    239  
    240       CWorkflowGraph::allocNodeEdge(); 
    241  
    242       std::cout<<"CFieldScalarFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
    243  
    244       CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 2, data[0]); 
    245  
    246       if(CWorkflowGraph::build_begin) 
    247       { 
    248  
    249         CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
    250  
    251         edgeID = InvalidableObject::edgeIdGenerator++; 
    252  
    253         CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 
    254  
    255         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
    256         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
    257       } 
    258       else CWorkflowGraph::build_begin = true; 
    259     } 
    260475 
    261476    CDataPacketPtr packet(new CDataPacket); 
     
    263478    packet->timestamp = data[0]->timestamp; 
    264479    packet->status = data[0]->status; 
    265     if(building_graph) packet->src_filterID = this->filterID; 
     480     
     481    std::tuple<int, int, int> graph = buildGraph(data); 
     482 
     483    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     484    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     485    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     486 
    266487    packet->field = this->field; 
    267488 
     
    288509  }; 
    289510 
     511  std::tuple<int, int, int> CFieldFieldScalarArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
     512  { 
     513    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     514    int unique_filter_id; 
     515 
     516    bool firstround; 
     517 
     518    if(building_graph) 
     519    {   
     520      CWorkflowGraph::allocNodeEdge(); 
     521 
     522      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 
     523 
     524      // first round 
     525      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
     526      { 
     527        firstround = true; 
     528        this->filterID = InvalidableObject::filterIdGenerator++; 
     529        int edgeID = InvalidableObject::edgeIdGenerator++; 
     530     
     531        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
     532        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     533        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     534 
     535        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
     536     
     537        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
     538        if(CWorkflowGraph::build_begin) 
     539        { 
     540 
     541          CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
     542          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     543 
     544          edgeID = InvalidableObject::edgeIdGenerator++; 
     545 
     546          CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 
     547          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     548 
     549          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
     550          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
     551        } 
     552        CWorkflowGraph::build_begin = true; 
     553 
     554        (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID;  
     555        unique_filter_id = this->filterID; 
     556  
     557      } 
     558      // not first round 
     559      else  
     560      { 
     561        firstround = false; 
     562        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
     563        if(data[0]->src_filterID != unique_filter_id) 
     564        { 
     565          int edgeID = InvalidableObject::edgeIdGenerator++; 
     566          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]);  
     567          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;  
     568          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     569        } 
     570        if(data[1]->src_filterID != unique_filter_id) 
     571        {  
     572          int edgeID = InvalidableObject::edgeIdGenerator++; 
     573          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[1]);   
     574          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
     575          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     576        } 
     577         
     578      }   
     579    } 
     580 
     581    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     582  } 
     583 
    290584  CDataPacketPtr CFieldFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    291585  { 
    292     bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    293     
    294     if(building_graph) 
    295     { 
    296       this->filterID = InvalidableObject::filterIdGenerator++; 
    297       int edgeID = InvalidableObject::edgeIdGenerator++; 
    298      
    299       CWorkflowGraph::allocNodeEdge(); 
    300  
    301       std::cout<<"CFieldFieldScalarArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
    302  
    303  
    304       CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 2, data[0]); 
    305  
    306       if(CWorkflowGraph::build_begin) 
    307       { 
    308  
    309         CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
    310  
    311         int edgeID = InvalidableObject::edgeIdGenerator++; 
    312  
    313         CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 
    314  
    315         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
    316         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
    317       } 
    318       else CWorkflowGraph::build_begin = true; 
    319     } 
    320586 
    321587    CDataPacketPtr packet(new CDataPacket); 
     
    323589    packet->timestamp = data[0]->timestamp; 
    324590    packet->status = data[0]->status; 
    325     if(building_graph) packet->src_filterID = this->filterID; 
     591     
     592    std::tuple<int, int, int> graph = buildGraph(data); 
     593 
     594    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     595    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     596    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     597 
    326598    packet->field = this->field; 
    327599 
     
    347619  }; 
    348620 
     621  std::tuple<int, int, int> CFieldFieldFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
     622  { 
     623    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     624    int unique_filter_id; 
     625 
     626    bool firstround; 
     627 
     628    if(building_graph) 
     629    {   
     630      CWorkflowGraph::allocNodeEdge(); 
     631 
     632      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId()); 
     633 
     634      // first round 
     635      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
     636      { 
     637        firstround = true; 
     638        this->filterID = InvalidableObject::filterIdGenerator++; 
     639        int edgeID = InvalidableObject::edgeIdGenerator++; 
     640     
     641        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
     642        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     643        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
     644 
     645        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
     646     
     647        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
     648        if(CWorkflowGraph::build_begin) 
     649        { 
     650 
     651          CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
     652          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     653 
     654          edgeID = InvalidableObject::edgeIdGenerator++; 
     655 
     656          CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 
     657          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     658 
     659          edgeID = InvalidableObject::edgeIdGenerator++; 
     660 
     661          CWorkflowGraph::addEdge(edgeID, this->filterID, data[2]); 
     662          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++; 
     663 
     664          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
     665          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
     666          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[2]->src_filterID].filter_filled = 0 ; 
     667        } 
     668        CWorkflowGraph::build_begin = true; 
     669 
     670        (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID;  
     671        unique_filter_id = this->filterID; 
     672  
     673      } 
     674      // not first round 
     675      else  
     676      { 
     677        firstround = false; 
     678        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
     679        if(data[0]->src_filterID != unique_filter_id) 
     680        { 
     681          int edgeID = InvalidableObject::edgeIdGenerator++; 
     682          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]);  
     683          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;  
     684          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     685        } 
     686        if(data[1]->src_filterID != unique_filter_id) 
     687        {  
     688          int edgeID = InvalidableObject::edgeIdGenerator++; 
     689          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[1]);   
     690          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
     691          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     692        } 
     693        if(data[2]->src_filterID != unique_filter_id) 
     694        {  
     695          int edgeID = InvalidableObject::edgeIdGenerator++; 
     696          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[2]);   
     697          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[2]->src_filterID].filter_filled = 0 ; 
     698          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
     699        } 
     700         
     701      }   
     702    } 
     703 
     704    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     705  } 
     706 
    349707  CDataPacketPtr CFieldFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
    350708  { 
    351     bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    352     
    353     if(building_graph) 
    354     { 
    355       this->filterID = InvalidableObject::filterIdGenerator++; 
    356       int edgeID = InvalidableObject::edgeIdGenerator++; 
    357      
    358  
    359       CWorkflowGraph::allocNodeEdge(); 
    360  
    361       std::cout<<"CFieldFieldFieldArithmeticFilter::apply connection = "<<data[0]->src_filterID<<" <-> "<<this->filterID<<std::endl; 
    362  
    363  
    364       CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 3, data[0]); 
    365  
    366       if(CWorkflowGraph::build_begin) 
    367       { 
    368  
    369         CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); 
    370  
    371  
    372         edgeID = InvalidableObject::edgeIdGenerator++; 
    373  
    374         CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]); 
    375  
    376  
    377         edgeID = InvalidableObject::edgeIdGenerator++; 
    378  
    379         CWorkflowGraph::addEdge(edgeID, this->filterID, data[2]); 
    380  
    381         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
    382         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ; 
    383         (*CWorkflowGraph::mapFilters_ptr_with_info)[data[2]->src_filterID].filter_filled = 0 ; 
    384       } 
    385       else CWorkflowGraph::build_begin = true; 
    386     } 
    387  
    388709 
    389710    CDataPacketPtr packet(new CDataPacket); 
     
    391712    packet->timestamp = data[0]->timestamp; 
    392713    packet->status = data[0]->status; 
    393     if(building_graph) packet->src_filterID = this->filterID; 
     714     
     715    std::tuple<int, int, int> graph = buildGraph(data); 
     716 
     717    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     718    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     719    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     720 
    394721    packet->field = this->field; 
    395722 
  • XIOS/dev/dev_trunk_omp/src/filter/ternary_arithmetic_filter.hpp

    r1162 r1681  
    55#include <string> 
    66#include "operator_expr.hpp" 
     7#include <tuple> 
    78 
    89namespace xios 
     
    3637       */ 
    3738      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     39      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
    3840  }; // class CScalarScalarFieldArithmeticFilter 
    3941 
     
    6769       */ 
    6870      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     71      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
    6972  }; // class CScalarScalarFieldArithmeticFilter 
    7073 
     
    9598       */ 
    9699      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     100      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
    97101  }; // class CScalarScalarFieldArithmeticFilter 
    98102 
     
    127131       */ 
    128132      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     133      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
    129134  }; // class CFieldScalarScalarArithmeticFilter 
    130135 
     
    156161       */ 
    157162      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     163      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
    158164  }; // class CFieldScalarFieldArithmeticFilter 
    159165 
     
    185191       */ 
    186192      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     193      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
    187194  }; // class CFieldFielScalardArithmeticFilter 
    188195 
     
    212219       */ 
    213220      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     221      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
    214222  }; // class CFieldFielFieldArithmeticFilter 
    215223 
  • XIOS/dev/dev_trunk_omp/src/filter/unary_arithmetic_filter.cpp

    r1680 r1681  
    1313  }; 
    1414 
    15   CDataPacketPtr CUnaryArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
     15  std::tuple<int, int, int> CUnaryArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data) 
    1616  { 
    1717    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    1818    int unique_filter_id; 
     19    bool firstround; 
    1920     
    2021    if(building_graph) 
     
    2627      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end()) 
    2728      { 
     29        firstround=true; 
    2830        this->filterID = InvalidableObject::filterIdGenerator++; 
    2931        int edgeID = InvalidableObject::edgeIdGenerator++; 
     
    3133        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]); 
    3234        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag; 
     35        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1; 
    3336 
    34         (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes(); 
    35         if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->recordXiosAttributes(); 
     37        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     38        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
    3639       
    3740        if(CWorkflowGraph::build_begin) 
     
    5053      else  
    5154      { 
     55        firstround=false; 
    5256        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash]; 
    5357        if(data[0]->src_filterID != unique_filter_id) 
     
    5761          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;  
    5862          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++; 
    59         } 
    60        
    61          
     63        }    
    6264      }  
    6365   
    6466    } 
     67 
     68    return std::make_tuple(building_graph, firstround, unique_filter_id); 
     69  } 
     70 
     71  CDataPacketPtr CUnaryArithmeticFilter::apply(std::vector<CDataPacketPtr> data) 
     72  { 
    6573 
    6674    CDataPacketPtr packet(new CDataPacket); 
     
    6876    packet->timestamp = data[0]->timestamp; 
    6977    packet->status = data[0]->status; 
     78 
     79    std::tuple<int, int, int> graph = buildGraph(data); 
     80 
     81    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph); 
     82    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1; 
     83    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance; 
     84 
    7085    packet->field = this->field; 
    7186 
     
    7388      packet->data.reference(op(data[0]->data)); 
    7489 
    75     if(building_graph) packet->src_filterID = unique_filter_id; 
    76  
    7790    return packet; 
    7891  } 
  • XIOS/dev/dev_trunk_omp/src/filter/unary_arithmetic_filter.hpp

    r642 r1681  
    3434       */ 
    3535      CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 
     36      std::tuple<int, int, int> virtual buildGraph(std::vector<CDataPacketPtr> data); 
    3637  }; // class CUnaryArithmeticFilter 
    3738} // namespace xios 
Note: See TracChangeset for help on using the changeset viewer.