source: XIOS/dev/dev_trunk_omp/src/filter/spatial_transform_filter.cpp @ 1669

Last change on this file since 1669 was 1669, checked in by yushan, 5 years ago

MARK: branch merged with trunk @1663. static graph OK with EP

File size: 16.0 KB
RevLine 
[1601]1#include "mpi.hpp"
[644]2#include "spatial_transform_filter.hpp"
3#include "grid_transformation.hpp"
4#include "context.hpp"
5#include "context_client.hpp"
[1412]6#include "timer.hpp"
[1646]7#ifdef _usingEP
[1601]8using namespace ep_lib;
[1646]9#endif
[1668]10#include "workflow_graph.hpp"
[644]11namespace xios
12{
[1668]13  CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine,
14                                                   double outputValue, size_t inputSlotsCount, bool buildWorkflowGraph /*= false*/)
15    : CFilter(gc, inputSlotsCount, engine, buildWorkflowGraph), outputDefaultValue(outputValue)
[644]16  { /* Nothing to do */ }
17
[1542]18  std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >
[1668]19  CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue,
20                                            bool buildWorkflowGraph)
[644]21  {
22    if (!srcGrid || !destGrid)
[1542]23      ERROR("std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >"
[644]24            "buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid)",
25            "Impossible to build the filter graph if either the source or the destination grid are null.");
26
[1542]27    std::shared_ptr<CSpatialTransformFilter> firstFilter, lastFilter;
[790]28    // Note that this loop goes from the last transformation to the first transformation
29    do
30    {
31      CGridTransformation* gridTransformation = destGrid->getTransformations();
32      CSpatialTransformFilterEngine* engine = CSpatialTransformFilterEngine::get(destGrid->getTransformations());
[827]33      const std::vector<StdString>& auxInputs = gridTransformation->getAuxInputs();
34      size_t inputCount = 1 + (auxInputs.empty() ? 0 : auxInputs.size());
[1158]35      double defaultValue  = (hasMissingValue) ? std::numeric_limits<double>::quiet_NaN() : 0.0;
[644]36
[1275]37
38      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
39      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
40
41      bool isSpatialTemporal=false ;
42      for (it=algoList.begin();it!=algoList.end();++it)  if (it->second.first == TRANS_TEMPORAL_SPLITTING) isSpatialTemporal=true ;
43
[1542]44      std::shared_ptr<CSpatialTransformFilter> filter ;
[1668]45      if( isSpatialTemporal)
46        filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTemporalFilter(gc, engine, gridTransformation, defaultValue, inputCount, buildWorkflowGraph));
47      else
48        filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount, buildWorkflowGraph));
[1275]49
50     
[790]51      if (!lastFilter)
52        lastFilter = filter;
53      else
[1668]54      {
[790]55        filter->connectOutput(firstFilter, 0);
[1668]56        if (buildWorkflowGraph)
57        {
58          if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>;
59          if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >;
60          int filterOut = (std::static_pointer_cast<COutputPin>(filter))->getFilterId();
61          int filterIn = (std::static_pointer_cast<COutputPin>(firstFilter))->getFilterId();
62          // PASS field's id here
63          (*CWorkflowGraph::mapFieldToFilters_ptr)["XXX"].push_back(filterOut);
64          (*CWorkflowGraph::mapFieldToFilters_ptr)["XXX"].push_back(filterIn);
65          (*CWorkflowGraph::mapFilters_ptr)[filterOut] = "Spatial transform filter";
66          (*CWorkflowGraph::mapFilters_ptr)[filterIn] = "Spatial transform filter";
[1669]67          std::cout<<"CSpatialTransformFilter::CSpatialTransformFilter CWorkflowGraph::mapFieldToFilters_ptr->size = "<<CWorkflowGraph::mapFieldToFilters_ptr->size()<<std::endl;
[1668]68        }
69      }
[790]70
71      firstFilter = filter;
[827]72      for (size_t idx = 0; idx < auxInputs.size(); ++idx)
73      {
74        CField* fieldAuxInput = CField::get(auxInputs[idx]);
75        fieldAuxInput->buildFilterGraph(gc, false);
76        fieldAuxInput->getInstantDataFilter()->connectOutput(firstFilter,idx+1);
77      }
78
[790]79      destGrid = gridTransformation->getGridSource();
80    }
81    while (destGrid != srcGrid);
82
83    return std::make_pair(firstFilter, lastFilter);
[644]84  }
85
[873]86  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data)
87  {
88    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
89    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
90    if (outputPacket)
[1021]91      onOutputReady(outputPacket);
[873]92  }
93
[1668]94  CSpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine,
95                                                  CGridTransformation* gridTransformation, double outputValue,
96                                                  size_t inputSlotsCount, bool buildWorkflowGraph)
97    : CSpatialTransformFilter(gc, engine, outputValue, inputSlotsCount, buildWorkflowGraph), record(0)
[1275]98  {
99      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
100      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
101
102      int pos ;
103      for (it=algoList.begin();it!=algoList.end();++it) 
104        if (it->second.first == TRANS_TEMPORAL_SPLITTING)
105        {
106          pos=it->first ;
107          if (pos < algoList.size()-1)
108            ERROR("SpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount))",
109                  "temporal splitting operation must be the last of whole transformation on same grid") ;
110        }
111         
112      CGrid* grid=gridTransformation->getGridDestination() ;
113
114      CAxis* axis = grid->getAxis(gridTransformation->getElementPositionInGridDst2AxisPosition().find(pos)->second) ;
115
116      nrecords = axis->index.numElements() ;
117  }
118
119
120  void CSpatialTemporalFilter::onInputReady(std::vector<CDataPacketPtr> data)
121  {
122    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
123    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
124
125    if (outputPacket)
126    {
127      size_t nelements=outputPacket->data.numElements() ;
128      if (!tmpData.numElements())
129      {
130        tmpData.resize(nelements);
131        tmpData=outputDefaultValue ;
132      }
133
134      nelements/=nrecords ;
135      size_t offset=nelements*record ;
136      for(size_t i=0;i<nelements;++i)  tmpData(i+offset) = outputPacket->data(i) ;
137   
138      record ++ ;
139      if (record==nrecords)
140      {
141        record=0 ;
142        CDataPacketPtr packet = CDataPacketPtr(new CDataPacket);
143        packet->date = data[0]->date;
144        packet->timestamp = data[0]->timestamp;
145        packet->status = data[0]->status;
146        packet->data.resize(tmpData.numElements());
147        packet->data = tmpData;
148        onOutputReady(packet);
149        tmpData.resize(0) ;
150      }
151    }
152  }
153
154
[644]155  CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)
156    : gridTransformation(gridTransformation)
157  {
158    if (!gridTransformation)
159      ERROR("CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)",
160            "Impossible to construct a spatial transform filter engine without a valid grid transformation.");
161  }
162
[1601]163  std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> > *CSpatialTransformFilterEngine::engines_ptr = 0;
[644]164
165  CSpatialTransformFilterEngine* CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)
166  {
167    if (!gridTransformation)
168      ERROR("CSpatialTransformFilterEngine& CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)",
169            "Impossible to get the requested engine, the grid transformation is invalid.");
[1601]170   
171    if(engines_ptr == NULL) engines_ptr = new std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >;
[644]172
[1601]173
174    std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines_ptr->find(gridTransformation);
175    if (it == engines_ptr->end())
[644]176    {
[1542]177      std::shared_ptr<CSpatialTransformFilterEngine> engine(new CSpatialTransformFilterEngine(gridTransformation));
[1601]178      it = engines_ptr->insert(std::make_pair(gridTransformation, engine)).first;
[644]179    }
180
181    return it->second.get();
182  }
183
184  CDataPacketPtr CSpatialTransformFilterEngine::apply(std::vector<CDataPacketPtr> data)
185  {
[873]186    /* Nothing to do */
187  }
188
189  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue)
190  {
[644]191    CDataPacketPtr packet(new CDataPacket);
192    packet->date = data[0]->date;
193    packet->timestamp = data[0]->timestamp;
194    packet->status = data[0]->status;
195
196    if (packet->status == CDataPacket::NO_ERROR)
197    {
[827]198      if (1 < data.size())  // Dynamical transformations
199      {
200        std::vector<CArray<double,1>* > dataAuxInputs(data.size()-1);
201        for (size_t idx = 0; idx < dataAuxInputs.size(); ++idx) dataAuxInputs[idx] = &(data[idx+1]->data);
[832]202        gridTransformation->computeAll(dataAuxInputs, packet->timestamp);
[827]203      }
[644]204      packet->data.resize(gridTransformation->getGridDestination()->storeIndex_client.numElements());
[1158]205      if (0 != packet->data.numElements())
206        (packet->data)(0) = defaultValue;
[644]207      apply(data[0]->data, packet->data);
208    }
209
210    return packet;
211  }
212
213  void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)
214  {
[1414]215    CTimer::get("CSpatialTransformFilterEngine::apply").resume(); 
216   
[644]217    CContextClient* client = CContext::getCurrent()->client;
[1646]218    int rank;
219    MPI_Comm_rank (client->intraComm, &rank);
[644]220
[873]221    // Get default value for output data
[1158]222    bool ignoreMissingValue = false; 
223    double defaultValue = std::numeric_limits<double>::quiet_NaN();
[1474]224    if (0 != dataDest.numElements()) ignoreMissingValue = NumTraits<double>::isNan(dataDest(0));
[873]225
[841]226    const std::list<CGridTransformation::SendingIndexGridSourceMap>& listLocalIndexSend = gridTransformation->getLocalIndexToSendFromGridSource();
227    const std::list<CGridTransformation::RecvIndexGridDestinationMap>& listLocalIndexToReceive = gridTransformation->getLocalIndexToReceiveOnGridDest();
228    const std::list<size_t>& listNbLocalIndexToReceive = gridTransformation->getNbLocalIndexToReceiveOnGridDest();
[888]229    const std::vector<CGenericAlgorithmTransformation*>& listAlgos = gridTransformation->getAlgos();
[644]230
[841]231    CArray<double,1> dataCurrentDest(dataSrc.copy());
[644]232
[841]233    std::list<CGridTransformation::SendingIndexGridSourceMap>::const_iterator itListSend  = listLocalIndexSend.begin(),
234                                                                              iteListSend = listLocalIndexSend.end();
235    std::list<CGridTransformation::RecvIndexGridDestinationMap>::const_iterator itListRecv = listLocalIndexToReceive.begin();
236    std::list<size_t>::const_iterator itNbListRecv = listNbLocalIndexToReceive.begin();
[888]237    std::vector<CGenericAlgorithmTransformation*>::const_iterator itAlgo = listAlgos.begin();
[841]238
[1646]239    for (; itListSend != iteListSend; ++itListSend, ++itListRecv, ++itNbListRecv, ++itAlgo)
[709]240    {
[841]241      CArray<double,1> dataCurrentSrc(dataCurrentDest);
242      const CGridTransformation::SendingIndexGridSourceMap& localIndexToSend = *itListSend;
[709]243
[841]244      // Sending data from field sources to do transformations
245      std::map<int, CArray<int,1> >::const_iterator itbSend = localIndexToSend.begin(), itSend,
246                                                    iteSend = localIndexToSend.end();
247      int idxSendBuff = 0;
248      std::vector<double*> sendBuff(localIndexToSend.size());
[1646]249      double* sendBuffRank;
[841]250      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
[644]251      {
[1646]252        int destRank = itSend->first;
[841]253        if (0 != itSend->second.numElements())
[1646]254        {
255          if (rank != itSend->first)
256            sendBuff[idxSendBuff] = new double[itSend->second.numElements()];
257          else
258            sendBuffRank = new double[itSend->second.numElements()];
259        }
[644]260      }
261
[841]262      idxSendBuff = 0;
[1601]263      std::vector<MPI_Request> sendRecvRequest(localIndexToSend.size() + itListRecv->size());
264      int position = 0;
[841]265      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
[644]266      {
[841]267        int destRank = itSend->first;
268        const CArray<int,1>& localIndex_p = itSend->second;
269        int countSize = localIndex_p.numElements();
[1646]270        if (destRank != rank)
[644]271        {
[1646]272          for (int idx = 0; idx < countSize; ++idx)
273          {
274            sendBuff[idxSendBuff][idx] = dataCurrentSrc(localIndex_p(idx));
275          } 
276          MPI_Isend(sendBuff[idxSendBuff], countSize, MPI_DOUBLE, destRank, 12, client->intraComm, &sendRecvRequest[position++]);
277         
[644]278        }
[1646]279        else
280        {
281          for (int idx = 0; idx < countSize; ++idx)
282          {
283            sendBuffRank[idx] = dataCurrentSrc(localIndex_p(idx));
284          }
285        }
[644]286      }
287
[841]288      // Receiving data on destination fields
289      const CGridTransformation::RecvIndexGridDestinationMap& localIndexToReceive = *itListRecv;
290      CGridTransformation::RecvIndexGridDestinationMap::const_iterator itbRecv = localIndexToReceive.begin(), itRecv,
291                                                                       iteRecv = localIndexToReceive.end();
292      int recvBuffSize = 0;
[1646]293      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
294      {
295        if (itRecv->first != rank )
296          recvBuffSize += itRecv->second.size();
297      }
298      //(recvBuffSize < itRecv->second.size()) ? itRecv->second.size() : recvBuffSize;
[841]299      double* recvBuff;
[1646]300
[841]301      if (0 != recvBuffSize) recvBuff = new double[recvBuffSize];
302      int currentBuff = 0;
303      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
304      {
305        int srcRank = itRecv->first;
[1646]306        if (srcRank != rank)
307        {
308          int countSize = itRecv->second.size();
309          MPI_Irecv(recvBuff + currentBuff, countSize, MPI_DOUBLE, srcRank, 12, client->intraComm, &sendRecvRequest[position++]);
310          currentBuff += countSize;
311        }
[841]312      }
313      std::vector<MPI_Status> status(sendRecvRequest.size());
[1646]314      MPI_Waitall(position, &sendRecvRequest[0], &status[0]);
[709]315
[1646]316
317
[841]318      dataCurrentDest.resize(*itNbListRecv);
[1646]319      dataCurrentDest = 0.0;
[873]320
[1158]321      std::vector<bool> localInitFlag(dataCurrentDest.numElements(), true);
[841]322      currentBuff = 0;
[1260]323      bool firstPass=true; 
[841]324      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
325      {
[842]326        const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second;
[1646]327        int srcRank = itRecv->first;
328        if (srcRank != rank)
329        {
330          int countSize = itRecv->second.size();
331          (*itAlgo)->apply(localIndex_p,
332                           recvBuff+currentBuff,
333                           dataCurrentDest,
334                           localInitFlag,
335                           ignoreMissingValue,firstPass);
336          currentBuff += countSize;
337        }
338        else
339        {
340          (*itAlgo)->apply(localIndex_p,
341                           sendBuffRank,
342                           dataCurrentDest,
343                           localInitFlag,
344                           ignoreMissingValue,firstPass);
345        }
[888]346
[1260]347        firstPass=false ;
[841]348      }
349
[979]350      (*itAlgo)->updateData(dataCurrentDest);
351
[841]352      idxSendBuff = 0;
353      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
354      {
355        if (0 != itSend->second.numElements())
[1646]356        {
357          if (rank != itSend->first)
358            delete [] sendBuff[idxSendBuff];
359          else
360            delete [] sendBuffRank;
361        }
[841]362      }
363      if (0 != recvBuffSize) delete [] recvBuff;
[709]364    }
[841]365    if (dataCurrentDest.numElements() != dataDest.numElements())
366    ERROR("CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)",
[1021]367          "Incoherent between the received size and expected size. " << std::endl
368          << "Expected size: " << dataDest.numElements() << std::endl
369          << "Received size: " << dataCurrentDest.numElements());
[841]370
371    dataDest = dataCurrentDest;
[1412]372
[1414]373    CTimer::get("CSpatialTransformFilterEngine::apply").suspend() ;
[644]374  }
375} // namespace xios
Note: See TracBrowser for help on using the repository browser.