source: XIOS/dev/branch_yushan/src/node/axis.cpp @ 1060

Last change on this file since 1060 was 1053, checked in by yushan, 7 years ago

ep_lib namespace specified when netcdf involved

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 32.5 KB
RevLine 
[219]1#include "axis.hpp"
2
[352]3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
[567]8#include "context.hpp"
9#include "context_client.hpp"
[676]10#include "context_server.hpp"
[591]11#include "xios_spl.hpp"
[630]12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
[633]15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
[676]17#include "distribution_client.hpp"
[219]18
[335]19namespace xios {
[540]20
[219]21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
[771]25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
[927]26      , isClientAfterTransformationChecked(false)
[676]27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
[821]29      , transformationMap_(), hasValue(false)
[621]30   {
31   }
[219]32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
[771]35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
[927]36      , isClientAfterTransformationChecked(false)
[676]37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
[821]39      , transformationMap_(), hasValue(false)
[621]40   {
41   }
[219]42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
[836]46   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
47   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
48   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
49   {
50     m["zoom_axis"] = TRANS_ZOOM_AXIS;
51     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
52     m["inverse_axis"] = TRANS_INVERSE_AXIS;
[895]53     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
54     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
[836]55   }
56
[219]57   ///---------------------------------------------------------------
58
59   const std::set<StdString> & CAxis::getRelFiles(void) const
60   {
61      return (this->relFiles);
62   }
63
64   bool CAxis::IsWritten(const StdString & filename) const
65   {
66      return (this->relFiles.find(filename) != this->relFiles.end());
67   }
68
[676]69   bool CAxis::isWrittenCompressed(const StdString& filename) const
70   {
71      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
72   }
73
[594]74   bool CAxis::isDistributed(void) const
75   {
76      return isDistributed_;
77   }
78
[676]79   /*!
80    * Test whether the data defined on the axis can be outputted in a compressed way.
[742]81    *
[676]82    * \return true if and only if a mask was defined for this axis
83    */
84   bool CAxis::isCompressible(void) const
85   {
86      return isCompressible_;
87   }
88
[219]89   void CAxis::addRelFile(const StdString & filename)
90   {
91      this->relFiles.insert(filename);
92   }
93
[676]94   void CAxis::addRelFileCompressed(const StdString& filename)
95   {
96      this->relFilesCompressed.insert(filename);
97   }
98
[219]99   //----------------------------------------------------------------
100
[676]101   const std::vector<int>& CAxis::getIndexesToWrite(void) const
102   {
103     return indexesToWrite;
104   }
105
106   /*!
107     Returns the number of indexes written by each server.
108     \return the number of indexes written by each server
109   */
110   int CAxis::getNumberWrittenIndexes() const
111   {
112     return numberWrittenIndexes_;
113   }
114
115   /*!
116     Returns the total number of indexes written by the servers.
117     \return the total number of indexes written by the servers
118   */
119   int CAxis::getTotalNumberWrittenIndexes() const
120   {
121     return totalNumberWrittenIndexes_;
122   }
123
124   /*!
125     Returns the offset of indexes written by each server.
126     \return the offset of indexes written by each server
127   */
128   int CAxis::getOffsetWrittenIndexes() const
129   {
130     return offsetWrittenIndexes_;
131   }
132
133   //----------------------------------------------------------------
134
[731]135   /*!
136    * Compute the minimum buffer size required to send the attributes to the server(s).
137    *
138    * \return A map associating the server rank with its minimum buffer size.
139    */
140   std::map<int, StdSize> CAxis::getAttributesBufferSize()
141   {
142     CContextClient* client = CContext::getCurrent()->client;
143
144     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
145
146     bool isNonDistributed = (n == n_glo);
147
148     if (client->isServerLeader())
149     {
150       // size estimation for sendServerAttribut
151       size_t size = 6 * sizeof(size_t);
152       // size estimation for sendNonDistributedValue
153       if (isNonDistributed)
154         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
155       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
156
157       const std::list<int>& ranks = client->getRanksServerLeader();
158       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
159       {
160         if (size > attributesSizes[*itRank])
161           attributesSizes[*itRank] = size;
162       }
163     }
164
165     if (!isNonDistributed)
166     {
167       // size estimation for sendDistributedValue
168       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
169       for (it = indSrv_.begin(); it != ite; ++it)
170       {
171         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
172         if (isCompressible_)
173           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
174
175         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
176         if (hasBounds_)
[754]177           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
[731]178
179         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
180         if (size > attributesSizes[it->first])
181           attributesSizes[it->first] = size;
182       }
183     }
184
185     return attributesSizes;
186   }
187
188   //----------------------------------------------------------------
189
[219]190   StdString CAxis::GetName(void)   { return (StdString("axis")); }
191   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
192   ENodeType CAxis::GetType(void)   { return (eAxis); }
193
194   //----------------------------------------------------------------
195
[622]196   CAxis* CAxis::createAxis()
197   {
198     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
199     return axis;
200   }
201
[775]202   void CAxis::fillInValues(const CArray<double,1>& values)
203   {
204     this->value = values;
205   }
206
[219]207   void CAxis::checkAttributes(void)
208   {
[666]209      if (this->n_glo.isEmpty())
[679]210        ERROR("CAxis::checkAttributes(void)",
211              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
212              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
[666]213      StdSize size = this->n_glo.getValue();
[540]214
[970]215      if (!this->index.isEmpty())
[551]216      {
[970]217        if (n.isEmpty()) n = index.numElements();
218
219        // It's not so correct but if begin is not the first value of index
220        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
221        if (begin.isEmpty()) begin = index(0);         
[551]222      }
[970]223      else 
224      {
225        if (!this->begin.isEmpty())
226        {
227          if (begin < 0 || begin > size - 1)
228            ERROR("CAxis::checkAttributes(void)",
229                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
230                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
231        }
232        else this->begin.setValue(0);
[551]233
[970]234        if (!this->n.isEmpty())
235        {
236          if (n < 0 || n > size)
237            ERROR("CAxis::checkAttributes(void)",
238                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
239                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
240        }
241        else this->n.setValue(size);
242
243        {
244          index.resize(n);
245          for (int i = 0; i < n; ++i) index(i) = i+begin;
246        }
[551]247      }
[624]248
[816]249      if (!this->value.isEmpty())
250      {
251        StdSize true_size = value.numElements();
252        if (this->n.getValue() != true_size)
253          ERROR("CAxis::checkAttributes(void)",
254                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
255                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
256        this->hasValue = true;
257      }
[219]258
[551]259      this->checkData();
[633]260      this->checkZoom();
[551]261      this->checkMask();
[633]262      this->checkBounds();
[970]263
264      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
265                       (!this->n.isEmpty() && (this->n != this->n_glo));
[219]266   }
267
[551]268   void CAxis::checkData()
269   {
270      if (data_begin.isEmpty()) data_begin.setValue(0);
[679]271
272      if (data_n.isEmpty())
[551]273      {
[679]274        data_n.setValue(n);
275      }
276      else if (data_n.getValue() < 0)
277      {
[551]278        ERROR("CAxis::checkData(void)",
[679]279              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
280              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
[551]281      }
282
283      if (data_index.isEmpty())
284      {
[679]285        data_index.resize(data_n);
286        for (int i = 0; i < data_n; ++i) data_index(i) = i;
[551]287      }
288   }
289
[631]290   void CAxis::checkZoom(void)
291   {
[821]292     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
293     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
[631]294   }
[567]295
[551]296   void CAxis::checkMask()
297   {
298      if (!mask.isEmpty())
299      {
[666]300         if (mask.extent(0) != n)
[679]301           ERROR("CAxis::checkMask(void)",
302                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
303                 << "The mask does not have the same size as the local domain." << std::endl
304                 << "Local size is " << n.getValue() << "." << std::endl
305                 << "Mask size is " << mask.extent(0) << ".");
[551]306      }
[679]307      else // (mask.isEmpty())
308      { // If no mask was defined, we create a default one without any masked point.
[666]309         mask.resize(n);
[679]310         for (int i = 0; i < n; ++i)
[551]311         {
[666]312           mask(i) = true;
[551]313         }
314      }
315   }
316
[633]317  void CAxis::checkBounds()
318  {
319    if (!bounds.isEmpty())
320    {
[713]321      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
322        ERROR("CAxis::checkAttributes(void)",
323              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
324              << "Axis size is " << n.getValue() << "." << std::endl
325              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
[633]326      hasBounds_ = true;
327    }
328    else hasBounds_ = false;
329  }
330
[676]331  void CAxis::checkEligibilityForCompressedOutput()
332  {
333    // We don't check if the mask is valid here, just if a mask has been defined at this point.
334    isCompressible_ = !mask.isEmpty();
335  }
[633]336
[567]337  bool CAxis::dispatchEvent(CEventServer& event)
338   {
[595]339      if (SuperClass::dispatchEvent(event)) return true;
[567]340      else
341      {
342        switch(event.type)
343        {
344           case EVENT_ID_SERVER_ATTRIBUT :
[595]345             recvServerAttribut(event);
346             return true;
347             break;
[633]348           case EVENT_ID_INDEX:
349            recvIndex(event);
350            return true;
351            break;
352          case EVENT_ID_DISTRIBUTED_VALUE:
353            recvDistributedValue(event);
354            return true;
355            break;
356          case EVENT_ID_NON_DISTRIBUTED_VALUE:
357            recvNonDistributedValue(event);
358            return true;
359            break;
[567]360           default :
[679]361             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
[595]362                    << "Unknown Event");
363           return false;
[567]364         }
365      }
366   }
367
[742]368   void CAxis::checkAttributesOnClient()
[567]369   {
370     if (this->areClientAttributesChecked_) return;
[595]371
[567]372     this->checkAttributes();
373
374     this->areClientAttributesChecked_ = true;
375   }
376
[927]377   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
378                                                          CServerDistributionDescription::ServerDistributionType distType)
379   {
380     CContext* context=CContext::getCurrent() ;
381
382     if (this->isClientAfterTransformationChecked) return;
383     if (context->hasClient)
384     {
385       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
386     }
387
388     this->isClientAfterTransformationChecked = true;
389   }
390
[567]391   // Send all checked attributes to server
392   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
393                                     CServerDistributionDescription::ServerDistributionType distType)
394   {
[742]395     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
[927]396     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
[595]397     CContext* context = CContext::getCurrent();
[567]398
399     if (this->isChecked) return;
400     if (context->hasClient)
401     {
[595]402       sendServerAttribut(globalDim, orderPositionInGrid, distType);
[927]403       if (hasValue) sendValue();
[567]404     }
405
406     this->isChecked = true;
407   }
408
[927]409  void CAxis::sendValue()
[633]410  {
[666]411     if (n.getValue() == n_glo.getValue())
[633]412       sendNonDistributedValue();
413     else
414       sendDistributedValue();
415  }
416
[815]417  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
418                                     CServerDistributionDescription::ServerDistributionType distType)
[633]419  {
420    CContext* context = CContext::getCurrent();
421    CContextClient* client = context->client;
422    int nbServer = client->serverSize;
423    int range, clientSize = client->clientSize;
[906]424    int rank = client->clientRank;
[667]425
[666]426    size_t ni = this->n.getValue();
427    size_t ibegin = this->begin.getValue();
[821]428    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
[667]429    size_t nZoomCount = 0;
[817]430    size_t nbIndex = index.numElements();
431    for (size_t idx = 0; idx < nbIndex; ++idx)
[667]432    {
[817]433      size_t globalIndex = index(idx);
[667]434      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
435    }
436
[817]437    CArray<size_t,1> globalIndexAxis(nbIndex);
[667]438    std::vector<size_t> globalAxisZoom(nZoomCount);
439    nZoomCount = 0;
[817]440    for (size_t idx = 0; idx < nbIndex; ++idx)
[633]441    {
[817]442      size_t globalIndex = index(idx);
[633]443      globalIndexAxis(idx) = globalIndex;
[667]444      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
445      {
[670]446        globalAxisZoom[nZoomCount] = globalIndex;
[667]447        ++nZoomCount;
448      }
[633]449    }
450
[676]451    std::set<int> writtenInd;
452    if (isCompressible_)
453    {
454      for (int idx = 0; idx < data_index.numElements(); ++idx)
455      {
456        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
457
458        if (ind >= 0 && ind < ni && mask(ind))
459        {
460          ind += ibegin;
461          if (ind >= global_zoom_begin && ind <= zoom_end)
462            writtenInd.insert(ind);
463        }
464      }
465    }
466
[815]467    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
468    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
[829]469    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
[815]470    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
471    {
472      std::vector<int> nGlobAxis(1);
473      nGlobAxis[0] = n_glo.getValue();
[633]474
[815]475      size_t globalSizeIndex = 1, indexBegin, indexEnd;
476      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
477      indexBegin = 0;
[906]478      if (globalSizeIndex <= clientSize)
[815]479      {
[906]480        indexBegin = rank%globalSizeIndex;
481        indexEnd = indexBegin;
[815]482      }
[906]483      else
484      {
485        for (int i = 0; i < clientSize; ++i)
486        {
487          range = globalSizeIndex / clientSize;
488          if (i < (globalSizeIndex%clientSize)) ++range;
489          if (i == client->clientRank) break;
490          indexBegin += range;
491        }
492        indexEnd = indexBegin + range - 1;
493      }
[815]494
495      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
496      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
497      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
498      clientServerMap->computeServerIndexMapping(globalIndexAxis);
499      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
500      delete clientServerMap;
501    }
502    else
[633]503    {
[815]504      std::vector<size_t> globalIndexServer(n_glo.getValue());
505      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
506      {
507        globalIndexServer[idx] = idx;
508      }
509
510      for (int idx = 0; idx < nbServer; ++idx)
511      {
512        globalIndexAxisOnServer[idx] = globalIndexServer;
513      }
[633]514    }
515
[829]516    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
517                                                         ite = globalIndexAxisOnServer.end();
[633]518    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
519                                        iteVec = (globalAxisZoom).end();
520    indSrv_.clear();
[676]521    indWrittenSrv_.clear();
[633]522    for (; it != ite; ++it)
523    {
524      int rank = it->first;
525      const std::vector<size_t>& globalIndexTmp = it->second;
526      int nb = globalIndexTmp.size();
527
528      for (int i = 0; i < nb; ++i)
529      {
530        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
531        {
532          indSrv_[rank].push_back(globalIndexTmp[i]);
533        }
[676]534
535        if (writtenInd.count(globalIndexTmp[i]))
536        {
537          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
538        }
[633]539      }
540    }
541
542    connectedServerRank_.clear();
543    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
544      connectedServerRank_.push_back(it->first);
545    }
546
547    if (!indSrv_.empty())
548    {
[829]549      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
550                                                     iteIndSrv = indSrv_.end();
[633]551      connectedServerRank_.clear();
[829]552      for (; itIndSrv != iteIndSrv; ++itIndSrv)
553        connectedServerRank_.push_back(itIndSrv->first);
[633]554    }
[815]555    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
[633]556  }
557
558  void CAxis::sendNonDistributedValue()
559  {
560    CContext* context = CContext::getCurrent();
561    CContextClient* client = context->client;
[676]562    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
[633]563
[821]564    int zoom_end = global_zoom_begin + global_zoom_n - 1;
[676]565    int nb = 0;
[666]566    for (size_t idx = 0; idx < n; ++idx)
[633]567    {
[666]568      size_t globalIndex = begin + idx;
[633]569      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
570    }
571
[676]572    int nbWritten = 0;
573    if (isCompressible_)
574    {
575      for (int idx = 0; idx < data_index.numElements(); ++idx)
576      {
577        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
578
579        if (ind >= 0 && ind < n && mask(ind))
580        {
581          ind += begin;
582          if (ind >= global_zoom_begin && ind <= zoom_end)
583            ++nbWritten;
584        }
585      }
586    }
587
[633]588    CArray<double,1> val(nb);
589    nb = 0;
[666]590    for (size_t idx = 0; idx < n; ++idx)
[633]591    {
[666]592      size_t globalIndex = begin + idx;
[633]593      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
594      {
595        val(nb) = value(idx);
596        ++nb;
597      }
598    }
599
[676]600    CArray<int, 1> writtenInd(nbWritten);
601    nbWritten = 0;
602    if (isCompressible_)
603    {
604      for (int idx = 0; idx < data_index.numElements(); ++idx)
605      {
606        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
607
608        if (ind >= 0 && ind < n && mask(ind))
609        {
610          ind += begin;
611          if (ind >= global_zoom_begin && ind <= zoom_end)
612          {
613            writtenInd(nbWritten) = ind;
614            ++nbWritten;
615          }
616        }
617      }
618    }
619
[633]620    if (client->isServerLeader())
621    {
622      std::list<CMessage> msgs;
623
624      const std::list<int>& ranks = client->getRanksServerLeader();
625      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
626      {
627        msgs.push_back(CMessage());
628        CMessage& msg = msgs.back();
629        msg << this->getId();
630        msg << val;
[676]631        if (isCompressible_)
632          msg << writtenInd;
633        event.push(*itRank, 1, msg);
[633]634      }
635      client->sendEvent(event);
636    }
637    else client->sendEvent(event);
638  }
639
640  void CAxis::sendDistributedValue(void)
641  {
642    int ns, n, i, j, ind, nv, idx;
643    CContext* context = CContext::getCurrent();
644    CContextClient* client=context->client;
645
646    // send value for each connected server
647    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
648    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
649
650    list<CMessage> list_msgsIndex, list_msgsVal;
651    list<CArray<int,1> > list_indi;
[676]652    list<CArray<int,1> > list_writtenInd;
[633]653    list<CArray<double,1> > list_val;
654    list<CArray<double,2> > list_bounds;
655
656    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
657    iteMap = indSrv_.end();
658    for (int k = 0; k < connectedServerRank_.size(); ++k)
659    {
660      int nbData = 0;
661      int rank = connectedServerRank_[k];
662      it = indSrv_.find(rank);
663      if (iteMap != it)
664        nbData = it->second.size();
665
666      list_indi.push_back(CArray<int,1>(nbData));
667      list_val.push_back(CArray<double,1>(nbData));
668
669      if (hasBounds_)
670      {
671        list_bounds.push_back(CArray<double,2>(2,nbData));
672      }
673
674      CArray<int,1>& indi = list_indi.back();
675      CArray<double,1>& val = list_val.back();
676
677      for (n = 0; n < nbData; ++n)
678      {
679        idx = static_cast<int>(it->second[n]);
[666]680        ind = idx - begin;
[633]681
682        val(n) = value(ind);
683        indi(n) = idx;
684
685        if (hasBounds_)
686        {
687          CArray<double,2>& boundsVal = list_bounds.back();
688          boundsVal(0, n) = bounds(0,n);
689          boundsVal(1, n) = bounds(1,n);
690        }
691      }
692
693      list_msgsIndex.push_back(CMessage());
694      list_msgsIndex.back() << this->getId() << list_indi.back();
695
[676]696      if (isCompressible_)
697      {
698        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
699        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
700        CArray<int,1>& writtenInd = list_writtenInd.back();
701
702        for (n = 0; n < writtenInd.numElements(); ++n)
703          writtenInd(n) = writtenIndSrc[n];
704
705        list_msgsIndex.back() << writtenInd;
706      }
707
[633]708      list_msgsVal.push_back(CMessage());
709      list_msgsVal.back() << this->getId() << list_val.back();
710
711      if (hasBounds_)
712      {
713        list_msgsVal.back() << list_bounds.back();
714      }
715
716      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
717      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
718    }
719
720    client->sendEvent(eventIndex);
721    client->sendEvent(eventVal);
722  }
723
724  void CAxis::recvIndex(CEventServer& event)
725  {
[676]726    CAxis* axis;
727
[633]728    list<CEventServer::SSubEvent>::iterator it;
729    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
730    {
731      CBufferIn* buffer = it->buffer;
[676]732      string axisId;
733      *buffer >> axisId;
734      axis = get(axisId);
735      axis->recvIndex(it->rank, *buffer);
[633]736    }
[676]737
738    if (axis->isCompressible_)
739    {
740      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
741
742      CContextServer* server = CContext::getCurrent()->server;
743      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
[1053]744      ep_lib::MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
745      ep_lib::MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
[676]746      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
747    }
[633]748  }
749
750  void CAxis::recvIndex(int rank, CBufferIn& buffer)
751  {
752    buffer >> indiSrv_[rank];
[676]753
754    if (isCompressible_)
755    {
756      CArray<int, 1> writtenIndexes;
757      buffer >> writtenIndexes;
758      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
759      for (int i = 0; i < writtenIndexes.numElements(); ++i)
760        indexesToWrite.push_back(writtenIndexes(i));
761    }
[633]762  }
763
764  void CAxis::recvDistributedValue(CEventServer& event)
765  {
766    list<CEventServer::SSubEvent>::iterator it;
767    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
768    {
769      CBufferIn* buffer = it->buffer;
[676]770      string axisId;
771      *buffer >> axisId;
772      get(axisId)->recvDistributedValue(it->rank, *buffer);
[633]773    }
774  }
775
776  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
777  {
778    CArray<int,1> &indi = indiSrv_[rank];
779    CArray<double,1> val;
780    CArray<double,2> boundsVal;
781
782    buffer >> val;
783    if (hasBounds_) buffer >> boundsVal;
784
785    int i, j, ind_srv;
786    for (int ind = 0; ind < indi.numElements(); ++ind)
787    {
788      i = indi(ind);
789      ind_srv = i - zoom_begin_srv;
790      value_srv(ind_srv) = val(ind);
791      if (hasBounds_)
792      {
793        bound_srv(0,ind_srv) = boundsVal(0, ind);
794        bound_srv(1,ind_srv) = boundsVal(1, ind);
795      }
796    }
797  }
798
799   void CAxis::recvNonDistributedValue(CEventServer& event)
800  {
[676]801    CAxis* axis;
802
[633]803    list<CEventServer::SSubEvent>::iterator it;
804    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
805    {
806      CBufferIn* buffer = it->buffer;
[676]807      string axisId;
808      *buffer >> axisId;
809      axis = get(axisId);
810      axis->recvNonDistributedValue(it->rank, *buffer);
[633]811    }
[676]812
813    if (axis->isCompressible_)
814    {
815      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
816
817      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
818      axis->offsetWrittenIndexes_ = 0;
819    }
[633]820  }
821
822  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
823  {
824    CArray<double,1> val;
825    buffer >> val;
826
827    for (int ind = 0; ind < val.numElements(); ++ind)
828    {
829      value_srv(ind) = val(ind);
830      if (hasBounds_)
831      {
832        bound_srv(0,ind) = bounds(0,ind);
833        bound_srv(1,ind) = bounds(1,ind);
834      }
835    }
[676]836
837    if (isCompressible_)
838    {
839      CArray<int, 1> writtenIndexes;
840      buffer >> writtenIndexes;
841      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
842      for (int i = 0; i < writtenIndexes.numElements(); ++i)
843        indexesToWrite.push_back(writtenIndexes(i));
844    }
[633]845  }
846
[595]847  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
848                                 CServerDistributionDescription::ServerDistributionType distType)
[567]849  {
[595]850    CContext* context = CContext::getCurrent();
851    CContextClient* client = context->client;
[815]852    int nbServer = client->serverSize;
[567]853
[815]854    CServerDistributionDescription serverDescription(globalDim, nbServer);
855    serverDescription.computeServerDistribution();
[595]856
857    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
858    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
859
860    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
[567]861    if (client->isServerLeader())
862    {
[595]863      std::list<CMessage> msgs;
864
865      const std::list<int>& ranks = client->getRanksServerLeader();
866      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
867      {
868        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
869        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
870        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
871        const int end   = begin + ni - 1;
872
873        msgs.push_back(CMessage());
874        CMessage& msg = msgs.back();
875        msg << this->getId();
876        msg << ni << begin << end;
[821]877        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
[676]878        msg << isCompressible_;
[595]879
880        event.push(*itRank,1,msg);
881      }
882      client->sendEvent(event);
[567]883    }
[595]884    else client->sendEvent(event);
[567]885  }
886
887  void CAxis::recvServerAttribut(CEventServer& event)
888  {
[595]889    CBufferIn* buffer = event.subEvents.begin()->buffer;
890    string axisId;
891    *buffer >> axisId;
892    get(axisId)->recvServerAttribut(*buffer);
[567]893  }
894
895  void CAxis::recvServerAttribut(CBufferIn& buffer)
896  {
[821]897    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
[567]898
[676]899    buffer >> ni_srv >> begin_srv >> end_srv;
[821]900    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
[676]901    buffer >> isCompressible_;
[623]902    global_zoom_begin = global_zoom_begin_tmp;
[821]903    global_zoom_n  = global_zoom_n_tmp;
904    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
[567]905
[623]906    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
907    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
[595]908    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
[567]909
910    if (zoom_size_srv<=0)
911    {
[595]912      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
[567]913    }
[586]914
[666]915    if (n_glo == n)
[586]916    {
[623]917      zoom_begin_srv = global_zoom_begin;
918      zoom_end_srv   = global_zoom_end; //zoom_end;
[595]919      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
[586]920    }
[816]921    if (hasValue)
922    {
923      value_srv.resize(zoom_size_srv);
924      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
925    }
[567]926  }
927
[836]928  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
929  {
930    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
931    return transformationMap_.back().second;
932  }
933
[619]934  bool CAxis::hasTransformation()
935  {
[621]936    return (!transformationMap_.empty());
[619]937  }
938
[621]939  void CAxis::setTransformations(const TransMapTypes& axisTrans)
[619]940  {
[621]941    transformationMap_ = axisTrans;
[619]942  }
943
[621]944  CAxis::TransMapTypes CAxis::getAllTransformations(void)
[620]945  {
[621]946    return transformationMap_;
947  }
[620]948
[621]949  /*!
950    Check the validity of all transformations applied on axis
951  This functions is called AFTER all inherited attributes are solved
952  */
953  void CAxis::checkTransformations()
954  {
955    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
956                                  ite = transformationMap_.end();
[895]957//    for (it = itb; it != ite; ++it)
958//    {
959//      (it->second)->checkValid(this);
960//    }
[620]961  }
962
[823]963  void CAxis::duplicateTransformation(CAxis* src)
964  {
965    if (src->hasTransformation())
966    {
967      this->setTransformations(src->getAllTransformations());
968    }
969  }
970
[747]971  /*!
972   * Go through the hierarchy to find the axis from which the transformations must be inherited
973   */
[619]974  void CAxis::solveInheritanceTransformation()
975  {
[747]976    if (hasTransformation() || !hasDirectAxisReference())
977      return;
[619]978
[747]979    CAxis* axis = this;
[619]980    std::vector<CAxis*> refAxis;
[747]981    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
[619]982    {
[747]983      refAxis.push_back(axis);
984      axis = axis->getDirectAxisReference();
[619]985    }
986
[747]987    if (axis->hasTransformation())
988      for (size_t i = 0; i < refAxis.size(); ++i)
989        refAxis[i]->setTransformations(axis->getAllTransformations());
[619]990  }
991
992  void CAxis::parse(xml::CXMLNode & node)
993  {
994    SuperClass::parse(node);
995
996    if (node.goToChildElement())
997    {
[836]998      StdString nodeElementName;
[619]999      do
1000      {
[784]1001        StdString nodeId("");
1002        if (node.getAttributes().end() != node.getAttributes().find("id"))
1003        { nodeId = node.getAttributes()["id"]; }
1004
[836]1005        nodeElementName = node.getElementName();
1006        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1007        it = transformationMapList_.find(nodeElementName);
1008        if (ite != it)
1009        {
1010          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1011                                                                                                               nodeId,
1012                                                                                                               &node)));
[786]1013        }
[968]1014        else
1015        {
1016          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1017                << "The transformation " << nodeElementName << " has not been supported yet.");
1018        }
[619]1019      } while (node.goToNextElement()) ;
1020      node.goToParentElement();
1021    }
1022  }
1023
[620]1024  DEFINE_REF_FUNC(Axis,axis)
[619]1025
[219]1026   ///---------------------------------------------------------------
1027
[335]1028} // namespace xios
Note: See TracBrowser for help on using the repository browser.