source: XIOS3/trunk/src/distribution/gatherer_connector.hpp @ 2396

Last change on this file since 2396 was 2294, checked in by jderouillat, 2 years ago

Fix for the average reduction in the new implementation of the GatherConnector::transfer

  • Property svn:executable set to *
File size: 13.4 KB
Line 
1#ifndef __GATHERER_CONNECTOR_HPP__
2#define __GATHERER_CONNECTOR_HPP__
3
4#include "xios_spl.hpp"
5#include "array_new.hpp"
6#include "distributed_view.hpp"
7#include "mpi.hpp"
8#include "local_view.hpp"
9#include "distributed_view.hpp"
10#include "context_client.hpp"
11#include "reduction_types.hpp"
12
13
14namespace xios
15{
16 
17  class CGathererConnector
18  {
19    private:
20      shared_ptr<CDistributedView> srcView_;
21      shared_ptr<CLocalView> dstView_;
22      map<int, vector<int>> connector_ ;
23      map<int, vector<bool>> mask_ ;  // mask is on src view
24      int dstSize_ ; 
25      map<int,int> srcSize_ ;
26
27    public:
28      CGathererConnector(shared_ptr<CDistributedView> srcView, shared_ptr<CLocalView> dstView) : srcView_(srcView), dstView_(dstView) {} ;
29      void computeConnector(void) ;
30     
31      template<typename T>
32      void transfer(int repeat, int sizeT, map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, EReduction op = EReduction::none)
33      {
34        // for future, make a specific transfer function for sizeT=1 to avoid multiplication (increasing performance)
35       
36        size_t dstSlice = dstSize_*sizeT ;
37        dataOut.resize(repeat* dstSlice) ; 
38       
39       
40        if (op == EReduction::none) // tranfer without reduction
41        {
42          for(auto& data : dataIn)
43          {
44            T* output = dataOut.dataFirst() ;
45            int rank=data.first ;
46            auto input  = data.second.dataFirst() ;
47            auto& connector=connector_[rank] ;
48            auto& mask=mask_[rank] ;
49            int size=mask.size() ;
50            size_t srcSlice = size * sizeT ;
51            for(int l=0; l<repeat; l++)
52            {   
53              for(int i=0, j=0 ;i<size;i++)
54              {
55                if (mask[i]) 
56                {
57                  int cj = connector[j]*sizeT ;
58                  int ci = i*sizeT ;
59                  for (int k=0;k<sizeT;k++) output[cj+k] = input[ci+k] ;
60                  j++ ;
61                }
62              }
63              input+=srcSlice ;
64              output+=dstSlice ;
65           }
66          }
67        }
68        else // with reduction, to be optimized, see how to vectorize
69        {
70          vector<int> vcount(dataOut.size(),0) ;
71          int* count = vcount.data() ;
72          T defaultValue = std::numeric_limits<T>::quiet_NaN();
73          for(auto& data : dataIn)
74          {
75            T* output = dataOut.dataFirst() ;
76            int rank=data.first ;
77            auto input  = data.second.dataFirst() ;
78            auto& connector=connector_[rank] ;
79            auto& mask=mask_[rank] ;
80            int size=mask.size() ;
81            size_t srcSlice = size * sizeT ;
82            for(int l=0; l<repeat; l++)
83            {   
84              for(int i=0, j=0 ;i<size;i++)
85              {
86                if (mask[i]) 
87                {
88                  int cj = connector[j]*sizeT ;
89                  int ci = i*sizeT ;
90                  for (int k=0;k<sizeT;k++)
91                  { 
92                    if (!std::isnan(input[ci+k])) // manage missing value
93                    {
94                      if (count[cj+k]==0)  output[cj+k] = input[ci+k] ;
95                      else
96                      {
97                        switch(op)
98                        {
99                          case EReduction::sum :
100                            output[cj+k] += input[ci+k] ;
101                            break ;
102                          case EReduction::min :
103                            output[cj+k]= std::min(output[cj+k],input[ci+k]) ;
104                            break ;
105                          case EReduction::max :
106                            output[cj+k]= std::max(output[cj+k],input[ci+k]) ;
107                            break ;
108                          case EReduction::average :
109                            output[cj+k] += input[ci+k] ;
110                            break ;
111                          default :
112                            ERROR("CGathererConnector::transfer",
113                               <<"reduction operator "<<(int)op<<" is not defined for this operation") ;
114                          break ;
115                        }
116                      }
117                      count[cj+k]++ ; 
118                    }
119                  }
120                  j++ ;
121                }
122              }
123              input+=srcSlice ;
124              output+=dstSlice ;
125              count+=dstSlice ;
126            }
127          }
128
129          T* output = dataOut.dataFirst() ;
130          if (op==EReduction::average)
131            for(int i=0;  i < dataOut.size() ; i++) 
132            {
133              if (count[i]>0) dataOut[i]/=count[i] ;
134              else dataOut[i] = defaultValue ;
135            }
136          else for(int i=0;  i < dataOut.size() ; i++) if (count[i]==0) dataOut[i] = defaultValue ;
137        }
138      }
139
140      template<typename T>
141      void transfer(int sizeT, map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, EReduction op = EReduction::none)
142      {
143        transfer(1, sizeT, dataIn, dataOut, op) ;
144      }
145   
146      template<typename T>
147      void transfer(map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, EReduction op = EReduction::none)
148      {
149        transfer(1,dataIn,dataOut, op) ;
150      }
151
152      template<typename T>
153      void transfer(int rank,  shared_ptr<CGathererConnector>* connectors, int nConnectors, const T* input, T* output, EReduction op = EReduction::none, int* count=nullptr)
154      {
155        auto& connector = connector_[rank] ; // probably costly, find a better way to avoid the map
156        auto& mask = mask_[rank] ; 
157        int srcSize = mask.size() ;
158     
159        if (nConnectors==0)
160        {
161          if (op == EReduction::none)
162          {
163            for(int i=0, j=0; i<srcSize; i++)
164              if (mask[i]) 
165              {
166                *(output+connector[j]) = *(input + i) ;
167                j++ ;
168              }
169          }
170          else
171          {
172            switch(op)
173            {
174              case EReduction::sum :
175                for(int i=0, j=0; i<srcSize; i++)
176                  if (mask[i]) 
177                  {
178                    if (!std::isnan(*(input + i)))
179                    {
180                      if (*(count+connector[j])==0) *(output+connector[j]) = *(input + i) ;
181                      else *(output+connector[j]) += *(input + i) ;
182                      (*(count+connector[j]))++ ;
183                    } 
184                    j++ ;
185                  }
186                break ;
187              case EReduction::min :
188                for(int i=0, j=0; i<srcSize; i++)
189                  if (mask[i]) 
190                  {
191                    if (!std::isnan(*(input + i)))
192                    {
193                      if (*(count+connector[j])==0) *(output+connector[j]) = *(input + i) ;
194                      else *(output+connector[j]) = std::min(*(output+connector[j]),*(input + i)) ;
195                      (*(count+connector[j]))++ ;
196                    } 
197                    j++ ;
198                  }
199                break ;
200              case EReduction::max :
201                for(int i=0, j=0; i<srcSize; i++)
202                  if (mask[i]) 
203                  {
204                    if (!std::isnan(*(input + i)))
205                    {
206                      if (*(count+connector[j])==0) *(output+connector[j]) = *(input + i) ;
207                      else *(output+connector[j]) = std::max(*(output+connector[j]),*(input + i)) ;
208                      (*(count+connector[j]))++ ;
209                    } 
210                    j++ ;
211                  }
212                break ;
213              case EReduction::average :
214                for(int i=0, j=0; i<srcSize; i++)
215                  if (mask[i]) 
216                  {
217                    if (!std::isnan(*(input + i)))
218                    {
219                      if (*(count+connector[j])==0) *(output+connector[j]) = *(input + i) ;
220                      else *(output+connector[j]) = *(output+connector[j])* (*(count+connector[j])) + *(input + i) ;
221                      (*(count+connector[j]))++ ;
222                    } 
223                    j++ ;
224                  }
225                for(int i=0, j=0; i<srcSize; i++)
226                    if (mask[i]) 
227                    {
228                      if (!std::isnan(*(input + i)))
229                        *(output+connector[j]) /= (*(count+connector[j]));
230                      j++ ;
231                    }
232                break ;
233              default :
234                ERROR("CGathererConnector::transfer",
235                     <<"reduction operator "<<(int)op<<" is not defined for this operation") ;
236                break ;
237            }
238          }
239
240        }
241        else
242       {
243          int srcSliceSize = (*(connectors-1))->getSrcSliceSize(rank, connectors-1, nConnectors-1) ;
244          int dstSliceSize = (*(connectors-1))->getDstSliceSize(connectors-1, nConnectors-1) ;
245
246          const T* in = input ; 
247          for(int i=0,j=0;i<srcSize;i++) 
248          {
249            if (mask[i]) 
250            {
251              (*(connectors-1))->transfer(rank, connectors-1, nConnectors-1, in, output+connector[j]*dstSliceSize, op, count+connector[j]*dstSliceSize) ; // the multiplication must be avoid in further optimization
252              j++ ;
253            }
254            in += srcSliceSize ;
255          }
256        }
257
258      }
259
260      // hook for transfering mask in grid connector, maybe find an other way to doing that...
261      void transfer_or(int rank,  shared_ptr<CGathererConnector>* connectors, int nConnectors, const bool* input, bool* output)
262      {
263        auto& connector = connector_[rank] ; // probably costly, find a better way to avoid the map
264        auto& mask = mask_[rank] ; 
265        int srcSize = mask.size() ;
266     
267        if (nConnectors==0)
268        {
269          for(int i=0, j=0; i<srcSize; i++)
270            if (mask[i]) 
271            {
272              *(output+connector[j]) |= *(input + i) ;
273              j++ ;
274            }
275
276        }
277        else
278       {
279          int srcSliceSize = (*(connectors-1))->getSrcSliceSize(rank, connectors-1, nConnectors-1) ;
280          int dstSliceSize = (*(connectors-1))->getDstSliceSize(connectors-1, nConnectors-1) ;
281
282          const bool* in = input ; 
283          for(int i=0,j=0;i<srcSize;i++) 
284          {
285            if (mask[i]) 
286            {
287              (*(connectors-1))->transfer_or(rank, connectors-1, nConnectors-1, in, output+connector[j]*dstSliceSize) ; // the multiplication must be avoid in further optimization
288              j++ ;
289            }
290            in += srcSliceSize ;
291          }
292        }
293
294      }
295
296
297
298      template<typename T>
299      void transfer(map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, T missingValue, EReduction op = EReduction::none)
300      {
301        transfer(1, 1, dataIn, dataOut, missingValue, op);
302      }
303     
304      template<typename T>
305      void transfer(int sizeT, map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, T missingValue, EReduction op = EReduction::none)
306      {
307         transfer(1, sizeT, dataIn, dataOut, missingValue, op) ;
308      }
309
310      template<typename T>
311      void transfer(int repeat , int sizeT, map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, T missingValue, EReduction op = EReduction::none)
312      {
313        dataOut.resize(repeat*dstSize_*sizeT) ;
314        dataOut=missingValue ;
315        transfer(repeat, sizeT, dataIn, dataOut, op) ;
316      }
317
318      template<typename T>
319      void transfer(CEventServer& event, int sizeT, CArray<T,1>& dataOut, EReduction op = EReduction::none)
320      {
321        map<int, CArray<T,1>> dataIn ;
322        for (auto& subEvent : event.subEvents) 
323        {
324          auto& data = dataIn[subEvent.rank]; 
325          (*subEvent.buffer) >> data ;
326        }
327        transfer(1, sizeT, dataIn, dataOut, op) ;
328      }
329     
330      template<typename T>
331      void transfer(CEventServer& event, CArray<T,1>& dataOut, EReduction op = EReduction::none)
332      {
333        transfer(event, 1, dataOut, op) ;
334      }
335
336      template<typename T>
337      void transfer(CEventServer& event, int sizeT, CArray<T,1>& dataOut, T missingValue, EReduction op = EReduction::none)
338      {
339        map<int, CArray<T,1>> dataIn ;
340        for (auto& subEvent : event.subEvents) 
341        {
342          auto& data = dataIn[subEvent.rank]; 
343          (*subEvent.buffer) >> data ;
344        }
345        transfer(1, sizeT, dataIn, dataOut, missingValue, op) ;
346      }
347
348      template<typename T>
349      void transfer(CEventServer& event, CArray<T,1>& dataOut, T missingValue, EReduction op = EReduction::none)
350      {
351        map<int, CArray<T,1>> dataIn ;
352        for (auto& subEvent : event.subEvents) 
353        {
354          auto& data = dataIn[subEvent.rank]; 
355          (*subEvent.buffer) >> data ;
356        }
357        transfer(1, 1, dataIn, dataOut, missingValue, op) ;
358      }
359
360    int getSrcSliceSize(int rank, shared_ptr<CGathererConnector>* connectors, int nConnectors) 
361    { if (nConnectors==0) return srcSize_[rank] ; else return srcSize_[rank] * (*(connectors-1))->getSrcSliceSize(rank, connectors-1,nConnectors-1) ; }
362
363    int getDstSliceSize(shared_ptr<CGathererConnector>* connectors, int nConnectors) 
364    { if (nConnectors==0) return dstSize_ ; else return dstSize_ * (*(connectors-1))->getDstSliceSize(connectors-1,nConnectors-1) ; }
365 
366    int getDstSize(void) {return dstSize_ ;}
367  } ;
368
369}
370
371#endif
Note: See TracBrowser for help on using the repository browser.