source: XIOS/dev/dev_olga/src/node/axis.cpp @ 1158

Last change on this file since 1158 was 1158, checked in by oabramkina, 7 years ago

Two server levels: merging with trunk r1137.
There are bugs.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 41.4 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), doZoomByIndex_(false), hasLabel(false)
30      , computedWrittenIndex_(false)
31   {
32   }
33
34   CAxis::CAxis(const StdString & id)
35      : CObjectTemplate<CAxis>(id)
36      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
37      , isClientAfterTransformationChecked(false)
38      , hasBounds_(false), isCompressible_(false)
39      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
40      , transformationMap_(), hasValue(false), doZoomByIndex_(false), hasLabel(false)
41      , computedWrittenIndex_(false)
42   {
43   }
44
45   CAxis::~CAxis(void)
46   { /* Ne rien faire de plus */ }
47
48   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
49   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
50   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
51   {
52     m["zoom_axis"] = TRANS_ZOOM_AXIS;
53     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
54     m["inverse_axis"] = TRANS_INVERSE_AXIS;
55     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
56     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
57   }
58
59   ///---------------------------------------------------------------
60
61   const std::set<StdString> & CAxis::getRelFiles(void) const
62   {
63      return (this->relFiles);
64   }
65
66   bool CAxis::IsWritten(const StdString & filename) const
67   {
68      return (this->relFiles.find(filename) != this->relFiles.end());
69   }
70
71   bool CAxis::isWrittenCompressed(const StdString& filename) const
72   {
73      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
74   }
75
76   bool CAxis::isDistributed(void) const
77   {
78      bool distributed = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
79             (!this->n.isEmpty() && (this->n != this->n_glo));
80      // A same stupid condition to make sure that if there is only one client, axis
81      // should be considered to be distributed. This should be a temporary solution     
82      distributed |= (1 == CContext::getCurrent()->client->clientSize);
83      return distributed;
84   }
85
86   /*!
87    * Test whether the data defined on the axis can be outputted in a compressed way.
88    *
89    * \return true if and only if a mask was defined for this axis
90    */
91   bool CAxis::isCompressible(void) const
92   {
93      return isCompressible_;
94   }
95
96   void CAxis::addRelFile(const StdString & filename)
97   {
98      this->relFiles.insert(filename);
99   }
100
101   void CAxis::addRelFileCompressed(const StdString& filename)
102   {
103      this->relFilesCompressed.insert(filename);
104   }
105
106   //----------------------------------------------------------------
107
108   /*!
109     Returns the number of indexes written by each server.
110     \return the number of indexes written by each server
111   */
112   int CAxis::getNumberWrittenIndexes() const
113   {
114     return numberWrittenIndexes_;
115   }
116
117   /*!
118     Returns the total number of indexes written by the servers.
119     \return the total number of indexes written by the servers
120   */
121   int CAxis::getTotalNumberWrittenIndexes() const
122   {
123     return totalNumberWrittenIndexes_;
124   }
125
126   /*!
127     Returns the offset of indexes written by each server.
128     \return the offset of indexes written by each server
129   */
130   int CAxis::getOffsetWrittenIndexes() const
131   {
132     return offsetWrittenIndexes_;
133   }
134
135   //----------------------------------------------------------------
136
137   /*!
138    * Compute the minimum buffer size required to send the attributes to the server(s).
139    *
140    * \return A map associating the server rank with its minimum buffer size.
141    */
142   std::map<int, StdSize> CAxis::getAttributesBufferSize()
143   {
144     // For now the assumption is that secondary server pools consist of the same number of procs.
145     // CHANGE the line below if the assumption changes.
146     CContext* context = CContext::getCurrent();
147     CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[0] : context->client;
148
149     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
150
151     bool isNonDistributed = (n == n_glo);
152
153     if (client->isServerLeader())
154     {
155       // size estimation for sendServerAttribut
156       size_t size = 6 * sizeof(size_t);
157       // size estimation for sendNonDistributedValue
158       if (isNonDistributed)
159         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
160       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
161
162       const std::list<int>& ranks = client->getRanksServerLeader();
163       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
164       {
165         if (size > attributesSizes[*itRank])
166           attributesSizes[*itRank] = size;
167       }
168     }
169
170     if (!isNonDistributed)
171     {
172       // size estimation for sendDistributedValue
173       boost::unordered_map<int, vector<size_t> >::const_iterator it, ite = indSrv_.end();
174       for (it = indSrv_.begin(); it != ite; ++it)
175       {
176         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
177         if (isCompressible_)
178           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
179
180         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
181         if (hasBounds_)
182           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
183 
184         if (hasLabel)
185           sizeValEvent += CArray<StdString,1>::size(it->second.size());
186
187         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
188         if (size > attributesSizes[it->first])
189           attributesSizes[it->first] = size;
190       }
191     }
192
193     return attributesSizes;
194   }
195
196   //----------------------------------------------------------------
197
198   StdString CAxis::GetName(void)   { return (StdString("axis")); }
199   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
200   ENodeType CAxis::GetType(void)   { return (eAxis); }
201
202   //----------------------------------------------------------------
203
204   CAxis* CAxis::createAxis()
205   {
206     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
207     return axis;
208   }
209
210   /*!
211     Check common attributes of an axis.
212     This check should be done in the very beginning of work flow
213   */
214   void CAxis::checkAttributes(void)
215   {
216      if (this->n_glo.isEmpty())
217        ERROR("CAxis::checkAttributes(void)",
218              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
219              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
220      StdSize size = this->n_glo.getValue();
221
222      if (!this->index.isEmpty())
223      {
224        if (n.isEmpty()) n = index.numElements();
225
226        // It's not so correct but if begin is not the first value of index
227        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
228        if (begin.isEmpty()) begin = index(0);         
229      }
230      else 
231      {
232        if (!this->begin.isEmpty())
233        {
234          if (begin < 0 || begin > size - 1)
235            ERROR("CAxis::checkAttributes(void)",
236                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
237                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
238        }
239        else this->begin.setValue(0);
240
241        if (!this->n.isEmpty())
242        {
243          if (n < 0 || n > size)
244            ERROR("CAxis::checkAttributes(void)",
245                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
246                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
247        }
248        else this->n.setValue(size);
249
250        {
251          index.resize(n);
252          for (int i = 0; i < n; ++i) index(i) = i+begin;
253        }
254      }
255
256      if (!this->value.isEmpty())
257      {
258        StdSize true_size = value.numElements();
259        if (this->n.getValue() != true_size)
260          ERROR("CAxis::checkAttributes(void)",
261                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
262                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
263        this->hasValue = true;
264      }
265
266      this->checkData();
267      this->checkZoom();
268      this->checkMask();
269      this->checkBounds();
270      this->checkLabel();
271   }
272
273   /*!
274      Check the validity of data and fill in values if any.
275   */
276   void CAxis::checkData()
277   {
278      if (data_begin.isEmpty()) data_begin.setValue(0);
279
280      if (data_n.isEmpty())
281      {
282        data_n.setValue(n);
283      }
284      else if (data_n.getValue() < 0)
285      {
286        ERROR("CAxis::checkData(void)",
287              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
288              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
289      }
290
291      if (data_index.isEmpty())
292      {
293        data_index.resize(data_n);
294        for (int i = 0; i < data_n; ++i) data_index(i) = i;
295      }
296   }
297
298   /*!
299     Check validity of zoom info and fill in values if any.
300   */
301   void CAxis::checkZoom(void)
302   {
303     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
304     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
305     if (zoom_index.isEmpty())
306     {
307       zoom_index.setValue(index.getValue());
308     }
309     if (zoom_n.isEmpty()) zoom_n.setValue(n);
310     if (zoom_begin.isEmpty()) zoom_begin.setValue(begin);
311   }
312
313   /*!
314     Check validity of mask info and fill in values if any.
315   */
316   void CAxis::checkMask()
317   {
318      if (!mask.isEmpty())
319      {
320         if (mask.extent(0) != n)
321           ERROR("CAxis::checkMask(void)",
322                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
323                 << "The mask does not have the same size as the local domain." << std::endl
324                 << "Local size is " << n.getValue() << "." << std::endl
325                 << "Mask size is " << mask.extent(0) << ".");
326      }
327      else // (mask.isEmpty())
328      { // If no mask was defined, we create a default one without any masked point.
329         mask.resize(n);
330         for (int i = 0; i < n; ++i)
331         {
332           mask(i) = true;
333         }
334      }
335   }
336
337   /*!
338     Check validity of bounds info and fill in values if any.
339   */
340   void CAxis::checkBounds()
341   {
342     if (!bounds.isEmpty())
343     {
344       if (bounds.extent(0) != 2 || bounds.extent(1) != n)
345         ERROR("CAxis::checkAttributes(void)",
346               << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
347               << "Axis size is " << n.getValue() << "." << std::endl
348               << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
349       hasBounds_ = true;
350     }
351     else hasBounds_ = false;
352   }
353
354  void CAxis::checkLabel()
355  {
356    if (!label.isEmpty())
357    {
358      if (label.extent(0) != n)
359        ERROR("CAxis::checkLabel(void)",
360              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
361              << "Axis size is " << n.getValue() << "." << std::endl
362              << "label size is "<< label.extent(0)<<  " .");
363      hasLabel = true;
364    }
365    else hasLabel = false;
366  }
367  void CAxis::checkEligibilityForCompressedOutput()
368  {
369    // We don't check if the mask is valid here, just if a mask has been defined at this point.
370    isCompressible_ = !mask.isEmpty();
371  }
372
373   bool CAxis::dispatchEvent(CEventServer& event)
374   {
375      if (SuperClass::dispatchEvent(event)) return true;
376      else
377      {
378        switch(event.type)
379        {
380           case EVENT_ID_DISTRIBUTION_ATTRIBUTE :
381             recvDistributionAttribute(event);
382             return true;
383             break;
384          case EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES:
385            recvNonDistributedAttributes(event);
386            return true;
387            break;
388          case EVENT_ID_DISTRIBUTED_ATTRIBUTES:
389            recvDistributedAttributes(event);
390            return true;
391            break;
392           default :
393             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
394                    << "Unknown Event");
395           return false;
396         }
397      }
398   }
399
400   /*!
401     Check attributes on client side (This name is still adequate???)
402   */
403   void CAxis::checkAttributesOnClient()
404   {
405     if (this->areClientAttributesChecked_) return;
406
407     this->checkAttributes();
408
409     this->areClientAttributesChecked_ = true;
410   }
411
412   /*
413     The (spatial) transformation sometimes can change attributes of an axis. Therefore, we should recheck them.
414   */
415   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
416                                                          CServerDistributionDescription::ServerDistributionType distType)
417   {
418     CContext* context=CContext::getCurrent() ;
419
420     if (this->isClientAfterTransformationChecked) return;
421     if (context->hasClient)
422     {
423       if (index.numElements() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
424     }
425
426     this->isClientAfterTransformationChecked = true;
427   }
428
429   // Send all checked attributes to server
430   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
431                                     CServerDistributionDescription::ServerDistributionType distType)
432   {
433     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
434     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
435     CContext* context = CContext::getCurrent();
436
437     if (this->isChecked) return;
438     if (context->hasClient) sendAttributes(globalDim, orderPositionInGrid, distType);   
439
440     this->isChecked = true;
441   }
442
443  /*!
444    Send attributes from one client to other clients
445    \param[in] globalDim global dimension of grid which contains this axis
446    \param[in] order
447  */
448  void CAxis::sendAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
449                             CServerDistributionDescription::ServerDistributionType distType)
450  {
451     if (index.numElements() == n_glo.getValue())
452       sendNonDistributedAttributes();
453     else
454     {
455       sendDistributedAttributes();       
456     }
457     sendDistributionAttribute(globalDim, orderPositionInGrid, distType);
458  }
459
460  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
461                                     CServerDistributionDescription::ServerDistributionType distType)
462  {
463    CContext* context = CContext::getCurrent();
464
465    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
466    for (int p = 0; p < nbSrvPools; ++p)
467    {
468      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
469      int nbServer = client->serverSize;
470      int range, clientSize = client->clientSize;
471      int rank = client->clientRank;
472
473      size_t ni = this->n.getValue();
474      size_t ibegin = this->begin.getValue();
475      size_t global_zoom_end = global_zoom_begin+global_zoom_n-1;
476      size_t nZoomCount = 0;
477      size_t nbIndex = index.numElements();
478
479      if (doZoomByIndex_) 
480      {
481        nZoomCount = zoom_index.numElements();
482      }
483      else
484      {
485        for (size_t idx = 0; idx < nbIndex; ++idx)
486        {
487          globalLocalIndexMap_[index(idx)] = idx;
488          size_t globalIndex = index(idx);
489          if (globalIndex >= global_zoom_begin && globalIndex <= global_zoom_end) ++nZoomCount;
490        }
491      }
492
493
494      CArray<size_t,1> globalIndexAxis(nbIndex);
495      std::vector<size_t> globalAxisZoom(nZoomCount);
496      nZoomCount = 0;
497      if (doZoomByIndex_) 
498      {
499        int nbIndexZoom = zoom_index.numElements();       
500        for (int i = 0; i < nbIndexZoom; ++i)
501        {   
502          globalIndexAxis(i) = zoom_index(i);
503        }
504      }
505      else 
506      {
507        for (size_t idx = 0; idx < nbIndex; ++idx)
508        {
509          size_t globalIndex = index(idx);
510          globalIndexAxis(idx) = globalIndex;
511          if (globalIndex >= global_zoom_begin && globalIndex <= global_zoom_end)
512          {
513            globalAxisZoom[nZoomCount] = globalIndex;
514            ++nZoomCount;
515          }
516        }
517
518        int end       = begin + n -1;       
519        zoom_begin    = global_zoom_begin > begin ? global_zoom_begin : begin;
520        int zoom_end  = global_zoom_end < end ? zoom_end : end;
521        zoom_n        = zoom_end-zoom_begin+1;
522      }
523
524      std::set<int> writtenInd;
525      if (isCompressible_)
526      {
527        for (int idx = 0; idx < data_index.numElements(); ++idx)
528        {
529          int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
530
531          if (ind >= 0 && ind < ni && mask(ind))
532          {
533            ind += ibegin;
534            if (ind >= global_zoom_begin && ind <= global_zoom_end)
535              writtenInd.insert(ind);
536          }
537        }
538      }
539
540      CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer, distType);
541      int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
542      CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
543      if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
544      {
545        std::vector<int> nGlobAxis(1);
546        nGlobAxis[0] = n_glo.getValue();
547
548        size_t globalSizeIndex = 1, indexBegin, indexEnd;
549        for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
550        indexBegin = 0;
551        if (globalSizeIndex <= clientSize)
552        {
553          indexBegin = rank%globalSizeIndex;
554          indexEnd = indexBegin;
555        }
556        else
557        {
558          for (int i = 0; i < clientSize; ++i)
559          {
560            range = globalSizeIndex / clientSize;
561            if (i < (globalSizeIndex%clientSize)) ++range;
562            if (i == client->clientRank) break;
563            indexBegin += range;
564          }
565          indexEnd = indexBegin + range - 1;
566        }
567
568        CArray<size_t,1> globalIndex(index.numElements());
569        for (size_t idx = 0; idx < globalIndex.numElements(); ++idx)
570          globalIndex(idx) = index(idx);
571
572        CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
573        serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
574        CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
575        clientServerMap->computeServerIndexMapping(globalIndex);
576        globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
577        delete clientServerMap;
578      }
579      else
580      {
581        std::vector<size_t> globalIndexServer(n_glo.getValue());
582        for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
583        {
584          globalIndexServer[idx] = idx;
585        }
586
587        for (int idx = 0; idx < nbServer; ++idx)
588        {
589          globalIndexAxisOnServer[idx] = globalIndexServer;
590        }
591      }
592
593      indSrv_.swap(globalIndexAxisOnServer);
594
595      CClientServerMapping::GlobalIndexMap::const_iterator it = indSrv_.begin(),
596                                                           ite = indSrv_.end();
597
598      connectedServerRank_.clear();
599      for (it = indSrv_.begin(); it != ite; ++it) {
600        connectedServerRank_.push_back(it->first);
601      }
602
603      nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
604    }
605  }
606
607   void CAxis::computeWrittenIndex()
608   { 
609      if (computedWrittenIndex_) return;
610      computedWrittenIndex_ = true;
611
612      CContext* context=CContext::getCurrent();     
613      CContextServer* server = context->server; 
614
615      std::vector<int> nBegin(1), nSize(1), nBeginGlobal(1), nGlob(1);
616      nBegin[0]       = zoom_begin;
617      nSize[0]        = zoom_n;   
618      nBeginGlobal[0] = 0; 
619      nGlob[0]        = n_glo;
620      CDistributionServer srvDist(server->intraCommSize, nBegin, nSize, nBeginGlobal, nGlob); 
621      const CArray<size_t,1>& writtenGlobalIndex  = srvDist.getGlobalIndex();
622
623      size_t nbWritten = 0, indGlo;     
624      boost::unordered_map<size_t,size_t>::const_iterator itb = globalLocalIndexMap_.begin(),
625                                                          ite = globalLocalIndexMap_.end(), it;         
626      CArray<size_t,1>::const_iterator itSrvb = writtenGlobalIndex.begin(),
627                                       itSrve = writtenGlobalIndex.end(), itSrv;     
628
629      for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
630      {
631        indGlo = *itSrv;
632        if (ite != globalLocalIndexMap_.find(indGlo))
633        {         
634          ++nbWritten;
635        }                 
636      }
637
638      localIndexToWriteOnServer.resize(nbWritten);
639
640      nbWritten = 0;
641      for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
642      {
643        indGlo = *itSrv;
644        if (ite != globalLocalIndexMap_.find(indGlo))
645        {
646          localIndexToWriteOnServer(nbWritten) = globalLocalIndexMap_[indGlo];
647          ++nbWritten;
648        }                 
649      }
650
651      if (isCompressible())
652      {
653        nbWritten = 0;
654        boost::unordered_map<size_t,size_t> localGlobalIndexMap;
655        for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
656        {
657          indGlo = *itSrv;
658          if (ite != globalLocalIndexMap_.find(indGlo))
659          {
660            localGlobalIndexMap[localIndexToWriteOnServer(nbWritten)] = indGlo;
661            ++nbWritten;
662          }                 
663        }
664
665        nbWritten = 0;
666        for (int idx = 0; idx < data_index.numElements(); ++idx)
667        {
668          if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
669          {
670            ++nbWritten;
671          }
672        }
673
674        compressedIndexToWriteOnServer.resize(nbWritten);
675        nbWritten = 0;
676        for (int idx = 0; idx < data_index.numElements(); ++idx)
677        {
678          if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
679          {
680            compressedIndexToWriteOnServer(nbWritten) = localGlobalIndexMap[data_index(idx)];
681            ++nbWritten;
682          }
683        }
684
685        numberWrittenIndexes_ = nbWritten;
686        if (isDistributed())
687        {
688               
689          MPI_Allreduce(&numberWrittenIndexes_, &totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
690          MPI_Scan(&numberWrittenIndexes_, &offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
691          offsetWrittenIndexes_ -= numberWrittenIndexes_;
692        }
693        else
694          totalNumberWrittenIndexes_ = numberWrittenIndexes_;
695      }
696
697   }
698
699
700
701  void CAxis::sendDistributionAttribute(const std::vector<int>& globalDim, int orderPositionInGrid,
702                                        CServerDistributionDescription::ServerDistributionType distType)
703  {
704    CContext* context = CContext::getCurrent();
705
706    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1;
707    for (int i = 0; i < nbSrvPools; ++i)
708    {
709      CContextClient* contextClientTmp = (context->hasServer) ? context->clientPrimServer[i]
710                                                                         : context->client;
711      int nbServer = contextClientTmp->serverSize;
712
713      CServerDistributionDescription serverDescription(globalDim, nbServer);
714      serverDescription.computeServerDistribution();
715
716      std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
717      std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
718
719      globalDimGrid.resize(globalDim.size());
720      for (int idx = 0; idx < globalDim.size(); ++idx) globalDimGrid(idx) = globalDim[idx];
721
722      CEventClient event(getType(),EVENT_ID_DISTRIBUTION_ATTRIBUTE);
723      if (contextClientTmp->isServerLeader())
724      {
725        std::list<CMessage> msgs;
726
727        const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
728        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
729        {
730          // Use const int to ensure CMessage holds a copy of the value instead of just a reference
731          const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
732          const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
733          const int end   = begin + ni - 1;
734
735          msgs.push_back(CMessage());
736          CMessage& msg = msgs.back();
737          msg << this->getId();
738          msg << ni << begin << end;
739          msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
740          msg << isCompressible_;
741          msg << orderPositionInGrid;
742          msg << globalDimGrid;
743
744          event.push(*itRank,1,msg);
745        }
746        contextClientTmp->sendEvent(event);
747      }
748      else contextClientTmp->sendEvent(event);
749    }
750  }
751
752  void CAxis::sendNonDistributedAttributes()
753  {
754    CContext* context = CContext::getCurrent();
755
756    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
757    for (int p = 0; p < nbSrvPools; ++p)
758    {
759      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
760
761      CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES);
762      size_t nbIndex = index.numElements();
763      size_t nbDataIndex = 0;
764
765      for (int idx = 0; idx < data_index.numElements(); ++idx)
766      {
767        int ind = data_index(idx);
768        if (ind >= 0 && ind < nbIndex) ++nbDataIndex;
769      }
770
771      CArray<int,1> dataIndex(nbDataIndex);
772      nbDataIndex = 0;
773      for (int idx = 0; idx < data_index.numElements(); ++idx)
774      {
775        int ind = data_index(idx);
776        if (ind >= 0 && ind < nbIndex)
777        {
778          dataIndex(nbDataIndex) = ind;
779          ++nbDataIndex;
780        }
781      }
782
783      if (client->isServerLeader())
784      {
785        std::list<CMessage> msgs;
786
787        const std::list<int>& ranks = client->getRanksServerLeader();
788        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
789        {
790          msgs.push_back(CMessage());
791          CMessage& msg = msgs.back();
792          msg << this->getId();
793          msg << index.getValue() << dataIndex << mask.getValue();
794
795          msg << doZoomByIndex_;
796          if (doZoomByIndex_) msg << zoom_index.getValue();
797          msg << hasValue;
798          if (hasValue) msg << value.getValue();
799          msg << hasBounds_;
800          if (hasBounds_) msg << bounds.getValue();
801
802          event.push(*itRank, 1, msg);
803        }
804        client->sendEvent(event);
805      }
806      else client->sendEvent(event);
807    }
808  }
809
810  void CAxis::recvNonDistributedAttributes(CEventServer& event)
811  {
812    list<CEventServer::SSubEvent>::iterator it;
813    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
814    {
815      CBufferIn* buffer = it->buffer;
816      string axisId;
817      *buffer >> axisId;
818      get(axisId)->recvNonDistributedAttributes(it->rank, *buffer);
819    }
820  }
821
822  void CAxis::recvNonDistributedAttributes(int rank, CBufferIn& buffer)
823  { 
824    CArray<int,1> tmp_index, tmp_data_index, tmp_zoom_index;
825    CArray<bool,1> tmp_mask;
826    CArray<double,1> tmp_val;
827    CArray<double,2> tmp_bnds;
828
829    buffer >> tmp_index;
830    index.reference(tmp_index);
831    buffer >> tmp_data_index;
832    data_index.reference(tmp_data_index);
833    buffer >> tmp_mask;
834    mask.reference(tmp_mask);
835
836    buffer >> doZoomByIndex_;
837    if (doZoomByIndex_)
838    {
839      buffer >> tmp_zoom_index;
840      zoom_index.reference(tmp_zoom_index);
841    }
842
843    buffer >> hasValue;
844    if (hasValue)
845    {
846      buffer >> tmp_val;
847      value.reference(tmp_val);
848    }
849
850    buffer >> hasBounds_;
851    if (hasBounds_)
852    {
853      buffer >> tmp_bnds;
854      bounds.reference(tmp_bnds);
855    }
856
857    data_begin.setValue(0);
858    globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor()));
859    for (int idx = 0; idx < index.numElements(); ++idx) globalLocalIndexMap_[idx] = index(idx);
860  }
861
862  void CAxis::sendDistributedAttributes(void)
863  {
864    int ns, n, i, j, ind, nv, idx;
865    CContext* context = CContext::getCurrent();
866
867    //int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1;
868    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
869    for (int p = 0; p < nbSrvPools; ++p)
870    {
871      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
872
873      CEventClient eventData(getType(), EVENT_ID_DISTRIBUTED_ATTRIBUTES);
874
875      list<CMessage> listData;
876      list<CArray<int,1> > list_indi, list_dataInd, list_zoomInd;
877      list<CArray<bool,1> > list_mask;
878      list<CArray<double,1> > list_val;
879      list<CArray<double,2> > list_bounds;
880
881      int nbIndex = index.numElements();
882      CArray<int,1> dataIndex(nbIndex);
883      dataIndex = -1;
884      for (int inx = 0; inx < data_index.numElements(); ++inx)
885      {
886        if (0 <= data_index(inx) && data_index(inx) < nbIndex)
887          dataIndex(inx) = data_index(inx);
888      }
889
890      boost::unordered_map<int, std::vector<size_t> >::const_iterator it, iteMap;
891      iteMap = indSrv_.end();
892      for (int k = 0; k < connectedServerRank_.size(); ++k)
893      {
894        int nbData = 0;
895        int rank = connectedServerRank_[k];
896        int nbSendingClient = nbConnectedClients_[rank];
897        it = indSrv_.find(rank);
898        if (iteMap != it)
899          nbData = it->second.size();
900
901        list_indi.push_back(CArray<int,1>(nbData));
902        list_dataInd.push_back(CArray<int,1>(nbData));       
903        list_mask.push_back(CArray<bool,1>(nbData));
904
905        if (doZoomByIndex_)
906          list_zoomInd.push_back(CArray<int,1>(nbData));
907
908        if (hasValue)
909          list_val.push_back(CArray<double,1>(nbData));
910
911        if (hasBounds_)
912        {
913          list_bounds.push_back(CArray<double,2>(2,nbData));
914        }
915
916        CArray<int,1>& indi = list_indi.back();
917        CArray<int,1>& dataIndi = list_dataInd.back();       
918        CArray<bool,1>& maskIndi = list_mask.back();
919
920        for (n = 0; n < nbData; ++n)
921        {
922          idx = static_cast<int>(it->second[n]);
923          indi(n) = idx;
924
925          ind = globalLocalIndexMap_[idx];
926          dataIndi(n) = dataIndex(ind);
927          maskIndi(n) = mask(ind);
928
929          if (doZoomByIndex_)
930          {
931            CArray<int,1>& zoomIndi = list_zoomInd.back();
932            zoomIndi(n) = zoom_index(ind);
933          }
934
935          if (hasValue)
936          {
937            CArray<double,1>& val = list_val.back();
938            val(n) = value(ind);
939          }
940
941          if (hasBounds_)
942          {
943            CArray<double,2>& boundsVal = list_bounds.back();
944            boundsVal(0, n) = bounds(0,n);
945            boundsVal(1, n) = bounds(1,n);
946          }
947        }
948
949        listData.push_back(CMessage());
950        listData.back() << this->getId()
951                        << list_indi.back() << list_dataInd.back() << list_mask.back();
952
953        listData.back() << doZoomByIndex_;           
954        if (doZoomByIndex_)
955          listData.back() << list_zoomInd.back();
956
957        listData.back() << hasValue;
958        if (hasValue)
959          listData.back() << list_val.back();
960
961        listData.back() << hasBounds_;
962        if (hasBounds_)
963          listData.back() << list_bounds.back();
964
965        eventData.push(rank, nbConnectedClients_[rank], listData.back());
966      }
967
968      client->sendEvent(eventData);
969    }
970  }
971
972  void CAxis::recvDistributedAttributes(CEventServer& event)
973  {
974    string axisId;
975    vector<int> ranks;
976    vector<CBufferIn*> buffers;
977
978    list<CEventServer::SSubEvent>::iterator it;
979    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
980    {
981      ranks.push_back(it->rank);
982      CBufferIn* buffer = it->buffer;
983      *buffer >> axisId;
984      buffers.push_back(buffer);
985    }
986    get(axisId)->recvDistributedAttributes(ranks, buffers);
987  }
988
989
990  void CAxis::recvDistributedAttributes(vector<int>& ranks, vector<CBufferIn*> buffers)
991  {
992    int nbReceived = ranks.size();
993    vector<CArray<int,1> > vec_indi(nbReceived), vec_dataInd(nbReceived), vec_zoomInd(nbReceived);   
994    vector<CArray<bool,1> > vec_mask(nbReceived);
995    vector<CArray<double,1> > vec_val(nbReceived);
996    vector<CArray<double,2> > vec_bounds(nbReceived);
997   
998    for (int idx = 0; idx < nbReceived; ++idx)
999    {     
1000      CBufferIn& buffer = *buffers[idx];
1001      buffer >> vec_indi[idx];
1002      buffer >> vec_dataInd[idx];     
1003      buffer >> vec_mask[idx];
1004
1005      buffer >> doZoomByIndex_;
1006      if (doZoomByIndex_)
1007        buffer >> vec_zoomInd[idx];
1008
1009      buffer >> hasValue;
1010      if (hasValue)
1011        buffer >> vec_val[idx];
1012
1013      buffer >> hasBounds_;
1014      if (hasBounds_)
1015        buffer >> vec_bounds[idx];
1016    }
1017
1018    int nbData = 0;
1019    for (int idx = 0; idx < nbReceived; ++idx)
1020    {
1021      nbData += vec_indi[idx].numElements();
1022    }
1023
1024    index.resize(nbData);
1025    globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor()));
1026    CArray<int,1> nonCompressedData(nbData);   
1027    mask.resize(nbData);
1028    if (hasValue)
1029      value.resize(nbData);
1030    if (hasBounds_)
1031      bounds.resize(2,nbData);
1032
1033    nbData = 0;
1034    for (int idx = 0; idx < nbReceived; ++idx)
1035    {
1036      CArray<int,1>& indi = vec_indi[idx];
1037      CArray<int,1>& dataIndi = vec_dataInd[idx];
1038      CArray<bool,1>& maskIndi = vec_mask[idx];
1039      int nb = indi.numElements();
1040      for (int n = 0; n < nb; ++n)
1041      {
1042        index(nbData) = indi(n);
1043        globalLocalIndexMap_[indi(n)] = nbData;
1044        nonCompressedData(nbData) = (0 <= dataIndi(n)) ? nbData : -1;
1045        mask(nbData) = maskIndi(n);
1046        if (hasValue)
1047          value(nbData) = vec_val[idx](n);
1048        if (hasBounds_)
1049        {
1050          bounds(0,nbData) = vec_bounds[idx](0,n);
1051          bounds(1,nbData) = vec_bounds[idx](1,n);
1052        }
1053        ++nbData;
1054      }
1055    }
1056
1057    int nbIndex = index.numElements();
1058    int nbCompressedData = 0; 
1059    for (int idx = 0; idx < nonCompressedData.numElements(); ++idx)
1060    {
1061      if (0 <= nonCompressedData(idx) && nonCompressedData(idx) < nbIndex)
1062        ++nbCompressedData;       
1063    }
1064
1065    data_index.resize(nbCompressedData);
1066    nbCompressedData = 0;
1067    for (int idx = 0; idx < nonCompressedData.numElements(); ++idx)
1068    {
1069      if (0 <= nonCompressedData(idx) && nonCompressedData(idx) < nbIndex)
1070      {
1071        data_index(nbCompressedData) = nonCompressedData(idx);
1072        ++nbCompressedData;       
1073      }
1074    }
1075    data_begin.setValue(0);
1076
1077    if (doZoomByIndex_)
1078    {
1079      int nbZoomIndex = 0;
1080      for (int idx = 0; idx < nbReceived; ++idx)
1081      {
1082        nbZoomIndex += vec_zoomInd[idx].numElements();
1083      }
1084
1085      zoom_index.resize(nbZoomIndex);
1086      nbZoomIndex = 0;     
1087      for (int idx = 0; idx < nbReceived; ++idx)
1088      {     
1089        CArray<int,1>& tmp = vec_zoomInd[idx];
1090        for (int i = 0; i < tmp.size(); ++i)
1091        {
1092          zoom_index(nbZoomIndex) = tmp(i);
1093          ++nbZoomIndex;
1094        }       
1095      }
1096    }
1097
1098    if (hasLabel)
1099    {
1100      //label_srv(ind_srv) = labelVal( ind);
1101    }
1102  }
1103
1104  void CAxis::recvDistributionAttribute(CEventServer& event)
1105  {
1106    CBufferIn* buffer = event.subEvents.begin()->buffer;
1107    string axisId;
1108    *buffer >> axisId;
1109    get(axisId)->recvDistributionAttribute(*buffer);
1110  }
1111
1112  void CAxis::recvDistributionAttribute(CBufferIn& buffer)
1113  {
1114    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
1115
1116    buffer >> ni_srv >> begin_srv >> end_srv;
1117    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
1118    buffer >> isCompressible_;
1119    buffer >> orderPosInGrid;
1120    buffer >> globalDimGrid;
1121
1122    n.setValue(ni_srv);
1123    begin.setValue(begin_srv);
1124    global_zoom_begin = global_zoom_begin_tmp;
1125    global_zoom_n  = global_zoom_n_tmp;
1126    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
1127
1128    zoom_begin = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
1129    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
1130    zoom_n  = zoom_end_srv - zoom_begin_srv + 1;
1131
1132    if (zoom_n<=0)
1133    {
1134      zoom_begin = 0; zoom_end_srv = 0; zoom_n = 0;
1135    }
1136
1137    if (n_glo == n)
1138    {
1139      zoom_begin = global_zoom_begin;
1140      zoom_end_srv   = global_zoom_end; //zoom_end;
1141      zoom_n     = zoom_end_srv - zoom_begin + 1;
1142    }
1143  }
1144
1145  /*!
1146    Compare two axis objects.
1147    They are equal if only if they have identical attributes as well as their values.
1148    Moreover, they must have the same transformations.
1149  \param [in] axis Compared axis
1150  \return result of the comparison
1151  */
1152  bool CAxis::isEqual(CAxis* obj)
1153  {
1154    vector<StdString> excludedAttr;
1155    excludedAttr.push_back("axis_ref");
1156
1157    bool objEqual = SuperClass::isEqual(obj, excludedAttr);   
1158    if (!objEqual) return objEqual;
1159
1160    TransMapTypes thisTrans = this->getAllTransformations();
1161    TransMapTypes objTrans  = obj->getAllTransformations();
1162
1163    TransMapTypes::const_iterator it, itb, ite;
1164    std::vector<ETranformationType> thisTransType, objTransType;
1165    for (it = thisTrans.begin(); it != thisTrans.end(); ++it)
1166      thisTransType.push_back(it->first);
1167    for (it = objTrans.begin(); it != objTrans.end(); ++it)
1168      objTransType.push_back(it->first);
1169
1170    if (thisTransType.size() != objTransType.size()) return false;
1171    for (int idx = 0; idx < thisTransType.size(); ++idx)
1172      objEqual &= (thisTransType[idx] == objTransType[idx]);
1173
1174    return objEqual;
1175  }
1176
1177  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1178  {
1179    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1180    return transformationMap_.back().second;
1181  }
1182
1183  bool CAxis::hasTransformation()
1184  {
1185    return (!transformationMap_.empty());
1186  }
1187
1188  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1189  {
1190    transformationMap_ = axisTrans;
1191  }
1192
1193  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1194  {
1195    return transformationMap_;
1196  }
1197
1198  void CAxis::duplicateTransformation(CAxis* src)
1199  {
1200    if (src->hasTransformation())
1201    {
1202      this->setTransformations(src->getAllTransformations());
1203    }
1204  }
1205
1206  /*!
1207   * Go through the hierarchy to find the axis from which the transformations must be inherited
1208   */
1209  void CAxis::solveInheritanceTransformation()
1210  {
1211    if (hasTransformation() || !hasDirectAxisReference())
1212      return;
1213
1214    CAxis* axis = this;
1215    std::vector<CAxis*> refAxis;
1216    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1217    {
1218      refAxis.push_back(axis);
1219      axis = axis->getDirectAxisReference();
1220    }
1221
1222    if (axis->hasTransformation())
1223      for (size_t i = 0; i < refAxis.size(); ++i)
1224        refAxis[i]->setTransformations(axis->getAllTransformations());
1225  }
1226
1227  void CAxis::parse(xml::CXMLNode & node)
1228  {
1229    SuperClass::parse(node);
1230
1231    if (node.goToChildElement())
1232    {
1233      StdString nodeElementName;
1234      do
1235      {
1236        StdString nodeId("");
1237        if (node.getAttributes().end() != node.getAttributes().find("id"))
1238        { nodeId = node.getAttributes()["id"]; }
1239
1240        nodeElementName = node.getElementName();
1241        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1242        it = transformationMapList_.find(nodeElementName);
1243        if (ite != it)
1244        {
1245          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1246                                                                                                               nodeId,
1247                                                                                                               &node)));
1248        }
1249        else
1250        {
1251          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1252                << "The transformation " << nodeElementName << " has not been supported yet.");
1253        }
1254      } while (node.goToNextElement()) ;
1255      node.goToParentElement();
1256    }
1257  }
1258
1259  DEFINE_REF_FUNC(Axis,axis)
1260
1261   ///---------------------------------------------------------------
1262
1263} // namespace xios
Note: See TracBrowser for help on using the repository browser.