Ignore:
Timestamp:
07/19/19 15:28:33 (5 years ago)
Author:
yushan
Message:

MARK: Dynamic workflow graph developement. Branch up to date with trunk @1676. Bug fixed

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_trunk_omp/src/filter/temporal_filter.cpp

    r1680 r1681  
    3333  } 
    3434 
    35   CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data) 
    36   { 
    37     bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
    38  
    39     if(building_graph) 
    40     { 
    41       int edgeID = InvalidableObject::edgeIdGenerator++; 
    42  
    43       CWorkflowGraph::allocNodeEdge(); 
    44  
    45       if(CWorkflowGraph::build_begin) 
    46       { 
    47         CWorkflowGraph::addEdge(edgeID, -1, data[0]);         
    48       } 
    49       this->filterIDoutputs_pair.push_back(make_pair(edgeID, data[0]->src_filterID));  
    50     } 
    51      
    52      
    53     CDataPacketPtr packet; 
    54  
    55     if (data[0]->status != CDataPacket::END_OF_STREAM) 
    56     { 
    57       bool usePacket, outputResult, copyLess; 
    58       if (isOnceOperation) 
    59         usePacket = outputResult = copyLess = isFirstOperation; 
    60       else 
    61       { 
    62         usePacket = (data[0]->date >= nextSamplingDate); 
    63         outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth); 
    64         copyLess = (isInstantOperation && usePacket && outputResult); 
    65       } 
    66  
    67       if (usePacket) 
    68       { 
    69         nbSamplingDates ++; 
    70         if (!copyLess) 
    71         { 
    72           if (!tmpData.numElements()) 
    73             tmpData.resize(data[0]->data.numElements()); 
    74  
    75           (*functor)(data[0]->data); 
    76         } 
    77  
    78         nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep(); 
    79       } 
    80  
    81       if (outputResult) 
    82       { 
    83         if(building_graph) 
    84         { 
    85           this->filterID = InvalidableObject::filterIdGenerator++; 
    86           CWorkflowGraph::addNode(this->filterID, "Temporal Filter\\n("+this->temp_op+")", 5, 1, 0, data[0]);    
    87           (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].transform_type = this->temp_op;    
    88           (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ; 
    89  
    90           (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes(); 
    91           if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->recordXiosAttributes(); 
    92  
    93  
    94           for(int i=0; i<this->filterIDoutputs_pair.size(); i++) 
    95           { 
    96             (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterIDoutputs_pair[i].second].filter_filled = 0 ;   
    97             (*CWorkflowGraph::mapFieldToFilters_ptr_with_info)[this->filterIDoutputs_pair[i].first].to = this->filterID ;   
    98           } 
    99  
    100           (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb = this->filterIDoutputs_pair.size() ; 
    101         } 
     35   
    10236 
    10337 
    10438 
    105         nbOperationDates ++; 
    106         if (!copyLess) 
    107         { 
    108           functor->final(); 
    109  
    110           packet = CDataPacketPtr(new CDataPacket); 
    111           packet->date = data[0]->date; 
    112           packet->timestamp = data[0]->timestamp; 
    113           packet->status = data[0]->status; 
    114           packet->data.resize(tmpData.numElements()); 
    115           packet->data = tmpData; 
    116         } 
    117         else 
    118           packet = data[0]; 
    119  
    120         isFirstOperation = false; 
    121          
    122         packet->field = this->field; 
    123         if(building_graph) packet->src_filterID=this->filterID; 
    124         if(building_graph) this->filterIDoutputs_pair.clear(); 
    125         if(building_graph) CWorkflowGraph::build_begin=true; 
    126       } 
    127     } 
    128  
    129     return packet; 
    130   } 
    131  
    132  
    133  
    134   CDataPacketPtr CTemporalFilter::apply_old(std::vector<CDataPacketPtr> data) 
     39  bool CTemporalFilter::buildGraph(std::vector<CDataPacketPtr> data) 
    13540  { 
    13641    bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; 
     
    15055        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].transform_type = this->temp_op;    
    15156        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = false ; 
     57        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].clusterID = 1 ; 
     58        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = (data[0]->distance); 
     59 
     60 
     61        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); 
     62        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes(); 
    15263      } 
    15364 
     
    15970        (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
    16071        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb += 1 ; 
     72        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = max(data[0]->distance+1, (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance); 
    16173      } 
    16274 
     
    16476      this->filterIDoutputs.push_back(data[0]->src_filterID);  
    16577    } 
    166      
    167      
     78 
     79    return building_graph; 
     80  } 
     81 
     82 
     83  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data) 
     84  { 
     85    bool BG = buildGraph(data); 
     86 
    16887    CDataPacketPtr packet; 
    16988 
     
    207126          packet->data.resize(tmpData.numElements()); 
    208127          packet->data = tmpData; 
     128           
    209129        } 
    210130        else 
     
    214134         
    215135        packet->field = this->field; 
    216         if(building_graph) packet->src_filterID=this->filterID; 
    217         if(building_graph) this->filterIDoutputs.clear(); 
    218         if(building_graph) CWorkflowGraph::build_begin=true; 
    219         if(building_graph) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ; 
     136         
     137        if(BG) 
     138        { 
     139          packet->src_filterID=this->filterID; 
     140          packet->distance = data[0]->distance+1; 
     141          this->filterIDoutputs.clear(); 
     142          CWorkflowGraph::build_begin=true; 
     143          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ; 
     144        } 
    220145      } 
    221146    } 
Note: See TracChangeset for help on using the changeset viewer.