source: XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp @ 2343

Last change on this file since 2343 was 2343, checked in by ymipsl, 2 years ago
  • Implement new infrastructure for transfert protocol.
  • new purelly one sided protocol is now available, the previous protocol (legacy, mix send/recv and one sided) is still available. Other specific protocol could be implemented more easilly in future.
  • switch can be operate with "transport_protocol" variable in XIOS context :

ex:
<variable id="transport_protocol" type="string">one_sided</variable>

Available protocols are : one_sided, legacy or default. The default protocol is "legacy".

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 81.8 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "legacy_context_client.hpp"
10#include "legacy_context_server.hpp"
11#include "one_sided_context_client.hpp"
12#include "one_sided_context_server.hpp"
13#include "nc4_data_output.hpp"
14#include "node_type.hpp"
15#include "message.hpp"
16#include "type.hpp"
17#include "xios_spl.hpp"
18#include "timer.hpp"
19#include "memtrack.hpp"
20#include <limits>
21#include <fstream>
22#include "server.hpp"
23#include "distribute_file_server2.hpp"
24#include "services_manager.hpp"
25#include "contexts_manager.hpp"
26#include "cxios.hpp"
27#include "client.hpp"
28#include "coupler_in.hpp"
29#include "coupler_out.hpp"
30#include "servers_ressource.hpp"
31#include "pool_ressource.hpp"
32#include "services.hpp"
33#include "contexts_manager.hpp"
34#include <chrono>
35#include <random>
36
37namespace xios
38{
39
40  std::shared_ptr<CContextGroup> CContext::root;
41
42   /// ////////////////////// Définitions ////////////////////// ///
43
44   CContext::CContext(void)
45      : CObjectTemplate<CContext>(), CContextAttributes()
46      , calendar(), hasClient(false), hasServer(false)
47      , isPostProcessed(false), finalized(false)
48      , client(nullptr), server(nullptr)
49      , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false)
50
51   { /* Ne rien faire de plus */ }
52
53   CContext::CContext(const StdString & id)
54      : CObjectTemplate<CContext>(id), CContextAttributes()
55      , calendar(), hasClient(false), hasServer(false)
56      , isPostProcessed(false), finalized(false)
57      , client(nullptr), server(nullptr)
58      , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false)
59   { /* Ne rien faire de plus */ }
60
61   CContext::~CContext(void)
62   {
63     delete client;
64     delete server;
65     for (std::vector<CContextClient*>::iterator it = clientPrimServer.begin(); it != clientPrimServer.end(); it++)  delete *it;
66     for (std::vector<CContextServer*>::iterator it = serverPrimServer.begin(); it != serverPrimServer.end(); it++)  delete *it;
67     if (registryIn!=nullptr) delete registryIn ;
68     if (registryOut!=nullptr) delete registryOut ;
69   }
70
71   //----------------------------------------------------------------
72   //! Get name of context
73   StdString CContext::GetName(void)   { return (StdString("context")); }
74   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
75   ENodeType CContext::GetType(void)   { return (eContext); }
76
77   //----------------------------------------------------------------
78
79  void CContext::initEventScheduler(void)
80  {
81    SRegisterContextInfo contextInfo ;
82    CXios::getContextsManager()->getContextInfo(this->getId(), contextInfo, getIntraComm()) ;
83
84    eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
85 
86    // generate unique hash for server
87    auto time=chrono::system_clock::now().time_since_epoch().count() ;
88    std::default_random_engine rd(time); // not reproducible from a run to another
89    std::uniform_int_distribution<size_t> dist;
90    hashId_=dist(rd) ;
91    MPI_Bcast(&hashId_,1,MPI_SIZE_T,0,getIntraComm()) ; // Bcast to all server of the context
92  }
93   /*!
94   \brief Get context group (context root)
95   \return Context root
96   */
97   CContextGroup* CContext::getRoot(void)
98   TRY
99   {
100      if (root.get()==NULL) root=std::shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
101      return root.get();
102   }
103   CATCH
104
105   void CContext::releaseStaticAllocation(void)
106   TRY
107   {
108      CDomain::releaseStaticAllocation();
109      CAxis::releaseStaticAllocation();
110      CScalar::releaseStaticAllocation();
111      if (root) root.reset() ;
112   }
113   CATCH
114   
115   //----------------------------------------------------------------
116
117   /*!
118   \brief Get calendar of a context
119   \return Calendar
120   */
121   std::shared_ptr<CCalendar> CContext::getCalendar(void) const
122   TRY
123   {
124      return (this->calendar);
125   }
126   CATCH
127
128   //----------------------------------------------------------------
129
130   /*!
131   \brief Set a context with a calendar
132   \param[in] newCalendar new calendar
133   */
134   void CContext::setCalendar(std::shared_ptr<CCalendar> newCalendar)
135   TRY
136   {
137      this->calendar = newCalendar;
138   }
139   CATCH_DUMP_ATTR
140
141   //----------------------------------------------------------------
142   /*!
143   \brief Parse xml file and write information into context object
144   \param [in] node xmld node corresponding in xml file
145   */
146   void CContext::parse(xml::CXMLNode & node)
147   TRY
148   {
149      CContext::SuperClass::parse(node);
150
151      // PARSING POUR GESTION DES ENFANTS
152      xml::THashAttributes attributes = node.getAttributes();
153
154      if (attributes.end() != attributes.find("src"))
155      {
156         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
157         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
158            ERROR("void CContext::parse(xml::CXMLNode & node)",
159                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
160         if (!ifs.good())
161            ERROR("CContext::parse(xml::CXMLNode & node)",
162                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
163         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
164      }
165
166      if (node.getElementName().compare(CContext::GetName()))
167         DEBUG("Le noeud is wrong defined but will be considered as a context !");
168
169      if (!(node.goToChildElement()))
170      {
171         DEBUG("Le context ne contient pas d'enfant !");
172      }
173      else
174      {
175         do { // Parcours des contextes pour traitement.
176
177            StdString name = node.getElementName();
178            attributes.clear();
179            attributes = node.getAttributes();
180
181            if (attributes.end() != attributes.find("id"))
182            {
183              DEBUG(<< "Definition node has an id,"
184                    << "it will not be taking account !");
185            }
186
187#define DECLARE_NODE(Name_, name_)    \
188   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
189   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
190#define DECLARE_NODE_PAR(Name_, name_)
191#include "node_type.conf"
192
193            DEBUG(<< "The element \'"     << name
194                  << "\' in the context \'" << CContext::getCurrent()->getId()
195                  << "\' is not a definition !");
196
197         } while (node.goToNextElement());
198
199         node.goToParentElement(); // Retour au parent
200      }
201   }
202   CATCH_DUMP_ATTR
203
204   //----------------------------------------------------------------
205   //! Show tree structure of context
206   void CContext::ShowTree(StdOStream & out)
207   TRY
208   {
209      StdString currentContextId = CContext::getCurrent() -> getId();
210      std::vector<CContext*> def_vector =
211         CContext::getRoot()->getChildList();
212      std::vector<CContext*>::iterator
213         it = def_vector.begin(), end = def_vector.end();
214
215      out << "<? xml version=\"1.0\" ?>" << std::endl;
216      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
217
218      for (; it != end; it++)
219      {
220         CContext* context = *it;
221         CContext::setCurrent(context->getId());
222         out << *context << std::endl;
223      }
224
225      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
226      CContext::setCurrent(currentContextId);
227   }
228   CATCH
229
230   //----------------------------------------------------------------
231
232   //! Convert context object into string (to print)
233   StdString CContext::toString(void) const
234   TRY
235   {
236      StdOStringStream oss;
237      oss << "<" << CContext::GetName()
238          << " id=\"" << this->getId() << "\" "
239          << SuperClassAttribute::toString() << ">" << std::endl;
240      if (!this->hasChild())
241      {
242         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
243      }
244      else
245      {
246
247#define DECLARE_NODE(Name_, name_)    \
248   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
249   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
250#define DECLARE_NODE_PAR(Name_, name_)
251#include "node_type.conf"
252
253      }
254      oss << "</" << CContext::GetName() << " >";
255      return (oss.str());
256   }
257   CATCH
258
259   //----------------------------------------------------------------
260
261   /*!
262   \brief Find all inheritace among objects in a context.
263   \param [in] apply (true) write attributes of parent into ones of child if they are empty
264                     (false) write attributes of parent into a new container of child
265   \param [in] parent unused
266   */
267   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
268   TRY
269   {
270#define DECLARE_NODE(Name_, name_)    \
271   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
272     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
273#define DECLARE_NODE_PAR(Name_, name_)
274#include "node_type.conf"
275   }
276   CATCH_DUMP_ATTR
277
278   //----------------------------------------------------------------
279
280   //! Verify if all root definition in the context have child.
281   bool CContext::hasChild(void) const
282   TRY
283   {
284      return (
285#define DECLARE_NODE(Name_, name_)    \
286   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
287#define DECLARE_NODE_PAR(Name_, name_)
288#include "node_type.conf"
289      false);
290}
291   CATCH
292
293   //----------------------------------------------------------------
294
295   void CContext::CleanTree(void)
296   TRY
297   {
298#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
299#define DECLARE_NODE_PAR(Name_, name_)
300#include "node_type.conf"
301   }
302   CATCH
303
304void CContext::removeContext(const string& contextId)
305{
306  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteContext< C##Name_ >(contextId);
307  #define DECLARE_NODE_PAR(Name_, name_) CObjectFactory::deleteContext< C##Name_ >(contextId);
308  #include "node_type.conf"
309  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteContext< C##Name_##Group >(contextId);
310  #define DECLARE_NODE_PAR(Name_, name_) 
311  #include "node_type.conf"
312
313/*
314  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_##Group >();
315  #define DECLARE_NODE_PAR(Name_, name_)
316  #include "node_type.conf"
317
318  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_>();
319  #define DECLARE_NODE_PAR(Name_, name_)
320  #include "node_type.conf"
321*/
322}
323
324void CContext::removeAllContexts(void)
325{
326  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteAllContexts< C##Name_ >();
327  #define DECLARE_NODE_PAR(Name_, name_) CObjectFactory::deleteAllContexts< C##Name_ >();
328  #include "node_type.conf"
329  #define DECLARE_NODE(Name_, name_)     CObjectFactory::deleteAllContexts< C##Name_##Group >();
330  #define DECLARE_NODE_PAR(Name_, name_) 
331  #include "node_type.conf"
332/*
333  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_##Group >();
334  #define DECLARE_NODE_PAR(Name_, name_)
335  #include "node_type.conf"
336
337  #define DECLARE_NODE(Name_, name_)     CObjectFactory::dumpObjects< C##Name_>();
338  #define DECLARE_NODE_PAR(Name_, name_)
339  #include "node_type.conf"
340*/
341  CObjectFactory::deleteAllContexts<CContext>() ;
342  CObjectFactory::deleteAllContexts<CContextGroup>() ;
343  CObjectFactory::clearCurrentContextId();
344  CGroupFactory::clearCurrentContextId();
345}
346   ///---------------------------------------------------------------
347
348
349 /*!
350    * Compute the required buffer size to send the fields data.
351    * \param maxEventSize [in/out] the size of the bigger event for each connected server
352    * \param [in] contextClient
353    * \param [in] bufferForWriting True if buffers are used for sending data for writing
354      This flag is only true for client and server-1 for communication with server-2
355    */
356   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize,
357                                                      CContextClient* contextClient, bool bufferForWriting /*= "false"*/)
358   TRY
359   {
360     std::map<int, StdSize> dataSize;
361
362     // Find all reference domain and axis of all active fields
363     std::vector<CFile*>& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles;
364     size_t numEnabledFiles = fileList.size();
365     for (size_t i = 0; i < numEnabledFiles; ++i)
366     {
367       CFile* file = fileList[i];
368       if (file->getContextClient() == contextClient)
369       {
370         std::vector<CField*> enabledFields = file->getEnabledFields();
371         size_t numEnabledFields = enabledFields.size();
372         for (size_t j = 0; j < numEnabledFields; ++j)
373         {
374           // const std::vector<std::map<int, StdSize> > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient);
375           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting);
376           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
377           for (; it != itE; ++it)
378           {
379             // If dataSize[it->first] does not exist, it will be zero-initialized
380             // so we can use it safely without checking for its existance
381           if (CXios::isOptPerformance)
382               dataSize[it->first] += it->second;
383             else if (dataSize[it->first] < it->second)
384               dataSize[it->first] = it->second;
385
386           if (maxEventSize[it->first] < it->second)
387               maxEventSize[it->first] = it->second;
388           }
389         }
390       }
391     }
392     return dataSize;
393   }
394   CATCH_DUMP_ATTR
395
396/*!
397    * Compute the required buffer size to send the attributes (mostly those grid related).
398    * \param maxEventSize [in/out] the size of the bigger event for each connected server
399    * \param [in] contextClient
400    * \param [in] bufferForWriting True if buffers are used for sending data for writing
401      This flag is only true for client and server-1 for communication with server-2
402    */
403   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize,
404                                                           CContextClient* contextClient, bool bufferForWriting /*= "false"*/)
405   TRY
406   {
407   // As calendar attributes are sent even if there are no active files or fields, maps are initialized according the size of calendar attributes
408     std::map<int, StdSize> attributesSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient);
409     maxEventSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient);
410
411     std::vector<CFile*>& fileList = this->enabledFiles;
412     size_t numEnabledFiles = fileList.size();
413     for (size_t i = 0; i < numEnabledFiles; ++i)
414     {
415//         CFile* file = this->enabledWriteModeFiles[i];
416        CFile* file = fileList[i];
417        std::vector<CField*> enabledFields = file->getEnabledFields();
418        size_t numEnabledFields = enabledFields.size();
419        for (size_t j = 0; j < numEnabledFields; ++j)
420        {
421          const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting);
422          std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
423          for (; it != itE; ++it)
424          {
425         // If attributesSize[it->first] does not exist, it will be zero-initialized
426         // so we can use it safely without checking for its existence
427             if (attributesSize[it->first] < it->second)
428         attributesSize[it->first] = it->second;
429
430         if (maxEventSize[it->first] < it->second)
431         maxEventSize[it->first] = it->second;
432          }
433        }
434     }
435     return attributesSize;
436   }
437   CATCH_DUMP_ATTR
438
439
440
441   //! Verify whether a context is initialized
442   bool CContext::isInitialized(void)
443   TRY
444   {
445     return hasClient;
446   }
447   CATCH_DUMP_ATTR
448
449
450   void CContext::init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType)
451   TRY
452   {
453     parentServerContext_ = parentServerContext ;
454     if (serviceType==CServicesManager::CLIENT) 
455       initClient(intraComm, serviceType) ;
456     else
457       initServer(intraComm, serviceType) ;
458     initEventScheduler() ;
459    }
460    CATCH_DUMP_ATTR
461
462
463
464//! Initialize client side
465   void CContext::initClient(MPI_Comm intraComm, int serviceType)
466   TRY
467   {
468      intraComm_=intraComm ;
469      MPI_Comm_rank(intraComm_, &intraCommRank_) ;
470      MPI_Comm_size(intraComm_, &intraCommSize_) ;
471
472      serviceType_ = CServicesManager::CLIENT ;
473      if (serviceType_==CServicesManager::CLIENT)
474      {
475        hasClient=true ;
476        hasServer=false ;
477      }
478      contextId_ = getId() ;
479     
480      attached_mode=true ;
481      if (!CXios::isUsingServer()) attached_mode=false ;
482
483
484      string contextRegistryId=getId() ;
485      registryIn=new CRegistry(CXios::getRegistryManager()->getRegistryIn());
486      registryIn->setPath(contextRegistryId) ;
487     
488      registryOut=new CRegistry(intraComm_) ;
489      registryOut->setPath(contextRegistryId) ;
490     
491   }
492   CATCH_DUMP_ATTR
493
494   
495   void CContext::initServer(MPI_Comm intraComm, int serviceType)
496   TRY
497   {
498     hasServer=true;
499     intraComm_=intraComm ;
500     MPI_Comm_rank(intraComm_, &intraCommRank_) ;
501     MPI_Comm_size(intraComm_, &intraCommSize_) ;
502
503     serviceType_=serviceType ;
504
505     if (serviceType_==CServicesManager::GATHERER)
506     {
507       hasClient=true ;
508       hasServer=true ;
509     }
510     else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)
511     {
512       hasClient=false ;
513       hasServer=true ;
514     }
515
516     CXios::getContextsManager()->getContextId(getId(), contextId_, intraComm) ;
517     
518     string contextRegistryId=getId() ;
519     registryIn=new CRegistry(CXios::getRegistryManager()->getRegistryIn());
520     registryIn->setPath(contextRegistryId) ;
521     
522     registryOut=new CRegistry(intraComm_) ;
523     registryOut->setPath(contextRegistryId) ;
524
525   }
526   CATCH_DUMP_ATTR
527
528
529  void CContext::createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) // for servers
530  TRY
531  {
532    MPI_Comm intraCommClient ;
533    MPI_Comm_dup(intraComm_, &intraCommClient);
534    comms.push_back(intraCommClient);
535    // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context
536    server = CContextServer::getNew(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ?
537    client = CContextClient::getNew(this,intraCommClient,interCommClient);
538    client->setAssociatedServer(server) ; 
539    server->setAssociatedClient(client) ; 
540
541  }
542  CATCH_DUMP_ATTR
543
544  void CContext::createServerInterComm(void) 
545  TRY
546  {
547   
548    MPI_Comm interCommClient, interCommServer ;
549
550    if (serviceType_ == CServicesManager::CLIENT)
551    {
552
553      int commRank ;
554      MPI_Comm_rank(intraComm_,&commRank) ;
555      if (commRank==0)
556      {
557        if (attached_mode) CXios::getContextsManager()->createServerContext(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultServerId, 0, getContextId()) ;
558        else if (CXios::usingServer2) CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId()) ;
559        else  CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId()) ;
560      }
561
562      MPI_Comm interComm ;
563     
564      if (attached_mode)
565      {
566        parentServerContext_->createIntercomm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultServerId, 0, getContextId(), intraComm_, 
567                                              interCommClient, interCommServer) ;
568        int type ; 
569        if (commRank==0) CXios::getServicesManager()->getServiceType(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, type) ;
570        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
571        setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble
572      }
573      else if (CXios::usingServer2)
574      { 
575//      CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, interComm) ;
576        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_,
577                                              interCommClient, interCommServer) ;
578        int type ; 
579        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultGathererId, 0, type) ;
580        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
581      }
582      else
583      {
584        //CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, interComm) ;
585        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_,
586                                              interCommClient, interCommServer) ;
587        int type ; 
588        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ;
589        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
590      }
591
592        // intraComm client is not duplicated. In all the code we use client->intraComm for MPI
593        // in future better to replace it by intracommuncator associated to the context
594   
595      MPI_Comm intraCommClient, intraCommServer ;
596      intraCommClient=intraComm_ ;
597      MPI_Comm_dup(intraComm_, &intraCommServer) ;
598      client = CContextClient::getNew(this, intraCommClient, interCommClient);
599      server = CContextServer::getNew(this, intraCommServer, interCommServer);
600      client->setAssociatedServer(server) ;
601      server->setAssociatedClient(client) ;
602    }
603   
604    if (serviceType_ == CServicesManager::GATHERER)
605    {
606      int commRank ;
607      MPI_Comm_rank(intraComm_,&commRank) ;
608     
609      int nbPartitions ;
610      if (commRank==0) 
611      { 
612        CXios::getServicesManager()->getServiceNbPartitions(CXios::defaultPoolId, CXios::defaultServerId, 0, nbPartitions) ;
613        for(int i=0 ; i<nbPartitions; i++)
614          CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId()) ;
615      }     
616      MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ;
617     
618      MPI_Comm interComm ;
619      for(int i=0 ; i<nbPartitions; i++)
620      {
621        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interCommClient, interCommServer) ;
622        int type ; 
623        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ;
624        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
625        primServerId_.push_back(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultServerId, i, type, getContextId())) ;
626
627        // intraComm client is not duplicated. In all the code we use client->intraComm for MPI
628        // in future better to replace it by intracommuncator associated to the context
629     
630        MPI_Comm intraCommClient, intraCommServer ;
631
632        intraCommClient=intraComm_ ;
633        MPI_Comm_dup(intraComm_, &intraCommServer) ;
634
635        CContextClient* client = CContextClient::getNew(this, intraCommClient, interCommClient) ;
636        CContextServer* server = CContextServer::getNew(this, intraCommServer, interCommServer) ;
637        client->setAssociatedServer(server) ;
638        server->setAssociatedClient(client) ;
639        clientPrimServer.push_back(client);
640        serverPrimServer.push_back(server); 
641
642     
643      }
644    }
645  }
646  CATCH_DUMP_ATTR
647
648  void CContext::globalEventLoop(void)
649  {
650    lockContext() ;
651    CXios::getDaemonsManager()->eventLoop() ;
652    unlockContext() ;
653    setCurrent(getId()) ;
654  }
655
656  bool CContext::scheduledEventLoop(bool enableEventsProcessing) 
657  {
658    bool out, finished; 
659    size_t timeLine=timeLine_ ;
660    if (serviceType_==CServicesManager::CLIENT)
661    {
662      timeLine_++ ;
663      eventScheduler_->registerEvent(timeLine, hashId_) ;
664    }
665
666    do
667    { 
668      finished=eventLoop(enableEventsProcessing) ;
669      if (serviceType_==CServicesManager::CLIENT) 
670      { 
671        out = eventScheduler_->queryEvent(timeLine,hashId_) ;
672        if (out) eventScheduler_->popEvent() ;
673      }
674
675      else out=true ;
676    }  while(!out) ;
677   
678    return finished ;
679  }
680
681  bool CContext::eventLoop(bool enableEventsProcessing)
682  {
683    bool  finished(true); 
684    if (isLockedContext()) enableEventsProcessing=false;
685   
686    setCurrent(getId()) ;
687
688    if (client!=nullptr && !finalized) client->eventLoop();
689   
690    for (int i = 0; i < clientPrimServer.size(); ++i)
691    {
692      if (!finalized) clientPrimServer[i]->eventLoop();
693      if (!finalized) finished &= serverPrimServer[i]->eventLoop(enableEventsProcessing);
694    }
695
696    for (auto couplerOut : couplerOutClient_)
697      if (!finalized) couplerOut.second->eventLoop();
698   
699    for (auto couplerIn : couplerInClient_)
700      if (!finalized) couplerIn.second->eventLoop();
701   
702    for (auto couplerOut : couplerOutServer_)
703      if (!finalized) couplerOut.second->eventLoop(enableEventsProcessing);
704
705    for (auto couplerIn : couplerInServer_)
706      if (!finalized) couplerIn.second->eventLoop(enableEventsProcessing);
707   
708    if (server!=nullptr) if (!finalized) finished &= server->eventLoop(enableEventsProcessing);
709    setCurrent(getId()) ;
710    return finalized && finished ;
711  }
712
713  void CContext::addCouplingChanel(const std::string& fullContextId, bool out)
714  {
715     int contextLeader ;
716     
717     if (out)
718     { 
719       if (couplerOutClient_.find(fullContextId)==couplerOutClient_.end()) 
720       {
721         bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ;
722     
723         MPI_Comm interComm, interCommClient, interCommServer  ;
724         MPI_Comm intraCommClient, intraCommServer ;
725
726         if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;
727
728        MPI_Comm_dup(intraComm_, &intraCommClient) ;
729        MPI_Comm_dup(intraComm_, &intraCommServer) ;
730        MPI_Comm_dup(interComm, &interCommClient) ;
731        MPI_Comm_dup(interComm, &interCommServer) ;
732        CContextClient* client = CContextClient::getNew(this, intraCommClient, interCommClient);
733        CContextServer* server = CContextServer::getNew(this, intraCommServer, interCommServer);
734        client->setAssociatedServer(server) ;
735        server->setAssociatedClient(client) ;
736        MPI_Comm_free(&interComm) ;
737        couplerOutClient_[fullContextId] = client ;
738        couplerOutServer_[fullContextId] = server ;
739      }
740    }
741    else if (couplerInClient_.find(fullContextId)==couplerInClient_.end())
742    {
743      bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ;
744     
745       MPI_Comm interComm, interCommClient, interCommServer  ;
746       MPI_Comm intraCommClient, intraCommServer ;
747
748       if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;
749
750       MPI_Comm_dup(intraComm_, &intraCommClient) ;
751       MPI_Comm_dup(intraComm_, &intraCommServer) ;
752       MPI_Comm_dup(interComm, &interCommServer) ;
753       MPI_Comm_dup(interComm, &interCommClient) ;
754       CContextServer* server = CContextServer::getNew(this, intraCommServer, interCommServer);
755       CContextClient* client = CContextClient::getNew(this, intraCommClient, interCommClient);
756       client->setAssociatedServer(server) ;
757       server->setAssociatedClient(client) ;
758       MPI_Comm_free(&interComm) ;
759
760       couplerInClient_[fullContextId] = client ;
761       couplerInServer_[fullContextId] = server ;       
762    }
763  }
764 
765   void CContext::finalize(void)
766   TRY
767   {
768      registryOut->hierarchicalGatherRegistry() ;
769      if (server->getIntraCommRank()==0) CXios::getRegistryManager()->merge(*registryOut) ;
770
771      if (serviceType_==CServicesManager::CLIENT)
772      {
773//ym        doPreTimestepOperationsForEnabledReadModeFiles(); // For now we only use server level 1 to read data
774
775        triggerLateFields() ;
776
777        // inform couplerIn that I am finished
778        for(auto& couplerInClient : couplerInClient_) sendCouplerInContextFinalized(couplerInClient.second) ;
779
780        // wait until received message from couplerOut that they have finished
781        bool couplersInFinalized ;
782        do
783        {
784          couplersInFinalized=true ;
785          for(auto& couplerOutClient : couplerOutClient_) couplersInFinalized &= isCouplerInContextFinalized(couplerOutClient.second) ; 
786          globalEventLoop() ;
787        } while (!couplersInFinalized) ;
788
789        info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ;
790        client->finalize();
791        info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ;
792        while (client->havePendingRequests()) client->eventLoop();
793        info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ;
794        bool notifiedFinalized=false ;
795        do
796        {
797          notifiedFinalized=client->isNotifiedFinalized() ;
798        } while (!notifiedFinalized) ;
799
800        server->releaseBuffers();
801        client->releaseBuffers();
802        info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ;
803      }
804      else if (serviceType_==CServicesManager::GATHERER)
805      {
806         for (int i = 0; i < clientPrimServer.size(); ++i)
807         {
808           clientPrimServer[i]->finalize();
809           bool bufferReleased;
810           do
811           {
812             clientPrimServer[i]->eventLoop();
813             bufferReleased = !clientPrimServer[i]->havePendingRequests();
814           } while (!bufferReleased);
815           
816           bool notifiedFinalized=false ;
817           do
818           {
819             notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ;
820           } while (!notifiedFinalized) ;
821           clientPrimServer[i]->releaseBuffers();
822         }
823         closeAllFile();
824
825      }
826      else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)
827      {
828        closeAllFile();
829        client->releaseBuffers();
830        server->releaseBuffers();
831      }
832
833      freeComms() ;
834       
835      parentServerContext_->freeComm() ;
836      finalized = true;
837      info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl;
838   }
839   CATCH_DUMP_ATTR
840
841   //! Free internally allocated communicators
842   void CContext::freeComms(void)
843   TRY
844   {
845     for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
846       MPI_Comm_free(&(*it));
847     comms.clear();
848   }
849   CATCH_DUMP_ATTR
850
851   //! Deallocate buffers allocated by clientContexts
852   void CContext::releaseClientBuffers(void)
853   TRY
854   {
855     client->releaseBuffers();
856     for (int i = 0; i < clientPrimServer.size(); ++i)
857       clientPrimServer[i]->releaseBuffers();
858   }
859   CATCH_DUMP_ATTR
860
861   
862   /*!
863   \brief Close all the context defintion and do processing data
864      After everything is well defined on client side, they will be processed and sent to server
865   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
866   all necessary information to server, from which each server can build its own database.
867   Because the role of server is to write out field data on a specific netcdf file,
868   the only information that it needs is the enabled files
869   and the active fields (fields will be written onto active files)
870   */
871  void CContext::closeDefinition(void)
872   TRY
873   {
874     CTimer::get("Context : close definition").resume() ;
875     
876     // create intercommunicator with servers.
877     // not sure it is the good place to be called here
878     createServerInterComm() ;
879
880
881     // After xml is parsed, there are some more works with post processing
882//     postProcessing();
883
884   
885    // Make sure the calendar was correctly created
886    if (serviceType_!=CServicesManager::CLIENT) CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
887    if (!calendar)
888      ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
889    else if (calendar->getTimeStep() == NoneDu)
890      ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
891    // Calendar first update to set the current date equals to the start date
892    calendar->update(0);
893
894    // Résolution des héritages descendants (càd des héritages de groupes)
895    // pour chacun des contextes.
896    solveDescInheritance(true);
897 
898    // Check if some axis, domains or grids are eligible to for compressed indexed output.
899    // Warning: This must be done after solving the inheritance and before the rest of post-processing
900    // --> later ????    checkAxisDomainsGridsEligibilityForCompressedOutput();     
901
902      // Check if some automatic time series should be generated
903      // Warning: This must be done after solving the inheritance and before the rest of post-processing     
904
905    // The timeseries should only be prepared in client
906    prepareTimeseries();
907
908    //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
909    findEnabledFiles();
910
911    // Solve inheritance for field to know if enabled or not.
912    for (auto field : CField::getAll()) field->solveRefInheritance();
913
914    findEnabledWriteModeFiles();
915    findEnabledReadModeFiles();
916    findEnabledCouplerIn();
917    findEnabledCouplerOut();
918    createCouplerInterCommunicator() ;
919
920    // Find all enabled fields of each file     
921    vector<CField*>&& fileOutField = findAllEnabledFieldsInFileOut(this->enabledWriteModeFiles);
922    vector<CField*>&& fileInField = findAllEnabledFieldsInFileIn(this->enabledReadModeFiles);
923    vector<CField*>&& couplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut);
924    vector<CField*>&& couplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn);
925    findFieldsWithReadAccess();
926    vector<CField*>& fieldWithReadAccess = fieldsWithReadAccess_ ;
927    vector<CField*> fieldModelIn ; // fields potentially from model
928
929    // define if files are on clientSied or serverSide
930    if (serviceType_==CServicesManager::CLIENT)
931    {
932      for (auto& file : enabledWriteModeFiles) file->setClientSide() ;
933      for (auto& file : enabledReadModeFiles) file->setClientSide() ;
934    }
935    else
936    {
937      for (auto& file : enabledWriteModeFiles) file->setServerSide() ;
938      for (auto& file : enabledReadModeFiles) file->setServerSide() ;
939    }
940
941   
942    for (auto& field : couplerInField)
943    {
944      field->unsetGridCompleted() ;
945    }
946// find all field potentially at workflow end
947    vector<CField*> endWorkflowFields ;
948    endWorkflowFields.reserve(fileOutField.size()+couplerOutField.size()+fieldWithReadAccess.size()) ;
949    endWorkflowFields.insert(endWorkflowFields.end(),fileOutField.begin(), fileOutField.end()) ;
950    endWorkflowFields.insert(endWorkflowFields.end(),couplerOutField.begin(), couplerOutField.end()) ;
951    endWorkflowFields.insert(endWorkflowFields.end(),fieldWithReadAccess.begin(), fieldWithReadAccess.end()) ;
952
953    bool workflowGraphIsCompleted ;
954   
955    bool first=true ;
956   
957    do
958    {
959      workflowGraphIsCompleted=true; 
960      for(auto endWorkflowField : endWorkflowFields) 
961      {
962        workflowGraphIsCompleted &= endWorkflowField->buildWorkflowGraph(garbageCollector) ;
963      }
964   
965      for(auto couplerIn : enabledCouplerIn) couplerIn->assignContext() ;
966      for(auto field : couplerInField) field->makeGridAliasForCoupling();
967      for(auto field : couplerInField) this->sendCouplerInReady(field->getContextClient()) ;
968   
969
970      // assign context to coupler out and related fields
971      for(auto couplerOut : enabledCouplerOut) couplerOut->assignContext() ;
972      // for now supose that all coupling out endpoint are succesfull. The difficultie is client/server buffer evaluation
973      for(auto field : couplerOutField) 
974      {
975        // connect to couplerOut -> to do
976      }
977
978      bool couplersReady ;
979      do 
980      {
981        couplersReady=true ;
982        for(auto field : couplerOutField)
983        {
984          bool ready = isCouplerInReady(field->getContextClient()) ; 
985          if (ready) field->sendFieldToCouplerOut() ;
986          couplersReady &= ready ;
987        }
988        this->scheduledEventLoop() ;
989
990      } while (!couplersReady) ;
991     
992      first=false ;
993      this->scheduledEventLoop() ;
994
995    } while (!workflowGraphIsCompleted) ;
996
997
998    for( auto field : couplerInField) couplerInFields_.push_back(field) ;
999
1000    // get all field coming potentially from model
1001    for (auto field : CField::getAll() ) if (field->getModelIn()) fieldModelIn.push_back(field) ;
1002
1003    // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields
1004    if (serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles);
1005    else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ;
1006
1007    // client side, assign context for file reading
1008    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ;
1009   
1010    // server side, assign context where to send file data read
1011    if (serviceType_==CServicesManager::CServicesManager::GATHERER || serviceType_==CServicesManager::IO_SERVER) 
1012      for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ;
1013   
1014    // workflow endpoint => sent to IO/SERVER
1015    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER)
1016    {
1017      for(auto field : fileOutField) 
1018      {
1019        field->connectToFileServer(garbageCollector) ; // connect the field to server filter
1020      }
1021      for(auto field : fileOutField) field->sendFieldToFileServer() ;
1022    }
1023
1024    // workflow endpoint => write to file
1025    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)
1026    {
1027      for(auto field : fileOutField) 
1028      {
1029        field->connectToFileWriter(garbageCollector) ; // connect the field to server filter
1030      }
1031    }
1032   
1033    // workflow endpoint => Send data from server to client
1034    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER)
1035    {
1036      for(auto field : fileInField) 
1037      {
1038        field->connectToServerToClient(garbageCollector) ;
1039      }
1040    }
1041
1042    // workflow endpoint => sent to model on client side
1043    if (serviceType_==CServicesManager::CLIENT)
1044    {
1045      for(auto field : fieldWithReadAccess) field->connectToModelOutput(garbageCollector) ;
1046    }
1047
1048
1049    // workflow startpoint => data from model
1050    if (serviceType_==CServicesManager::CLIENT)
1051    {
1052      for(auto field : fieldModelIn) 
1053      {
1054        field->connectToModelInput(garbageCollector) ; // connect the field to server filter
1055        // grid index will be computed on the fly
1056      }
1057    }
1058   
1059    // workflow startpoint => data from client on server side
1060    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER || serviceType_==CServicesManager::OUT_SERVER)
1061    {
1062      for(auto field : fieldModelIn) 
1063      {
1064        field->connectToClientInput(garbageCollector) ; // connect the field to server filter
1065      }
1066    }
1067
1068   
1069    for(auto field : couplerInField) 
1070    {
1071      field->connectToCouplerIn(garbageCollector) ; // connect the field to server filter
1072    }
1073   
1074   
1075    for(auto field : couplerOutField) 
1076    {
1077      field->connectToCouplerOut(garbageCollector) ; // for now the same kind of filter that for file server
1078    }
1079
1080     // workflow startpoint => data from server on client side
1081    if (serviceType_==CServicesManager::CLIENT)
1082    {
1083      for(auto field : fileInField) 
1084      {
1085        field->sendFieldToInputFileServer() ;
1086        field->connectToServerInput(garbageCollector) ; // connect the field to server filter
1087        fileInFields_.push_back(field) ;
1088      }
1089    }
1090
1091    // workflow startpoint => data read from file on server side
1092    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER)
1093    {
1094      for(auto field : fileInField) 
1095      {
1096        field->connectToFileReader(garbageCollector) ;
1097      }
1098    }
1099   
1100    // construct slave server list
1101    if (serviceType_==CServicesManager::CLIENT) 
1102    {
1103      for(auto field : fileOutField) slaveServers_.insert(field->getContextClient()) ; 
1104      for(auto field : fileInField) slaveServers_.insert(field->getContextClient()) ; 
1105    }
1106    else if (serviceType_==CServicesManager::GATHERER) 
1107      for(auto field : fileOutField) slaveServers_.insert(field->getContextClient()) ; 
1108
1109    for(auto& slaveServer : slaveServers_) sendCloseDefinition(slaveServer) ;
1110
1111    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 
1112    {
1113      createFileHeader();
1114    }
1115
1116    if (serviceType_==CServicesManager::CLIENT) startPrefetchingOfEnabledReadModeFiles();
1117   
1118    // send signal to couplerIn context that definition phasis is done
1119
1120    for(auto& couplerInClient : couplerInClient_) sendCouplerInCloseDefinition(couplerInClient.second) ;
1121
1122    // wait until all couplerIn signal that closeDefition is done.
1123    bool ok;
1124    do
1125    {
1126      ok = true ;
1127      for(auto& couplerOutClient : couplerOutClient_) ok &= isCouplerInCloseDefinition(couplerOutClient.second) ;
1128      this->scheduledEventLoop() ;
1129    } while (!ok) ;
1130
1131    // Now evaluate the size of the context client buffers
1132    map<CContextClient*,map<int,size_t>> fieldBufferEvaluation ;
1133    for(auto field : fileOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to server
1134    for(auto field : couplerOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to coupler
1135    for(auto field : fileInField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // server to client (for io servers)
1136   
1137    // fix size for each context client
1138    for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ;
1139
1140
1141     CTimer::get("Context : close definition").suspend() ;
1142  }
1143  CATCH_DUMP_ATTR
1144
1145
1146  vector<CField*> CContext::findAllEnabledFieldsInFileOut(const std::vector<CFile*>& activeFiles)
1147   TRY
1148   {
1149     vector<CField*> fields ;
1150     for(auto file : activeFiles)
1151     {
1152        const vector<CField*>&& fieldList=file->getEnabledFields() ;
1153        for(auto field : fieldList) field->setFileOut(file) ;
1154        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1155     }
1156     return fields ;
1157   }
1158   CATCH_DUMP_ATTR
1159
1160   vector<CField*> CContext::findAllEnabledFieldsInFileIn(const std::vector<CFile*>& activeFiles)
1161   TRY
1162   {
1163     vector<CField*> fields ;
1164     for(auto file : activeFiles)
1165     {
1166        const vector<CField*>&& fieldList=file->getEnabledFields() ;
1167        for(auto field : fieldList) field->setFileIn(file) ;
1168        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1169     }
1170     return fields ;
1171   }
1172   CATCH_DUMP_ATTR
1173
1174   vector<CField*> CContext::findAllEnabledFieldsCouplerOut(const std::vector<CCouplerOut*>& activeCouplerOut)
1175   TRY
1176   {
1177     vector<CField*> fields ;
1178     for (auto couplerOut :activeCouplerOut)
1179     {
1180        const vector<CField*>&& fieldList=couplerOut->getEnabledFields() ;
1181        for(auto field : fieldList) field->setCouplerOut(couplerOut) ;
1182        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1183     }
1184     return fields ;
1185   }
1186   CATCH_DUMP_ATTR
1187
1188   vector<CField*> CContext::findAllEnabledFieldsCouplerIn(const std::vector<CCouplerIn*>& activeCouplerIn)
1189   TRY
1190   {
1191     vector<CField*> fields ;
1192     for (auto couplerIn :activeCouplerIn)
1193     {
1194        const vector<CField*>&& fieldList=couplerIn->getEnabledFields() ;
1195        for(auto field : fieldList) field->setCouplerIn(couplerIn) ;
1196        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1197     }
1198     return fields ;
1199   }
1200   CATCH_DUMP_ATTR
1201
1202 /*!
1203  * Send context attribute and calendar to file server, it must be done once by context file server
1204  * \param[in] client : context client to send   
1205  */ 
1206  void CContext::sendContextToFileServer(CContextClient* client)
1207  {
1208    if (sendToFileServer_done_.count(client)!=0) return ;
1209    else sendToFileServer_done_.insert(client) ;
1210   
1211    this->sendAllAttributesToServer(client); // Send all attributes of current context to server
1212    CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(client); // Send all attributes of current cale
1213  }
1214
1215 
1216   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
1217   TRY
1218   {
1219      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
1220        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
1221   }
1222   CATCH_DUMP_ATTR
1223
1224
1225   void CContext::postProcessFilterGraph()
1226   TRY
1227   {
1228     int size = enabledFiles.size();
1229     for (int i = 0; i < size; ++i)
1230     {
1231        enabledFiles[i]->postProcessFilterGraph();
1232     }
1233   }
1234   CATCH_DUMP_ATTR
1235
1236   void CContext::startPrefetchingOfEnabledReadModeFiles()
1237   TRY
1238   {
1239     int size = enabledReadModeFiles.size();
1240     for (int i = 0; i < size; ++i)
1241     {
1242        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
1243     }
1244   }
1245   CATCH_DUMP_ATTR
1246
1247   void CContext::doPreTimestepOperationsForEnabledReadModeFiles()
1248   TRY
1249   {
1250     int size = enabledReadModeFiles.size();
1251     for (int i = 0; i < size; ++i)
1252     {
1253        enabledReadModeFiles[i]->doPreTimestepOperationsForEnabledReadModeFields();
1254     }
1255   }
1256   CATCH_DUMP_ATTR
1257
1258   void CContext::doPostTimestepOperationsForEnabledReadModeFiles()
1259   TRY
1260   {
1261     int size = enabledReadModeFiles.size();
1262     for (int i = 0; i < size; ++i)
1263     {
1264        enabledReadModeFiles[i]->doPostTimestepOperationsForEnabledReadModeFields();
1265     }
1266   }
1267   CATCH_DUMP_ATTR
1268
1269  void CContext::findFieldsWithReadAccess(void)
1270  TRY
1271  {
1272    fieldsWithReadAccess_.clear();
1273    const vector<CField*> allFields = CField::getAll();
1274    for (size_t i = 0; i < allFields.size(); ++i)
1275    {
1276      CField* field = allFields[i];
1277      if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
1278      {
1279        fieldsWithReadAccess_.push_back(field);
1280        field->setModelOut() ;
1281      }
1282    }
1283  }
1284  CATCH_DUMP_ATTR
1285
1286 
1287   void CContext::solveAllInheritance(bool apply)
1288   TRY
1289   {
1290     // Résolution des héritages descendants (càd des héritages de groupes)
1291     // pour chacun des contextes.
1292      solveDescInheritance(apply);
1293
1294     // Résolution des héritages par référence au niveau des fichiers.
1295      const vector<CFile*> allFiles=CFile::getAll();
1296      const vector<CCouplerIn*> allCouplerIn=CCouplerIn::getAll();
1297      const vector<CCouplerOut*> allCouplerOut=CCouplerOut::getAll();
1298      const vector<CGrid*> allGrids= CGrid::getAll();
1299
1300      if (serviceType_==CServicesManager::CLIENT)
1301      {
1302        for (unsigned int i = 0; i < allFiles.size(); i++)
1303          allFiles[i]->solveFieldRefInheritance(apply);
1304
1305        for (unsigned int i = 0; i < allCouplerIn.size(); i++)
1306          allCouplerIn[i]->solveFieldRefInheritance(apply);
1307
1308        for (unsigned int i = 0; i < allCouplerOut.size(); i++)
1309          allCouplerOut[i]->solveFieldRefInheritance(apply);
1310      }
1311
1312      unsigned int vecSize = allGrids.size();
1313      unsigned int i = 0;
1314      for (i = 0; i < vecSize; ++i)
1315        allGrids[i]->solveElementsRefInheritance(apply);
1316
1317   }
1318  CATCH_DUMP_ATTR
1319
1320   void CContext::findEnabledFiles(void)
1321   TRY
1322   {
1323      const std::vector<CFile*> allFiles = CFile::getAll();
1324      const CDate& initDate = calendar->getInitDate();
1325
1326      for (unsigned int i = 0; i < allFiles.size(); i++)
1327         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
1328         {
1329            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
1330            {
1331              if (allFiles[i]->output_freq.isEmpty())
1332              {
1333                 ERROR("CContext::findEnabledFiles()",
1334                     << "Mandatory attribute output_freq must be defined for file \""<<allFiles[i]->getFileOutputName()
1335                     <<" \".")
1336              }
1337              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
1338              {
1339                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
1340                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
1341                    <<"\" is less than the time step. File will not be written."<<endl;
1342              }
1343              else
1344               enabledFiles.push_back(allFiles[i]);
1345            }
1346            else // Si l'attribut 'enabled' est fixé à faux.
1347            {
1348              // disabled all fields contained in file (used in findFieldsWithReadAccess through field_ref dependencies)
1349              const vector<CField*>&& fieldList=allFiles[i]->getEnabledFields() ;
1350              for(auto field : fieldList) field->enabled.setValue(false);
1351            }
1352         }
1353         else
1354         {
1355           if (allFiles[i]->output_freq.isEmpty())
1356           {
1357              ERROR("CContext::findEnabledFiles()",
1358                  << "Mandatory attribute output_freq must be defined for file \""<<allFiles[i]->getFileOutputName()
1359                  <<" \".")
1360           }
1361           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
1362           {
1363             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
1364                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
1365                 <<"\" is less than the time step. File will not be written."<<endl;
1366           }
1367           else
1368             enabledFiles.push_back(allFiles[i]); // otherwise true by default
1369         }
1370
1371      if (enabledFiles.size() == 0)
1372         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
1373               << getId() << "\" !");
1374
1375   }
1376   CATCH_DUMP_ATTR
1377
1378   void CContext::findEnabledCouplerIn(void)
1379   TRY
1380   {
1381      const std::vector<CCouplerIn*> allCouplerIn = CCouplerIn::getAll();
1382      bool enabled ;
1383      for (size_t i = 0; i < allCouplerIn.size(); i++)
1384      {
1385        if (allCouplerIn[i]->enabled.isEmpty()) enabled=true ;
1386        else enabled=allCouplerIn[i]->enabled ;
1387        if (enabled) enabledCouplerIn.push_back(allCouplerIn[i]) ;
1388      }
1389   }
1390   CATCH_DUMP_ATTR
1391
1392   void CContext::findEnabledCouplerOut(void)
1393   TRY
1394   {
1395      const std::vector<CCouplerOut*> allCouplerOut = CCouplerOut::getAll();
1396      bool enabled ;
1397      for (size_t i = 0; i < allCouplerOut.size(); i++)
1398      {
1399        if (allCouplerOut[i]->enabled.isEmpty()) enabled=true ;
1400        else enabled=allCouplerOut[i]->enabled ;
1401        if (enabled) enabledCouplerOut.push_back(allCouplerOut[i]) ;
1402      }
1403   }
1404   CATCH_DUMP_ATTR
1405
1406
1407
1408
1409   void CContext::distributeFiles(const vector<CFile*>& files)
1410   TRY
1411   {
1412     bool distFileMemory=false ;
1413     distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory);
1414
1415     if (distFileMemory) distributeFileOverMemoryBandwith(files) ;
1416     else distributeFileOverBandwith(files) ;
1417   }
1418   CATCH_DUMP_ATTR
1419
1420   void CContext::distributeFileOverBandwith(const vector<CFile*>& files)
1421   TRY
1422   {
1423     double eps=std::numeric_limits<double>::epsilon()*10 ;
1424     
1425     std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out);
1426     int nbPools = clientPrimServer.size();
1427
1428     // (1) Find all enabled files in write mode
1429     // for (int i = 0; i < this->enabledFiles.size(); ++i)
1430     // {
1431     //   if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write ))
1432     //    enabledWriteModeFiles.push_back(enabledFiles[i]);
1433     // }
1434
1435     // (2) Estimate the data volume for each file
1436     int size = files.size();
1437     std::vector<std::pair<double, CFile*> > dataSizeMap;
1438     double dataPerPool = 0;
1439     int nfield=0 ;
1440     ofs<<size<<endl ;
1441     for (size_t i = 0; i < size; ++i)
1442     {
1443       CFile* file = files[i];
1444       ofs<<file->getId()<<endl ;
1445       StdSize dataSize=0;
1446       std::vector<CField*> enabledFields = file->getEnabledFields();
1447       size_t numEnabledFields = enabledFields.size();
1448       ofs<<numEnabledFields<<endl ;
1449       for (size_t j = 0; j < numEnabledFields; ++j)
1450       {
1451         dataSize += enabledFields[j]->getGlobalWrittenSize() ;
1452         ofs<<enabledFields[j]->getGrid()->getId()<<endl ;
1453         ofs<<enabledFields[j]->getGlobalWrittenSize()<<endl ;
1454       }
1455       double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ;
1456       double dataSizeSec= dataSize/ outFreqSec;
1457       ofs<<dataSizeSec<<endl ;
1458       nfield++ ;
1459// add epsilon*nField to dataSizeSec in order to  preserve reproductive ordering when sorting
1460       dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file));
1461       dataPerPool += dataSizeSec;
1462     }
1463     dataPerPool /= nbPools;
1464     std::sort(dataSizeMap.begin(), dataSizeMap.end());
1465
1466     // (3) Assign contextClient to each enabled file
1467
1468     std::multimap<double,int> poolDataSize ;
1469// multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11
1470
1471     int j;
1472     double dataSize ;
1473     for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ; 
1474             
1475     for (int i = dataSizeMap.size()-1; i >= 0; --i)
1476     {
1477       dataSize=(*poolDataSize.begin()).first ;
1478       j=(*poolDataSize.begin()).second ;
1479       dataSizeMap[i].second->setContextClient(clientPrimServer[j]);
1480       dataSize+=dataSizeMap[i].first;
1481       poolDataSize.erase(poolDataSize.begin()) ;
1482       poolDataSize.insert(std::pair<double,int>(dataSize,j)) ; 
1483     }
1484
1485     for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) info(30)<<"Load Balancing for servers (perfect=1) : "<<it->second<<" :  ratio "<<it->first*1./dataPerPool<<endl ;
1486   }
1487   CATCH_DUMP_ATTR
1488
1489   void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList)
1490   TRY
1491   {
1492     int nbPools = clientPrimServer.size();
1493     double ratio=0.5 ;
1494     ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio);
1495
1496     int nFiles = filesList.size();
1497     vector<SDistFile> files(nFiles);
1498     vector<SDistGrid> grids;
1499     map<string,int> gridMap ;
1500     string gridId; 
1501     int gridIndex=0 ;
1502
1503     for (size_t i = 0; i < nFiles; ++i)
1504     {
1505       StdSize dataSize=0;
1506       CFile* file = filesList[i];
1507       std::vector<CField*> enabledFields = file->getEnabledFields();
1508       size_t numEnabledFields = enabledFields.size();
1509
1510       files[i].id_=file->getId() ;
1511       files[i].nbGrids_=numEnabledFields;
1512       files[i].assignedGrid_ = new int[files[i].nbGrids_] ;
1513         
1514       for (size_t j = 0; j < numEnabledFields; ++j)
1515       {
1516         gridId=enabledFields[j]->getGrid()->getId() ;
1517         if (gridMap.find(gridId)==gridMap.end())
1518         {
1519            gridMap[gridId]=gridIndex  ;
1520            SDistGrid newGrid; 
1521            grids.push_back(newGrid) ;
1522            gridIndex++ ;
1523         }
1524         files[i].assignedGrid_[j]=gridMap[gridId] ;
1525         grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ;
1526         dataSize += enabledFields[j]->getGlobalWrittenSize() ; // usefull
1527       }
1528       double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ;
1529       files[i].bandwith_= dataSize/ outFreqSec ;
1530     }
1531
1532     double bandwith=0 ;
1533     double memory=0 ;
1534   
1535     for(int i=0; i<nFiles; i++)  bandwith+=files[i].bandwith_ ;
1536     for(int i=0; i<nFiles; i++)  files[i].bandwith_ = files[i].bandwith_/bandwith * ratio ;
1537
1538     for(int i=0; i<grids.size(); i++)  memory+=grids[i].size_ ;
1539     for(int i=0; i<grids.size(); i++)  grids[i].size_ = grids[i].size_ / memory * (1.0-ratio) ;
1540       
1541     distributeFileOverServer2(nbPools, grids.size(), &grids[0], nFiles, &files[0]) ;
1542
1543     vector<double> memorySize(nbPools,0.) ;
1544     vector< set<int> > serverGrids(nbPools) ;
1545     vector<double> bandwithSize(nbPools,0.) ;
1546       
1547     for (size_t i = 0; i < nFiles; ++i)
1548     {
1549       bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ;
1550       for(int j=0 ; j<files[i].nbGrids_;j++)
1551       {
1552         if (serverGrids[files[i].assignedServer_].find(files[i].assignedGrid_[j]) == serverGrids[files[i].assignedServer_].end())
1553         {
1554           memorySize[files[i].assignedServer_]+= grids[files[i].assignedGrid_[j]].size_ * memory / (1.0-ratio);
1555           serverGrids[files[i].assignedServer_].insert(files[i].assignedGrid_[j]) ;
1556         }
1557       }
1558       filesList[i]->setContextClient(clientPrimServer[files[i].assignedServer_]) ;
1559       delete [] files[i].assignedGrid_ ;
1560     }
1561
1562     for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<"   assigned file bandwith "<<bandwithSize[i]*86400.*4./1024/1024.<<" Mb / days"<<endl ;
1563     for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<"   assigned grid memory "<<memorySize[i]*100/1024./1024.<<" Mb"<<endl ;
1564
1565   }
1566   CATCH_DUMP_ATTR
1567
1568   /*!
1569      Find all files in write mode
1570   */
1571   void CContext::findEnabledWriteModeFiles(void)
1572   TRY
1573   {
1574     int size = this->enabledFiles.size();
1575     for (int i = 0; i < size; ++i)
1576     {
1577       if (enabledFiles[i]->mode.isEmpty() || 
1578          (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write ))
1579        enabledWriteModeFiles.push_back(enabledFiles[i]);
1580     }
1581   }
1582   CATCH_DUMP_ATTR
1583
1584   /*!
1585      Find all files in read mode
1586   */
1587   void CContext::findEnabledReadModeFiles(void)
1588   TRY
1589   {
1590     int size = this->enabledFiles.size();
1591     for (int i = 0; i < size; ++i)
1592     {
1593       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
1594        enabledReadModeFiles.push_back(enabledFiles[i]);
1595     }
1596   }
1597   CATCH_DUMP_ATTR
1598
1599   void CContext::closeAllFile(void)
1600   TRY
1601   {
1602     std::vector<CFile*>::const_iterator
1603            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
1604
1605     for (; it != end; it++)
1606     {
1607       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
1608       (*it)->close();
1609     }
1610   }
1611   CATCH_DUMP_ATTR
1612
1613   /*!
1614   \brief Dispatch event received from client
1615      Whenever a message is received in buffer of server, it will be processed depending on
1616   its event type. A new event type should be added in the switch list to make sure
1617   it processed on server side.
1618   \param [in] event: Received message
1619   */
1620   bool CContext::dispatchEvent(CEventServer& event)
1621   TRY
1622   {
1623
1624      if (SuperClass::dispatchEvent(event)) return true;
1625      else
1626      {
1627        switch(event.type)
1628        {
1629           case EVENT_ID_CLOSE_DEFINITION :
1630             recvCloseDefinition(event);
1631             return true;
1632             break;
1633           case EVENT_ID_UPDATE_CALENDAR:
1634             recvUpdateCalendar(event);
1635             return true;
1636             break;
1637           case EVENT_ID_CREATE_FILE_HEADER :
1638             recvCreateFileHeader(event);
1639             return true;
1640             break;
1641           case EVENT_ID_COUPLER_IN_READY:
1642             recvCouplerInReady(event);
1643             return true;
1644             break;
1645           case EVENT_ID_COUPLER_IN_CLOSE_DEFINITION:
1646             recvCouplerInCloseDefinition(event);
1647             return true;
1648             break;
1649           case EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED:
1650             recvCouplerInContextFinalized(event);
1651             return true;
1652             break; 
1653           default :
1654             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
1655                    <<"Unknown Event");
1656           return false;
1657         }
1658      }
1659   }
1660   CATCH
1661
1662   //! Client side: Send a message to server to make it close
1663   // ym obsolete
1664   void CContext::sendCloseDefinition(void)
1665   TRY
1666   {
1667    int nbSrvPools ;
1668    if (serviceType_==CServicesManager::CLIENT) nbSrvPools = 1 ;
1669    else if (serviceType_==CServicesManager::GATHERER) nbSrvPools = this->clientPrimServer.size() ;
1670    else nbSrvPools = 0 ;
1671    CContextClient* contextClientTmp ;
1672
1673    for (int i = 0; i < nbSrvPools; ++i)
1674     {
1675       if (serviceType_==CServicesManager::CLIENT) contextClientTmp = client ;
1676       else if (serviceType_==CServicesManager::GATHERER ) contextClientTmp = clientPrimServer[i] ;
1677       CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
1678       if (contextClientTmp->isServerLeader())
1679       {
1680         CMessage msg;
1681         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1682         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1683           event.push(*itRank,1,msg);
1684         contextClientTmp->sendEvent(event);
1685       }
1686       else contextClientTmp->sendEvent(event);
1687     }
1688   }
1689   CATCH_DUMP_ATTR
1690   
1691   //  ! Client side: Send a message to server to make it close
1692   void CContext::sendCloseDefinition(CContextClient* client)
1693   TRY
1694   {
1695      if (sendCloseDefinition_done_.count(client)!=0) return ;
1696      else sendCloseDefinition_done_.insert(client) ;
1697
1698      CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
1699      if (client->isServerLeader())
1700      {
1701        CMessage msg;
1702        for(auto rank : client->getRanksServerLeader()) event.push(rank,1,msg);
1703        client->sendEvent(event);
1704      }
1705     else client->sendEvent(event);
1706   }
1707   CATCH_DUMP_ATTR
1708
1709   //! Server side: Receive a message of client announcing a context close
1710   void CContext::recvCloseDefinition(CEventServer& event)
1711   TRY
1712   {
1713      CBufferIn* buffer=event.subEvents.begin()->buffer;
1714      getCurrent()->closeDefinition();
1715   }
1716   CATCH
1717
1718   //! Client side: Send a message to update calendar in each time step
1719   void CContext::sendUpdateCalendar(int step)
1720   TRY
1721   {
1722     for(auto client : slaveServers_) 
1723     {
1724       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
1725       if (client->isServerLeader())
1726       {
1727         CMessage msg;
1728         msg<<step;
1729         for (auto& rank : client->getRanksServerLeader() ) event.push(rank,1,msg);
1730         client->sendEvent(event);
1731       }
1732       else client->sendEvent(event);
1733     }
1734   }
1735   CATCH_DUMP_ATTR
1736
1737   //! Server side: Receive a message of client annoucing calendar update
1738   void CContext::recvUpdateCalendar(CEventServer& event)
1739   TRY
1740   {
1741      CBufferIn* buffer=event.subEvents.begin()->buffer;
1742      getCurrent()->recvUpdateCalendar(*buffer);
1743   }
1744   CATCH
1745
1746   //! Server side: Receive a message of client annoucing calendar update
1747   void CContext::recvUpdateCalendar(CBufferIn& buffer)
1748   TRY
1749   {
1750      int step;
1751      buffer>>step;
1752      updateCalendar(step);
1753      if (serviceType_==CServicesManager::GATHERER)
1754      {       
1755        sendUpdateCalendar(step);
1756      }
1757   }
1758   CATCH_DUMP_ATTR
1759
1760   //! Client side: Send a message to create header part of netcdf file
1761   void CContext::sendCreateFileHeader(void)
1762   TRY
1763   {
1764     int nbSrvPools ;
1765     if (serviceType_==CServicesManager::CLIENT) nbSrvPools = 1 ;
1766     else if (serviceType_==CServicesManager::GATHERER) nbSrvPools = this->clientPrimServer.size() ;
1767     else nbSrvPools = 0 ;
1768     CContextClient* contextClientTmp ;
1769
1770     for (int i = 0; i < nbSrvPools; ++i)
1771     {
1772       if (serviceType_==CServicesManager::CLIENT) contextClientTmp = client ;
1773       else if (serviceType_==CServicesManager::GATHERER ) contextClientTmp = clientPrimServer[i] ;
1774       CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
1775
1776       if (contextClientTmp->isServerLeader())
1777       {
1778         CMessage msg;
1779         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1780         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1781           event.push(*itRank,1,msg) ;
1782         contextClientTmp->sendEvent(event);
1783       }
1784       else contextClientTmp->sendEvent(event);
1785     }
1786   }
1787   CATCH_DUMP_ATTR
1788
1789   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
1790   void CContext::recvCreateFileHeader(CEventServer& event)
1791   TRY
1792   {
1793      CBufferIn* buffer=event.subEvents.begin()->buffer;
1794      getCurrent()->recvCreateFileHeader(*buffer);
1795   }
1796   CATCH
1797
1798   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
1799   void CContext::recvCreateFileHeader(CBufferIn& buffer)
1800   TRY
1801   {
1802      if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 
1803        createFileHeader();
1804   }
1805   CATCH_DUMP_ATTR
1806
1807   void CContext::createCouplerInterCommunicator(void)
1808   TRY
1809   {
1810      int rank=this->getIntraCommRank() ;
1811      map<string,list<CCouplerOut*>> listCouplerOut ; 
1812      map<string,list<CCouplerIn*>> listCouplerIn ; 
1813
1814      for(auto couplerOut : enabledCouplerOut) listCouplerOut[couplerOut->getCouplingContextId()].push_back(couplerOut) ;
1815      for(auto couplerIn : enabledCouplerIn) listCouplerIn[couplerIn->getCouplingContextId()].push_back(couplerIn) ;
1816
1817      CCouplerManager* couplerManager = CXios::getCouplerManager() ;
1818      if (rank==0)
1819      {
1820        for(auto couplerOut : listCouplerOut) couplerManager->registerCoupling(this->getContextId(),couplerOut.first) ;
1821        for(auto couplerIn : listCouplerIn) couplerManager->registerCoupling(couplerIn.first,this->getContextId()) ;
1822      }
1823
1824      do
1825      {
1826        for(auto couplerOut : listCouplerOut) 
1827        {
1828          bool isNextCoupling ;
1829          if (rank==0) isNextCoupling = couplerManager->isNextCoupling(this->getContextId(),couplerOut.first) ;
1830          MPI_Bcast(&isNextCoupling,1,MPI_C_BOOL, 0, getIntraComm()) ; 
1831          if (isNextCoupling) 
1832          {
1833            addCouplingChanel(couplerOut.first, true) ;
1834            listCouplerOut.erase(couplerOut.first) ;
1835            break ;
1836          }           
1837        }
1838        for(auto couplerIn : listCouplerIn) 
1839        {
1840          bool isNextCoupling ;
1841          if (rank==0) isNextCoupling = couplerManager->isNextCoupling(couplerIn.first,this->getContextId());
1842          MPI_Bcast(&isNextCoupling,1,MPI_C_BOOL, 0, getIntraComm()) ; 
1843          if (isNextCoupling) 
1844          {
1845            addCouplingChanel(couplerIn.first, false) ;
1846            listCouplerIn.erase(couplerIn.first) ;
1847            break ;
1848          }           
1849        }
1850
1851      } while (!listCouplerOut.empty() || !listCouplerIn.empty()) ;
1852
1853   }
1854   CATCH_DUMP_ATTR
1855
1856 
1857     //! Client side: Send infomation of active files (files are enabled to write out)
1858   void CContext::sendEnabledFiles(const std::vector<CFile*>& activeFiles)
1859   TRY
1860   {
1861     int size = activeFiles.size();
1862
1863     // In a context, each type has a root definition, e.g: axis, domain, field.
1864     // Every object must be a child of one of these root definition. In this case
1865     // all new file objects created on server must be children of the root "file_definition"
1866     StdString fileDefRoot("file_definition");
1867     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
1868
1869     for (int i = 0; i < size; ++i)
1870     {
1871       CFile* f = activeFiles[i];
1872       cfgrpPtr->sendCreateChild(f->getId(),f->getContextClient());
1873       f->sendAllAttributesToServer(f->getContextClient());
1874       f->sendAddAllVariables(f->getContextClient());
1875     }
1876   }
1877   CATCH_DUMP_ATTR
1878
1879   //! Client side: Send information of active fields (ones are written onto files)
1880   void CContext::sendEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)
1881   TRY
1882   {
1883     int size = activeFiles.size();
1884     for (int i = 0; i < size; ++i)
1885     {
1886       activeFiles[i]->sendEnabledFields(activeFiles[i]->getContextClient());
1887     }
1888   }
1889   CATCH_DUMP_ATTR
1890
1891 
1892   //! Client side: Prepare the timeseries by adding the necessary files
1893   void CContext::prepareTimeseries()
1894   TRY
1895   {
1896     const std::vector<CFile*> allFiles = CFile::getAll();
1897     for (size_t i = 0; i < allFiles.size(); i++)
1898     {
1899       CFile* file = allFiles[i];
1900
1901       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1902       for (size_t k = 0; k < vars.size(); k++)
1903       {
1904         CVariable* var = vars[k];
1905
1906         if (var->ts_target.isEmpty()
1907              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1908           fileVars.push_back(var);
1909
1910         if (!var->ts_target.isEmpty()
1911              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1912           fieldVars.push_back(var);
1913       }
1914
1915       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1916       {
1917         StdString fileNameStr("%file_name%") ;
1918         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1919         
1920         StdString fileName=file->getFileOutputName();
1921         size_t pos=tsPrefix.find(fileNameStr) ;
1922         while (pos!=std::string::npos)
1923         {
1924           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1925           pos=tsPrefix.find(fileNameStr) ;
1926         }
1927       
1928         const std::vector<CField*> allFields = file->getAllFields();
1929         for (size_t j = 0; j < allFields.size(); j++)
1930         {
1931           CField* field = allFields[j];
1932
1933           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1934           {
1935             CFile* tsFile = CFile::create();
1936             tsFile->duplicateAttributes(file);
1937
1938             // Add variables originating from file and targeted to timeserie file
1939             for (size_t k = 0; k < fileVars.size(); k++)
1940               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1941
1942           
1943             tsFile->name = tsPrefix + "_";
1944             if (!field->name.isEmpty())
1945               tsFile->name.get() += field->name;
1946             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1947               tsFile->name.get() += field->field_ref;
1948             else
1949               tsFile->name.get() += field->getId();
1950
1951             if (!field->ts_split_freq.isEmpty())
1952               tsFile->split_freq = field->ts_split_freq;
1953
1954             CField* tsField = tsFile->addField();
1955             tsField->field_ref = field->getId();
1956
1957             // Add variables originating from file and targeted to timeserie field
1958             for (size_t k = 0; k < fieldVars.size(); k++)
1959               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1960
1961             vars = field->getAllVariables();
1962             for (size_t k = 0; k < vars.size(); k++)
1963             {
1964               CVariable* var = vars[k];
1965
1966               // Add variables originating from field and targeted to timeserie field
1967               if (var->ts_target.isEmpty()
1968                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1969                 tsField->getVirtualVariableGroup()->addChild(var);
1970
1971               // Add variables originating from field and targeted to timeserie file
1972               if (!var->ts_target.isEmpty()
1973                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1974                 tsFile->getVirtualVariableGroup()->addChild(var);
1975             }
1976
1977             tsFile->solveFieldRefInheritance(true);
1978
1979             if (file->timeseries == CFile::timeseries_attr::exclusive)
1980               field->enabled = false;
1981           }
1982         }
1983
1984         // Finally disable the original file is need be
1985         if (file->timeseries == CFile::timeseries_attr::only)
1986          file->enabled = false;
1987       }
1988     }
1989   }
1990   CATCH_DUMP_ATTR
1991
1992 
1993   //! Client side: Send information of reference domain, axis and scalar of active fields
1994   void CContext::sendRefDomainsAxisScalars(const std::vector<CFile*>& activeFiles)
1995   TRY
1996   {
1997     std::set<pair<StdString,CContextClient*>> domainIds, axisIds, scalarIds;
1998
1999     // Find all reference domain and axis of all active fields
2000     int numEnabledFiles = activeFiles.size();
2001     for (int i = 0; i < numEnabledFiles; ++i)
2002     {
2003       std::vector<CField*> enabledFields = activeFiles[i]->getEnabledFields();
2004       int numEnabledFields = enabledFields.size();
2005       for (int j = 0; j < numEnabledFields; ++j)
2006       {
2007         CContextClient* contextClient=enabledFields[j]->getContextClient() ;
2008         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
2009         if ("" != prDomAxisScalarId[0]) domainIds.insert(make_pair(prDomAxisScalarId[0],contextClient));
2010         if ("" != prDomAxisScalarId[1]) axisIds.insert(make_pair(prDomAxisScalarId[1],contextClient));
2011         if ("" != prDomAxisScalarId[2]) scalarIds.insert(make_pair(prDomAxisScalarId[2],contextClient));
2012       }
2013     }
2014
2015     // Create all reference axis on server side
2016     std::set<StdString>::iterator itDom, itAxis, itScalar;
2017     std::set<StdString>::const_iterator itE;
2018
2019     StdString scalarDefRoot("scalar_definition");
2020     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
2021     
2022     for (auto itScalar = scalarIds.begin(); itScalar != scalarIds.end(); ++itScalar)
2023     {
2024       if (!itScalar->first.empty())
2025       {
2026         scalarPtr->sendCreateChild(itScalar->first,itScalar->second);
2027         CScalar::get(itScalar->first)->sendAllAttributesToServer(itScalar->second);
2028       }
2029     }
2030
2031     StdString axiDefRoot("axis_definition");
2032     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
2033     
2034     for (auto itAxis = axisIds.begin(); itAxis != axisIds.end(); ++itAxis)
2035     {
2036       if (!itAxis->first.empty())
2037       {
2038         axisPtr->sendCreateChild(itAxis->first, itAxis->second);
2039         CAxis::get(itAxis->first)->sendAllAttributesToServer(itAxis->second);
2040       }
2041     }
2042
2043     // Create all reference domains on server side
2044     StdString domDefRoot("domain_definition");
2045     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
2046     
2047     for (auto itDom = domainIds.begin(); itDom != domainIds.end(); ++itDom)
2048     {
2049       if (!itDom->first.empty()) {
2050          domPtr->sendCreateChild(itDom->first, itDom->second);
2051          CDomain::get(itDom->first)->sendAllAttributesToServer(itDom->second);
2052       }
2053     }
2054   }
2055   CATCH_DUMP_ATTR
2056
2057   void CContext::triggerLateFields(void)
2058   TRY
2059   {
2060    for(auto& field : fileInFields_) field->triggerLateField() ;
2061    for(auto& field : couplerInFields_) field->triggerLateField() ;
2062   }
2063   CATCH_DUMP_ATTR
2064
2065   //! Update calendar in each time step
2066   void CContext::updateCalendar(int step)
2067   TRY
2068   {
2069      int prevStep = calendar->getStep();
2070
2071      if (prevStep < step)
2072      {
2073        if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data
2074        {
2075          triggerLateFields();
2076        }
2077
2078        info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
2079        calendar->update(step);
2080        info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
2081  #ifdef XIOS_MEMTRACK_LIGHT
2082        info(50) << " Current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<<step<<" of context "<<this->getId()<<endl ;
2083  #endif
2084
2085        if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data
2086        {
2087          doPostTimestepOperationsForEnabledReadModeFiles();
2088          garbageCollector.invalidate(calendar->getCurrentDate());
2089        }
2090      }
2091      else if (prevStep == step)
2092        info(50) << "updateCalendar: already at step " << step << ", no operation done." << endl;
2093      else // if (prevStep > step)
2094        ERROR("void CContext::updateCalendar(int step)",
2095              << "Illegal calendar update: previous step was " << prevStep << ", new step " << step << "is in the past!")
2096   }
2097   CATCH_DUMP_ATTR
2098
2099   void CContext::initReadFiles(void)
2100   TRY
2101   {
2102      vector<CFile*>::const_iterator it;
2103
2104      for (it=enabledReadModeFiles.begin(); it != enabledReadModeFiles.end(); it++)
2105      {
2106         (*it)->initRead();
2107      }
2108   }
2109   CATCH_DUMP_ATTR
2110
2111   //! Server side: Create header of netcdf file
2112   void CContext::createFileHeader(void)
2113   TRY
2114   {
2115      vector<CFile*>::const_iterator it;
2116
2117      //for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
2118      for (it=enabledWriteModeFiles.begin(); it != enabledWriteModeFiles.end(); it++)
2119      {
2120         (*it)->initWrite();
2121      }
2122   }
2123   CATCH_DUMP_ATTR
2124
2125   //! Get current context
2126   CContext* CContext::getCurrent(void)
2127   TRY
2128   {
2129     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
2130   }
2131   CATCH
2132
2133   /*!
2134   \brief Set context with an id be the current context
2135   \param [in] id identity of context to be set to current
2136   */
2137   void CContext::setCurrent(const string& id)
2138   TRY
2139   {
2140     CObjectFactory::SetCurrentContextId(id);
2141     CGroupFactory::SetCurrentContextId(id);
2142   }
2143   CATCH
2144
2145  /*!
2146  \brief Create a context with specific id
2147  \param [in] id identity of new context
2148  \return pointer to the new context or already-existed one with identity id
2149  */
2150  CContext* CContext::create(const StdString& id)
2151  TRY
2152  {
2153    CContext::setCurrent(id);
2154
2155    bool hasctxt = CContext::has(id);
2156    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
2157    getRoot();
2158    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
2159
2160#define DECLARE_NODE(Name_, name_) \
2161    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
2162#define DECLARE_NODE_PAR(Name_, name_)
2163#include "node_type.conf"
2164
2165    return (context);
2166  }
2167  CATCH
2168
2169 
2170  void CContext::sendFinalizeClient(CContextClient* contextClient, const string& contextClientId)
2171  TRY
2172  {
2173    CEventClient event(getType(),EVENT_ID_CONTEXT_FINALIZE_CLIENT);
2174    if (contextClient->isServerLeader())
2175    {
2176      CMessage msg;
2177      msg<<contextClientId ;
2178      const std::list<int>& ranks = contextClient->getRanksServerLeader();
2179      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
2180           event.push(*itRank,1,msg);
2181      contextClient->sendEvent(event);
2182    }
2183    else contextClient->sendEvent(event);
2184  }
2185  CATCH_DUMP_ATTR
2186
2187 
2188  void CContext::recvFinalizeClient(CEventServer& event)
2189  TRY
2190  {
2191    CBufferIn* buffer=event.subEvents.begin()->buffer;
2192    string id;
2193    *buffer>>id;
2194    get(id)->recvFinalizeClient(*buffer);
2195  }
2196  CATCH
2197
2198  void CContext::recvFinalizeClient(CBufferIn& buffer)
2199  TRY
2200  {
2201    countChildContextFinalized_++ ;
2202  }
2203  CATCH_DUMP_ATTR
2204
2205
2206
2207
2208 //! Client side: Send a message  announcing that context can receive grid definition from coupling
2209   void CContext::sendCouplerInReady(CContextClient* client)
2210   TRY
2211   {
2212      if (sendCouplerInReady_done_.count(client)!=0) return ;
2213      else sendCouplerInReady_done_.insert(client) ;
2214
2215      CEventClient event(getType(),EVENT_ID_COUPLER_IN_READY);
2216
2217      if (client->isServerLeader())
2218      {
2219        CMessage msg;
2220        msg<<this->getId();
2221        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2222        client->sendEvent(event);
2223      }
2224      else client->sendEvent(event);
2225   }
2226   CATCH_DUMP_ATTR
2227
2228   //! Server side: Receive a message announcing that context can send grid definition for context coupling
2229   void CContext::recvCouplerInReady(CEventServer& event)
2230   TRY
2231   {
2232      CBufferIn* buffer=event.subEvents.begin()->buffer;
2233      getCurrent()->recvCouplerInReady(*buffer);
2234   }
2235   CATCH
2236
2237   //! Server side: Receive a message announcing that context can send grid definition for context coupling
2238   void CContext::recvCouplerInReady(CBufferIn& buffer)
2239   TRY
2240   {
2241      string contextId ;
2242      buffer>>contextId;
2243      couplerInReady_.insert(getCouplerOutClient(contextId)) ;
2244   }
2245   CATCH_DUMP_ATTR
2246
2247
2248
2249
2250
2251 //! Client side: Send a message  announcing that a coupling context have done it closeDefinition, so data can be sent now.
2252   void CContext::sendCouplerInCloseDefinition(CContextClient* client)
2253   TRY
2254   {
2255      if (sendCouplerInCloseDefinition_done_.count(client)!=0) return ;
2256      else sendCouplerInCloseDefinition_done_.insert(client) ;
2257
2258      CEventClient event(getType(),EVENT_ID_COUPLER_IN_CLOSE_DEFINITION);
2259
2260      if (client->isServerLeader())
2261      {
2262        CMessage msg;
2263        msg<<this->getId();
2264        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2265        client->sendEvent(event);
2266      }
2267      else client->sendEvent(event);
2268   }
2269   CATCH_DUMP_ATTR
2270
2271   //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now.
2272   void CContext::recvCouplerInCloseDefinition(CEventServer& event)
2273   TRY
2274   {
2275      CBufferIn* buffer=event.subEvents.begin()->buffer;
2276      getCurrent()->recvCouplerInCloseDefinition(*buffer);
2277   }
2278   CATCH
2279
2280   //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now.
2281   void CContext::recvCouplerInCloseDefinition(CBufferIn& buffer)
2282   TRY
2283   {
2284      string contextId ;
2285      buffer>>contextId;
2286      couplerInCloseDefinition_.insert(getCouplerOutClient(contextId)) ;
2287   }
2288   CATCH_DUMP_ATTR
2289
2290
2291
2292
2293//! Client side: Send a message  announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2294   void CContext::sendCouplerInContextFinalized(CContextClient* client)
2295   TRY
2296   {
2297      if (sendCouplerInContextFinalized_done_.count(client)!=0) return ;
2298      else sendCouplerInContextFinalized_done_.insert(client) ;
2299
2300      CEventClient event(getType(),EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED);
2301
2302      if (client->isServerLeader())
2303      {
2304        CMessage msg;
2305        msg<<this->getId();
2306        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2307        client->sendEvent(event);
2308      }
2309      else client->sendEvent(event);
2310   }
2311   CATCH_DUMP_ATTR
2312
2313   //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2314   void CContext::recvCouplerInContextFinalized(CEventServer& event)
2315   TRY
2316   {
2317      CBufferIn* buffer=event.subEvents.begin()->buffer;
2318      getCurrent()->recvCouplerInContextFinalized(*buffer);
2319   }
2320   CATCH
2321
2322   //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2323   void CContext::recvCouplerInContextFinalized(CBufferIn& buffer)
2324   TRY
2325   {
2326      string contextId ;
2327      buffer>>contextId;
2328      couplerInContextFinalized_.insert(getCouplerOutClient(contextId)) ;
2329   }
2330   CATCH_DUMP_ATTR
2331
2332
2333
2334
2335  /*!
2336  * \fn bool CContext::isFinalized(void)
2337  * Context is finalized if it received context post finalize event.
2338  */
2339  bool CContext::isFinalized(void)
2340  TRY
2341  {
2342    return finalized;
2343  }
2344  CATCH_DUMP_ATTR
2345  ///--------------------------------------------------------------
2346  StdString CContext::dumpClassAttributes(void)
2347  {
2348    StdString str;
2349    str.append("enabled files=\"");
2350    int size = this->enabledFiles.size();
2351    for (int i = 0; i < size; ++i)
2352    {
2353      str.append(enabledFiles[i]->getId());
2354      str.append(" ");
2355    }
2356    str.append("\"");
2357    return str;
2358  }
2359
2360} // namespace xios
Note: See TracBrowser for help on using the repository browser.