source: XIOS/trunk/src/filter/invert_algorithm.cpp @ 619

Last change on this file since 619 was 619, checked in by mhnguyen, 9 years ago

Implementing the first prototype of filter

+) Create new class filter
+) Implement class for specific algorithm
+) Implement inversing algorithm

Test
+) On Curie
+) Grid with one axis: passed

File size: 9.8 KB
Line 
1#include "invert_algorithm.hpp"
2#include "axis.hpp"
3#include "grid.hpp"
4#include <vector>
5#include <map>
6#include <list>
7#include <algorithm>
8#include "mpi.hpp"
9#include "context.hpp"
10#include "context_client.hpp"
11#include "field.hpp"
12#include "array_new.hpp"
13
14namespace xios {
15CInvertAlgorithm::CInvertAlgorithm()
16{
17}
18
19void CInvertAlgorithm::operate(CAxisTransformation& axisTransformation)
20{
21  std::vector<CAxis*> axisInputs = axisTransformation.getInputs();
22  std::vector<CAxis*> axisOutputs = axisTransformation.getOutputs();
23
24  // For now, only consider simple case one input - one output
25  CAxis* axisInput = axisInputs[0], *axisOutput = axisOutputs[0];
26  // Check correct size
27  if (axisInput->size.getValue() != axisOutput->size.getValue())
28  {
29    ERROR("CInvertAlgorithm::operate(CAxisTransformation& axisTransformation)",
30           << "Two axis have different size"
31           << "Size of axis source " <<axisInput->getId() << " is " << axisInput->size.getValue()  << std::endl
32           << "Size of axis destionation " <<axisOutput->getId() << " is " << axisOutput->size.getValue());
33  }
34
35  if ((axisInput->ni.isEmpty()) || (axisInput->ni.getValue() == axisInput->size.getValue()))
36  {
37    // axis non-distributed
38    size_t ssize = axisInput->size.getValue();
39    axisOutput->value.resize(ssize);
40    for (size_t idx = 0; idx < ssize; ++idx)
41    {
42      (axisOutput->value)(idx) = (axisInput->value)(ssize-idx-1);
43    }
44  }
45  else
46  {
47    CContext* context = CContext::getCurrent();
48    CContextClient* client = context->client;
49    int nbClient = client->clientSize;
50
51    // Ok, now axis is distributed
52    // First of all, each of client needs to have the whole axis data
53    int localAxisSize = axisInput->ni.getValue();
54    int* recvAxisSize = new int[nbClient];
55    MPI_Allgather(&localAxisSize, 1, MPI_INT,
56                  recvAxisSize, 1, MPI_INT, client->intraComm);
57
58
59    int* displ=new int[nbClient];
60    displ[0]=0 ;
61    for(int n=1;n<nbClient;n++) displ[n]=displ[n-1]+recvAxisSize[n-1];
62    int recvSize=displ[nbClient-1]+recvAxisSize[nbClient-1];
63    double* recvBuff=new double[recvSize];
64
65    double* sendBuff = new double[localAxisSize];
66    for (int idx = 0; idx < localAxisSize; ++idx) sendBuff[idx] = (axisInput->value)(idx);
67
68    MPI_Allgatherv(sendBuff, localAxisSize, MPI_DOUBLE,
69                   recvBuff, recvAxisSize, displ, MPI_DOUBLE,
70                   client->intraComm);
71
72    int ibegin = axisInput->ibegin.getValue();
73    size_t ssize = axisInput->size.getValue();
74    axisOutput->value.resize(localAxisSize);
75    for (int idx = 0; idx < localAxisSize; ++idx)
76        (axisOutput->value)(idx) = recvBuff[ssize-ibegin-idx-1];
77
78    std::cout << "axisOutput value " << (axisOutput->value) << std::endl;
79
80    delete [] displ;
81    delete [] sendBuff;
82    delete [] recvBuff;
83    delete [] recvAxisSize;
84  }
85}
86
87
88void CInvertAlgorithm::operate(CAxisFilter& axisFilter)
89{
90  CContext* context = CContext::getCurrent();
91  CContextClient* client = context->client;
92  MPI_Comm clientIntraComm = client->intraComm;
93
94  CGrid* gridInput = axisFilter.gridInput;
95  CGrid* gridOutput = axisFilter.gridOutput;
96
97
98  bool isDataDistributed = (gridInput->doGridHaveDataDistributed());
99  isDataDistributed = false;
100
101  // Data distributed
102  const CArray<size_t,1>& globalDataIndexInput = axisFilter.getGlobalDataIndexInput();
103  if (sendingIndexMap.empty() && receivingIndexMap.empty() && indexMap.empty())
104    inverseGlobalDataIndex(globalDataIndexInput);
105
106
107  const std::vector<CField*> fieldInputs = axisFilter.getInputs();
108  std::vector<CField*> fieldOutputs = axisFilter.getOutputs();
109
110  CField* input  = fieldInputs[0];
111  CField* output = fieldOutputs[0];
112
113  CArray<double, 1>& dataInput  = input->data;
114  CArray<double, 1>& dataOutput = output->filteredData;
115
116  if (dataOutput.numElements() != dataInput.numElements())
117    dataOutput.resize(dataInput.numElements());
118
119  std::map<size_t,size_t>::iterator itb = indexMap.begin(), ite = indexMap.end(), it;
120  for (it = itb; it != ite; ++it)
121  {
122    dataOutput(it->first) = dataInput(it->second);
123  }
124
125   std::cout << "dataOutput 1 "<<  dataOutput <<  std::endl;
126  std::map<int, std::vector<size_t> >::iterator itbMap, itMap, iteMap;
127  int idx = 0, k = 0;
128  int sendBuffSize = 0;
129  int nbSend = sendingIndexMap.size();
130  std::vector<int> sendDispl(nbSend,0);
131  itbMap = sendingIndexMap.begin();
132  iteMap = sendingIndexMap.end();
133  for (itMap = itbMap; itMap != iteMap; ++itMap) sendBuffSize += (itMap->second).size();
134  itMap = itbMap;
135  for (idx = 1; idx < nbSend; ++itMap, ++idx) sendDispl[idx] = sendDispl[idx-1] + (itMap->second).size();
136
137  double* sendBuff, *ptr;
138  std::vector<MPI_Request> sendRequest(nbSend);
139  if (0 != sendBuffSize) sendBuff = new double[sendBuffSize];
140  for (k = 0, itMap = itbMap; itMap != iteMap; ++itMap, ++k)
141  {
142    int sendDataSize = (itMap->second).size();
143    for (int i = 0; i < sendDataSize; ++i)
144    {
145      sendBuff[sendDispl[k]+i] = dataInput((itMap->second)[i]);
146    }
147    std::cout <<  "itMap first " << itMap->first << std::endl;
148    ptr = sendBuff + sendDispl[k];
149    MPI_Isend(ptr, sendDataSize, MPI_DOUBLE,
150              itMap->first, 24, clientIntraComm, &sendRequest[k]);
151  }
152
153  MPI_Status status;
154  for (idx = 0; idx < nbSend; ++idx) MPI_Wait(&sendRequest[idx], &status);
155
156  int recvBuffSize = 0, recvBuffPos = 0;
157  int nbRecv = receivingIndexMap.size();
158  std::map<int,int> recvDispl;
159  itbMap = receivingIndexMap.begin();
160  iteMap = receivingIndexMap.end();
161  for (itMap = itbMap; itMap != iteMap; ++itMap)
162  {
163    recvBuffSize += (itMap->second).size();
164    recvDispl[itMap->first] = recvBuffPos;
165    recvBuffPos += (itMap->second).size();
166  }
167
168  double* recvBuff;
169  std::map<int, MPI_Request> recvRequest;
170  std::map<int, MPI_Request>::iterator itRequest;
171  std::list<int> processedRequest;
172  int nbRemainingRecv = nbRecv;
173  if (0 != sendBuffSize) recvBuff = new double[recvBuffSize];
174
175  for (itMap = itbMap; itMap != iteMap; ++itMap)
176  {
177    int rankSource = itMap->first;
178    int recvDataSize = (itMap->second).size();
179    MPI_Recv(recvBuff+recvDispl[rankSource], recvDataSize, MPI_DOUBLE,
180             rankSource, 24, clientIntraComm, &status);
181    for (int i = 0; i < recvDataSize; ++i)
182    {
183      dataOutput(receivingIndexMap[rankSource][i]) = recvBuff[recvDispl[rankSource]+recvDataSize-i-1];
184    }
185  }
186  std::cout << "dataOutput 2 "<< dataOutput <<  std::endl;
187
188//  while (0 < nbRemainingRecv)
189//  {
190//    MPI_Status status;
191//    int flag, count;
192//    MPI_Iprobe(MPI_ANY_SOURCE, 24, clientIntraComm, &flag, &status);
193//    if ((true == flag))
194//    {
195//      MPI_Get_count(&status, MPI_DOUBLE, &count);
196//      MPI_Irecv(recvBuff+recvDispl[status.MPI_SOURCE], count, MPI_DOUBLE,
197//                status.MPI_SOURCE, 24, clientIntraComm,
198//                &recvRequest[status.MPI_SOURCE]);
199//
200//    }
201//    for (itRequest = recvRequest.begin(); itRequest != recvRequest.end(); ++itRequest)
202//    {
203//      int rank = itRequest->first;
204//      processedRequest.push_back(rank);
205//      int recvDataSize = (receivingIndexMap[rank]).size();
206//      for (int i = 0; i < recvDataSize; ++i)
207//      {
208//        dataOutput(receivingIndexMap[rank][i]) = recvBuff[recvDispl[rank]+recvDataSize-i];
209//      }
210//      --nbRemainingRecv;
211//    }
212//
213//    while (!processedRequest.empty())
214//    {
215//      recvRequest.erase(processedRequest.back());
216//      processedRequest.pop_back();
217//    }
218//  }
219
220  if (0 != sendBuffSize) delete [] sendBuff;
221  if (0 != recvBuffSize) delete [] recvBuff;
222}
223
224
225void CInvertAlgorithm::inverseGlobalDataIndex(const CArray<size_t,1>& globalDataIndexInput)
226{
227  CContext* context = CContext::getCurrent();
228  CContextClient* client = context->client;
229  int nbClient = client->clientSize;
230  int clientRank = client->clientRank;
231  int idx = 0;
232
233  int dataIndexSize = globalDataIndexInput.numElements();
234  int* recvAxisSize = new int[nbClient];
235  MPI_Allgather(&dataIndexSize, 1, MPI_INT,
236                recvAxisSize, 1, MPI_INT, client->intraComm);
237
238  std::vector<int> displ(nbClient);
239  displ[0]=0 ;
240  for(int n=1;n<nbClient;n++) displ[n]=displ[n-1]+recvAxisSize[n-1];
241  int recvSize=displ[nbClient-1]+recvAxisSize[nbClient-1];
242  unsigned long* recvBuff=new unsigned long[recvSize];
243
244  unsigned long* sendBuff = new unsigned long[dataIndexSize];
245  for (int idx = 0; idx < dataIndexSize; ++idx) sendBuff[idx] = (globalDataIndexInput)(idx);
246
247  MPI_Allgatherv(sendBuff, dataIndexSize, MPI_UNSIGNED_LONG,
248                 recvBuff, recvAxisSize, &displ[0], MPI_UNSIGNED_LONG,
249                 client->intraComm);
250
251  unsigned long* ptr;
252  int receivingRank, sendingRank;
253  std::vector<int>::iterator upper, itbDispl = displ.begin(), iteDispl = displ.end();
254
255  //
256//  std::map<int, std::vector<size_t> > sendingIndexMap;
257
258  // map between client rank and (local) received data index
259//  std::map<int, std::vector<size_t> > receivingIndexMap;
260  std::vector<size_t> tmpIndexMap;
261  for (idx = 0; idx < dataIndexSize; ++idx)
262  {
263    // First of all, need to know the inversed index
264    size_t inversedIndex = recvSize - globalDataIndexInput(idx)-1;
265
266    // Find position of the inversed index in the whole received array index
267    ptr = std::find(recvBuff, recvBuff+recvSize, inversedIndex);
268    if (ptr != recvBuff+recvSize)
269    {
270      int indexPosition = std::distance(recvBuff, ptr);
271      // Determine a client from which the current client will receive data
272      upper = std::upper_bound(itbDispl, iteDispl, indexPosition);
273      sendingRank = receivingRank = std::distance(itbDispl, upper) - 1;
274      if (clientRank != receivingRank)
275      {
276        (receivingIndexMap[receivingRank]).push_back(idx);
277        (sendingIndexMap[sendingRank]).push_back(idx);
278      }
279      else
280      {
281        tmpIndexMap.push_back(idx);
282      }
283    }
284  }
285  if (!tmpIndexMap.empty())
286  {
287    int tmpSize = tmpIndexMap.size();
288    for (int idx = 0; idx < tmpSize; ++idx)
289    {
290      indexMap[tmpIndexMap[idx]] = tmpIndexMap[tmpSize - idx -1];
291    }
292  }
293
294
295  delete [] sendBuff;
296  delete [] recvBuff;
297  delete [] recvAxisSize;
298}
299}
Note: See TracBrowser for help on using the repository browser.