source: XIOS/trunk/src/node/axis.cpp @ 906

Last change on this file since 906 was 906, checked in by mhnguyen, 8 years ago

Fixing bug: Ticket 98

+) Correct the way to distribute index of domain (axis) in case there are more
clients than number of index

Test
+) On Curie
+) Small grid (2x2x1) and 6 clients - 1 server: Pass

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 31.3 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
27      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
28      , transformationMap_(), hasValue(false)
29   {
30   }
31
32   CAxis::CAxis(const StdString & id)
33      : CObjectTemplate<CAxis>(id)
34      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
35      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
36      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
37      , transformationMap_(), hasValue(false)
38   {
39   }
40
41   CAxis::~CAxis(void)
42   { /* Ne rien faire de plus */ }
43
44   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
45   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
46   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
47   {
48     m["zoom_axis"] = TRANS_ZOOM_AXIS;
49     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
50     m["inverse_axis"] = TRANS_INVERSE_AXIS;
51     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
52     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
53   }
54
55   ///---------------------------------------------------------------
56
57   const std::set<StdString> & CAxis::getRelFiles(void) const
58   {
59      return (this->relFiles);
60   }
61
62   bool CAxis::IsWritten(const StdString & filename) const
63   {
64      return (this->relFiles.find(filename) != this->relFiles.end());
65   }
66
67   bool CAxis::isWrittenCompressed(const StdString& filename) const
68   {
69      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
70   }
71
72   bool CAxis::isDistributed(void) const
73   {
74      return isDistributed_;
75   }
76
77   /*!
78    * Test whether the data defined on the axis can be outputted in a compressed way.
79    *
80    * \return true if and only if a mask was defined for this axis
81    */
82   bool CAxis::isCompressible(void) const
83   {
84      return isCompressible_;
85   }
86
87   void CAxis::addRelFile(const StdString & filename)
88   {
89      this->relFiles.insert(filename);
90   }
91
92   void CAxis::addRelFileCompressed(const StdString& filename)
93   {
94      this->relFilesCompressed.insert(filename);
95   }
96
97   //----------------------------------------------------------------
98
99   const std::vector<int>& CAxis::getIndexesToWrite(void) const
100   {
101     return indexesToWrite;
102   }
103
104   /*!
105     Returns the number of indexes written by each server.
106     \return the number of indexes written by each server
107   */
108   int CAxis::getNumberWrittenIndexes() const
109   {
110     return numberWrittenIndexes_;
111   }
112
113   /*!
114     Returns the total number of indexes written by the servers.
115     \return the total number of indexes written by the servers
116   */
117   int CAxis::getTotalNumberWrittenIndexes() const
118   {
119     return totalNumberWrittenIndexes_;
120   }
121
122   /*!
123     Returns the offset of indexes written by each server.
124     \return the offset of indexes written by each server
125   */
126   int CAxis::getOffsetWrittenIndexes() const
127   {
128     return offsetWrittenIndexes_;
129   }
130
131   //----------------------------------------------------------------
132
133   /*!
134    * Compute the minimum buffer size required to send the attributes to the server(s).
135    *
136    * \return A map associating the server rank with its minimum buffer size.
137    */
138   std::map<int, StdSize> CAxis::getAttributesBufferSize()
139   {
140     CContextClient* client = CContext::getCurrent()->client;
141
142     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
143
144     bool isNonDistributed = (n == n_glo);
145
146     if (client->isServerLeader())
147     {
148       // size estimation for sendServerAttribut
149       size_t size = 6 * sizeof(size_t);
150       // size estimation for sendNonDistributedValue
151       if (isNonDistributed)
152         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
153       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
154
155       const std::list<int>& ranks = client->getRanksServerLeader();
156       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
157       {
158         if (size > attributesSizes[*itRank])
159           attributesSizes[*itRank] = size;
160       }
161     }
162
163     if (!isNonDistributed)
164     {
165       // size estimation for sendDistributedValue
166       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
167       for (it = indSrv_.begin(); it != ite; ++it)
168       {
169         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
170         if (isCompressible_)
171           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
172
173         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
174         if (hasBounds_)
175           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
176
177         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
178         if (size > attributesSizes[it->first])
179           attributesSizes[it->first] = size;
180       }
181     }
182
183     return attributesSizes;
184   }
185
186   //----------------------------------------------------------------
187
188   StdString CAxis::GetName(void)   { return (StdString("axis")); }
189   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
190   ENodeType CAxis::GetType(void)   { return (eAxis); }
191
192   //----------------------------------------------------------------
193
194   CAxis* CAxis::createAxis()
195   {
196     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
197     return axis;
198   }
199
200   void CAxis::fillInValues(const CArray<double,1>& values)
201   {
202     this->value = values;
203   }
204
205   void CAxis::checkAttributes(void)
206   {
207      if (this->n_glo.isEmpty())
208        ERROR("CAxis::checkAttributes(void)",
209              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
210              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
211      StdSize size = this->n_glo.getValue();
212
213      isDistributed_ = !this->begin.isEmpty() || !this->n.isEmpty();
214
215      if (!this->begin.isEmpty())
216      {
217        if (begin < 0 || begin > size - 1)
218          ERROR("CAxis::checkAttributes(void)",
219                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
220                << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
221      }
222      else this->begin.setValue(0);
223
224      if (!this->n.isEmpty())
225      {
226        if (n < 0 || n > size)
227          ERROR("CAxis::checkAttributes(void)",
228                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
229                << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
230      }
231      else this->n.setValue(size);
232
233      if (!this->value.isEmpty())
234      {
235        StdSize true_size = value.numElements();
236        if (this->n.getValue() != true_size)
237          ERROR("CAxis::checkAttributes(void)",
238                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
239                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
240        this->hasValue = true;
241      }
242
243      if (this->index.isEmpty())
244      {
245        index.resize(n);
246        for (int i = 0; i < n; ++i) index(i) = i+begin;
247      }
248
249      this->checkData();
250      this->checkZoom();
251      this->checkMask();
252      this->checkBounds();
253   }
254
255   void CAxis::checkData()
256   {
257      if (data_begin.isEmpty()) data_begin.setValue(0);
258
259      if (data_n.isEmpty())
260      {
261        data_n.setValue(n);
262      }
263      else if (data_n.getValue() < 0)
264      {
265        ERROR("CAxis::checkData(void)",
266              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
267              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
268      }
269
270      if (data_index.isEmpty())
271      {
272        data_index.resize(data_n);
273        for (int i = 0; i < data_n; ++i) data_index(i) = i;
274      }
275   }
276
277   void CAxis::checkZoom(void)
278   {
279     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
280     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
281   }
282
283   void CAxis::checkMask()
284   {
285      if (!mask.isEmpty())
286      {
287         if (mask.extent(0) != n)
288           ERROR("CAxis::checkMask(void)",
289                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
290                 << "The mask does not have the same size as the local domain." << std::endl
291                 << "Local size is " << n.getValue() << "." << std::endl
292                 << "Mask size is " << mask.extent(0) << ".");
293      }
294      else // (mask.isEmpty())
295      { // If no mask was defined, we create a default one without any masked point.
296         mask.resize(n);
297         for (int i = 0; i < n; ++i)
298         {
299           mask(i) = true;
300         }
301      }
302   }
303
304  void CAxis::checkBounds()
305  {
306    if (!bounds.isEmpty())
307    {
308      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
309        ERROR("CAxis::checkAttributes(void)",
310              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
311              << "Axis size is " << n.getValue() << "." << std::endl
312              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
313      hasBounds_ = true;
314    }
315    else hasBounds_ = false;
316  }
317
318  void CAxis::checkEligibilityForCompressedOutput()
319  {
320    // We don't check if the mask is valid here, just if a mask has been defined at this point.
321    isCompressible_ = !mask.isEmpty();
322  }
323
324  bool CAxis::dispatchEvent(CEventServer& event)
325   {
326      if (SuperClass::dispatchEvent(event)) return true;
327      else
328      {
329        switch(event.type)
330        {
331           case EVENT_ID_SERVER_ATTRIBUT :
332             recvServerAttribut(event);
333             return true;
334             break;
335           case EVENT_ID_INDEX:
336            recvIndex(event);
337            return true;
338            break;
339          case EVENT_ID_DISTRIBUTED_VALUE:
340            recvDistributedValue(event);
341            return true;
342            break;
343          case EVENT_ID_NON_DISTRIBUTED_VALUE:
344            recvNonDistributedValue(event);
345            return true;
346            break;
347           default :
348             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
349                    << "Unknown Event");
350           return false;
351         }
352      }
353   }
354
355   void CAxis::checkAttributesOnClient()
356   {
357     if (this->areClientAttributesChecked_) return;
358
359     this->checkAttributes();
360
361     this->areClientAttributesChecked_ = true;
362   }
363
364   // Send all checked attributes to server
365   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
366                                     CServerDistributionDescription::ServerDistributionType distType)
367   {
368     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
369     CContext* context = CContext::getCurrent();
370
371     if (this->isChecked) return;
372     if (context->hasClient)
373     {
374       sendServerAttribut(globalDim, orderPositionInGrid, distType);
375       if (hasValue) sendValue(globalDim, orderPositionInGrid, distType);
376     }
377
378     this->isChecked = true;
379   }
380
381  void CAxis::sendValue(const std::vector<int>& globalDim, int orderPositionInGrid,
382                        CServerDistributionDescription::ServerDistributionType distType)
383  {
384     if (n.getValue() == n_glo.getValue())
385     {
386       sendNonDistributedValue();
387     }
388     else
389     {
390       computeConnectedServer(globalDim, orderPositionInGrid, distType);
391       sendDistributedValue();
392     }
393  }
394
395  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
396                                     CServerDistributionDescription::ServerDistributionType distType)
397  {
398    CContext* context = CContext::getCurrent();
399    CContextClient* client = context->client;
400    int nbServer = client->serverSize;
401    int range, clientSize = client->clientSize;
402    int rank = client->clientRank;
403
404    size_t ni = this->n.getValue();
405    size_t ibegin = this->begin.getValue();
406    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
407    size_t nZoomCount = 0;
408    size_t nbIndex = index.numElements();
409    for (size_t idx = 0; idx < nbIndex; ++idx)
410    {
411      size_t globalIndex = index(idx);
412      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
413    }
414
415    CArray<size_t,1> globalIndexAxis(nbIndex);
416    std::vector<size_t> globalAxisZoom(nZoomCount);
417    nZoomCount = 0;
418    for (size_t idx = 0; idx < nbIndex; ++idx)
419    {
420      size_t globalIndex = index(idx);
421      globalIndexAxis(idx) = globalIndex;
422      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
423      {
424        globalAxisZoom[nZoomCount] = globalIndex;
425        ++nZoomCount;
426      }
427    }
428
429    std::set<int> writtenInd;
430    if (isCompressible_)
431    {
432      for (int idx = 0; idx < data_index.numElements(); ++idx)
433      {
434        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
435
436        if (ind >= 0 && ind < ni && mask(ind))
437        {
438          ind += ibegin;
439          if (ind >= global_zoom_begin && ind <= zoom_end)
440            writtenInd.insert(ind);
441        }
442      }
443    }
444
445    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
446    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
447    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
448    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
449    {
450      std::vector<int> nGlobAxis(1);
451      nGlobAxis[0] = n_glo.getValue();
452
453      size_t globalSizeIndex = 1, indexBegin, indexEnd;
454      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
455      indexBegin = 0;
456      if (globalSizeIndex <= clientSize)
457      {
458        indexBegin = rank%globalSizeIndex;
459        indexEnd = indexBegin;
460      }
461      else
462      {
463        for (int i = 0; i < clientSize; ++i)
464        {
465          range = globalSizeIndex / clientSize;
466          if (i < (globalSizeIndex%clientSize)) ++range;
467          if (i == client->clientRank) break;
468          indexBegin += range;
469        }
470        indexEnd = indexBegin + range - 1;
471      }
472
473      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
474      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
475      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
476      clientServerMap->computeServerIndexMapping(globalIndexAxis);
477      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
478      delete clientServerMap;
479    }
480    else
481    {
482      std::vector<size_t> globalIndexServer(n_glo.getValue());
483      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
484      {
485        globalIndexServer[idx] = idx;
486      }
487
488      for (int idx = 0; idx < nbServer; ++idx)
489      {
490        globalIndexAxisOnServer[idx] = globalIndexServer;
491      }
492    }
493
494    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
495                                                         ite = globalIndexAxisOnServer.end();
496    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
497                                        iteVec = (globalAxisZoom).end();
498    indSrv_.clear();
499    indWrittenSrv_.clear();
500    for (; it != ite; ++it)
501    {
502      int rank = it->first;
503      const std::vector<size_t>& globalIndexTmp = it->second;
504      int nb = globalIndexTmp.size();
505
506      for (int i = 0; i < nb; ++i)
507      {
508        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
509        {
510          indSrv_[rank].push_back(globalIndexTmp[i]);
511        }
512
513        if (writtenInd.count(globalIndexTmp[i]))
514        {
515          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
516        }
517      }
518    }
519
520    connectedServerRank_.clear();
521    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
522      connectedServerRank_.push_back(it->first);
523    }
524
525    if (!indSrv_.empty())
526    {
527      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
528                                                     iteIndSrv = indSrv_.end();
529      connectedServerRank_.clear();
530      for (; itIndSrv != iteIndSrv; ++itIndSrv)
531        connectedServerRank_.push_back(itIndSrv->first);
532    }
533    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
534  }
535
536  void CAxis::sendNonDistributedValue()
537  {
538    CContext* context = CContext::getCurrent();
539    CContextClient* client = context->client;
540    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
541
542    int zoom_end = global_zoom_begin + global_zoom_n - 1;
543    int nb = 0;
544    for (size_t idx = 0; idx < n; ++idx)
545    {
546      size_t globalIndex = begin + idx;
547      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
548    }
549
550    int nbWritten = 0;
551    if (isCompressible_)
552    {
553      for (int idx = 0; idx < data_index.numElements(); ++idx)
554      {
555        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
556
557        if (ind >= 0 && ind < n && mask(ind))
558        {
559          ind += begin;
560          if (ind >= global_zoom_begin && ind <= zoom_end)
561            ++nbWritten;
562        }
563      }
564    }
565
566    CArray<double,1> val(nb);
567    nb = 0;
568    for (size_t idx = 0; idx < n; ++idx)
569    {
570      size_t globalIndex = begin + idx;
571      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
572      {
573        val(nb) = value(idx);
574        ++nb;
575      }
576    }
577
578    CArray<int, 1> writtenInd(nbWritten);
579    nbWritten = 0;
580    if (isCompressible_)
581    {
582      for (int idx = 0; idx < data_index.numElements(); ++idx)
583      {
584        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
585
586        if (ind >= 0 && ind < n && mask(ind))
587        {
588          ind += begin;
589          if (ind >= global_zoom_begin && ind <= zoom_end)
590          {
591            writtenInd(nbWritten) = ind;
592            ++nbWritten;
593          }
594        }
595      }
596    }
597
598    if (client->isServerLeader())
599    {
600      std::list<CMessage> msgs;
601
602      const std::list<int>& ranks = client->getRanksServerLeader();
603      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
604      {
605        msgs.push_back(CMessage());
606        CMessage& msg = msgs.back();
607        msg << this->getId();
608        msg << val;
609        if (isCompressible_)
610          msg << writtenInd;
611        event.push(*itRank, 1, msg);
612      }
613      client->sendEvent(event);
614    }
615    else client->sendEvent(event);
616  }
617
618  void CAxis::sendDistributedValue(void)
619  {
620    int ns, n, i, j, ind, nv, idx;
621    CContext* context = CContext::getCurrent();
622    CContextClient* client=context->client;
623
624    // send value for each connected server
625    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
626    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
627
628    list<CMessage> list_msgsIndex, list_msgsVal;
629    list<CArray<int,1> > list_indi;
630    list<CArray<int,1> > list_writtenInd;
631    list<CArray<double,1> > list_val;
632    list<CArray<double,2> > list_bounds;
633
634    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
635    iteMap = indSrv_.end();
636    for (int k = 0; k < connectedServerRank_.size(); ++k)
637    {
638      int nbData = 0;
639      int rank = connectedServerRank_[k];
640      it = indSrv_.find(rank);
641      if (iteMap != it)
642        nbData = it->second.size();
643
644      list_indi.push_back(CArray<int,1>(nbData));
645      list_val.push_back(CArray<double,1>(nbData));
646
647      if (hasBounds_)
648      {
649        list_bounds.push_back(CArray<double,2>(2,nbData));
650      }
651
652      CArray<int,1>& indi = list_indi.back();
653      CArray<double,1>& val = list_val.back();
654
655      for (n = 0; n < nbData; ++n)
656      {
657        idx = static_cast<int>(it->second[n]);
658        ind = idx - begin;
659
660        val(n) = value(ind);
661        indi(n) = idx;
662
663        if (hasBounds_)
664        {
665          CArray<double,2>& boundsVal = list_bounds.back();
666          boundsVal(0, n) = bounds(0,n);
667          boundsVal(1, n) = bounds(1,n);
668        }
669      }
670
671      list_msgsIndex.push_back(CMessage());
672      list_msgsIndex.back() << this->getId() << list_indi.back();
673
674      if (isCompressible_)
675      {
676        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
677        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
678        CArray<int,1>& writtenInd = list_writtenInd.back();
679
680        for (n = 0; n < writtenInd.numElements(); ++n)
681          writtenInd(n) = writtenIndSrc[n];
682
683        list_msgsIndex.back() << writtenInd;
684      }
685
686      list_msgsVal.push_back(CMessage());
687      list_msgsVal.back() << this->getId() << list_val.back();
688
689      if (hasBounds_)
690      {
691        list_msgsVal.back() << list_bounds.back();
692      }
693
694      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
695      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
696    }
697
698    client->sendEvent(eventIndex);
699    client->sendEvent(eventVal);
700  }
701
702  void CAxis::recvIndex(CEventServer& event)
703  {
704    CAxis* axis;
705
706    list<CEventServer::SSubEvent>::iterator it;
707    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
708    {
709      CBufferIn* buffer = it->buffer;
710      string axisId;
711      *buffer >> axisId;
712      axis = get(axisId);
713      axis->recvIndex(it->rank, *buffer);
714    }
715
716    if (axis->isCompressible_)
717    {
718      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
719
720      CContextServer* server = CContext::getCurrent()->server;
721      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
722      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
723      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
724      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
725    }
726  }
727
728  void CAxis::recvIndex(int rank, CBufferIn& buffer)
729  {
730    buffer >> indiSrv_[rank];
731
732    if (isCompressible_)
733    {
734      CArray<int, 1> writtenIndexes;
735      buffer >> writtenIndexes;
736      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
737      for (int i = 0; i < writtenIndexes.numElements(); ++i)
738        indexesToWrite.push_back(writtenIndexes(i));
739    }
740  }
741
742  void CAxis::recvDistributedValue(CEventServer& event)
743  {
744    list<CEventServer::SSubEvent>::iterator it;
745    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
746    {
747      CBufferIn* buffer = it->buffer;
748      string axisId;
749      *buffer >> axisId;
750      get(axisId)->recvDistributedValue(it->rank, *buffer);
751    }
752  }
753
754  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
755  {
756    CArray<int,1> &indi = indiSrv_[rank];
757    CArray<double,1> val;
758    CArray<double,2> boundsVal;
759
760    buffer >> val;
761    if (hasBounds_) buffer >> boundsVal;
762
763    int i, j, ind_srv;
764    for (int ind = 0; ind < indi.numElements(); ++ind)
765    {
766      i = indi(ind);
767      ind_srv = i - zoom_begin_srv;
768      value_srv(ind_srv) = val(ind);
769      if (hasBounds_)
770      {
771        bound_srv(0,ind_srv) = boundsVal(0, ind);
772        bound_srv(1,ind_srv) = boundsVal(1, ind);
773      }
774    }
775  }
776
777   void CAxis::recvNonDistributedValue(CEventServer& event)
778  {
779    CAxis* axis;
780
781    list<CEventServer::SSubEvent>::iterator it;
782    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
783    {
784      CBufferIn* buffer = it->buffer;
785      string axisId;
786      *buffer >> axisId;
787      axis = get(axisId);
788      axis->recvNonDistributedValue(it->rank, *buffer);
789    }
790
791    if (axis->isCompressible_)
792    {
793      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
794
795      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
796      axis->offsetWrittenIndexes_ = 0;
797    }
798  }
799
800  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
801  {
802    CArray<double,1> val;
803    buffer >> val;
804
805    for (int ind = 0; ind < val.numElements(); ++ind)
806    {
807      value_srv(ind) = val(ind);
808      if (hasBounds_)
809      {
810        bound_srv(0,ind) = bounds(0,ind);
811        bound_srv(1,ind) = bounds(1,ind);
812      }
813    }
814
815    if (isCompressible_)
816    {
817      CArray<int, 1> writtenIndexes;
818      buffer >> writtenIndexes;
819      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
820      for (int i = 0; i < writtenIndexes.numElements(); ++i)
821        indexesToWrite.push_back(writtenIndexes(i));
822    }
823  }
824
825  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
826                                 CServerDistributionDescription::ServerDistributionType distType)
827  {
828    CContext* context = CContext::getCurrent();
829    CContextClient* client = context->client;
830    int nbServer = client->serverSize;
831
832    CServerDistributionDescription serverDescription(globalDim, nbServer);
833    serverDescription.computeServerDistribution();
834
835    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
836    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
837
838    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
839    if (client->isServerLeader())
840    {
841      std::list<CMessage> msgs;
842
843      const std::list<int>& ranks = client->getRanksServerLeader();
844      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
845      {
846        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
847        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
848        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
849        const int end   = begin + ni - 1;
850
851        msgs.push_back(CMessage());
852        CMessage& msg = msgs.back();
853        msg << this->getId();
854        msg << ni << begin << end;
855        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
856        msg << isCompressible_;
857
858        event.push(*itRank,1,msg);
859      }
860      client->sendEvent(event);
861    }
862    else client->sendEvent(event);
863  }
864
865  void CAxis::recvServerAttribut(CEventServer& event)
866  {
867    CBufferIn* buffer = event.subEvents.begin()->buffer;
868    string axisId;
869    *buffer >> axisId;
870    get(axisId)->recvServerAttribut(*buffer);
871  }
872
873  void CAxis::recvServerAttribut(CBufferIn& buffer)
874  {
875    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
876
877    buffer >> ni_srv >> begin_srv >> end_srv;
878    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
879    buffer >> isCompressible_;
880    global_zoom_begin = global_zoom_begin_tmp;
881    global_zoom_n  = global_zoom_n_tmp;
882    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
883
884    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
885    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
886    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
887
888    if (zoom_size_srv<=0)
889    {
890      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
891    }
892
893    if (n_glo == n)
894    {
895      zoom_begin_srv = global_zoom_begin;
896      zoom_end_srv   = global_zoom_end; //zoom_end;
897      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
898    }
899    if (hasValue)
900    {
901      value_srv.resize(zoom_size_srv);
902      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
903    }
904  }
905
906  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
907  {
908    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
909    return transformationMap_.back().second;
910  }
911
912  bool CAxis::hasTransformation()
913  {
914    return (!transformationMap_.empty());
915  }
916
917  void CAxis::setTransformations(const TransMapTypes& axisTrans)
918  {
919    transformationMap_ = axisTrans;
920  }
921
922  CAxis::TransMapTypes CAxis::getAllTransformations(void)
923  {
924    return transformationMap_;
925  }
926
927  /*!
928    Check the validity of all transformations applied on axis
929  This functions is called AFTER all inherited attributes are solved
930  */
931  void CAxis::checkTransformations()
932  {
933    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
934                                  ite = transformationMap_.end();
935//    for (it = itb; it != ite; ++it)
936//    {
937//      (it->second)->checkValid(this);
938//    }
939  }
940
941  void CAxis::duplicateTransformation(CAxis* src)
942  {
943    if (src->hasTransformation())
944    {
945      this->setTransformations(src->getAllTransformations());
946    }
947  }
948
949  /*!
950   * Go through the hierarchy to find the axis from which the transformations must be inherited
951   */
952  void CAxis::solveInheritanceTransformation()
953  {
954    if (hasTransformation() || !hasDirectAxisReference())
955      return;
956
957    CAxis* axis = this;
958    std::vector<CAxis*> refAxis;
959    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
960    {
961      refAxis.push_back(axis);
962      axis = axis->getDirectAxisReference();
963    }
964
965    if (axis->hasTransformation())
966      for (size_t i = 0; i < refAxis.size(); ++i)
967        refAxis[i]->setTransformations(axis->getAllTransformations());
968  }
969
970  void CAxis::parse(xml::CXMLNode & node)
971  {
972    SuperClass::parse(node);
973
974    if (node.goToChildElement())
975    {
976      StdString nodeElementName;
977      do
978      {
979        StdString nodeId("");
980        if (node.getAttributes().end() != node.getAttributes().find("id"))
981        { nodeId = node.getAttributes()["id"]; }
982
983        nodeElementName = node.getElementName();
984        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
985        it = transformationMapList_.find(nodeElementName);
986        if (ite != it)
987        {
988          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
989                                                                                                               nodeId,
990                                                                                                               &node)));
991        }
992      } while (node.goToNextElement()) ;
993      node.goToParentElement();
994    }
995  }
996
997  DEFINE_REF_FUNC(Axis,axis)
998
999   ///---------------------------------------------------------------
1000
1001} // namespace xios
Note: See TracBrowser for help on using the repository browser.