source: XIOS3/trunk/src/node/context.cpp @ 2407

Last change on this file since 2407 was 2407, checked in by ymipsl, 21 months ago

Implement separate "reader" and "writer" service. Default reader live on same ressources that "writer" or "gatherer" services.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 79.4 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "legacy_context_client.hpp"
10#include "legacy_context_server.hpp"
11#include "one_sided_context_client.hpp"
12#include "one_sided_context_server.hpp"
13#include "nc4_data_output.hpp"
14#include "node_type.hpp"
15#include "message.hpp"
16#include "type.hpp"
17#include "xios_spl.hpp"
18#include "timer.hpp"
19#include "memtrack.hpp"
20#include <limits>
21#include <fstream>
22#include "server.hpp"
23#include "distribute_file_server2.hpp"
24#include "services_manager.hpp"
25#include "contexts_manager.hpp"
26#include "cxios.hpp"
27#include "client.hpp"
28#include "coupler_in.hpp"
29#include "coupler_out.hpp"
30#include "servers_ressource.hpp"
31#include "pool_ressource.hpp"
32#include "services.hpp"
33#include "contexts_manager.hpp"
34#include <chrono>
35#include <random>
36
37namespace xios
38{
39
40  std::shared_ptr<CContextGroup> CContext::root;
41
42   /// ////////////////////// Définitions ////////////////////// ///
43
44   CContext::CContext(void)
45      : CObjectTemplate<CContext>(), CContextAttributes()
46      , calendar(), hasClient(false), hasServer(false)
47      , isPostProcessed(false), finalized(false)
48      , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false)
49
50   { /* Ne rien faire de plus */  }
51
52   CContext::CContext(const StdString & id)
53      : CObjectTemplate<CContext>(id), CContextAttributes()
54      , calendar(), hasClient(false), hasServer(false)
55      , isPostProcessed(false), finalized(false)
56      , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false)
57   { /* Ne rien faire de plus */ }
58
59   CContext::~CContext(void)
60   {
61     for(auto& client : writerClientOut_) delete client ;
62     for(auto& server : writerServerOut_) delete server ;
63
64     for(auto& client : writerClientIn_) delete client ;
65     for(auto& server : writerServerIn_) delete server ;
66
67     for(auto& client : readerClientOut_) delete client ;
68     for(auto& server : readerServerOut_) delete server ;
69
70     for(auto& client : readerClientIn_) delete client ;
71     for(auto& server : readerServerIn_) delete server ;
72
73
74     if (registryIn!=nullptr) delete registryIn ;
75     if (registryOut!=nullptr) delete registryOut ;
76   }
77
78   //----------------------------------------------------------------
79   //! Get name of context
80   StdString CContext::GetName(void)   { return (StdString("context")); }
81   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
82   ENodeType CContext::GetType(void)   { return (eContext); }
83
84   //----------------------------------------------------------------
85
86  void CContext::initEventScheduler(void)
87  {
88    SRegisterContextInfo contextInfo ;
89    CXios::getContextsManager()->getContextInfo(this->getId(), contextInfo, getIntraComm()) ;
90
91    eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
92 
93    // generate unique hash for server
94    auto time=chrono::system_clock::now().time_since_epoch().count() ;
95    std::default_random_engine rd(time); // not reproducible from a run to another
96    std::uniform_int_distribution<size_t> dist;
97    hashId_=dist(rd) ;
98    MPI_Bcast(&hashId_,1,MPI_SIZE_T,0,getIntraComm()) ; // Bcast to all server of the context
99  }
100   /*!
101   \brief Get context group (context root)
102   \return Context root
103   */
104   CContextGroup* CContext::getRoot(void)
105   TRY
106   {
107      if (root.get()==NULL) root=std::shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
108      return root.get();
109   }
110   CATCH
111
112   void CContext::releaseStaticAllocation(void)
113   TRY
114   {
115      CDomain::releaseStaticAllocation();
116      CAxis::releaseStaticAllocation();
117      CScalar::releaseStaticAllocation();
118      if (root) root.reset() ;
119   }
120   CATCH
121   
122   //----------------------------------------------------------------
123
124   /*!
125   \brief Get calendar of a context
126   \return Calendar
127   */
128   std::shared_ptr<CCalendar> CContext::getCalendar(void) const
129   TRY
130   {
131      return (this->calendar);
132   }
133   CATCH
134
135   //----------------------------------------------------------------
136
137   /*!
138   \brief Set a context with a calendar
139   \param[in] newCalendar new calendar
140   */
141   void CContext::setCalendar(std::shared_ptr<CCalendar> newCalendar)
142   TRY
143   {
144      this->calendar = newCalendar;
145   }
146   CATCH_DUMP_ATTR
147
148   //----------------------------------------------------------------
149   /*!
150   \brief Parse xml file and write information into context object
151   \param [in] node xmld node corresponding in xml file
152   */
153   void CContext::parse(xml::CXMLNode & node)
154   TRY
155   {
156      CContext::SuperClass::parse(node);
157
158      // PARSING POUR GESTION DES ENFANTS
159      xml::THashAttributes attributes = node.getAttributes();
160
161      if (attributes.end() != attributes.find("src"))
162      {
163         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
164         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
165            ERROR("void CContext::parse(xml::CXMLNode & node)",
166                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
167         if (!ifs.good())
168            ERROR("CContext::parse(xml::CXMLNode & node)",
169                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
170         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
171      }
172
173      if (node.getElementName().compare(CContext::GetName()))
174         DEBUG("Le noeud is wrong defined but will be considered as a context !");
175
176      if (!(node.goToChildElement()))
177      {
178         DEBUG("Le context ne contient pas d'enfant !");
179      }
180      else
181      {
182         do { // Parcours des contextes pour traitement.
183
184            StdString name = node.getElementName();
185            attributes.clear();
186            attributes = node.getAttributes();
187
188            if (attributes.end() != attributes.find("id"))
189            {
190              DEBUG(<< "Definition node has an id,"
191                    << "it will not be taking account !");
192            }
193
194#define DECLARE_NODE(Name_, name_)    \
195   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
196   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
197#define DECLARE_NODE_PAR(Name_, name_)
198#include "node_type.conf"
199
200            DEBUG(<< "The element \'"     << name
201                  << "\' in the context \'" << CContext::getCurrent()->getId()
202                  << "\' is not a definition !");
203
204         } while (node.goToNextElement());
205
206         node.goToParentElement(); // Retour au parent
207      }
208   }
209   CATCH_DUMP_ATTR
210
211   //----------------------------------------------------------------
212   //! Show tree structure of context
213   void CContext::ShowTree(StdOStream & out)
214   TRY
215   {
216      StdString currentContextId = CContext::getCurrent() -> getId();
217      std::vector<CContext*> def_vector =
218         CContext::getRoot()->getChildList();
219      std::vector<CContext*>::iterator
220         it = def_vector.begin(), end = def_vector.end();
221
222      out << "<? xml version=\"1.0\" ?>" << std::endl;
223      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
224
225      for (; it != end; it++)
226      {
227         CContext* context = *it;
228         CContext::setCurrent(context->getId());
229         out << *context << std::endl;
230      }
231
232      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
233      CContext::setCurrent(currentContextId);
234   }
235   CATCH
236
237   //----------------------------------------------------------------
238
239   //! Convert context object into string (to print)
240   StdString CContext::toString(void) const
241   TRY
242   {
243      StdOStringStream oss;
244      oss << "<" << CContext::GetName()
245          << " id=\"" << this->getId() << "\" "
246          << SuperClassAttribute::toString() << ">" << std::endl;
247      if (!this->hasChild())
248      {
249         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
250      }
251      else
252      {
253
254#define DECLARE_NODE(Name_, name_)    \
255   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
256   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
257#define DECLARE_NODE_PAR(Name_, name_)
258#include "node_type.conf"
259
260      }
261      oss << "</" << CContext::GetName() << " >";
262      return (oss.str());
263   }
264   CATCH
265
266   //----------------------------------------------------------------
267
268   /*!
269   \brief Find all inheritace among objects in a context.
270   \param [in] apply (true) write attributes of parent into ones of child if they are empty
271                     (false) write attributes of parent into a new container of child
272   \param [in] parent unused
273   */
274   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
275   TRY
276   {
277#define DECLARE_NODE(Name_, name_)    \
278   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
279     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
280#define DECLARE_NODE_PAR(Name_, name_)
281#include "node_type.conf"
282   }
283   CATCH_DUMP_ATTR
284
285   //----------------------------------------------------------------
286
287   //! Verify if all root definition in the context have child.
288   bool CContext::hasChild(void) const
289   TRY
290   {
291      return (
292#define DECLARE_NODE(Name_, name_)    \
293   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
294#define DECLARE_NODE_PAR(Name_, name_)
295#include "node_type.conf"
296      false);
297}
298   CATCH
299
300   //----------------------------------------------------------------
301
302   void CContext::CleanTree(void)
303   TRY
304   {
305#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
306#define DECLARE_NODE_PAR(Name_, name_)
307#include "node_type.conf"
308   }
309   CATCH
310
311void CContext::removeContext(const string& contextId)
312{
313  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteContext< C##Name_ >(contextId);
314  #define DECLARE_NODE_PAR(Name_, name_) CObjectFactory::deleteContext< C##Name_ >(contextId);
315  #include "node_type.conf"
316  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteContext< C##Name_##Group >(contextId);
317  #define DECLARE_NODE_PAR(Name_, name_) 
318  #include "node_type.conf"
319
320/*
321  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_##Group >();
322  #define DECLARE_NODE_PAR(Name_, name_)
323  #include "node_type.conf"
324
325  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_>();
326  #define DECLARE_NODE_PAR(Name_, name_)
327  #include "node_type.conf"
328*/
329}
330
331void CContext::removeAllContexts(void)
332{
333  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteAllContexts< C##Name_ >();
334  #define DECLARE_NODE_PAR(Name_, name_) CObjectFactory::deleteAllContexts< C##Name_ >();
335  #include "node_type.conf"
336  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteAllContexts< C##Name_##Group >();
337  #define DECLARE_NODE_PAR(Name_, name_) 
338  #include "node_type.conf"
339/*
340  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_##Group >();
341  #define DECLARE_NODE_PAR(Name_, name_)
342  #include "node_type.conf"
343
344  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_>();
345  #define DECLARE_NODE_PAR(Name_, name_)
346  #include "node_type.conf"
347*/
348  CObjectFactory::deleteAllContexts<CContext>() ;
349  CObjectFactory::deleteAllContexts<CContextGroup>() ;
350  CObjectFactory::clearCurrentContextId();
351  CGroupFactory::clearCurrentContextId();
352}
353   ///---------------------------------------------------------------
354
355
356 /*!
357    * Compute the required buffer size to send the fields data.
358    * \param maxEventSize [in/out] the size of the bigger event for each connected server
359    * \param [in] contextClient
360    * \param [in] bufferForWriting True if buffers are used for sending data for writing
361      This flag is only true for client and server-1 for communication with server-2
362    */
363   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize,
364                                                      CContextClient* contextClient, bool bufferForWriting /*= "false"*/)
365   TRY
366   {
367     std::map<int, StdSize> dataSize;
368
369     // Find all reference domain and axis of all active fields
370     std::vector<CFile*>& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles;
371     size_t numEnabledFiles = fileList.size();
372     for (size_t i = 0; i < numEnabledFiles; ++i)
373     {
374       CFile* file = fileList[i];
375       if (file->getContextClient() == contextClient)
376       {
377         std::vector<CField*> enabledFields = file->getEnabledFields();
378         size_t numEnabledFields = enabledFields.size();
379         for (size_t j = 0; j < numEnabledFields; ++j)
380         {
381           // const std::vector<std::map<int, StdSize> > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient);
382           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting);
383           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
384           for (; it != itE; ++it)
385           {
386             // If dataSize[it->first] does not exist, it will be zero-initialized
387             // so we can use it safely without checking for its existance
388           if (CXios::isOptPerformance)
389               dataSize[it->first] += it->second;
390             else if (dataSize[it->first] < it->second)
391               dataSize[it->first] = it->second;
392
393           if (maxEventSize[it->first] < it->second)
394               maxEventSize[it->first] = it->second;
395           }
396         }
397       }
398     }
399     return dataSize;
400   }
401   CATCH_DUMP_ATTR
402
403/*!
404    * Compute the required buffer size to send the attributes (mostly those grid related).
405    * \param maxEventSize [in/out] the size of the bigger event for each connected server
406    * \param [in] contextClient
407    * \param [in] bufferForWriting True if buffers are used for sending data for writing
408      This flag is only true for client and server-1 for communication with server-2
409    */
410   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize,
411                                                           CContextClient* contextClient, bool bufferForWriting /*= "false"*/)
412   TRY
413   {
414   // As calendar attributes are sent even if there are no active files or fields, maps are initialized according the size of calendar attributes
415     std::map<int, StdSize> attributesSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient);
416     maxEventSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient);
417
418     std::vector<CFile*>& fileList = this->enabledFiles;
419     size_t numEnabledFiles = fileList.size();
420     for (size_t i = 0; i < numEnabledFiles; ++i)
421     {
422//         CFile* file = this->enabledWriteModeFiles[i];
423        CFile* file = fileList[i];
424        std::vector<CField*> enabledFields = file->getEnabledFields();
425        size_t numEnabledFields = enabledFields.size();
426        for (size_t j = 0; j < numEnabledFields; ++j)
427        {
428          const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting);
429          std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
430          for (; it != itE; ++it)
431          {
432         // If attributesSize[it->first] does not exist, it will be zero-initialized
433         // so we can use it safely without checking for its existence
434             if (attributesSize[it->first] < it->second)
435         attributesSize[it->first] = it->second;
436
437         if (maxEventSize[it->first] < it->second)
438         maxEventSize[it->first] = it->second;
439          }
440        }
441     }
442     return attributesSize;
443   }
444   CATCH_DUMP_ATTR
445
446
447
448   //! Verify whether a context is initialized
449   bool CContext::isInitialized(void)
450   TRY
451   {
452     return hasClient;
453   }
454   CATCH_DUMP_ATTR
455
456
457   void CContext::init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType)
458   TRY
459   {
460     parentServerContext_ = parentServerContext ;
461     if (serviceType==CServicesManager::CLIENT) 
462       initClient(intraComm, serviceType) ;
463     else
464       initServer(intraComm, serviceType) ;
465     initEventScheduler() ;
466    }
467    CATCH_DUMP_ATTR
468
469
470
471//! Initialize client side
472   void CContext::initClient(MPI_Comm intraComm, int serviceType)
473   TRY
474   {
475      intraComm_=intraComm ;
476      MPI_Comm_rank(intraComm_, &intraCommRank_) ;
477      MPI_Comm_size(intraComm_, &intraCommSize_) ;
478
479      serviceType_ = CServicesManager::CLIENT ;
480      if (serviceType_==CServicesManager::CLIENT)
481      {
482        hasClient=true ;
483        hasServer=false ;
484      }
485      contextId_ = getId() ;
486     
487      attached_mode=true ;
488      if (!CXios::isUsingServer()) attached_mode=false ;
489
490
491      string contextRegistryId=getId() ;
492      registryIn=new CRegistry(CXios::getRegistryManager()->getRegistryIn());
493      registryIn->setPath(contextRegistryId) ;
494     
495      registryOut=new CRegistry(intraComm_) ;
496      registryOut->setPath(contextRegistryId) ;
497     
498   }
499   CATCH_DUMP_ATTR
500
501   
502   void CContext::initServer(MPI_Comm intraComm, int serviceType)
503   TRY
504   {
505     hasServer=true;
506     intraComm_=intraComm ;
507     MPI_Comm_rank(intraComm_, &intraCommRank_) ;
508     MPI_Comm_size(intraComm_, &intraCommSize_) ;
509
510     serviceType_=serviceType ;
511
512     if (serviceType_==CServicesManager::GATHERER)
513     {
514       hasClient=true ;
515       hasServer=true ;
516     }
517     else if (serviceType_==CServicesManager::WRITER || serviceType_==CServicesManager::READER)
518     {
519       hasClient=false ;
520       hasServer=true ;
521     }
522
523     CXios::getContextsManager()->getContextId(getId(), contextId_, intraComm) ;
524     
525     string contextRegistryId=getId() ;
526     registryIn=new CRegistry(CXios::getRegistryManager()->getRegistryIn());
527     registryIn->setPath(contextRegistryId) ;
528     
529     registryOut=new CRegistry(intraComm_) ;
530     registryOut->setPath(contextRegistryId) ;
531
532   }
533   CATCH_DUMP_ATTR
534
535
536  void CContext::createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) // for servers
537  TRY
538  {
539    MPI_Comm intraCommClient ;
540    MPI_Comm_dup(intraComm_, &intraCommClient);
541    comms.push_back(intraCommClient);
542    // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context
543
544    CContextServer* server ;
545    CContextClient* client ;
546   
547    server = CContextServer::getNew(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ?
548    client = CContextClient::getNew(this,intraCommClient,interCommClient);
549    client->setAssociatedServer(server) ; 
550    server->setAssociatedClient(client) ;
551   
552    if (serviceType_ == CServicesManager::GATHERER || serviceType_ == CServicesManager::WRITER)
553    { 
554      writerServerIn_.push_back(server) ; 
555      writerClientIn_.push_back(client) ; 
556    }
557    else if (serviceType_ == CServicesManager::READER)
558    {
559      readerServerIn_.push_back(server) ; 
560      readerClientIn_.push_back(client) ; 
561    }
562  }
563  CATCH_DUMP_ATTR
564
565 
566  void CContext::createServerInterComm(const string& poolId, const string& serverId, vector<pair<string, pair<CContextClient*,CContextServer*>>>& clientServers )
567  TRY
568  {
569    MPI_Comm interCommClient, interCommServer ;
570    int commRank ;
571    MPI_Comm_rank(intraComm_,&commRank) ;
572   
573    int nbPartitions ;
574    if (commRank==0)
575    {
576      CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions) ;
577      for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ;
578    }
579    setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble (attached ???)
580    MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ;
581     
582    MPI_Comm interComm ;
583    for(int i=0 ; i<nbPartitions; i++)
584    {
585      parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ;
586      int type ; 
587      if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type) ;
588      MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
589      string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ;
590
591      MPI_Comm intraCommClient, intraCommServer ;
592
593      intraCommClient=intraComm_ ;
594      MPI_Comm_dup(intraComm_, &intraCommServer) ;
595
596      CContextClient* client = CContextClient::getNew(this, intraCommClient, interCommClient) ;
597      CContextServer* server = CContextServer::getNew(this, intraCommServer, interCommServer) ;
598      client->setAssociatedServer(server) ;
599      server->setAssociatedClient(client) ;
600     
601      clientServers.push_back({fullServerId,{client,server}}) ;
602      clientsId_[client] = fullServerId ;
603      serversId_[server] = fullServerId ;
604    }
605  }
606  CATCH_DUMP_ATTR
607 
608  void CContext::createServerInterComm(void) 
609  TRY
610  {
611    vector<pair<string, pair<CContextClient*,CContextServer*>>> clientServers ;
612
613    if (serviceType_ == CServicesManager::CLIENT)
614    {
615      if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultWriterId, clientServers) ;
616      else if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ;
617      else createServerInterComm(CXios::defaultPoolId, CXios::defaultWriterId, clientServers) ;
618     
619      writerClientOut_.push_back(clientServers[0].second.first) ; 
620      writerServerOut_.push_back(clientServers[0].second.second) ;
621
622      clientServers.clear() ;
623   
624      if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultReaderId, clientServers) ;
625      else createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ;
626      readerClientOut_.push_back(clientServers[0].second.first) ; 
627      readerServerOut_.push_back(clientServers[0].second.second) ;
628
629
630    }
631    else if (serviceType_ == CServicesManager::GATHERER)
632    {
633      createServerInterComm(CXios::defaultPoolId, CXios::defaultWriterId, clientServers) ;
634      for(auto& clientServer : clientServers)
635      {
636        writerClientOut_.push_back(clientServer.second.first) ; 
637        writerServerOut_.push_back(clientServer.second.second) ;
638      }
639    }
640
641  }
642  CATCH_DUMP_ATTR
643
644  void CContext::globalEventLoop(void)
645  {
646    lockContext() ;
647    CXios::getDaemonsManager()->eventLoop() ;
648    unlockContext() ;
649    setCurrent(getId()) ;
650  }
651
652  bool CContext::scheduledEventLoop(bool enableEventsProcessing) 
653  {
654    bool out, finished; 
655    size_t timeLine=timeLine_ ;
656    if (serviceType_==CServicesManager::CLIENT)
657    {
658      timeLine_++ ;
659      eventScheduler_->registerEvent(timeLine, hashId_) ;
660    }
661
662    do
663    { 
664      finished=eventLoop(enableEventsProcessing) ;
665      if (serviceType_==CServicesManager::CLIENT) 
666      { 
667        out = eventScheduler_->queryEvent(timeLine,hashId_) ;
668        if (out) eventScheduler_->popEvent() ;
669      }
670
671      else out=true ;
672    }  while(!out) ;
673   
674    return finished ;
675  }
676
677  bool CContext::eventLoop(bool enableEventsProcessing)
678  {
679    bool  finished(true); 
680    if (isLockedContext()) enableEventsProcessing=false;
681   
682    setCurrent(getId()) ;
683
684    if (!finalized)
685    {
686      for(auto client : writerClientOut_) client->eventLoop();
687      for(auto server : writerServerOut_) finished &= server->eventLoop(enableEventsProcessing);
688      for(auto client : writerClientIn_) client->eventLoop();
689      for(auto server : writerServerIn_) finished &= server->eventLoop(enableEventsProcessing);
690      for(auto client : readerClientOut_) client->eventLoop();
691      for(auto server : readerServerOut_) finished &= server->eventLoop(enableEventsProcessing);
692      for(auto client : readerClientIn_) client->eventLoop();
693      for(auto server : readerServerIn_) finished &= server->eventLoop(enableEventsProcessing);
694      for(auto couplerOut : couplerOutClient_) couplerOut.second->eventLoop();
695      for(auto couplerIn : couplerInClient_) couplerIn.second->eventLoop();
696      for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing);
697      for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing);
698    }
699    setCurrent(getId()) ;
700    return finalized && finished ;
701  }
702
703  void CContext::addCouplingChanel(const std::string& fullContextId, bool out)
704  {
705     int contextLeader ;
706     
707     if (out)
708     { 
709       if (couplerOutClient_.find(fullContextId)==couplerOutClient_.end()) 
710       {
711         bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ;
712     
713         MPI_Comm interComm, interCommClient, interCommServer  ;
714         MPI_Comm intraCommClient, intraCommServer ;
715
716         if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;
717
718        MPI_Comm_dup(intraComm_, &intraCommClient) ;
719        MPI_Comm_dup(intraComm_, &intraCommServer) ;
720        MPI_Comm_dup(interComm, &interCommClient) ;
721        MPI_Comm_dup(interComm, &interCommServer) ;
722        CContextClient* client = CContextClient::getNew(this, intraCommClient, interCommClient);
723        CContextServer* server = CContextServer::getNew(this, intraCommServer, interCommServer);
724        client->setAssociatedServer(server) ;
725        server->setAssociatedClient(client) ;
726        MPI_Comm_free(&interComm) ;
727        couplerOutClient_[fullContextId] = client ;
728        couplerOutServer_[fullContextId] = server ;
729      }
730    }
731    else if (couplerInClient_.find(fullContextId)==couplerInClient_.end())
732    {
733      bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ;
734     
735       MPI_Comm interComm, interCommClient, interCommServer  ;
736       MPI_Comm intraCommClient, intraCommServer ;
737
738       if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;
739
740       MPI_Comm_dup(intraComm_, &intraCommClient) ;
741       MPI_Comm_dup(intraComm_, &intraCommServer) ;
742       MPI_Comm_dup(interComm, &interCommServer) ;
743       MPI_Comm_dup(interComm, &interCommClient) ;
744       CContextServer* server = CContextServer::getNew(this, intraCommServer, interCommServer);
745       CContextClient* client = CContextClient::getNew(this, intraCommClient, interCommClient);
746       client->setAssociatedServer(server) ;
747       server->setAssociatedClient(client) ;
748       MPI_Comm_free(&interComm) ;
749
750       couplerInClient_[fullContextId] = client ;
751       couplerInServer_[fullContextId] = server ;       
752    }
753  }
754 
755   void CContext::finalize(void)
756   TRY
757   {
758      registryOut->hierarchicalGatherRegistry() ;
759      if (intraCommRank_==0) CXios::getRegistryManager()->merge(*registryOut) ;
760
761      if (serviceType_==CServicesManager::CLIENT)
762      {
763//ym        doPreTimestepOperationsForEnabledReadModeFiles(); // For now we only use server level 1 to read data
764        triggerLateFields() ;
765
766        // inform couplerIn that I am finished
767        for(auto& couplerInClient : couplerInClient_) sendCouplerInContextFinalized(couplerInClient.second) ;
768
769        // wait until received message from couplerOut that they have finished
770        bool couplersInFinalized ;
771        do
772        {
773          couplersInFinalized=true ;
774          for(auto& couplerOutClient : couplerOutClient_) couplersInFinalized &= isCouplerInContextFinalized(couplerOutClient.second) ; 
775          globalEventLoop() ;
776        } while (!couplersInFinalized) ;
777
778        CContextClient* client ;
779        CContextServer* server ;
780
781        if (writerClientOut_.size()!=0)
782        {
783          client=writerClientOut_[0] ;
784          server=writerServerOut_[0] ;
785
786          info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to writer"<<endl ;
787          client->finalize();
788          info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to writer"<<endl ;
789          while (client->havePendingRequests()) client->eventLoop();
790          info(100)<<"DEBUG: context "<<getId()<<" no pending request on writer ok"<<endl ;
791          bool notifiedFinalized=false ;
792          do
793          {
794            notifiedFinalized = client->isNotifiedFinalized() ;
795          } while (!notifiedFinalized) ;
796
797          server->releaseBuffers();
798          client->releaseBuffers();
799          info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ;
800        }
801
802        if (readerClientOut_.size()!=0)
803        {
804          client=readerClientOut_[0] ;
805          server=readerServerOut_[0] ;
806
807          info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to reader"<<endl ;
808          client->finalize();
809          info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to reader"<<endl ;
810          while (client->havePendingRequests()) client->eventLoop();
811          info(100)<<"DEBUG: context "<<getId()<<" no pending request on reader ok"<<endl ;
812          bool notifiedFinalized=false ;
813         do
814         {
815            notifiedFinalized = client->isNotifiedFinalized() ;
816         }   while (!notifiedFinalized) ;
817
818          server->releaseBuffers();
819          client->releaseBuffers();
820          info(100)<<"DEBUG: context "<<getId()<<" release client reader ok"<<endl ;
821        }
822      }
823      else if (serviceType_==CServicesManager::GATHERER)
824      {
825         for(auto& client : writerClientOut_)
826         {
827           client->finalize();
828           bool bufferReleased;
829           do
830           {
831             client->eventLoop();
832             bufferReleased = !client->havePendingRequests();
833           } while (!bufferReleased);
834           
835           bool notifiedFinalized=false ;
836           do
837           {
838             notifiedFinalized=client->isNotifiedFinalized() ;
839           } while (!notifiedFinalized) ;
840           client->releaseBuffers();
841         }
842         closeAllFile();
843         //ym writerClientIn & writerServerIn not released here ==> to check !!
844      }
845      else if (serviceType_==CServicesManager::WRITER)
846      {
847        closeAllFile();
848        writerClientIn_[0]->releaseBuffers();
849        writerServerIn_[0]->releaseBuffers();
850      }
851      else if (serviceType_==CServicesManager::READER)
852      {
853        closeAllFile();
854        readerClientIn_[0]->releaseBuffers();
855        readerServerIn_[0]->releaseBuffers();
856      }
857
858      freeComms() ;
859       
860      parentServerContext_->freeComm() ;
861      finalized = true;
862      info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl;
863   }
864   CATCH_DUMP_ATTR
865
866   //! Free internally allocated communicators
867   void CContext::freeComms(void)
868   TRY
869   {
870     for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
871       MPI_Comm_free(&(*it));
872     comms.clear();
873   }
874   CATCH_DUMP_ATTR
875
876   
877   /*!
878   \brief Close all the context defintion and do processing data
879      After everything is well defined on client side, they will be processed and sent to server
880   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
881   all necessary information to server, from which each server can build its own database.
882   Because the role of server is to write out field data on a specific netcdf file,
883   the only information that it needs is the enabled files
884   and the active fields (fields will be written onto active files)
885   */
886  void CContext::closeDefinition(void)
887   TRY
888   {
889     CTimer::get("Context : close definition").resume() ;
890     
891     // create intercommunicator with servers.
892     // not sure it is the good place to be called here
893     createServerInterComm() ;
894
895
896     // After xml is parsed, there are some more works with post processing
897//     postProcessing();
898
899   
900    // Make sure the calendar was correctly created
901    if (serviceType_!=CServicesManager::CLIENT) CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
902    if (!calendar)
903      ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
904    else if (calendar->getTimeStep() == NoneDu)
905      ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
906    // Calendar first update to set the current date equals to the start date
907    calendar->update(0);
908
909    // Résolution des héritages descendants (càd des héritages de groupes)
910    // pour chacun des contextes.
911    solveDescInheritance(true);
912 
913    // Check if some axis, domains or grids are eligible to for compressed indexed output.
914    // Warning: This must be done after solving the inheritance and before the rest of post-processing
915    // --> later ????    checkAxisDomainsGridsEligibilityForCompressedOutput();     
916
917      // Check if some automatic time series should be generated
918      // Warning: This must be done after solving the inheritance and before the rest of post-processing     
919
920    // The timeseries should only be prepared in client
921    prepareTimeseries();
922
923    //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
924    findEnabledFiles();
925
926    // Solve inheritance for field to know if enabled or not.
927    for (auto field : CField::getAll()) field->solveRefInheritance();
928
929    findEnabledWriteModeFiles();
930    findEnabledReadModeFiles();
931    findEnabledCouplerIn();
932    findEnabledCouplerOut();
933    createCouplerInterCommunicator() ;
934
935    // Find all enabled fields of each file     
936    vector<CField*>&& fileOutField = findAllEnabledFieldsInFileOut(this->enabledWriteModeFiles);
937    vector<CField*>&& fileInField = findAllEnabledFieldsInFileIn(this->enabledReadModeFiles);
938    vector<CField*>&& couplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut);
939    vector<CField*>&& couplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn);
940    findFieldsWithReadAccess();
941    vector<CField*>& fieldWithReadAccess = fieldsWithReadAccess_ ;
942    vector<CField*> fieldModelIn ; // fields potentially from model
943
944    // define if files are on clientSied or serverSide
945    if (serviceType_==CServicesManager::CLIENT)
946    {
947      for (auto& file : enabledWriteModeFiles) file->setClientSide() ;
948      for (auto& file : enabledReadModeFiles) file->setClientSide() ;
949    }
950    else
951    {
952      for (auto& file : enabledWriteModeFiles) file->setServerSide() ;
953      for (auto& file : enabledReadModeFiles) file->setServerSide() ;
954    }
955
956   
957    for (auto& field : couplerInField)
958    {
959      field->unsetGridCompleted() ;
960    }
961// find all field potentially at workflow end
962    vector<CField*> endWorkflowFields ;
963    endWorkflowFields.reserve(fileOutField.size()+couplerOutField.size()+fieldWithReadAccess.size()) ;
964    endWorkflowFields.insert(endWorkflowFields.end(),fileOutField.begin(), fileOutField.end()) ;
965    endWorkflowFields.insert(endWorkflowFields.end(),couplerOutField.begin(), couplerOutField.end()) ;
966    endWorkflowFields.insert(endWorkflowFields.end(),fieldWithReadAccess.begin(), fieldWithReadAccess.end()) ;
967
968    bool workflowGraphIsCompleted ;
969   
970    bool first=true ;
971   
972    do
973    {
974      workflowGraphIsCompleted=true; 
975      for(auto endWorkflowField : endWorkflowFields) 
976      {
977        workflowGraphIsCompleted &= endWorkflowField->buildWorkflowGraph(garbageCollector) ;
978      }
979   
980      for(auto couplerIn : enabledCouplerIn) couplerIn->assignContext() ;
981      for(auto field : couplerInField) field->makeGridAliasForCoupling();
982      for(auto field : couplerInField) this->sendCouplerInReady(field->getContextClient()) ;
983   
984
985      // assign context to coupler out and related fields
986      for(auto couplerOut : enabledCouplerOut) couplerOut->assignContext() ;
987      // for now supose that all coupling out endpoint are succesfull. The difficultie is client/server buffer evaluation
988      for(auto field : couplerOutField) 
989      {
990        // connect to couplerOut -> to do
991      }
992
993      bool couplersReady ;
994      do 
995      {
996        couplersReady=true ;
997        for(auto field : couplerOutField)
998        {
999          bool ready = isCouplerInReady(field->getContextClient()) ; 
1000          if (ready) field->sendFieldToCouplerOut() ;
1001          couplersReady &= ready ;
1002        }
1003        this->scheduledEventLoop() ;
1004
1005      } while (!couplersReady) ;
1006     
1007      first=false ;
1008      this->scheduledEventLoop() ;
1009
1010    } while (!workflowGraphIsCompleted) ;
1011
1012
1013    for( auto field : couplerInField) couplerInFields_.push_back(field) ;
1014
1015    // get all field coming potentially from model
1016    for (auto field : CField::getAll() ) if (field->getModelIn()) fieldModelIn.push_back(field) ;
1017
1018    // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields
1019    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles);
1020    //else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ;
1021
1022    // client side, assign context for file reading
1023    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ;
1024   
1025    // server side, assign context where to send file data read
1026    if (serviceType_==CServicesManager::READER) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientIn_[0]) ;
1027   
1028
1029    // workflow startpoint => data from server on client side
1030    // important : sendFieldToInputFileServer must be done prior sendFieldToFileServer because for the first case the grid remoteConnectorIn
1031    //             and grid remoteConnectorOut will be computed, and in the second case only the remoteConnectorOut.
1032    if (serviceType_==CServicesManager::CLIENT)
1033    {
1034      for(auto field : fileInField) 
1035      {
1036        field->sendFieldToInputFileServer() ;
1037        field->connectToServerInput(garbageCollector) ; // connect the field to server filter
1038        fileInFields_.push_back(field) ;
1039      }
1040    }
1041
1042    // workflow endpoint => sent to IO/SERVER
1043    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER)
1044    {
1045      for(auto field : fileOutField) 
1046      {
1047        field->connectToFileServer(garbageCollector) ; // connect the field to server filter
1048      }
1049      for(auto field : fileOutField) field->sendFieldToFileServer() ;
1050    }
1051
1052    // workflow endpoint => write to file
1053    if (serviceType_==CServicesManager::WRITER)
1054    {
1055      for(auto field : fileOutField) 
1056      {
1057        field->connectToFileWriter(garbageCollector) ; // connect the field to server filter
1058      }
1059    }
1060   
1061    // workflow endpoint => Send data from server to client
1062    if (serviceType_==CServicesManager::READER || serviceType_==CServicesManager::GATHERER)
1063    {
1064      for(auto field : fileInField) 
1065      {
1066        field->connectToServerToClient(garbageCollector) ;
1067      }
1068    }
1069
1070    // workflow endpoint => sent to model on client side
1071    if (serviceType_==CServicesManager::CLIENT)
1072    {
1073      for(auto field : fieldWithReadAccess) field->connectToModelOutput(garbageCollector) ;
1074    }
1075
1076
1077    // workflow startpoint => data from model
1078    if (serviceType_==CServicesManager::CLIENT)
1079    {
1080      for(auto field : fieldModelIn) 
1081      {
1082        field->connectToModelInput(garbageCollector) ; // connect the field to server filter
1083        // grid index will be computed on the fly
1084      }
1085    }
1086   
1087    // workflow startpoint => data from client on server side
1088    if (serviceType_==CServicesManager::WRITER || serviceType_==CServicesManager::GATHERER)
1089    {
1090      for(auto field : fieldModelIn) 
1091      {
1092        field->connectToClientInput(garbageCollector) ; // connect the field to server filter
1093      }
1094    }
1095
1096   
1097    for(auto field : couplerInField) 
1098    {
1099      field->connectToCouplerIn(garbageCollector) ; // connect the field to server filter
1100    }
1101   
1102   
1103    for(auto field : couplerOutField) 
1104    {
1105      field->connectToCouplerOut(garbageCollector) ; // for now the same kind of filter that for file server
1106    }
1107
1108    // workflow startpoint => data read from file on server side
1109    if (serviceType_==CServicesManager::READER)
1110    {
1111      for(auto field : fileInField) 
1112      {
1113        field->connectToFileReader(garbageCollector) ;
1114      }
1115    }
1116   
1117    // construct slave server list
1118    map<string, CContextClient*> slaves ; // need an ordered list ;
1119    if (serviceType_==CServicesManager::CLIENT) 
1120    {
1121      for(auto field : fileOutField) slaves[clientsId_[field->getContextClient()]] = field->getContextClient() ; 
1122      for(auto field : fileInField) slaves[clientsId_[field->getContextClient()]] = field->getContextClient() ; 
1123    }
1124    else if (serviceType_==CServicesManager::GATHERER) 
1125      for(auto field : fileOutField) slaves[clientsId_[field->getContextClient()]] = field->getContextClient() ;
1126    for(auto& slave : slaves) slaveServers_.push_back(slave.second) ;   
1127
1128
1129    for(auto& slaveServer : slaveServers_) sendCloseDefinition(slaveServer) ;
1130
1131    if (serviceType_==CServicesManager::WRITER) 
1132    {
1133      createFileHeader();
1134    }
1135
1136    if (serviceType_==CServicesManager::CLIENT) startPrefetchingOfEnabledReadModeFiles();
1137   
1138    // send signal to couplerIn context that definition phasis is done
1139
1140    for(auto& couplerInClient : couplerInClient_) sendCouplerInCloseDefinition(couplerInClient.second) ;
1141
1142    // wait until all couplerIn signal that closeDefition is done.
1143    bool ok;
1144    do
1145    {
1146      ok = true ;
1147      for(auto& couplerOutClient : couplerOutClient_) ok &= isCouplerInCloseDefinition(couplerOutClient.second) ;
1148      this->scheduledEventLoop() ;
1149    } while (!ok) ;
1150
1151    // Now evaluate the size of the context client buffers
1152    map<CContextClient*,map<int,size_t>> fieldBufferEvaluation ;
1153    for(auto field : fileOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to server
1154    for(auto field : couplerOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to coupler
1155    for(auto field : fileInField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // server to client (for io servers)
1156   
1157    // fix size for each context client
1158    for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ;
1159
1160
1161     CTimer::get("Context : close definition").suspend() ;
1162  }
1163  CATCH_DUMP_ATTR
1164
1165
1166  vector<CField*> CContext::findAllEnabledFieldsInFileOut(const std::vector<CFile*>& activeFiles)
1167   TRY
1168   {
1169     vector<CField*> fields ;
1170     for(auto file : activeFiles)
1171     {
1172        const vector<CField*>&& fieldList=file->getEnabledFields() ;
1173        for(auto field : fieldList) field->setFileOut(file) ;
1174        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1175     }
1176     return fields ;
1177   }
1178   CATCH_DUMP_ATTR
1179
1180   vector<CField*> CContext::findAllEnabledFieldsInFileIn(const std::vector<CFile*>& activeFiles)
1181   TRY
1182   {
1183     vector<CField*> fields ;
1184     for(auto file : activeFiles)
1185     {
1186        const vector<CField*>&& fieldList=file->getEnabledFields() ;
1187        for(auto field : fieldList) field->setFileIn(file) ;
1188        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1189     }
1190     return fields ;
1191   }
1192   CATCH_DUMP_ATTR
1193
1194   vector<CField*> CContext::findAllEnabledFieldsCouplerOut(const std::vector<CCouplerOut*>& activeCouplerOut)
1195   TRY
1196   {
1197     vector<CField*> fields ;
1198     for (auto couplerOut :activeCouplerOut)
1199     {
1200        const vector<CField*>&& fieldList=couplerOut->getEnabledFields() ;
1201        for(auto field : fieldList) field->setCouplerOut(couplerOut) ;
1202        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1203     }
1204     return fields ;
1205   }
1206   CATCH_DUMP_ATTR
1207
1208   vector<CField*> CContext::findAllEnabledFieldsCouplerIn(const std::vector<CCouplerIn*>& activeCouplerIn)
1209   TRY
1210   {
1211     vector<CField*> fields ;
1212     for (auto couplerIn :activeCouplerIn)
1213     {
1214        const vector<CField*>&& fieldList=couplerIn->getEnabledFields() ;
1215        for(auto field : fieldList) field->setCouplerIn(couplerIn) ;
1216        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1217     }
1218     return fields ;
1219   }
1220   CATCH_DUMP_ATTR
1221
1222 /*!
1223  * Send context attribute and calendar to file server, it must be done once by context file server
1224  * \param[in] client : context client to send   
1225  */ 
1226  void CContext::sendContextToFileServer(CContextClient* client)
1227  {
1228    if (sendToFileServer_done_.count(client)!=0) return ;
1229    else sendToFileServer_done_.insert(client) ;
1230   
1231    this->sendAllAttributesToServer(client); // Send all attributes of current context to server
1232    CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(client); // Send all attributes of current cale
1233  }
1234
1235 
1236   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
1237   TRY
1238   {
1239      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
1240        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
1241   }
1242   CATCH_DUMP_ATTR
1243
1244
1245   void CContext::postProcessFilterGraph()
1246   TRY
1247   {
1248     int size = enabledFiles.size();
1249     for (int i = 0; i < size; ++i)
1250     {
1251        enabledFiles[i]->postProcessFilterGraph();
1252     }
1253   }
1254   CATCH_DUMP_ATTR
1255
1256   void CContext::startPrefetchingOfEnabledReadModeFiles()
1257   TRY
1258   {
1259     int size = enabledReadModeFiles.size();
1260     for (int i = 0; i < size; ++i)
1261     {
1262        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
1263     }
1264   }
1265   CATCH_DUMP_ATTR
1266
1267   void CContext::doPreTimestepOperationsForEnabledReadModeFiles()
1268   TRY
1269   {
1270     int size = enabledReadModeFiles.size();
1271     for (int i = 0; i < size; ++i)
1272     {
1273        enabledReadModeFiles[i]->doPreTimestepOperationsForEnabledReadModeFields();
1274     }
1275   }
1276   CATCH_DUMP_ATTR
1277
1278   void CContext::doPostTimestepOperationsForEnabledReadModeFiles()
1279   TRY
1280   {
1281     int size = enabledReadModeFiles.size();
1282     for (int i = 0; i < size; ++i)
1283     {
1284        enabledReadModeFiles[i]->doPostTimestepOperationsForEnabledReadModeFields();
1285     }
1286   }
1287   CATCH_DUMP_ATTR
1288
1289  void CContext::findFieldsWithReadAccess(void)
1290  TRY
1291  {
1292    fieldsWithReadAccess_.clear();
1293    const vector<CField*> allFields = CField::getAll();
1294    for (size_t i = 0; i < allFields.size(); ++i)
1295    {
1296      CField* field = allFields[i];
1297      if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
1298      {
1299        fieldsWithReadAccess_.push_back(field);
1300        field->setModelOut() ;
1301      }
1302    }
1303  }
1304  CATCH_DUMP_ATTR
1305
1306 
1307   void CContext::solveAllInheritance(bool apply)
1308   TRY
1309   {
1310     // Résolution des héritages descendants (càd des héritages de groupes)
1311     // pour chacun des contextes.
1312      solveDescInheritance(apply);
1313
1314     // Résolution des héritages par référence au niveau des fichiers.
1315      const vector<CFile*> allFiles=CFile::getAll();
1316      const vector<CCouplerIn*> allCouplerIn=CCouplerIn::getAll();
1317      const vector<CCouplerOut*> allCouplerOut=CCouplerOut::getAll();
1318      const vector<CGrid*> allGrids= CGrid::getAll();
1319
1320      if (serviceType_==CServicesManager::CLIENT)
1321      {
1322        for (unsigned int i = 0; i < allFiles.size(); i++)
1323          allFiles[i]->solveFieldRefInheritance(apply);
1324
1325        for (unsigned int i = 0; i < allCouplerIn.size(); i++)
1326          allCouplerIn[i]->solveFieldRefInheritance(apply);
1327
1328        for (unsigned int i = 0; i < allCouplerOut.size(); i++)
1329          allCouplerOut[i]->solveFieldRefInheritance(apply);
1330      }
1331
1332      unsigned int vecSize = allGrids.size();
1333      unsigned int i = 0;
1334      for (i = 0; i < vecSize; ++i)
1335        allGrids[i]->solveElementsRefInheritance(apply);
1336
1337   }
1338  CATCH_DUMP_ATTR
1339
1340   void CContext::findEnabledFiles(void)
1341   TRY
1342   {
1343      const std::vector<CFile*> allFiles = CFile::getAll();
1344      const CDate& initDate = calendar->getInitDate();
1345
1346      for (unsigned int i = 0; i < allFiles.size(); i++)
1347         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
1348         {
1349            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
1350            {
1351              if (allFiles[i]->output_freq.isEmpty())
1352              {
1353                 ERROR("CContext::findEnabledFiles()",
1354                     << "Mandatory attribute output_freq must be defined for file \""<<allFiles[i]->getFileOutputName()
1355                     <<" \".")
1356              }
1357              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
1358              {
1359                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
1360                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
1361                    <<"\" is less than the time step. File will not be written."<<endl;
1362              }
1363              else
1364               enabledFiles.push_back(allFiles[i]);
1365            }
1366            else // Si l'attribut 'enabled' est fixé à faux.
1367            {
1368              // disabled all fields contained in file (used in findFieldsWithReadAccess through field_ref dependencies)
1369              const vector<CField*>&& fieldList=allFiles[i]->getEnabledFields() ;
1370              for(auto field : fieldList) field->enabled.setValue(false);
1371            }
1372         }
1373         else
1374         {
1375           if (allFiles[i]->output_freq.isEmpty())
1376           {
1377              ERROR("CContext::findEnabledFiles()",
1378                  << "Mandatory attribute output_freq must be defined for file \""<<allFiles[i]->getFileOutputName()
1379                  <<" \".")
1380           }
1381           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
1382           {
1383             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
1384                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
1385                 <<"\" is less than the time step. File will not be written."<<endl;
1386           }
1387           else
1388             enabledFiles.push_back(allFiles[i]); // otherwise true by default
1389         }
1390
1391      if (enabledFiles.size() == 0)
1392         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
1393               << getId() << "\" !");
1394
1395   }
1396   CATCH_DUMP_ATTR
1397
1398   void CContext::findEnabledCouplerIn(void)
1399   TRY
1400   {
1401      const std::vector<CCouplerIn*> allCouplerIn = CCouplerIn::getAll();
1402      bool enabled ;
1403      for (size_t i = 0; i < allCouplerIn.size(); i++)
1404      {
1405        if (allCouplerIn[i]->enabled.isEmpty()) enabled=true ;
1406        else enabled=allCouplerIn[i]->enabled ;
1407        if (enabled) enabledCouplerIn.push_back(allCouplerIn[i]) ;
1408      }
1409   }
1410   CATCH_DUMP_ATTR
1411
1412   void CContext::findEnabledCouplerOut(void)
1413   TRY
1414   {
1415      const std::vector<CCouplerOut*> allCouplerOut = CCouplerOut::getAll();
1416      bool enabled ;
1417      for (size_t i = 0; i < allCouplerOut.size(); i++)
1418      {
1419        if (allCouplerOut[i]->enabled.isEmpty()) enabled=true ;
1420        else enabled=allCouplerOut[i]->enabled ;
1421        if (enabled) enabledCouplerOut.push_back(allCouplerOut[i]) ;
1422      }
1423   }
1424   CATCH_DUMP_ATTR
1425
1426
1427
1428
1429   void CContext::distributeFiles(const vector<CFile*>& files)
1430   TRY
1431   {
1432     bool distFileMemory=false ;
1433     distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory);
1434
1435     int nbPools = writerClientOut_.size();
1436     if (nbPools==1) distributeFileOverOne(files) ;
1437     else if (distFileMemory) distributeFileOverMemoryBandwith(files) ;
1438     else distributeFileOverBandwith(files) ;
1439   }
1440   CATCH_DUMP_ATTR
1441
1442   void CContext::distributeFileOverOne(const vector<CFile*>& files)
1443   TRY
1444   {
1445    for(auto& file : files) file->setContextClient(writerClientOut_[0]) ;
1446   }
1447   CATCH_DUMP_ATTR
1448
1449   void CContext::distributeFileOverBandwith(const vector<CFile*>& files)
1450   TRY
1451   {
1452     double eps=std::numeric_limits<double>::epsilon()*10 ;
1453     
1454     std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out);
1455     int nbPools = writerClientOut_.size();
1456
1457     // (1) Find all enabled files in write mode
1458     // for (int i = 0; i < this->enabledFiles.size(); ++i)
1459     // {
1460     //   if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write ))
1461     //    enabledWriteModeFiles.push_back(enabledFiles[i]);
1462     // }
1463
1464     // (2) Estimate the data volume for each file
1465     int size = files.size();
1466     std::vector<std::pair<double, CFile*> > dataSizeMap;
1467     double dataPerPool = 0;
1468     int nfield=0 ;
1469     ofs<<size<<endl ;
1470     for (size_t i = 0; i < size; ++i)
1471     {
1472       CFile* file = files[i];
1473       ofs<<file->getId()<<endl ;
1474       StdSize dataSize=0;
1475       std::vector<CField*> enabledFields = file->getEnabledFields();
1476       size_t numEnabledFields = enabledFields.size();
1477       ofs<<numEnabledFields<<endl ;
1478       for (size_t j = 0; j < numEnabledFields; ++j)
1479       {
1480         dataSize += enabledFields[j]->getGlobalWrittenSize() ;
1481         ofs<<enabledFields[j]->getGrid()->getId()<<endl ;
1482         ofs<<enabledFields[j]->getGlobalWrittenSize()<<endl ;
1483       }
1484       double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ;
1485       double dataSizeSec= dataSize/ outFreqSec;
1486       ofs<<dataSizeSec<<endl ;
1487       nfield++ ;
1488// add epsilon*nField to dataSizeSec in order to  preserve reproductive ordering when sorting
1489       dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file));
1490       dataPerPool += dataSizeSec;
1491     }
1492     dataPerPool /= nbPools;
1493     std::sort(dataSizeMap.begin(), dataSizeMap.end());
1494
1495     // (3) Assign contextClient to each enabled file
1496
1497     std::multimap<double,int> poolDataSize ;
1498// multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11
1499
1500     int j;
1501     double dataSize ;
1502     for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ; 
1503             
1504     for (int i = dataSizeMap.size()-1; i >= 0; --i)
1505     {
1506       dataSize=(*poolDataSize.begin()).first ;
1507       j=(*poolDataSize.begin()).second ;
1508       dataSizeMap[i].second->setContextClient(writerClientOut_[j]);
1509       dataSize+=dataSizeMap[i].first;
1510       poolDataSize.erase(poolDataSize.begin()) ;
1511       poolDataSize.insert(std::pair<double,int>(dataSize,j)) ; 
1512     }
1513
1514     for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) info(30)<<"Load Balancing for servers (perfect=1) : "<<it->second<<" :  ratio "<<it->first*1./dataPerPool<<endl ;
1515   }
1516   CATCH_DUMP_ATTR
1517
1518   void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList)
1519   TRY
1520   {
1521     int nbPools = writerClientOut_.size();
1522     double ratio=0.5 ;
1523     ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio);
1524
1525     int nFiles = filesList.size();
1526     vector<SDistFile> files(nFiles);
1527     vector<SDistGrid> grids;
1528     map<string,int> gridMap ;
1529     string gridId; 
1530     int gridIndex=0 ;
1531
1532     for (size_t i = 0; i < nFiles; ++i)
1533     {
1534       StdSize dataSize=0;
1535       CFile* file = filesList[i];
1536       std::vector<CField*> enabledFields = file->getEnabledFields();
1537       size_t numEnabledFields = enabledFields.size();
1538
1539       files[i].id_=file->getId() ;
1540       files[i].nbGrids_=numEnabledFields;
1541       files[i].assignedGrid_ = new int[files[i].nbGrids_] ;
1542         
1543       for (size_t j = 0; j < numEnabledFields; ++j)
1544       {
1545         gridId=enabledFields[j]->getGrid()->getId() ;
1546         if (gridMap.find(gridId)==gridMap.end())
1547         {
1548            gridMap[gridId]=gridIndex  ;
1549            SDistGrid newGrid; 
1550            grids.push_back(newGrid) ;
1551            gridIndex++ ;
1552         }
1553         files[i].assignedGrid_[j]=gridMap[gridId] ;
1554         grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ;
1555         dataSize += enabledFields[j]->getGlobalWrittenSize() ; // usefull
1556       }
1557       double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ;
1558       files[i].bandwith_= dataSize/ outFreqSec ;
1559     }
1560
1561     double bandwith=0 ;
1562     double memory=0 ;
1563   
1564     for(int i=0; i<nFiles; i++)  bandwith+=files[i].bandwith_ ;
1565     for(int i=0; i<nFiles; i++)  files[i].bandwith_ = files[i].bandwith_/bandwith * ratio ;
1566
1567     for(int i=0; i<grids.size(); i++)  memory+=grids[i].size_ ;
1568     for(int i=0; i<grids.size(); i++)  grids[i].size_ = grids[i].size_ / memory * (1.0-ratio) ;
1569       
1570     distributeFileOverServer2(nbPools, grids.size(), &grids[0], nFiles, &files[0]) ;
1571
1572     vector<double> memorySize(nbPools,0.) ;
1573     vector< set<int> > serverGrids(nbPools) ;
1574     vector<double> bandwithSize(nbPools,0.) ;
1575       
1576     for (size_t i = 0; i < nFiles; ++i)
1577     {
1578       bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ;
1579       for(int j=0 ; j<files[i].nbGrids_;j++)
1580       {
1581         if (serverGrids[files[i].assignedServer_].find(files[i].assignedGrid_[j]) == serverGrids[files[i].assignedServer_].end())
1582         {
1583           memorySize[files[i].assignedServer_]+= grids[files[i].assignedGrid_[j]].size_ * memory / (1.0-ratio);
1584           serverGrids[files[i].assignedServer_].insert(files[i].assignedGrid_[j]) ;
1585         }
1586       }
1587       filesList[i]->setContextClient(writerClientOut_[files[i].assignedServer_]) ;
1588       delete [] files[i].assignedGrid_ ;
1589     }
1590
1591     for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<"   assigned file bandwith "<<bandwithSize[i]*86400.*4./1024/1024.<<" Mb / days"<<endl ;
1592     for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<"   assigned grid memory "<<memorySize[i]*100/1024./1024.<<" Mb"<<endl ;
1593
1594   }
1595   CATCH_DUMP_ATTR
1596
1597   /*!
1598      Find all files in write mode
1599   */
1600   void CContext::findEnabledWriteModeFiles(void)
1601   TRY
1602   {
1603     int size = this->enabledFiles.size();
1604     for (int i = 0; i < size; ++i)
1605     {
1606       if (enabledFiles[i]->mode.isEmpty() || 
1607          (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write ))
1608        enabledWriteModeFiles.push_back(enabledFiles[i]);
1609     }
1610   }
1611   CATCH_DUMP_ATTR
1612
1613   /*!
1614      Find all files in read mode
1615   */
1616   void CContext::findEnabledReadModeFiles(void)
1617   TRY
1618   {
1619     int size = this->enabledFiles.size();
1620     for (int i = 0; i < size; ++i)
1621     {
1622       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
1623        enabledReadModeFiles.push_back(enabledFiles[i]);
1624     }
1625   }
1626   CATCH_DUMP_ATTR
1627
1628   void CContext::closeAllFile(void)
1629   TRY
1630   {
1631     std::vector<CFile*>::const_iterator
1632            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
1633
1634     for (; it != end; it++)
1635     {
1636       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
1637       (*it)->close();
1638     }
1639   }
1640   CATCH_DUMP_ATTR
1641
1642   /*!
1643   \brief Dispatch event received from client
1644      Whenever a message is received in buffer of server, it will be processed depending on
1645   its event type. A new event type should be added in the switch list to make sure
1646   it processed on server side.
1647   \param [in] event: Received message
1648   */
1649   bool CContext::dispatchEvent(CEventServer& event)
1650   TRY
1651   {
1652
1653      if (SuperClass::dispatchEvent(event)) return true;
1654      else
1655      {
1656        switch(event.type)
1657        {
1658           case EVENT_ID_CLOSE_DEFINITION :
1659             recvCloseDefinition(event);
1660             return true;
1661             break;
1662           case EVENT_ID_UPDATE_CALENDAR:
1663             recvUpdateCalendar(event);
1664             return true;
1665             break;
1666           case EVENT_ID_COUPLER_IN_READY:
1667             recvCouplerInReady(event);
1668             return true;
1669             break;
1670           case EVENT_ID_COUPLER_IN_CLOSE_DEFINITION:
1671             recvCouplerInCloseDefinition(event);
1672             return true;
1673             break;
1674           case EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED:
1675             recvCouplerInContextFinalized(event);
1676             return true;
1677             break; 
1678           default :
1679             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
1680                    <<"Unknown Event");
1681           return false;
1682         }
1683      }
1684   }
1685   CATCH
1686
1687   //  ! Client side: Send a message to server to make it close
1688   void CContext::sendCloseDefinition(CContextClient* client)
1689   TRY
1690   {
1691      if (sendCloseDefinition_done_.count(client)!=0) return ;
1692      else sendCloseDefinition_done_.insert(client) ;
1693
1694      CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
1695      if (client->isServerLeader())
1696      {
1697        CMessage msg;
1698        for(auto rank : client->getRanksServerLeader()) event.push(rank,1,msg);
1699        client->sendEvent(event);
1700      }
1701     else client->sendEvent(event);
1702   }
1703   CATCH_DUMP_ATTR
1704
1705   //! Server side: Receive a message of client announcing a context close
1706   void CContext::recvCloseDefinition(CEventServer& event)
1707   TRY
1708   {
1709      CBufferIn* buffer=event.subEvents.begin()->buffer;
1710      getCurrent()->closeDefinition();
1711   }
1712   CATCH
1713
1714   //! Client side: Send a message to update calendar in each time step
1715   void CContext::sendUpdateCalendar(int step)
1716   TRY
1717   {
1718     for(auto client : slaveServers_) 
1719     {
1720       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
1721       if (client->isServerLeader())
1722       {
1723         CMessage msg;
1724         msg<<step;
1725         for (auto& rank : client->getRanksServerLeader() ) event.push(rank,1,msg);
1726         client->sendEvent(event);
1727       }
1728       else client->sendEvent(event);
1729     }
1730   }
1731   CATCH_DUMP_ATTR
1732
1733   //! Server side: Receive a message of client annoucing calendar update
1734   void CContext::recvUpdateCalendar(CEventServer& event)
1735   TRY
1736   {
1737      CBufferIn* buffer=event.subEvents.begin()->buffer;
1738      getCurrent()->recvUpdateCalendar(*buffer);
1739   }
1740   CATCH
1741
1742   //! Server side: Receive a message of client annoucing calendar update
1743   void CContext::recvUpdateCalendar(CBufferIn& buffer)
1744   TRY
1745   {
1746      int step;
1747      buffer>>step;
1748      updateCalendar(step);
1749      if (serviceType_==CServicesManager::GATHERER)
1750      {       
1751        sendUpdateCalendar(step);
1752      }
1753   }
1754   CATCH_DUMP_ATTR
1755
1756
1757   void CContext::createCouplerInterCommunicator(void)
1758   TRY
1759   {
1760      int rank=this->getIntraCommRank() ;
1761      map<string,list<CCouplerOut*>> listCouplerOut ; 
1762      map<string,list<CCouplerIn*>> listCouplerIn ; 
1763
1764      for(auto couplerOut : enabledCouplerOut) listCouplerOut[couplerOut->getCouplingContextId()].push_back(couplerOut) ;
1765      for(auto couplerIn : enabledCouplerIn) listCouplerIn[couplerIn->getCouplingContextId()].push_back(couplerIn) ;
1766
1767      CCouplerManager* couplerManager = CXios::getCouplerManager() ;
1768      if (rank==0)
1769      {
1770        for(auto couplerOut : listCouplerOut) couplerManager->registerCoupling(this->getContextId(),couplerOut.first) ;
1771        for(auto couplerIn : listCouplerIn) couplerManager->registerCoupling(couplerIn.first,this->getContextId()) ;
1772      }
1773
1774      do
1775      {
1776        for(auto couplerOut : listCouplerOut) 
1777        {
1778          bool isNextCoupling ;
1779          if (rank==0) isNextCoupling = couplerManager->isNextCoupling(this->getContextId(),couplerOut.first) ;
1780          MPI_Bcast(&isNextCoupling,1,MPI_C_BOOL, 0, getIntraComm()) ; 
1781          if (isNextCoupling) 
1782          {
1783            addCouplingChanel(couplerOut.first, true) ;
1784            listCouplerOut.erase(couplerOut.first) ;
1785            break ;
1786          }           
1787        }
1788        for(auto couplerIn : listCouplerIn) 
1789        {
1790          bool isNextCoupling ;
1791          if (rank==0) isNextCoupling = couplerManager->isNextCoupling(couplerIn.first,this->getContextId());
1792          MPI_Bcast(&isNextCoupling,1,MPI_C_BOOL, 0, getIntraComm()) ; 
1793          if (isNextCoupling) 
1794          {
1795            addCouplingChanel(couplerIn.first, false) ;
1796            listCouplerIn.erase(couplerIn.first) ;
1797            break ;
1798          }           
1799        }
1800
1801      } while (!listCouplerOut.empty() || !listCouplerIn.empty()) ;
1802
1803   }
1804   CATCH_DUMP_ATTR
1805
1806 
1807     //! Client side: Send infomation of active files (files are enabled to write out)
1808   void CContext::sendEnabledFiles(const std::vector<CFile*>& activeFiles)
1809   TRY
1810   {
1811     int size = activeFiles.size();
1812
1813     // In a context, each type has a root definition, e.g: axis, domain, field.
1814     // Every object must be a child of one of these root definition. In this case
1815     // all new file objects created on server must be children of the root "file_definition"
1816     StdString fileDefRoot("file_definition");
1817     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
1818
1819     for (int i = 0; i < size; ++i)
1820     {
1821       CFile* f = activeFiles[i];
1822       cfgrpPtr->sendCreateChild(f->getId(),f->getContextClient());
1823       f->sendAllAttributesToServer(f->getContextClient());
1824       f->sendAddAllVariables(f->getContextClient());
1825     }
1826   }
1827   CATCH_DUMP_ATTR
1828
1829   //! Client side: Send information of active fields (ones are written onto files)
1830   void CContext::sendEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)
1831   TRY
1832   {
1833     int size = activeFiles.size();
1834     for (int i = 0; i < size; ++i)
1835     {
1836       activeFiles[i]->sendEnabledFields(activeFiles[i]->getContextClient());
1837     }
1838   }
1839   CATCH_DUMP_ATTR
1840
1841 
1842   //! Client side: Prepare the timeseries by adding the necessary files
1843   void CContext::prepareTimeseries()
1844   TRY
1845   {
1846     const std::vector<CFile*> allFiles = CFile::getAll();
1847     for (size_t i = 0; i < allFiles.size(); i++)
1848     {
1849       CFile* file = allFiles[i];
1850
1851       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1852       for (size_t k = 0; k < vars.size(); k++)
1853       {
1854         CVariable* var = vars[k];
1855
1856         if (var->ts_target.isEmpty()
1857              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1858           fileVars.push_back(var);
1859
1860         if (!var->ts_target.isEmpty()
1861              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1862           fieldVars.push_back(var);
1863       }
1864
1865       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1866       {
1867         StdString fileNameStr("%file_name%") ;
1868         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1869         
1870         StdString fileName=file->getFileOutputName();
1871         size_t pos=tsPrefix.find(fileNameStr) ;
1872         while (pos!=std::string::npos)
1873         {
1874           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1875           pos=tsPrefix.find(fileNameStr) ;
1876         }
1877       
1878         const std::vector<CField*> allFields = file->getAllFields();
1879         for (size_t j = 0; j < allFields.size(); j++)
1880         {
1881           CField* field = allFields[j];
1882
1883           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1884           {
1885             CFile* tsFile = CFile::create();
1886             tsFile->duplicateAttributes(file);
1887
1888             // Add variables originating from file and targeted to timeserie file
1889             for (size_t k = 0; k < fileVars.size(); k++)
1890               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1891
1892           
1893             tsFile->name = tsPrefix + "_";
1894             if (!field->name.isEmpty())
1895               tsFile->name.get() += field->name;
1896             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1897               tsFile->name.get() += field->field_ref;
1898             else
1899               tsFile->name.get() += field->getId();
1900
1901             if (!field->ts_split_freq.isEmpty())
1902               tsFile->split_freq = field->ts_split_freq;
1903
1904             CField* tsField = tsFile->addField();
1905             tsField->field_ref = field->getId();
1906
1907             // Add variables originating from file and targeted to timeserie field
1908             for (size_t k = 0; k < fieldVars.size(); k++)
1909               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1910
1911             vars = field->getAllVariables();
1912             for (size_t k = 0; k < vars.size(); k++)
1913             {
1914               CVariable* var = vars[k];
1915
1916               // Add variables originating from field and targeted to timeserie field
1917               if (var->ts_target.isEmpty()
1918                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1919                 tsField->getVirtualVariableGroup()->addChild(var);
1920
1921               // Add variables originating from field and targeted to timeserie file
1922               if (!var->ts_target.isEmpty()
1923                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1924                 tsFile->getVirtualVariableGroup()->addChild(var);
1925             }
1926
1927             tsFile->solveFieldRefInheritance(true);
1928
1929             if (file->timeseries == CFile::timeseries_attr::exclusive)
1930               field->enabled = false;
1931           }
1932         }
1933
1934         // Finally disable the original file is need be
1935         if (file->timeseries == CFile::timeseries_attr::only)
1936          file->enabled = false;
1937       }
1938     }
1939   }
1940   CATCH_DUMP_ATTR
1941
1942 
1943   //! Client side: Send information of reference domain, axis and scalar of active fields
1944   void CContext::sendRefDomainsAxisScalars(const std::vector<CFile*>& activeFiles)
1945   TRY
1946   {
1947     std::set<pair<StdString,CContextClient*>> domainIds, axisIds, scalarIds;
1948
1949     // Find all reference domain and axis of all active fields
1950     int numEnabledFiles = activeFiles.size();
1951     for (int i = 0; i < numEnabledFiles; ++i)
1952     {
1953       std::vector<CField*> enabledFields = activeFiles[i]->getEnabledFields();
1954       int numEnabledFields = enabledFields.size();
1955       for (int j = 0; j < numEnabledFields; ++j)
1956       {
1957         CContextClient* contextClient=enabledFields[j]->getContextClient() ;
1958         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1959         if ("" != prDomAxisScalarId[0]) domainIds.insert(make_pair(prDomAxisScalarId[0],contextClient));
1960         if ("" != prDomAxisScalarId[1]) axisIds.insert(make_pair(prDomAxisScalarId[1],contextClient));
1961         if ("" != prDomAxisScalarId[2]) scalarIds.insert(make_pair(prDomAxisScalarId[2],contextClient));
1962       }
1963     }
1964
1965     // Create all reference axis on server side
1966     std::set<StdString>::iterator itDom, itAxis, itScalar;
1967     std::set<StdString>::const_iterator itE;
1968
1969     StdString scalarDefRoot("scalar_definition");
1970     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1971     
1972     for (auto itScalar = scalarIds.begin(); itScalar != scalarIds.end(); ++itScalar)
1973     {
1974       if (!itScalar->first.empty())
1975       {
1976         scalarPtr->sendCreateChild(itScalar->first,itScalar->second);
1977         CScalar::get(itScalar->first)->sendAllAttributesToServer(itScalar->second);
1978       }
1979     }
1980
1981     StdString axiDefRoot("axis_definition");
1982     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1983     
1984     for (auto itAxis = axisIds.begin(); itAxis != axisIds.end(); ++itAxis)
1985     {
1986       if (!itAxis->first.empty())
1987       {
1988         axisPtr->sendCreateChild(itAxis->first, itAxis->second);
1989         CAxis::get(itAxis->first)->sendAllAttributesToServer(itAxis->second);
1990       }
1991     }
1992
1993     // Create all reference domains on server side
1994     StdString domDefRoot("domain_definition");
1995     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1996     
1997     for (auto itDom = domainIds.begin(); itDom != domainIds.end(); ++itDom)
1998     {
1999       if (!itDom->first.empty()) {
2000          domPtr->sendCreateChild(itDom->first, itDom->second);
2001          CDomain::get(itDom->first)->sendAllAttributesToServer(itDom->second);
2002       }
2003     }
2004   }
2005   CATCH_DUMP_ATTR
2006
2007   void CContext::triggerLateFields(void)
2008   TRY
2009   {
2010    for(auto& field : fileInFields_) field->triggerLateField() ;
2011    for(auto& field : couplerInFields_) field->triggerLateField() ;
2012   }
2013   CATCH_DUMP_ATTR
2014
2015   //! Update calendar in each time step
2016   void CContext::updateCalendar(int step)
2017   TRY
2018   {
2019      int prevStep = calendar->getStep();
2020
2021      if (prevStep < step)
2022      {
2023        if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data
2024        {
2025          triggerLateFields();
2026        }
2027
2028        info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
2029        calendar->update(step);
2030        info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
2031  #ifdef XIOS_MEMTRACK_LIGHT
2032        info(50) << " Current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<<step<<" of context "<<this->getId()<<endl ;
2033  #endif
2034
2035        if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data
2036        {
2037          doPostTimestepOperationsForEnabledReadModeFiles();
2038          garbageCollector.invalidate(calendar->getCurrentDate());
2039        }
2040      }
2041      else if (prevStep == step)
2042        info(50) << "updateCalendar: already at step " << step << ", no operation done." << endl;
2043      else // if (prevStep > step)
2044        ERROR("void CContext::updateCalendar(int step)",
2045              << "Illegal calendar update: previous step was " << prevStep << ", new step " << step << "is in the past!")
2046   }
2047   CATCH_DUMP_ATTR
2048
2049   void CContext::initReadFiles(void)
2050   TRY
2051   {
2052      vector<CFile*>::const_iterator it;
2053
2054      for (it=enabledReadModeFiles.begin(); it != enabledReadModeFiles.end(); it++)
2055      {
2056         (*it)->initRead();
2057      }
2058   }
2059   CATCH_DUMP_ATTR
2060
2061   //! Server side: Create header of netcdf file
2062   void CContext::createFileHeader(void)
2063   TRY
2064   {
2065      vector<CFile*>::const_iterator it;
2066
2067      //for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
2068      for (it=enabledWriteModeFiles.begin(); it != enabledWriteModeFiles.end(); it++)
2069      {
2070         (*it)->initWrite();
2071      }
2072   }
2073   CATCH_DUMP_ATTR
2074
2075   //! Get current context
2076   CContext* CContext::getCurrent(void)
2077   TRY
2078   {
2079     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
2080   }
2081   CATCH
2082
2083   /*!
2084   \brief Set context with an id be the current context
2085   \param [in] id identity of context to be set to current
2086   */
2087   void CContext::setCurrent(const string& id)
2088   TRY
2089   {
2090     CObjectFactory::SetCurrentContextId(id);
2091     CGroupFactory::SetCurrentContextId(id);
2092   }
2093   CATCH
2094
2095  /*!
2096  \brief Create a context with specific id
2097  \param [in] id identity of new context
2098  \return pointer to the new context or already-existed one with identity id
2099  */
2100  CContext* CContext::create(const StdString& id)
2101  TRY
2102  {
2103    CContext::setCurrent(id);
2104
2105    bool hasctxt = CContext::has(id);
2106    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
2107    getRoot();
2108    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
2109
2110#define DECLARE_NODE(Name_, name_) \
2111    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
2112#define DECLARE_NODE_PAR(Name_, name_)
2113#include "node_type.conf"
2114
2115    return (context);
2116  }
2117  CATCH
2118
2119 
2120  void CContext::sendFinalizeClient(CContextClient* contextClient, const string& contextClientId)
2121  TRY
2122  {
2123    CEventClient event(getType(),EVENT_ID_CONTEXT_FINALIZE_CLIENT);
2124    if (contextClient->isServerLeader())
2125    {
2126      CMessage msg;
2127      msg<<contextClientId ;
2128      const std::list<int>& ranks = contextClient->getRanksServerLeader();
2129      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
2130           event.push(*itRank,1,msg);
2131      contextClient->sendEvent(event);
2132    }
2133    else contextClient->sendEvent(event);
2134  }
2135  CATCH_DUMP_ATTR
2136
2137 
2138  void CContext::recvFinalizeClient(CEventServer& event)
2139  TRY
2140  {
2141    CBufferIn* buffer=event.subEvents.begin()->buffer;
2142    string id;
2143    *buffer>>id;
2144    get(id)->recvFinalizeClient(*buffer);
2145  }
2146  CATCH
2147
2148  void CContext::recvFinalizeClient(CBufferIn& buffer)
2149  TRY
2150  {
2151    countChildContextFinalized_++ ;
2152  }
2153  CATCH_DUMP_ATTR
2154
2155
2156
2157
2158 //! Client side: Send a message  announcing that context can receive grid definition from coupling
2159   void CContext::sendCouplerInReady(CContextClient* client)
2160   TRY
2161   {
2162      if (sendCouplerInReady_done_.count(client)!=0) return ;
2163      else sendCouplerInReady_done_.insert(client) ;
2164
2165      CEventClient event(getType(),EVENT_ID_COUPLER_IN_READY);
2166
2167      if (client->isServerLeader())
2168      {
2169        CMessage msg;
2170        msg<<this->getId();
2171        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2172        client->sendEvent(event);
2173      }
2174      else client->sendEvent(event);
2175   }
2176   CATCH_DUMP_ATTR
2177
2178   //! Server side: Receive a message announcing that context can send grid definition for context coupling
2179   void CContext::recvCouplerInReady(CEventServer& event)
2180   TRY
2181   {
2182      CBufferIn* buffer=event.subEvents.begin()->buffer;
2183      getCurrent()->recvCouplerInReady(*buffer);
2184   }
2185   CATCH
2186
2187   //! Server side: Receive a message announcing that context can send grid definition for context coupling
2188   void CContext::recvCouplerInReady(CBufferIn& buffer)
2189   TRY
2190   {
2191      string contextId ;
2192      buffer>>contextId;
2193      couplerInReady_.insert(getCouplerOutClient(contextId)) ;
2194   }
2195   CATCH_DUMP_ATTR
2196
2197
2198
2199
2200
2201 //! Client side: Send a message  announcing that a coupling context have done it closeDefinition, so data can be sent now.
2202   void CContext::sendCouplerInCloseDefinition(CContextClient* client)
2203   TRY
2204   {
2205      if (sendCouplerInCloseDefinition_done_.count(client)!=0) return ;
2206      else sendCouplerInCloseDefinition_done_.insert(client) ;
2207
2208      CEventClient event(getType(),EVENT_ID_COUPLER_IN_CLOSE_DEFINITION);
2209
2210      if (client->isServerLeader())
2211      {
2212        CMessage msg;
2213        msg<<this->getId();
2214        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2215        client->sendEvent(event);
2216      }
2217      else client->sendEvent(event);
2218   }
2219   CATCH_DUMP_ATTR
2220
2221   //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now.
2222   void CContext::recvCouplerInCloseDefinition(CEventServer& event)
2223   TRY
2224   {
2225      CBufferIn* buffer=event.subEvents.begin()->buffer;
2226      getCurrent()->recvCouplerInCloseDefinition(*buffer);
2227   }
2228   CATCH
2229
2230   //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now.
2231   void CContext::recvCouplerInCloseDefinition(CBufferIn& buffer)
2232   TRY
2233   {
2234      string contextId ;
2235      buffer>>contextId;
2236      couplerInCloseDefinition_.insert(getCouplerOutClient(contextId)) ;
2237   }
2238   CATCH_DUMP_ATTR
2239
2240
2241
2242
2243//! Client side: Send a message  announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2244   void CContext::sendCouplerInContextFinalized(CContextClient* client)
2245   TRY
2246   {
2247      if (sendCouplerInContextFinalized_done_.count(client)!=0) return ;
2248      else sendCouplerInContextFinalized_done_.insert(client) ;
2249
2250      CEventClient event(getType(),EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED);
2251
2252      if (client->isServerLeader())
2253      {
2254        CMessage msg;
2255        msg<<this->getId();
2256        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2257        client->sendEvent(event);
2258      }
2259      else client->sendEvent(event);
2260   }
2261   CATCH_DUMP_ATTR
2262
2263   //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2264   void CContext::recvCouplerInContextFinalized(CEventServer& event)
2265   TRY
2266   {
2267      CBufferIn* buffer=event.subEvents.begin()->buffer;
2268      getCurrent()->recvCouplerInContextFinalized(*buffer);
2269   }
2270   CATCH
2271
2272   //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2273   void CContext::recvCouplerInContextFinalized(CBufferIn& buffer)
2274   TRY
2275   {
2276      string contextId ;
2277      buffer>>contextId;
2278      couplerInContextFinalized_.insert(getCouplerOutClient(contextId)) ;
2279   }
2280   CATCH_DUMP_ATTR
2281
2282
2283
2284
2285  /*!
2286  * \fn bool CContext::isFinalized(void)
2287  * Context is finalized if it received context post finalize event.
2288  */
2289  bool CContext::isFinalized(void)
2290  TRY
2291  {
2292    return finalized;
2293  }
2294  CATCH_DUMP_ATTR
2295  ///--------------------------------------------------------------
2296  StdString CContext::dumpClassAttributes(void)
2297  {
2298    StdString str;
2299    str.append("enabled files=\"");
2300    int size = this->enabledFiles.size();
2301    for (int i = 0; i < size; ++i)
2302    {
2303      str.append(enabledFiles[i]->getId());
2304      str.append(" ");
2305    }
2306    str.append("\"");
2307    return str;
2308  }
2309
2310} // namespace xios
Note: See TracBrowser for help on using the repository browser.