source: XIOS/dev/dev_ym/XIOS_COUPLING/src/distribution/gatherer_connector.hpp @ 2291

Last change on this file since 2291 was 2291, checked in by ymipsl, 2 years ago

Improve reduction transformation

  • make the difference between reduction over geometry or reduction between process.
  • geometrical reduction :

domain -> axis
axis -> scalar
domain -> scalar

  • reduction across processes for redondant geometrical cell :

axis -> axis
scalar -> scalar

Reduction can be local (only for the geometrical cell owned by current process) or global, using the "local" attribute (bool) over the reduction.

YM

  • Property svn:executable set to *
File size: 13.0 KB
Line 
1#ifndef __GATHERER_CONNECTOR_HPP__
2#define __GATHERER_CONNECTOR_HPP__
3
4#include "xios_spl.hpp"
5#include "array_new.hpp"
6#include "distributed_view.hpp"
7#include "mpi.hpp"
8#include "local_view.hpp"
9#include "distributed_view.hpp"
10#include "context_client.hpp"
11#include "reduction_types.hpp"
12
13
14namespace xios
15{
16 
17  class CGathererConnector
18  {
19    private:
20      shared_ptr<CDistributedView> srcView_;
21      shared_ptr<CLocalView> dstView_;
22      map<int, vector<int>> connector_ ;
23      map<int, vector<bool>> mask_ ;  // mask is on src view
24      int dstSize_ ; 
25      map<int,int> srcSize_ ;
26
27    public:
28      CGathererConnector(shared_ptr<CDistributedView> srcView, shared_ptr<CLocalView> dstView) : srcView_(srcView), dstView_(dstView) {} ;
29      void computeConnector(void) ;
30     
31      template<typename T>
32      void transfer(int repeat, int sizeT, map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, EReduction op = EReduction::none)
33      {
34        // for future, make a specific transfer function for sizeT=1 to avoid multiplication (increasing performance)
35       
36        size_t dstSlice = dstSize_*sizeT ;
37        dataOut.resize(repeat* dstSlice) ; 
38       
39       
40        if (op == EReduction::none) // tranfer without reduction
41        {
42          for(auto& data : dataIn)
43          {
44            T* output = dataOut.dataFirst() ;
45            int rank=data.first ;
46            auto input  = data.second.dataFirst() ;
47            auto& connector=connector_[rank] ;
48            auto& mask=mask_[rank] ;
49            int size=mask.size() ;
50            size_t srcSlice = size * sizeT ;
51            for(int l=0; l<repeat; l++)
52            {   
53              for(int i=0, j=0 ;i<size;i++)
54              {
55                if (mask[i]) 
56                {
57                  int cj = connector[j]*sizeT ;
58                  int ci = i*sizeT ;
59                  for (int k=0;k<sizeT;k++) output[cj+k] = input[ci+k] ;
60                  j++ ;
61                }
62              }
63              input+=srcSlice ;
64              output+=dstSlice ;
65           }
66          }
67        }
68        else // with reduction, to be optimized, see how to vectorize
69        {
70          vector<int> vcount(dataOut.size(),0) ;
71          int* count = vcount.data() ;
72          T defaultValue = std::numeric_limits<T>::quiet_NaN();
73          for(auto& data : dataIn)
74          {
75            T* output = dataOut.dataFirst() ;
76            int rank=data.first ;
77            auto input  = data.second.dataFirst() ;
78            auto& connector=connector_[rank] ;
79            auto& mask=mask_[rank] ;
80            int size=mask.size() ;
81            size_t srcSlice = size * sizeT ;
82            for(int l=0; l<repeat; l++)
83            {   
84              for(int i=0, j=0 ;i<size;i++)
85              {
86                if (mask[i]) 
87                {
88                  int cj = connector[j]*sizeT ;
89                  int ci = i*sizeT ;
90                  for (int k=0;k<sizeT;k++)
91                  { 
92                    if (!std::isnan(input[ci+k])) // manage missing value
93                    {
94                      if (count[cj+k]==0)  output[cj+k] = input[ci+k] ;
95                      else
96                      {
97                        switch(op)
98                        {
99                          case EReduction::sum :
100                            output[cj+k] += input[ci+k] ;
101                            break ;
102                          case EReduction::min :
103                            output[cj+k]= std::min(output[cj+k],input[ci+k]) ;
104                            break ;
105                          case EReduction::max :
106                            output[cj+k]= std::max(output[cj+k],input[ci+k]) ;
107                            break ;
108                          case EReduction::average :
109                            output[cj+k] += input[ci+k] ;
110                            break ;
111                          default :
112                            ERROR("CGathererConnector::transfer",
113                               <<"reduction operator "<<(int)op<<" is not defined for this operation") ;
114                          break ;
115                        }
116                      }
117                      count[cj+k]++ ; 
118                    }
119                  }
120                  j++ ;
121                }
122              }
123              input+=srcSlice ;
124              output+=dstSlice ;
125              count+=dstSlice ;
126            }
127          }
128
129          T* output = dataOut.dataFirst() ;
130          if (op==EReduction::average)
131            for(int i=0;  i < dataOut.size() ; i++) 
132            {
133              if (count[i]>0) dataOut[i]/=count[i] ;
134              else dataOut[i] = defaultValue ;
135            }
136          else for(int i=0;  i < dataOut.size() ; i++) if (count[i]==0) dataOut[i] = defaultValue ;
137        }
138      }
139
140      template<typename T>
141      void transfer(int sizeT, map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, EReduction op = EReduction::none)
142      {
143        transfer(1, sizeT, dataIn, dataOut, op) ;
144      }
145   
146      template<typename T>
147      void transfer(map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, EReduction op = EReduction::none)
148      {
149        transfer(1,dataIn,dataOut, op) ;
150      }
151
152      template<typename T>
153      void transfer(int rank,  shared_ptr<CGathererConnector>* connectors, int nConnectors, const T* input, T* output, EReduction op = EReduction::none, int* count=nullptr)
154      {
155        auto& connector = connector_[rank] ; // probably costly, find a better way to avoid the map
156        auto& mask = mask_[rank] ; 
157        int srcSize = mask.size() ;
158     
159        if (nConnectors==0)
160        {
161          if (op == EReduction::none)
162          {
163            for(int i=0, j=0; i<srcSize; i++)
164              if (mask[i]) 
165              {
166                *(output+connector[j]) = *(input + i) ;
167                j++ ;
168              }
169          }
170          else
171          {
172            switch(op)
173            {
174              case EReduction::sum :
175                for(int i=0, j=0; i<srcSize; i++)
176                  if (mask[i]) 
177                  {
178                    if (!std::isnan(*(input + i)))
179                    {
180                      if (*(count+connector[j])==0) *(output+connector[j]) = *(input + i) ;
181                      else *(output+connector[j]) += *(input + i) ;
182                      (*(count+connector[j]))++ ;
183                    } 
184                    j++ ;
185                  }
186                break ;
187              case EReduction::min :
188                for(int i=0, j=0; i<srcSize; i++)
189                  if (mask[i]) 
190                  {
191                    if (!std::isnan(*(input + i)))
192                    {
193                      if (*(count+connector[j])==0) *(output+connector[j]) = *(input + i) ;
194                      else *(output+connector[j]) = std::min(*(output+connector[j]),*(input + i)) ;
195                      (*(count+connector[j]))++ ;
196                    } 
197                    j++ ;
198                  }
199                break ;
200              case EReduction::max :
201                for(int i=0, j=0; i<srcSize; i++)
202                  if (mask[i]) 
203                  {
204                    if (!std::isnan(*(input + i)))
205                    {
206                      if (*(count+connector[j])==0) *(output+connector[j]) = *(input + i) ;
207                      else *(output+connector[j]) = std::max(*(output+connector[j]),*(input + i)) ;
208                      (*(count+connector[j]))++ ;
209                    } 
210                    j++ ;
211                  }
212                break ;
213              case EReduction::average :
214                for(int i=0, j=0; i<srcSize; i++)
215                  if (mask[i]) 
216                  {
217                    if (!std::isnan(*(input + i)))
218                    {
219                      if (*(count+connector[j])==0) *(output+connector[j]) = *(input + i) ;
220                      else *(output+connector[j]) += *(input + i) ;
221                      (*(count+connector[j]))++ ;
222                    } 
223                    j++ ;
224                  }
225                break ;
226              default :
227                ERROR("CGathererConnector::transfer",
228                     <<"reduction operator "<<(int)op<<" is not defined for this operation") ;
229                break ;
230            }
231          }
232
233        }
234        else
235       {
236          int srcSliceSize = (*(connectors-1))->getSrcSliceSize(rank, connectors-1, nConnectors-1) ;
237          int dstSliceSize = (*(connectors-1))->getDstSliceSize(connectors-1, nConnectors-1) ;
238
239          const T* in = input ; 
240          for(int i=0,j=0;i<srcSize;i++) 
241          {
242            if (mask[i]) 
243            {
244              (*(connectors-1))->transfer(rank, connectors-1, nConnectors-1, in, output+connector[j]*dstSliceSize, op, count+connector[j]*dstSliceSize) ; // the multiplication must be avoid in further optimization
245              j++ ;
246            }
247            in += srcSliceSize ;
248          }
249        }
250
251      }
252
253      // hook for transfering mask in grid connector, maybe find an other way to doing that...
254      void transfer_or(int rank,  shared_ptr<CGathererConnector>* connectors, int nConnectors, const bool* input, bool* output)
255      {
256        auto& connector = connector_[rank] ; // probably costly, find a better way to avoid the map
257        auto& mask = mask_[rank] ; 
258        int srcSize = mask.size() ;
259     
260        if (nConnectors==0)
261        {
262          for(int i=0, j=0; i<srcSize; i++)
263            if (mask[i]) 
264            {
265              *(output+connector[j]) |= *(input + i) ;
266              j++ ;
267            }
268
269        }
270        else
271       {
272          int srcSliceSize = (*(connectors-1))->getSrcSliceSize(rank, connectors-1, nConnectors-1) ;
273          int dstSliceSize = (*(connectors-1))->getDstSliceSize(connectors-1, nConnectors-1) ;
274
275          const bool* in = input ; 
276          for(int i=0,j=0;i<srcSize;i++) 
277          {
278            if (mask[i]) 
279            {
280              (*(connectors-1))->transfer_or(rank, connectors-1, nConnectors-1, in, output+connector[j]*dstSliceSize) ; // the multiplication must be avoid in further optimization
281              j++ ;
282            }
283            in += srcSliceSize ;
284          }
285        }
286
287      }
288
289
290
291      template<typename T>
292      void transfer(map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, T missingValue, EReduction op = EReduction::none)
293      {
294        transfer(1, 1, dataIn, dataOut, missingValue, op);
295      }
296     
297      template<typename T>
298      void transfer(int sizeT, map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, T missingValue, EReduction op = EReduction::none)
299      {
300         transfer(1, sizeT, dataIn, dataOut, missingValue, op) ;
301      }
302
303      template<typename T>
304      void transfer(int repeat , int sizeT, map<int, CArray<T,1>>& dataIn, CArray<T,1>& dataOut, T missingValue, EReduction op = EReduction::none)
305      {
306        dataOut.resize(repeat*dstSize_*sizeT) ;
307        dataOut=missingValue ;
308        transfer(repeat, sizeT, dataIn, dataOut, op) ;
309      }
310
311      template<typename T>
312      void transfer(CEventServer& event, int sizeT, CArray<T,1>& dataOut, EReduction op = EReduction::none)
313      {
314        map<int, CArray<T,1>> dataIn ;
315        for (auto& subEvent : event.subEvents) 
316        {
317          auto& data = dataIn[subEvent.rank]; 
318          (*subEvent.buffer) >> data ;
319        }
320        transfer(1, sizeT, dataIn, dataOut, op) ;
321      }
322     
323      template<typename T>
324      void transfer(CEventServer& event, CArray<T,1>& dataOut, EReduction op = EReduction::none)
325      {
326        transfer(event, 1, dataOut, op) ;
327      }
328
329      template<typename T>
330      void transfer(CEventServer& event, int sizeT, CArray<T,1>& dataOut, T missingValue, EReduction op = EReduction::none)
331      {
332        map<int, CArray<T,1>> dataIn ;
333        for (auto& subEvent : event.subEvents) 
334        {
335          auto& data = dataIn[subEvent.rank]; 
336          (*subEvent.buffer) >> data ;
337        }
338        transfer(1, sizeT, dataIn, dataOut, missingValue, op) ;
339      }
340
341      template<typename T>
342      void transfer(CEventServer& event, CArray<T,1>& dataOut, T missingValue, EReduction op = EReduction::none)
343      {
344        map<int, CArray<T,1>> dataIn ;
345        for (auto& subEvent : event.subEvents) 
346        {
347          auto& data = dataIn[subEvent.rank]; 
348          (*subEvent.buffer) >> data ;
349        }
350        transfer(1, 1, dataIn, dataOut, missingValue, op) ;
351      }
352
353    int getSrcSliceSize(int rank, shared_ptr<CGathererConnector>* connectors, int nConnectors) 
354    { if (nConnectors==0) return srcSize_[rank] ; else return srcSize_[rank] * (*(connectors-1))->getSrcSliceSize(rank, connectors-1,nConnectors-1) ; }
355
356    int getDstSliceSize(shared_ptr<CGathererConnector>* connectors, int nConnectors) 
357    { if (nConnectors==0) return dstSize_ ; else return dstSize_ * (*(connectors-1))->getDstSliceSize(connectors-1,nConnectors-1) ; }
358 
359    int getDstSize(void) {return dstSize_ ;}
360  } ;
361
362}
363
364#endif
Note: See TracBrowser for help on using the repository browser.