source: XIOS/dev/branch_openmp/src/filter/spatial_transform_filter.cpp @ 1360

Last change on this file since 1360 was 1328, checked in by yushan, 7 years ago

dev_omp

File size: 11.4 KB
RevLine 
[1328]1#include "mpi.hpp"
[644]2#include "spatial_transform_filter.hpp"
3#include "grid_transformation.hpp"
4#include "context.hpp"
5#include "context_client.hpp"
[1328]6using namespace ep_lib;
[644]7
8namespace xios
9{
[873]10  CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, double outputValue, size_t inputSlotsCount)
11    : CFilter(gc, inputSlotsCount, engine), outputDefaultValue(outputValue)
[644]12  { /* Nothing to do */ }
13
14  std::pair<boost::shared_ptr<CSpatialTransformFilter>, boost::shared_ptr<CSpatialTransformFilter> >
[1018]15  CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue)
[644]16  {
17    if (!srcGrid || !destGrid)
18      ERROR("std::pair<boost::shared_ptr<CSpatialTransformFilter>, boost::shared_ptr<CSpatialTransformFilter> >"
19            "buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid)",
20            "Impossible to build the filter graph if either the source or the destination grid are null.");
21
[790]22    boost::shared_ptr<CSpatialTransformFilter> firstFilter, lastFilter;
23    // Note that this loop goes from the last transformation to the first transformation
24    do
25    {
26      CGridTransformation* gridTransformation = destGrid->getTransformations();
27      CSpatialTransformFilterEngine* engine = CSpatialTransformFilterEngine::get(destGrid->getTransformations());
[827]28      const std::vector<StdString>& auxInputs = gridTransformation->getAuxInputs();
29      size_t inputCount = 1 + (auxInputs.empty() ? 0 : auxInputs.size());
[1076]30      double defaultValue  = (hasMissingValue) ? std::numeric_limits<double>::quiet_NaN() : 0.0;
[873]31      boost::shared_ptr<CSpatialTransformFilter> filter(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount));
[644]32
[790]33      if (!lastFilter)
34        lastFilter = filter;
35      else
36        filter->connectOutput(firstFilter, 0);
37
38      firstFilter = filter;
[827]39      for (size_t idx = 0; idx < auxInputs.size(); ++idx)
40      {
41        CField* fieldAuxInput = CField::get(auxInputs[idx]);
42        fieldAuxInput->buildFilterGraph(gc, false);
43        fieldAuxInput->getInstantDataFilter()->connectOutput(firstFilter,idx+1);
44      }
45
[790]46      destGrid = gridTransformation->getGridSource();
47    }
48    while (destGrid != srcGrid);
49
50    return std::make_pair(firstFilter, lastFilter);
[644]51  }
52
[873]53  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data)
54  {
55    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
56    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
57    if (outputPacket)
[1006]58      onOutputReady(outputPacket);
[873]59  }
60
[644]61  CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)
62    : gridTransformation(gridTransformation)
63  {
64    if (!gridTransformation)
65      ERROR("CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)",
66            "Impossible to construct a spatial transform filter engine without a valid grid transformation.");
67  }
68
[1328]69  //std::map<CGridTransformation*, boost::shared_ptr<CSpatialTransformFilterEngine> > CSpatialTransformFilterEngine::engines;
[1134]70  std::map<CGridTransformation*, boost::shared_ptr<CSpatialTransformFilterEngine> > *CSpatialTransformFilterEngine::engines_ptr = 0;
[644]71
72  CSpatialTransformFilterEngine* CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)
73  {
74    if (!gridTransformation)
75      ERROR("CSpatialTransformFilterEngine& CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)",
76            "Impossible to get the requested engine, the grid transformation is invalid.");
[1328]77   
[1134]78    if(engines_ptr == NULL) engines_ptr = new std::map<CGridTransformation*, boost::shared_ptr<CSpatialTransformFilterEngine> >;
79
[1328]80    //std::map<CGridTransformation*, boost::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines.find(gridTransformation);
[1134]81    std::map<CGridTransformation*, boost::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines_ptr->find(gridTransformation);
[1328]82    //if (it == engines.end())
[1134]83    if (it == engines_ptr->end())
[644]84    {
85      boost::shared_ptr<CSpatialTransformFilterEngine> engine(new CSpatialTransformFilterEngine(gridTransformation));
[1328]86      //it = engines.insert(std::make_pair(gridTransformation, engine)).first;
[1134]87      it = engines_ptr->insert(std::make_pair(gridTransformation, engine)).first;
[644]88    }
89
90    return it->second.get();
91  }
92
93  CDataPacketPtr CSpatialTransformFilterEngine::apply(std::vector<CDataPacketPtr> data)
94  {
[873]95    /* Nothing to do */
96  }
97
98  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue)
99  {
[644]100    CDataPacketPtr packet(new CDataPacket);
101    packet->date = data[0]->date;
102    packet->timestamp = data[0]->timestamp;
103    packet->status = data[0]->status;
104
105    if (packet->status == CDataPacket::NO_ERROR)
106    {
[827]107      if (1 < data.size())  // Dynamical transformations
108      {
109        std::vector<CArray<double,1>* > dataAuxInputs(data.size()-1);
110        for (size_t idx = 0; idx < dataAuxInputs.size(); ++idx) dataAuxInputs[idx] = &(data[idx+1]->data);
[832]111        gridTransformation->computeAll(dataAuxInputs, packet->timestamp);
[827]112      }
[644]113      packet->data.resize(gridTransformation->getGridDestination()->storeIndex_client.numElements());
[1018]114      if (0 != packet->data.numElements())
115        (packet->data)(0) = defaultValue;
[644]116      apply(data[0]->data, packet->data);
117    }
118
119    return packet;
120  }
121
122  void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)
123  {
124    CContextClient* client = CContext::getCurrent()->client;
125
[873]126    // Get default value for output data
[1076]127    bool ignoreMissingValue = false; 
128    double defaultValue = std::numeric_limits<double>::quiet_NaN();
129    if (0 != dataDest.numElements()) ignoreMissingValue = NumTraits<double>::isnan(dataDest(0));
[1328]130
[841]131    const std::list<CGridTransformation::SendingIndexGridSourceMap>& listLocalIndexSend = gridTransformation->getLocalIndexToSendFromGridSource();
132    const std::list<CGridTransformation::RecvIndexGridDestinationMap>& listLocalIndexToReceive = gridTransformation->getLocalIndexToReceiveOnGridDest();
133    const std::list<size_t>& listNbLocalIndexToReceive = gridTransformation->getNbLocalIndexToReceiveOnGridDest();
[873]134    const std::list<std::vector<bool> >& listLocalIndexMaskOnDest = gridTransformation->getLocalMaskIndexOnGridDest();
[888]135    const std::vector<CGenericAlgorithmTransformation*>& listAlgos = gridTransformation->getAlgos();
[644]136
[841]137    CArray<double,1> dataCurrentDest(dataSrc.copy());
[644]138
[1328]139    std::list<CGridTransformation::SendingIndexGridSourceMap>::const_iterator itListSend  = listLocalIndexSend.begin(),
140                                                                              iteListSend = listLocalIndexSend.end();
[841]141    std::list<CGridTransformation::RecvIndexGridDestinationMap>::const_iterator itListRecv = listLocalIndexToReceive.begin();
142    std::list<size_t>::const_iterator itNbListRecv = listNbLocalIndexToReceive.begin();
[873]143    std::list<std::vector<bool> >::const_iterator itLocalMaskIndexOnDest = listLocalIndexMaskOnDest.begin();
[888]144    std::vector<CGenericAlgorithmTransformation*>::const_iterator itAlgo = listAlgos.begin();
[841]145
[888]146    for (; itListSend != iteListSend; ++itListSend, ++itListRecv, ++itNbListRecv, ++itLocalMaskIndexOnDest, ++itAlgo)
[709]147    {
[841]148      CArray<double,1> dataCurrentSrc(dataCurrentDest);
149      const CGridTransformation::SendingIndexGridSourceMap& localIndexToSend = *itListSend;
[709]150
[841]151      // Sending data from field sources to do transformations
152      std::map<int, CArray<int,1> >::const_iterator itbSend = localIndexToSend.begin(), itSend,
153                                                    iteSend = localIndexToSend.end();
154      int idxSendBuff = 0;
155      std::vector<double*> sendBuff(localIndexToSend.size());
156      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
[644]157      {
[841]158        if (0 != itSend->second.numElements())
159          sendBuff[idxSendBuff] = new double[itSend->second.numElements()];
[644]160      }
161
[841]162      idxSendBuff = 0;
[1328]163      std::vector<MPI_Request> sendRecvRequest(localIndexToSend.size() + itListRecv->size());
[1203]164      int position = 0;
[841]165      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
[644]166      {
[841]167        int destRank = itSend->first;
168        const CArray<int,1>& localIndex_p = itSend->second;
169        int countSize = localIndex_p.numElements();
170        for (int idx = 0; idx < countSize; ++idx)
[644]171        {
[841]172          sendBuff[idxSendBuff][idx] = dataCurrentSrc(localIndex_p(idx));
[644]173        }
[1328]174        MPI_Isend(sendBuff[idxSendBuff], countSize, MPI_DOUBLE, destRank, 12, client->intraComm, &sendRecvRequest[position++]);
[644]175      }
176
[841]177      // Receiving data on destination fields
[1328]178      const CGridTransformation::RecvIndexGridDestinationMap& localIndexToReceive = *itListRecv;
179      CGridTransformation::RecvIndexGridDestinationMap::const_iterator itbRecv = localIndexToReceive.begin(), itRecv,
180                                                                       iteRecv = localIndexToReceive.end();
[841]181      int recvBuffSize = 0;
182      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv) recvBuffSize += itRecv->second.size(); //(recvBuffSize < itRecv->second.size())
183                                                                       //? itRecv->second.size() : recvBuffSize;
184      double* recvBuff;
185      if (0 != recvBuffSize) recvBuff = new double[recvBuffSize];
186      int currentBuff = 0;
187      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
188      {
189        int srcRank = itRecv->first;
190        int countSize = itRecv->second.size();
[1328]191        MPI_Irecv(recvBuff + currentBuff, countSize, MPI_DOUBLE, srcRank, 12, client->intraComm, &sendRecvRequest[position++]);
[841]192        currentBuff += countSize;
193      }
[1328]194      std::vector<MPI_Status> status(sendRecvRequest.size());
[841]195      MPI_Waitall(sendRecvRequest.size(), &sendRecvRequest[0], &status[0]);
[709]196
[841]197      dataCurrentDest.resize(*itNbListRecv);
[873]198      const std::vector<bool>& localMaskDest = *itLocalMaskIndexOnDest;
199      for (int i = 0; i < localMaskDest.size(); ++i)
200        if (localMaskDest[i]) dataCurrentDest(i) = 0.0;
201        else dataCurrentDest(i) = defaultValue;
202
[1042]203      std::vector<bool> localInitFlag(dataCurrentDest.numElements(), true);
[841]204      currentBuff = 0;
[1328]205      bool firstPass=true; 
[841]206      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
207      {
208        int countSize = itRecv->second.size();
[842]209        const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second;
[888]210        (*itAlgo)->apply(localIndex_p,
211                         recvBuff+currentBuff,
212                         dataCurrentDest,
[918]213                         localInitFlag,
[1328]214                         ignoreMissingValue,firstPass);
[888]215
[841]216        currentBuff += countSize;
[1328]217        firstPass=false ;
[841]218      }
219
[979]220      (*itAlgo)->updateData(dataCurrentDest);
221
[841]222      idxSendBuff = 0;
223      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
224      {
225        if (0 != itSend->second.numElements())
226          delete [] sendBuff[idxSendBuff];
227      }
228      if (0 != recvBuffSize) delete [] recvBuff;
[709]229    }
[841]230    if (dataCurrentDest.numElements() != dataDest.numElements())
231    ERROR("CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)",
[1003]232          "Incoherent between the received size and expected size. " << std::endl
233          << "Expected size: " << dataDest.numElements() << std::endl
234          << "Received size: " << dataCurrentDest.numElements());
[841]235
236    dataDest = dataCurrentDest;
[644]237  }
[1328]238} // namespace xios
Note: See TracBrowser for help on using the repository browser.