source: XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp @ 2258

Last change on this file since 2258 was 2258, checked in by ymipsl, 3 years ago

One sided protocol improvment.
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 79.2 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16#include "timer.hpp"
17#include "memtrack.hpp"
18#include <limits>
19#include <fstream>
20#include "server.hpp"
21#include "distribute_file_server2.hpp"
22#include "services_manager.hpp"
23#include "contexts_manager.hpp"
24#include "cxios.hpp"
25#include "client.hpp"
26#include "coupler_in.hpp"
27#include "coupler_out.hpp"
28#include "servers_ressource.hpp"
29#include "pool_ressource.hpp"
30#include "services.hpp"
31#include "contexts_manager.hpp"
32#include <chrono>
33#include <random>
34
35namespace xios
36{
37
38  std::shared_ptr<CContextGroup> CContext::root;
39
40   /// ////////////////////// Définitions ////////////////////// ///
41
42   CContext::CContext(void)
43      : CObjectTemplate<CContext>(), CContextAttributes()
44      , calendar(), hasClient(false), hasServer(false)
45      , isPostProcessed(false), finalized(false)
46      , client(nullptr), server(nullptr)
47      , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false)
48
49   { /* Ne rien faire de plus */ }
50
51   CContext::CContext(const StdString & id)
52      : CObjectTemplate<CContext>(id), CContextAttributes()
53      , calendar(), hasClient(false), hasServer(false)
54      , isPostProcessed(false), finalized(false)
55      , client(nullptr), server(nullptr)
56      , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false)
57   { /* Ne rien faire de plus */ }
58
59   CContext::~CContext(void)
60   {
61     delete client;
62     delete server;
63     for (std::vector<CContextClient*>::iterator it = clientPrimServer.begin(); it != clientPrimServer.end(); it++)  delete *it;
64     for (std::vector<CContextServer*>::iterator it = serverPrimServer.begin(); it != serverPrimServer.end(); it++)  delete *it;
65
66   }
67
68   //----------------------------------------------------------------
69   //! Get name of context
70   StdString CContext::GetName(void)   { return (StdString("context")); }
71   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
72   ENodeType CContext::GetType(void)   { return (eContext); }
73
74   //----------------------------------------------------------------
75
76  void CContext::initEventScheduler(void)
77  {
78    SRegisterContextInfo contextInfo ;
79    CXios::getContextsManager()->getContextInfo(this->getId(), contextInfo, getIntraComm()) ;
80
81    eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
82 
83    // generate unique hash for server
84    auto time=chrono::system_clock::now().time_since_epoch().count() ;
85    std::default_random_engine rd(time); // not reproducible from a run to another
86    std::uniform_int_distribution<size_t> dist;
87    hashId_=dist(rd) ;
88    MPI_Bcast(&hashId_,1,MPI_SIZE_T,0,getIntraComm()) ; // Bcast to all server of the context
89  }
90   /*!
91   \brief Get context group (context root)
92   \return Context root
93   */
94   CContextGroup* CContext::getRoot(void)
95   TRY
96   {
97      if (root.get()==NULL) root=std::shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
98      return root.get();
99   }
100   CATCH
101
102   //----------------------------------------------------------------
103
104   /*!
105   \brief Get calendar of a context
106   \return Calendar
107   */
108   std::shared_ptr<CCalendar> CContext::getCalendar(void) const
109   TRY
110   {
111      return (this->calendar);
112   }
113   CATCH
114
115   //----------------------------------------------------------------
116
117   /*!
118   \brief Set a context with a calendar
119   \param[in] newCalendar new calendar
120   */
121   void CContext::setCalendar(std::shared_ptr<CCalendar> newCalendar)
122   TRY
123   {
124      this->calendar = newCalendar;
125   }
126   CATCH_DUMP_ATTR
127
128   //----------------------------------------------------------------
129   /*!
130   \brief Parse xml file and write information into context object
131   \param [in] node xmld node corresponding in xml file
132   */
133   void CContext::parse(xml::CXMLNode & node)
134   TRY
135   {
136      CContext::SuperClass::parse(node);
137
138      // PARSING POUR GESTION DES ENFANTS
139      xml::THashAttributes attributes = node.getAttributes();
140
141      if (attributes.end() != attributes.find("src"))
142      {
143         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
144         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
145            ERROR("void CContext::parse(xml::CXMLNode & node)",
146                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
147         if (!ifs.good())
148            ERROR("CContext::parse(xml::CXMLNode & node)",
149                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
150         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
151      }
152
153      if (node.getElementName().compare(CContext::GetName()))
154         DEBUG("Le noeud is wrong defined but will be considered as a context !");
155
156      if (!(node.goToChildElement()))
157      {
158         DEBUG("Le context ne contient pas d'enfant !");
159      }
160      else
161      {
162         do { // Parcours des contextes pour traitement.
163
164            StdString name = node.getElementName();
165            attributes.clear();
166            attributes = node.getAttributes();
167
168            if (attributes.end() != attributes.find("id"))
169            {
170              DEBUG(<< "Definition node has an id,"
171                    << "it will not be taking account !");
172            }
173
174#define DECLARE_NODE(Name_, name_)    \
175   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
176   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
177#define DECLARE_NODE_PAR(Name_, name_)
178#include "node_type.conf"
179
180            DEBUG(<< "The element \'"     << name
181                  << "\' in the context \'" << CContext::getCurrent()->getId()
182                  << "\' is not a definition !");
183
184         } while (node.goToNextElement());
185
186         node.goToParentElement(); // Retour au parent
187      }
188   }
189   CATCH_DUMP_ATTR
190
191   //----------------------------------------------------------------
192   //! Show tree structure of context
193   void CContext::ShowTree(StdOStream & out)
194   TRY
195   {
196      StdString currentContextId = CContext::getCurrent() -> getId();
197      std::vector<CContext*> def_vector =
198         CContext::getRoot()->getChildList();
199      std::vector<CContext*>::iterator
200         it = def_vector.begin(), end = def_vector.end();
201
202      out << "<? xml version=\"1.0\" ?>" << std::endl;
203      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
204
205      for (; it != end; it++)
206      {
207         CContext* context = *it;
208         CContext::setCurrent(context->getId());
209         out << *context << std::endl;
210      }
211
212      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
213      CContext::setCurrent(currentContextId);
214   }
215   CATCH
216
217   //----------------------------------------------------------------
218
219   //! Convert context object into string (to print)
220   StdString CContext::toString(void) const
221   TRY
222   {
223      StdOStringStream oss;
224      oss << "<" << CContext::GetName()
225          << " id=\"" << this->getId() << "\" "
226          << SuperClassAttribute::toString() << ">" << std::endl;
227      if (!this->hasChild())
228      {
229         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation
230      }
231      else
232      {
233
234#define DECLARE_NODE(Name_, name_)    \
235   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
236   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
237#define DECLARE_NODE_PAR(Name_, name_)
238#include "node_type.conf"
239
240      }
241      oss << "</" << CContext::GetName() << " >";
242      return (oss.str());
243   }
244   CATCH
245
246   //----------------------------------------------------------------
247
248   /*!
249   \brief Find all inheritace among objects in a context.
250   \param [in] apply (true) write attributes of parent into ones of child if they are empty
251                     (false) write attributes of parent into a new container of child
252   \param [in] parent unused
253   */
254   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
255   TRY
256   {
257#define DECLARE_NODE(Name_, name_)    \
258   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
259     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
260#define DECLARE_NODE_PAR(Name_, name_)
261#include "node_type.conf"
262   }
263   CATCH_DUMP_ATTR
264
265   //----------------------------------------------------------------
266
267   //! Verify if all root definition in the context have child.
268   bool CContext::hasChild(void) const
269   TRY
270   {
271      return (
272#define DECLARE_NODE(Name_, name_)    \
273   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
274#define DECLARE_NODE_PAR(Name_, name_)
275#include "node_type.conf"
276      false);
277}
278   CATCH
279
280   //----------------------------------------------------------------
281
282   void CContext::CleanTree(void)
283   TRY
284   {
285#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
286#define DECLARE_NODE_PAR(Name_, name_)
287#include "node_type.conf"
288   }
289   CATCH
290
291   ///---------------------------------------------------------------
292
293
294 /*!
295    * Compute the required buffer size to send the fields data.
296    * \param maxEventSize [in/out] the size of the bigger event for each connected server
297    * \param [in] contextClient
298    * \param [in] bufferForWriting True if buffers are used for sending data for writing
299      This flag is only true for client and server-1 for communication with server-2
300    */
301   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize,
302                                                      CContextClient* contextClient, bool bufferForWriting /*= "false"*/)
303   TRY
304   {
305     std::map<int, StdSize> dataSize;
306
307     // Find all reference domain and axis of all active fields
308     std::vector<CFile*>& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles;
309     size_t numEnabledFiles = fileList.size();
310     for (size_t i = 0; i < numEnabledFiles; ++i)
311     {
312       CFile* file = fileList[i];
313       if (file->getContextClient() == contextClient)
314       {
315         std::vector<CField*> enabledFields = file->getEnabledFields();
316         size_t numEnabledFields = enabledFields.size();
317         for (size_t j = 0; j < numEnabledFields; ++j)
318         {
319           // const std::vector<std::map<int, StdSize> > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient);
320           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting);
321           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
322           for (; it != itE; ++it)
323           {
324             // If dataSize[it->first] does not exist, it will be zero-initialized
325             // so we can use it safely without checking for its existance
326           if (CXios::isOptPerformance)
327               dataSize[it->first] += it->second;
328             else if (dataSize[it->first] < it->second)
329               dataSize[it->first] = it->second;
330
331           if (maxEventSize[it->first] < it->second)
332               maxEventSize[it->first] = it->second;
333           }
334         }
335       }
336     }
337     return dataSize;
338   }
339   CATCH_DUMP_ATTR
340
341/*!
342    * Compute the required buffer size to send the attributes (mostly those grid related).
343    * \param maxEventSize [in/out] the size of the bigger event for each connected server
344    * \param [in] contextClient
345    * \param [in] bufferForWriting True if buffers are used for sending data for writing
346      This flag is only true for client and server-1 for communication with server-2
347    */
348   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize,
349                                                           CContextClient* contextClient, bool bufferForWriting /*= "false"*/)
350   TRY
351   {
352   // As calendar attributes are sent even if there are no active files or fields, maps are initialized according the size of calendar attributes
353     std::map<int, StdSize> attributesSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient);
354     maxEventSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient);
355
356     std::vector<CFile*>& fileList = this->enabledFiles;
357     size_t numEnabledFiles = fileList.size();
358     for (size_t i = 0; i < numEnabledFiles; ++i)
359     {
360//         CFile* file = this->enabledWriteModeFiles[i];
361        CFile* file = fileList[i];
362        std::vector<CField*> enabledFields = file->getEnabledFields();
363        size_t numEnabledFields = enabledFields.size();
364        for (size_t j = 0; j < numEnabledFields; ++j)
365        {
366          const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting);
367          std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
368          for (; it != itE; ++it)
369          {
370         // If attributesSize[it->first] does not exist, it will be zero-initialized
371         // so we can use it safely without checking for its existence
372             if (attributesSize[it->first] < it->second)
373         attributesSize[it->first] = it->second;
374
375         if (maxEventSize[it->first] < it->second)
376         maxEventSize[it->first] = it->second;
377          }
378        }
379     }
380     return attributesSize;
381   }
382   CATCH_DUMP_ATTR
383
384
385
386   //! Verify whether a context is initialized
387   bool CContext::isInitialized(void)
388   TRY
389   {
390     return hasClient;
391   }
392   CATCH_DUMP_ATTR
393
394
395   void CContext::init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType)
396   TRY
397   {
398     parentServerContext_ = parentServerContext ;
399     if (serviceType==CServicesManager::CLIENT) 
400       initClient(intraComm, serviceType) ;
401     else
402       initServer(intraComm, serviceType) ;
403     initEventScheduler() ;
404    }
405    CATCH_DUMP_ATTR
406
407
408
409//! Initialize client side
410   void CContext::initClient(MPI_Comm intraComm, int serviceType)
411   TRY
412   {
413      intraComm_=intraComm ;
414      MPI_Comm_rank(intraComm_, &intraCommRank_) ;
415      MPI_Comm_size(intraComm_, &intraCommSize_) ;
416
417      serviceType_ = CServicesManager::CLIENT ;
418      if (serviceType_==CServicesManager::CLIENT)
419      {
420        hasClient=true ;
421        hasServer=false ;
422      }
423      contextId_ = getId() ;
424     
425      attached_mode=true ;
426      if (!CXios::isUsingServer()) attached_mode=false ;
427
428
429      string contextRegistryId=getId() ;
430      registryIn=new CRegistry(CXios::getRegistryManager()->getRegistryIn());
431      registryIn->setPath(contextRegistryId) ;
432     
433      registryOut=new CRegistry(intraComm_) ;
434      registryOut->setPath(contextRegistryId) ;
435     
436   }
437   CATCH_DUMP_ATTR
438
439   
440   void CContext::initServer(MPI_Comm intraComm, int serviceType)
441   TRY
442   {
443     hasServer=true;
444     intraComm_=intraComm ;
445     MPI_Comm_rank(intraComm_, &intraCommRank_) ;
446     MPI_Comm_size(intraComm_, &intraCommSize_) ;
447
448     serviceType_=serviceType ;
449
450     if (serviceType_==CServicesManager::GATHERER)
451     {
452       hasClient=true ;
453       hasServer=true ;
454     }
455     else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)
456     {
457       hasClient=false ;
458       hasServer=true ;
459     }
460
461     CXios::getContextsManager()->getContextId(getId(), contextId_, intraComm) ;
462     
463     string contextRegistryId=getId() ;
464     registryIn=new CRegistry(CXios::getRegistryManager()->getRegistryIn());
465     registryIn->setPath(contextRegistryId) ;
466     
467     registryOut=new CRegistry(intraComm_) ;
468     registryOut->setPath(contextRegistryId) ;
469
470   }
471   CATCH_DUMP_ATTR
472
473
474  void CContext::createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) // for servers
475  TRY
476  {
477    MPI_Comm intraCommClient ;
478    MPI_Comm_dup(intraComm_, &intraCommClient);
479    comms.push_back(intraCommClient);
480    // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context
481    server = new CContextServer(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ?
482    client = new CContextClient(this,intraCommClient,interCommClient);
483    client->setAssociatedServer(server) ; 
484    server->setAssociatedClient(client) ; 
485
486  }
487  CATCH_DUMP_ATTR
488
489  void CContext::createServerInterComm(void) 
490  TRY
491  {
492   
493    MPI_Comm interCommClient, interCommServer ;
494
495    if (serviceType_ == CServicesManager::CLIENT)
496    {
497
498      int commRank ;
499      MPI_Comm_rank(intraComm_,&commRank) ;
500      if (commRank==0)
501      {
502        if (attached_mode) CXios::getContextsManager()->createServerContext(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, getContextId()) ;
503        else if (CXios::usingServer2) CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId()) ;
504        else  CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId()) ;
505      }
506
507      MPI_Comm interComm ;
508     
509      if (attached_mode)
510      {
511        parentServerContext_->createIntercomm(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, getContextId(), intraComm_, 
512                                              interCommClient, interCommServer) ;
513        int type ; 
514        if (commRank==0) CXios::getServicesManager()->getServiceType(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, type) ;
515        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
516        setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble
517      }
518      else if (CXios::usingServer2)
519      { 
520//      CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, interComm) ;
521        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_,
522                                              interCommClient, interCommServer) ;
523        int type ; 
524        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultGathererId, 0, type) ;
525        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
526      }
527      else
528      {
529        //CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, interComm) ;
530        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_,
531                                              interCommClient, interCommServer) ;
532        int type ; 
533        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ;
534        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
535      }
536
537        // intraComm client is not duplicated. In all the code we use client->intraComm for MPI
538        // in future better to replace it by intracommuncator associated to the context
539   
540      MPI_Comm intraCommClient, intraCommServer ;
541      intraCommClient=intraComm_ ;
542      MPI_Comm_dup(intraComm_, &intraCommServer) ;
543      client = new CContextClient(this, intraCommClient, interCommClient);
544      server = new CContextServer(this, intraCommServer, interCommServer);
545      client->setAssociatedServer(server) ;
546      server->setAssociatedClient(client) ;
547    }
548   
549    if (serviceType_ == CServicesManager::GATHERER)
550    {
551      int commRank ;
552      MPI_Comm_rank(intraComm_,&commRank) ;
553     
554      int nbPartitions ;
555      if (commRank==0) 
556      { 
557        CXios::getServicesManager()->getServiceNbPartitions(CXios::defaultPoolId, CXios::defaultServerId, 0, nbPartitions) ;
558        for(int i=0 ; i<nbPartitions; i++)
559          CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId()) ;
560      }     
561      MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ;
562     
563      MPI_Comm interComm ;
564      for(int i=0 ; i<nbPartitions; i++)
565      {
566        parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interCommClient, interCommServer) ;
567        int type ; 
568        if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ;
569        MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ;
570        primServerId_.push_back(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultServerId, i, type, getContextId())) ;
571
572        // intraComm client is not duplicated. In all the code we use client->intraComm for MPI
573        // in future better to replace it by intracommuncator associated to the context
574     
575        MPI_Comm intraCommClient, intraCommServer ;
576
577        intraCommClient=intraComm_ ;
578        MPI_Comm_dup(intraComm_, &intraCommServer) ;
579
580        CContextClient* client = new CContextClient(this, intraCommClient, interCommClient) ;
581        CContextServer* server = new CContextServer(this, intraCommServer, interCommServer) ;
582        client->setAssociatedServer(server) ;
583        server->setAssociatedClient(client) ;
584        clientPrimServer.push_back(client);
585        serverPrimServer.push_back(server); 
586
587     
588      }
589    }
590  }
591  CATCH_DUMP_ATTR
592
593  void CContext::globalEventLoop(void)
594  {
595    lockContext() ;
596    CXios::getDaemonsManager()->eventLoop() ;
597    unlockContext() ;
598    setCurrent(getId()) ;
599  }
600
601  bool CContext::scheduledEventLoop(bool enableEventsProcessing) 
602  {
603    bool out, finished; 
604    size_t timeLine=timeLine_ ;
605    if (serviceType_==CServicesManager::CLIENT)
606    {
607      timeLine_++ ;
608      eventScheduler_->registerEvent(timeLine, hashId_) ;
609    }
610
611    do
612    { 
613      finished=eventLoop(enableEventsProcessing) ;
614      if (serviceType_==CServicesManager::CLIENT) 
615      { 
616        out = eventScheduler_->queryEvent(timeLine,hashId_) ;
617        if (out) eventScheduler_->popEvent() ;
618      }
619
620      else out=true ;
621    }  while(!out) ;
622   
623    return finished ;
624  }
625
626  bool CContext::eventLoop(bool enableEventsProcessing)
627  {
628    bool  finished(true); 
629    if (isLockedContext()) return false;
630   
631    setCurrent(getId()) ;
632
633    if (client!=nullptr && !finalized) client->checkBuffers();
634   
635    for (int i = 0; i < clientPrimServer.size(); ++i)
636    {
637      if (!finalized) clientPrimServer[i]->checkBuffers();
638      if (!finalized) finished &= serverPrimServer[i]->eventLoop(enableEventsProcessing);
639    }
640
641    for (auto couplerOut : couplerOutClient_)
642      if (!finalized) couplerOut.second->checkBuffers();
643   
644    for (auto couplerIn : couplerInClient_)
645      if (!finalized) couplerIn.second->checkBuffers();
646   
647    for (auto couplerOut : couplerOutServer_)
648      if (!finalized) couplerOut.second->eventLoop(enableEventsProcessing);
649
650    for (auto couplerIn : couplerInServer_)
651      if (!finalized) couplerIn.second->eventLoop(enableEventsProcessing);
652   
653    if (server!=nullptr) if (!finalized) finished &= server->eventLoop(enableEventsProcessing);
654    setCurrent(getId()) ;
655    return finalized && finished ;
656  }
657
658  void CContext::addCouplingChanel(const std::string& fullContextId, bool out)
659  {
660     int contextLeader ;
661     
662     if (out)
663     { 
664       if (couplerOutClient_.find(fullContextId)==couplerOutClient_.end()) 
665       {
666         bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ;
667     
668         MPI_Comm interComm, interCommClient, interCommServer  ;
669         MPI_Comm intraCommClient, intraCommServer ;
670
671         if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;
672
673        MPI_Comm_dup(intraComm_, &intraCommClient) ;
674        MPI_Comm_dup(intraComm_, &intraCommServer) ;
675        MPI_Comm_dup(interComm, &interCommClient) ;
676        MPI_Comm_dup(interComm, &interCommServer) ;
677        CContextClient* client = new CContextClient(this, intraCommClient, interCommClient);
678        CContextServer* server = new CContextServer(this, intraCommServer, interCommServer);
679        client->setAssociatedServer(server) ;
680        server->setAssociatedClient(client) ;
681        MPI_Comm_free(&interComm) ;
682        couplerOutClient_[fullContextId] = client ;
683        couplerOutServer_[fullContextId] = server ;
684      }
685    }
686    else if (couplerInClient_.find(fullContextId)==couplerInClient_.end())
687    {
688      bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ;
689     
690       MPI_Comm interComm, interCommClient, interCommServer  ;
691       MPI_Comm intraCommClient, intraCommServer ;
692
693       if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;
694
695       MPI_Comm_dup(intraComm_, &intraCommClient) ;
696       MPI_Comm_dup(intraComm_, &intraCommServer) ;
697       MPI_Comm_dup(interComm, &interCommServer) ;
698       MPI_Comm_dup(interComm, &interCommClient) ;
699       CContextServer* server = new CContextServer(this, intraCommServer, interCommServer);
700       CContextClient* client = new CContextClient(this, intraCommClient, interCommClient);
701       client->setAssociatedServer(server) ;
702       server->setAssociatedClient(client) ;
703       MPI_Comm_free(&interComm) ;
704
705       couplerInClient_[fullContextId] = client ;
706       couplerInServer_[fullContextId] = server ;       
707    }
708  }
709 
710   void CContext::finalize(void)
711   TRY
712   {
713      registryOut->hierarchicalGatherRegistry() ;
714      if (server->intraCommRank==0) CXios::getRegistryManager()->merge(*registryOut) ;
715
716      if (serviceType_==CServicesManager::CLIENT)
717      {
718//ym        doPreTimestepOperationsForEnabledReadModeFiles(); // For now we only use server level 1 to read data
719
720        triggerLateFields() ;
721
722        // inform couplerIn that I am finished
723        for(auto& couplerInClient : couplerInClient_) sendCouplerInContextFinalized(couplerInClient.second) ;
724
725        // wait until received message from couplerOut that they have finished
726        bool couplersInFinalized ;
727        do
728        {
729          couplersInFinalized=true ;
730          for(auto& couplerOutClient : couplerOutClient_) couplersInFinalized &= isCouplerInContextFinalized(couplerOutClient.second) ; 
731          globalEventLoop() ;
732        } while (!couplersInFinalized) ;
733
734        info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ;
735        client->finalize();
736        info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ;
737        while (client->havePendingRequests()) client->checkBuffers();
738        info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ;
739        bool notifiedFinalized=false ;
740        do
741        {
742          notifiedFinalized=client->isNotifiedFinalized() ;
743        } while (!notifiedFinalized) ;
744
745        server->releaseBuffers();
746        client->releaseBuffers();
747        info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ;
748      }
749      else if (serviceType_==CServicesManager::GATHERER)
750      {
751         for (int i = 0; i < clientPrimServer.size(); ++i)
752         {
753           clientPrimServer[i]->finalize();
754           bool bufferReleased;
755           do
756           {
757             clientPrimServer[i]->checkBuffers();
758             bufferReleased = !clientPrimServer[i]->havePendingRequests();
759           } while (!bufferReleased);
760           
761           bool notifiedFinalized=false ;
762           do
763           {
764             notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ;
765           } while (!notifiedFinalized) ;
766           clientPrimServer[i]->releaseBuffers();
767         }
768         closeAllFile();
769
770      }
771      else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)
772      {
773        closeAllFile();
774        client->releaseBuffers();
775      }
776
777      freeComms() ;
778       
779      parentServerContext_->freeComm() ;
780      finalized = true;
781      info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl;
782   }
783   CATCH_DUMP_ATTR
784
785   //! Free internally allocated communicators
786   void CContext::freeComms(void)
787   TRY
788   {
789     for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
790       MPI_Comm_free(&(*it));
791     comms.clear();
792   }
793   CATCH_DUMP_ATTR
794
795   //! Deallocate buffers allocated by clientContexts
796   void CContext::releaseClientBuffers(void)
797   TRY
798   {
799     client->releaseBuffers();
800     for (int i = 0; i < clientPrimServer.size(); ++i)
801       clientPrimServer[i]->releaseBuffers();
802   }
803   CATCH_DUMP_ATTR
804
805   
806   /*!
807   \brief Close all the context defintion and do processing data
808      After everything is well defined on client side, they will be processed and sent to server
809   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
810   all necessary information to server, from which each server can build its own database.
811   Because the role of server is to write out field data on a specific netcdf file,
812   the only information that it needs is the enabled files
813   and the active fields (fields will be written onto active files)
814   */
815  void CContext::closeDefinition(void)
816   TRY
817   {
818     CTimer::get("Context : close definition").resume() ;
819     
820     // create intercommunicator with servers.
821     // not sure it is the good place to be called here
822     createServerInterComm() ;
823
824
825     // After xml is parsed, there are some more works with post processing
826//     postProcessing();
827
828   
829    // Make sure the calendar was correctly created
830    if (serviceType_!=CServicesManager::CLIENT) CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
831    if (!calendar)
832      ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
833    else if (calendar->getTimeStep() == NoneDu)
834      ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
835    // Calendar first update to set the current date equals to the start date
836    calendar->update(0);
837
838    // Résolution des héritages descendants (càd des héritages de groupes)
839    // pour chacun des contextes.
840    solveDescInheritance(true);
841 
842    // Solve inheritance for field to know if enabled or not.
843    for (auto field : CField::getAll()) field->solveRefInheritance();
844
845    // Check if some axis, domains or grids are eligible to for compressed indexed output.
846    // Warning: This must be done after solving the inheritance and before the rest of post-processing
847    // --> later ????    checkAxisDomainsGridsEligibilityForCompressedOutput();     
848
849      // Check if some automatic time series should be generated
850      // Warning: This must be done after solving the inheritance and before the rest of post-processing     
851
852    // The timeseries should only be prepared in client
853    prepareTimeseries();
854
855    //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir.
856    findEnabledFiles();
857    findEnabledWriteModeFiles();
858    findEnabledReadModeFiles();
859    findEnabledCouplerIn();
860    findEnabledCouplerOut();
861    createCouplerInterCommunicator() ;
862
863    // Find all enabled fields of each file     
864    vector<CField*>&& fileOutField = findAllEnabledFieldsInFileOut(this->enabledWriteModeFiles);
865    vector<CField*>&& fileInField = findAllEnabledFieldsInFileIn(this->enabledReadModeFiles);
866    vector<CField*>&& couplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut);
867    vector<CField*>&& couplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn);
868    findFieldsWithReadAccess();
869    vector<CField*>& fieldWithReadAccess = fieldsWithReadAccess_ ;
870    vector<CField*> fieldModelIn ; // fields potentially from model
871     
872    // define if files are on clientSied or serverSide
873    if (serviceType_==CServicesManager::CLIENT)
874    {
875      for (auto& file : enabledWriteModeFiles) file->setClientSide() ;
876      for (auto& file : enabledReadModeFiles) file->setClientSide() ;
877    }
878    else
879    {
880      for (auto& file : enabledWriteModeFiles) file->setServerSide() ;
881      for (auto& file : enabledReadModeFiles) file->setServerSide() ;
882    }
883
884   
885    for (auto& field : couplerInField)
886    {
887      field->unsetGridCompleted() ;
888    }
889// find all field potentially at workflow end
890    vector<CField*> endWorkflowFields ;
891    endWorkflowFields.reserve(fileOutField.size()+couplerOutField.size()+fieldWithReadAccess.size()) ;
892    endWorkflowFields.insert(endWorkflowFields.end(),fileOutField.begin(), fileOutField.end()) ;
893    endWorkflowFields.insert(endWorkflowFields.end(),couplerOutField.begin(), couplerOutField.end()) ;
894    endWorkflowFields.insert(endWorkflowFields.end(),fieldWithReadAccess.begin(), fieldWithReadAccess.end()) ;
895
896    bool workflowGraphIsCompleted ;
897   
898    bool first=true ;
899   
900    do
901    {
902      workflowGraphIsCompleted=true; 
903      for(auto endWorkflowField : endWorkflowFields) 
904      {
905        workflowGraphIsCompleted &= endWorkflowField->buildWorkflowGraph(garbageCollector) ;
906      }
907   
908      for(auto couplerIn : enabledCouplerIn) couplerIn->assignContext() ;
909      for(auto field : couplerInField) field->makeGridAliasForCoupling();
910      for(auto field : couplerInField) this->sendCouplerInReady(field->getContextClient()) ;
911   
912
913      // assign context to coupler out and related fields
914      for(auto couplerOut : enabledCouplerOut) couplerOut->assignContext() ;
915      // for now supose that all coupling out endpoint are succesfull. The difficultie is client/server buffer evaluation
916      for(auto field : couplerOutField) 
917      {
918        // connect to couplerOut -> to do
919      }
920
921      bool couplersReady ;
922      do 
923      {
924        couplersReady=true ;
925        for(auto field : couplerOutField)
926        {
927          bool ready = isCouplerInReady(field->getContextClient()) ; 
928          if (ready) field->sendFieldToCouplerOut() ;
929          couplersReady &= ready ;
930        }
931        this->scheduledEventLoop() ;
932
933      } while (!couplersReady) ;
934     
935      first=false ;
936      this->scheduledEventLoop() ;
937
938    } while (!workflowGraphIsCompleted) ;
939
940
941    for( auto field : couplerInField) couplerInFields_.push_back(field) ;
942
943    // get all field coming potentially from model
944    for (auto field : CField::getAll() ) if (field->getModelIn()) fieldModelIn.push_back(field) ;
945
946    // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields
947    if (serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles);
948    else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ;
949
950    // client side, assign context for file reading
951    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ;
952   
953    // server side, assign context where to send file data read
954    if (serviceType_==CServicesManager::CServicesManager::GATHERER || serviceType_==CServicesManager::IO_SERVER) 
955      for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ;
956   
957    // workflow endpoint => sent to IO/SERVER
958    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER)
959    {
960      for(auto field : fileOutField) 
961      {
962        field->connectToFileServer(garbageCollector) ; // connect the field to server filter
963      }
964      for(auto field : fileOutField) field->sendFieldToFileServer() ;
965    }
966
967    // workflow endpoint => write to file
968    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)
969    {
970      for(auto field : fileOutField) 
971      {
972        field->connectToFileWriter(garbageCollector) ; // connect the field to server filter
973      }
974    }
975   
976    // workflow endpoint => Send data from server to client
977    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER)
978    {
979      for(auto field : fileInField) 
980      {
981        field->connectToServerToClient(garbageCollector) ;
982      }
983    }
984
985    // workflow endpoint => sent to model on client side
986    if (serviceType_==CServicesManager::CLIENT)
987    {
988      for(auto field : fieldWithReadAccess) field->connectToModelOutput(garbageCollector) ;
989    }
990
991
992    // workflow startpoint => data from model
993    if (serviceType_==CServicesManager::CLIENT)
994    {
995      for(auto field : fieldModelIn) 
996      {
997        field->connectToModelInput(garbageCollector) ; // connect the field to server filter
998        // grid index will be computed on the fly
999      }
1000    }
1001   
1002    // workflow startpoint => data from client on server side
1003    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER || serviceType_==CServicesManager::OUT_SERVER)
1004    {
1005      for(auto field : fieldModelIn) 
1006      {
1007        field->connectToClientInput(garbageCollector) ; // connect the field to server filter
1008      }
1009    }
1010
1011   
1012    for(auto field : couplerInField) 
1013    {
1014      field->connectToCouplerIn(garbageCollector) ; // connect the field to server filter
1015    }
1016   
1017   
1018    for(auto field : couplerOutField) 
1019    {
1020      field->connectToCouplerOut(garbageCollector) ; // for now the same kind of filter that for file server
1021    }
1022
1023     // workflow startpoint => data from server on client side
1024    if (serviceType_==CServicesManager::CLIENT)
1025    {
1026      for(auto field : fileInField) 
1027      {
1028        field->sendFieldToInputFileServer() ;
1029        field->connectToServerInput(garbageCollector) ; // connect the field to server filter
1030        fileInFields_.push_back(field) ;
1031      }
1032    }
1033
1034    // workflow startpoint => data read from file on server side
1035    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER)
1036    {
1037      for(auto field : fileInField) 
1038      {
1039        field->connectToFileReader(garbageCollector) ;
1040      }
1041    }
1042   
1043    // construct slave server list
1044    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) 
1045    {
1046      for(auto field : fileOutField) slaveServers_.insert(field->getContextClient()) ; 
1047      for(auto field : fileInField)  slaveServers_.insert(field->getContextClient()) ; 
1048    }
1049
1050    for(auto& slaveServer : slaveServers_) sendCloseDefinition(slaveServer) ;
1051
1052    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 
1053    {
1054      createFileHeader();
1055    }
1056
1057    if (serviceType_==CServicesManager::CLIENT) startPrefetchingOfEnabledReadModeFiles();
1058   
1059    // send signal to couplerIn context that definition phasis is done
1060
1061    for(auto& couplerInClient : couplerInClient_) sendCouplerInCloseDefinition(couplerInClient.second) ;
1062
1063    // wait until all couplerIn signal that closeDefition is done.
1064    bool ok;
1065    do
1066    {
1067      ok = true ;
1068      for(auto& couplerOutClient : couplerOutClient_) ok &= isCouplerInCloseDefinition(couplerOutClient.second) ;
1069      this->scheduledEventLoop() ;
1070    } while (!ok) ;
1071
1072    // Now evaluate the size of the context client buffers
1073    map<CContextClient*,map<int,size_t>> fieldBufferEvaluation ;
1074    for(auto field : fileOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to server
1075    for(auto field : couplerOutField) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // output to coupler
1076    for(auto field : fieldModelIn) field->evaluateBufferSize(fieldBufferEvaluation, CXios::isOptPerformance) ; // server to client (for io servers)
1077   
1078    // fix size for each context client
1079    for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ;
1080
1081
1082     CTimer::get("Context : close definition").suspend() ;
1083  }
1084  CATCH_DUMP_ATTR
1085
1086
1087  vector<CField*> CContext::findAllEnabledFieldsInFileOut(const std::vector<CFile*>& activeFiles)
1088   TRY
1089   {
1090     vector<CField*> fields ;
1091     for(auto file : activeFiles)
1092     {
1093        const vector<CField*>&& fieldList=file->getEnabledFields() ;
1094        for(auto field : fieldList) field->setFileOut(file) ;
1095        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1096     }
1097     return fields ;
1098   }
1099   CATCH_DUMP_ATTR
1100
1101   vector<CField*> CContext::findAllEnabledFieldsInFileIn(const std::vector<CFile*>& activeFiles)
1102   TRY
1103   {
1104     vector<CField*> fields ;
1105     for(auto file : activeFiles)
1106     {
1107        const vector<CField*>&& fieldList=file->getEnabledFields() ;
1108        for(auto field : fieldList) field->setFileIn(file) ;
1109        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1110     }
1111     return fields ;
1112   }
1113   CATCH_DUMP_ATTR
1114
1115   vector<CField*> CContext::findAllEnabledFieldsCouplerOut(const std::vector<CCouplerOut*>& activeCouplerOut)
1116   TRY
1117   {
1118     vector<CField*> fields ;
1119     for (auto couplerOut :activeCouplerOut)
1120     {
1121        const vector<CField*>&& fieldList=couplerOut->getEnabledFields() ;
1122        for(auto field : fieldList) field->setCouplerOut(couplerOut) ;
1123        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1124     }
1125     return fields ;
1126   }
1127   CATCH_DUMP_ATTR
1128
1129   vector<CField*> CContext::findAllEnabledFieldsCouplerIn(const std::vector<CCouplerIn*>& activeCouplerIn)
1130   TRY
1131   {
1132     vector<CField*> fields ;
1133     for (auto couplerIn :activeCouplerIn)
1134     {
1135        const vector<CField*>&& fieldList=couplerIn->getEnabledFields() ;
1136        for(auto field : fieldList) field->setCouplerIn(couplerIn) ;
1137        fields.insert(fields.end(),fieldList.begin(),fieldList.end());
1138     }
1139     return fields ;
1140   }
1141   CATCH_DUMP_ATTR
1142
1143 /*!
1144  * Send context attribute and calendar to file server, it must be done once by context file server
1145  * \param[in] client : context client to send   
1146  */ 
1147  void CContext::sendContextToFileServer(CContextClient* client)
1148  {
1149    if (sendToFileServer_done_.count(client)!=0) return ;
1150    else sendToFileServer_done_.insert(client) ;
1151   
1152    this->sendAllAttributesToServer(client); // Send all attributes of current context to server
1153    CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(client); // Send all attributes of current cale
1154  }
1155
1156 
1157   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
1158   TRY
1159   {
1160      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
1161        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
1162   }
1163   CATCH_DUMP_ATTR
1164
1165
1166   void CContext::postProcessFilterGraph()
1167   TRY
1168   {
1169     int size = enabledFiles.size();
1170     for (int i = 0; i < size; ++i)
1171     {
1172        enabledFiles[i]->postProcessFilterGraph();
1173     }
1174   }
1175   CATCH_DUMP_ATTR
1176
1177   void CContext::startPrefetchingOfEnabledReadModeFiles()
1178   TRY
1179   {
1180     int size = enabledReadModeFiles.size();
1181     for (int i = 0; i < size; ++i)
1182     {
1183        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
1184     }
1185   }
1186   CATCH_DUMP_ATTR
1187
1188   void CContext::doPreTimestepOperationsForEnabledReadModeFiles()
1189   TRY
1190   {
1191     int size = enabledReadModeFiles.size();
1192     for (int i = 0; i < size; ++i)
1193     {
1194        enabledReadModeFiles[i]->doPreTimestepOperationsForEnabledReadModeFields();
1195     }
1196   }
1197   CATCH_DUMP_ATTR
1198
1199   void CContext::doPostTimestepOperationsForEnabledReadModeFiles()
1200   TRY
1201   {
1202     int size = enabledReadModeFiles.size();
1203     for (int i = 0; i < size; ++i)
1204     {
1205        enabledReadModeFiles[i]->doPostTimestepOperationsForEnabledReadModeFields();
1206     }
1207   }
1208   CATCH_DUMP_ATTR
1209
1210  void CContext::findFieldsWithReadAccess(void)
1211  TRY
1212  {
1213    fieldsWithReadAccess_.clear();
1214    const vector<CField*> allFields = CField::getAll();
1215    for (size_t i = 0; i < allFields.size(); ++i)
1216    {
1217      CField* field = allFields[i];
1218      if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
1219      {
1220        fieldsWithReadAccess_.push_back(field);
1221        field->setModelOut() ;
1222      }
1223    }
1224  }
1225  CATCH_DUMP_ATTR
1226
1227 
1228   void CContext::solveAllInheritance(bool apply)
1229   TRY
1230   {
1231     // Résolution des héritages descendants (càd des héritages de groupes)
1232     // pour chacun des contextes.
1233      solveDescInheritance(apply);
1234
1235     // Résolution des héritages par référence au niveau des fichiers.
1236      const vector<CFile*> allFiles=CFile::getAll();
1237      const vector<CCouplerIn*> allCouplerIn=CCouplerIn::getAll();
1238      const vector<CCouplerOut*> allCouplerOut=CCouplerOut::getAll();
1239      const vector<CGrid*> allGrids= CGrid::getAll();
1240
1241      if (serviceType_==CServicesManager::CLIENT)
1242      {
1243        for (unsigned int i = 0; i < allFiles.size(); i++)
1244          allFiles[i]->solveFieldRefInheritance(apply);
1245
1246        for (unsigned int i = 0; i < allCouplerIn.size(); i++)
1247          allCouplerIn[i]->solveFieldRefInheritance(apply);
1248
1249        for (unsigned int i = 0; i < allCouplerOut.size(); i++)
1250          allCouplerOut[i]->solveFieldRefInheritance(apply);
1251      }
1252
1253      unsigned int vecSize = allGrids.size();
1254      unsigned int i = 0;
1255      for (i = 0; i < vecSize; ++i)
1256        allGrids[i]->solveElementsRefInheritance(apply);
1257
1258   }
1259  CATCH_DUMP_ATTR
1260
1261   void CContext::findEnabledFiles(void)
1262   TRY
1263   {
1264      const std::vector<CFile*> allFiles = CFile::getAll();
1265      const CDate& initDate = calendar->getInitDate();
1266
1267      for (unsigned int i = 0; i < allFiles.size(); i++)
1268         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini.
1269         {
1270            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai.
1271            {
1272              if (allFiles[i]->output_freq.isEmpty())
1273              {
1274                 ERROR("CContext::findEnabledFiles()",
1275                     << "Mandatory attribute output_freq must be defined for file \""<<allFiles[i]->getFileOutputName()
1276                     <<" \".")
1277              }
1278              if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
1279              {
1280                error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
1281                    << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
1282                    <<"\" is less than the time step. File will not be written."<<endl;
1283              }
1284              else
1285               enabledFiles.push_back(allFiles[i]);
1286            }
1287         }
1288         else
1289         {
1290           if (allFiles[i]->output_freq.isEmpty())
1291           {
1292              ERROR("CContext::findEnabledFiles()",
1293                  << "Mandatory attribute output_freq must be defined for file \""<<allFiles[i]->getFileOutputName()
1294                  <<" \".")
1295           }
1296           if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep()))
1297           {
1298             error(0)<<"WARNING: void CContext::findEnabledFiles()"<<endl
1299                 << "Output frequency in file \""<<allFiles[i]->getFileOutputName()
1300                 <<"\" is less than the time step. File will not be written."<<endl;
1301           }
1302           else
1303             enabledFiles.push_back(allFiles[i]); // otherwise true by default
1304         }
1305
1306      if (enabledFiles.size() == 0)
1307         DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \""
1308               << getId() << "\" !");
1309
1310   }
1311   CATCH_DUMP_ATTR
1312
1313   void CContext::findEnabledCouplerIn(void)
1314   TRY
1315   {
1316      const std::vector<CCouplerIn*> allCouplerIn = CCouplerIn::getAll();
1317      bool enabled ;
1318      for (size_t i = 0; i < allCouplerIn.size(); i++)
1319      {
1320        if (allCouplerIn[i]->enabled.isEmpty()) enabled=true ;
1321        else enabled=allCouplerIn[i]->enabled ;
1322        if (enabled) enabledCouplerIn.push_back(allCouplerIn[i]) ;
1323      }
1324   }
1325   CATCH_DUMP_ATTR
1326
1327   void CContext::findEnabledCouplerOut(void)
1328   TRY
1329   {
1330      const std::vector<CCouplerOut*> allCouplerOut = CCouplerOut::getAll();
1331      bool enabled ;
1332      for (size_t i = 0; i < allCouplerOut.size(); i++)
1333      {
1334        if (allCouplerOut[i]->enabled.isEmpty()) enabled=true ;
1335        else enabled=allCouplerOut[i]->enabled ;
1336        if (enabled) enabledCouplerOut.push_back(allCouplerOut[i]) ;
1337      }
1338   }
1339   CATCH_DUMP_ATTR
1340
1341
1342
1343
1344   void CContext::distributeFiles(const vector<CFile*>& files)
1345   TRY
1346   {
1347     bool distFileMemory=false ;
1348     distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory);
1349
1350     if (distFileMemory) distributeFileOverMemoryBandwith(files) ;
1351     else distributeFileOverBandwith(files) ;
1352   }
1353   CATCH_DUMP_ATTR
1354
1355   void CContext::distributeFileOverBandwith(const vector<CFile*>& files)
1356   TRY
1357   {
1358     double eps=std::numeric_limits<double>::epsilon()*10 ;
1359     
1360     std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out);
1361     int nbPools = clientPrimServer.size();
1362
1363     // (1) Find all enabled files in write mode
1364     // for (int i = 0; i < this->enabledFiles.size(); ++i)
1365     // {
1366     //   if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write ))
1367     //    enabledWriteModeFiles.push_back(enabledFiles[i]);
1368     // }
1369
1370     // (2) Estimate the data volume for each file
1371     int size = files.size();
1372     std::vector<std::pair<double, CFile*> > dataSizeMap;
1373     double dataPerPool = 0;
1374     int nfield=0 ;
1375     ofs<<size<<endl ;
1376     for (size_t i = 0; i < size; ++i)
1377     {
1378       CFile* file = files[i];
1379       ofs<<file->getId()<<endl ;
1380       StdSize dataSize=0;
1381       std::vector<CField*> enabledFields = file->getEnabledFields();
1382       size_t numEnabledFields = enabledFields.size();
1383       ofs<<numEnabledFields<<endl ;
1384       for (size_t j = 0; j < numEnabledFields; ++j)
1385       {
1386         dataSize += enabledFields[j]->getGlobalWrittenSize() ;
1387         ofs<<enabledFields[j]->getGrid()->getId()<<endl ;
1388         ofs<<enabledFields[j]->getGlobalWrittenSize()<<endl ;
1389       }
1390       double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ;
1391       double dataSizeSec= dataSize/ outFreqSec;
1392       ofs<<dataSizeSec<<endl ;
1393       nfield++ ;
1394// add epsilon*nField to dataSizeSec in order to  preserve reproductive ordering when sorting
1395       dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file));
1396       dataPerPool += dataSizeSec;
1397     }
1398     dataPerPool /= nbPools;
1399     std::sort(dataSizeMap.begin(), dataSizeMap.end());
1400
1401     // (3) Assign contextClient to each enabled file
1402
1403     std::multimap<double,int> poolDataSize ;
1404// multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11
1405
1406     int j;
1407     double dataSize ;
1408     for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ; 
1409             
1410     for (int i = dataSizeMap.size()-1; i >= 0; --i)
1411     {
1412       dataSize=(*poolDataSize.begin()).first ;
1413       j=(*poolDataSize.begin()).second ;
1414       dataSizeMap[i].second->setContextClient(clientPrimServer[j]);
1415       dataSize+=dataSizeMap[i].first;
1416       poolDataSize.erase(poolDataSize.begin()) ;
1417       poolDataSize.insert(std::pair<double,int>(dataSize,j)) ; 
1418     }
1419
1420     for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) info(30)<<"Load Balancing for servers (perfect=1) : "<<it->second<<" :  ratio "<<it->first*1./dataPerPool<<endl ;
1421   }
1422   CATCH_DUMP_ATTR
1423
1424   void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList)
1425   TRY
1426   {
1427     int nbPools = clientPrimServer.size();
1428     double ratio=0.5 ;
1429     ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio);
1430
1431     int nFiles = filesList.size();
1432     vector<SDistFile> files(nFiles);
1433     vector<SDistGrid> grids;
1434     map<string,int> gridMap ;
1435     string gridId; 
1436     int gridIndex=0 ;
1437
1438     for (size_t i = 0; i < nFiles; ++i)
1439     {
1440       StdSize dataSize=0;
1441       CFile* file = filesList[i];
1442       std::vector<CField*> enabledFields = file->getEnabledFields();
1443       size_t numEnabledFields = enabledFields.size();
1444
1445       files[i].id_=file->getId() ;
1446       files[i].nbGrids_=numEnabledFields;
1447       files[i].assignedGrid_ = new int[files[i].nbGrids_] ;
1448         
1449       for (size_t j = 0; j < numEnabledFields; ++j)
1450       {
1451         gridId=enabledFields[j]->getGrid()->getId() ;
1452         if (gridMap.find(gridId)==gridMap.end())
1453         {
1454            gridMap[gridId]=gridIndex  ;
1455            SDistGrid newGrid; 
1456            grids.push_back(newGrid) ;
1457            gridIndex++ ;
1458         }
1459         files[i].assignedGrid_[j]=gridMap[gridId] ;
1460         grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ;
1461         dataSize += enabledFields[j]->getGlobalWrittenSize() ; // usefull
1462       }
1463       double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ;
1464       files[i].bandwith_= dataSize/ outFreqSec ;
1465     }
1466
1467     double bandwith=0 ;
1468     double memory=0 ;
1469   
1470     for(int i=0; i<nFiles; i++)  bandwith+=files[i].bandwith_ ;
1471     for(int i=0; i<nFiles; i++)  files[i].bandwith_ = files[i].bandwith_/bandwith * ratio ;
1472
1473     for(int i=0; i<grids.size(); i++)  memory+=grids[i].size_ ;
1474     for(int i=0; i<grids.size(); i++)  grids[i].size_ = grids[i].size_ / memory * (1.0-ratio) ;
1475       
1476     distributeFileOverServer2(nbPools, grids.size(), &grids[0], nFiles, &files[0]) ;
1477
1478     vector<double> memorySize(nbPools,0.) ;
1479     vector< set<int> > serverGrids(nbPools) ;
1480     vector<double> bandwithSize(nbPools,0.) ;
1481       
1482     for (size_t i = 0; i < nFiles; ++i)
1483     {
1484       bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ;
1485       for(int j=0 ; j<files[i].nbGrids_;j++)
1486       {
1487         if (serverGrids[files[i].assignedServer_].find(files[i].assignedGrid_[j]) == serverGrids[files[i].assignedServer_].end())
1488         {
1489           memorySize[files[i].assignedServer_]+= grids[files[i].assignedGrid_[j]].size_ * memory / (1.0-ratio);
1490           serverGrids[files[i].assignedServer_].insert(files[i].assignedGrid_[j]) ;
1491         }
1492       }
1493       filesList[i]->setContextClient(clientPrimServer[files[i].assignedServer_]) ;
1494       delete [] files[i].assignedGrid_ ;
1495     }
1496
1497     for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<"   assigned file bandwith "<<bandwithSize[i]*86400.*4./1024/1024.<<" Mb / days"<<endl ;
1498     for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<"   assigned grid memory "<<memorySize[i]*100/1024./1024.<<" Mb"<<endl ;
1499
1500   }
1501   CATCH_DUMP_ATTR
1502
1503   /*!
1504      Find all files in write mode
1505   */
1506   void CContext::findEnabledWriteModeFiles(void)
1507   TRY
1508   {
1509     int size = this->enabledFiles.size();
1510     for (int i = 0; i < size; ++i)
1511     {
1512       if (enabledFiles[i]->mode.isEmpty() || 
1513          (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write ))
1514        enabledWriteModeFiles.push_back(enabledFiles[i]);
1515     }
1516   }
1517   CATCH_DUMP_ATTR
1518
1519   /*!
1520      Find all files in read mode
1521   */
1522   void CContext::findEnabledReadModeFiles(void)
1523   TRY
1524   {
1525     int size = this->enabledFiles.size();
1526     for (int i = 0; i < size; ++i)
1527     {
1528       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
1529        enabledReadModeFiles.push_back(enabledFiles[i]);
1530     }
1531   }
1532   CATCH_DUMP_ATTR
1533
1534   void CContext::closeAllFile(void)
1535   TRY
1536   {
1537     std::vector<CFile*>::const_iterator
1538            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
1539
1540     for (; it != end; it++)
1541     {
1542       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
1543       (*it)->close();
1544     }
1545   }
1546   CATCH_DUMP_ATTR
1547
1548   /*!
1549   \brief Dispatch event received from client
1550      Whenever a message is received in buffer of server, it will be processed depending on
1551   its event type. A new event type should be added in the switch list to make sure
1552   it processed on server side.
1553   \param [in] event: Received message
1554   */
1555   bool CContext::dispatchEvent(CEventServer& event)
1556   TRY
1557   {
1558
1559      if (SuperClass::dispatchEvent(event)) return true;
1560      else
1561      {
1562        switch(event.type)
1563        {
1564           case EVENT_ID_CLOSE_DEFINITION :
1565             recvCloseDefinition(event);
1566             return true;
1567             break;
1568           case EVENT_ID_UPDATE_CALENDAR:
1569             recvUpdateCalendar(event);
1570             return true;
1571             break;
1572           case EVENT_ID_CREATE_FILE_HEADER :
1573             recvCreateFileHeader(event);
1574             return true;
1575             break;
1576           case EVENT_ID_COUPLER_IN_READY:
1577             recvCouplerInReady(event);
1578             return true;
1579             break;
1580           case EVENT_ID_COUPLER_IN_CLOSE_DEFINITION:
1581             recvCouplerInCloseDefinition(event);
1582             return true;
1583             break;
1584           case EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED:
1585             recvCouplerInContextFinalized(event);
1586             return true;
1587             break; 
1588           default :
1589             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
1590                    <<"Unknown Event");
1591           return false;
1592         }
1593      }
1594   }
1595   CATCH
1596
1597   //! Client side: Send a message to server to make it close
1598   // ym obsolete
1599   void CContext::sendCloseDefinition(void)
1600   TRY
1601   {
1602    int nbSrvPools ;
1603    if (serviceType_==CServicesManager::CLIENT) nbSrvPools = 1 ;
1604    else if (serviceType_==CServicesManager::GATHERER) nbSrvPools = this->clientPrimServer.size() ;
1605    else nbSrvPools = 0 ;
1606    CContextClient* contextClientTmp ;
1607
1608    for (int i = 0; i < nbSrvPools; ++i)
1609     {
1610       if (serviceType_==CServicesManager::CLIENT) contextClientTmp = client ;
1611       else if (serviceType_==CServicesManager::GATHERER ) contextClientTmp = clientPrimServer[i] ;
1612       CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
1613       if (contextClientTmp->isServerLeader())
1614       {
1615         CMessage msg;
1616         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1617         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1618           event.push(*itRank,1,msg);
1619         contextClientTmp->sendEvent(event);
1620       }
1621       else contextClientTmp->sendEvent(event);
1622     }
1623   }
1624   CATCH_DUMP_ATTR
1625   
1626   //  ! Client side: Send a message to server to make it close
1627   void CContext::sendCloseDefinition(CContextClient* client)
1628   TRY
1629   {
1630      if (sendCloseDefinition_done_.count(client)!=0) return ;
1631      else sendCloseDefinition_done_.insert(client) ;
1632
1633      CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
1634      if (client->isServerLeader())
1635      {
1636        CMessage msg;
1637        for(auto rank : client->getRanksServerLeader()) event.push(rank,1,msg);
1638        client->sendEvent(event);
1639      }
1640     else client->sendEvent(event);
1641   }
1642   CATCH_DUMP_ATTR
1643
1644   //! Server side: Receive a message of client announcing a context close
1645   void CContext::recvCloseDefinition(CEventServer& event)
1646   TRY
1647   {
1648      CBufferIn* buffer=event.subEvents.begin()->buffer;
1649      getCurrent()->closeDefinition();
1650   }
1651   CATCH
1652
1653   //! Client side: Send a message to update calendar in each time step
1654   void CContext::sendUpdateCalendar(int step)
1655   TRY
1656   {
1657     CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
1658     for(auto client : slaveServers_) 
1659     {
1660       if (client->isServerLeader())
1661       {
1662         CMessage msg;
1663         msg<<step;
1664         for (auto& rank : client->getRanksServerLeader() ) event.push(rank,1,msg);
1665         client->sendEvent(event);
1666       }
1667       else client->sendEvent(event);
1668     }
1669   }
1670   CATCH_DUMP_ATTR
1671
1672   //! Server side: Receive a message of client annoucing calendar update
1673   void CContext::recvUpdateCalendar(CEventServer& event)
1674   TRY
1675   {
1676      CBufferIn* buffer=event.subEvents.begin()->buffer;
1677      getCurrent()->recvUpdateCalendar(*buffer);
1678   }
1679   CATCH
1680
1681   //! Server side: Receive a message of client annoucing calendar update
1682   void CContext::recvUpdateCalendar(CBufferIn& buffer)
1683   TRY
1684   {
1685      int step;
1686      buffer>>step;
1687      updateCalendar(step);
1688      if (serviceType_==CServicesManager::GATHERER)
1689      {       
1690        sendUpdateCalendar(step);
1691      }
1692   }
1693   CATCH_DUMP_ATTR
1694
1695   //! Client side: Send a message to create header part of netcdf file
1696   void CContext::sendCreateFileHeader(void)
1697   TRY
1698   {
1699     int nbSrvPools ;
1700     if (serviceType_==CServicesManager::CLIENT) nbSrvPools = 1 ;
1701     else if (serviceType_==CServicesManager::GATHERER) nbSrvPools = this->clientPrimServer.size() ;
1702     else nbSrvPools = 0 ;
1703     CContextClient* contextClientTmp ;
1704
1705     for (int i = 0; i < nbSrvPools; ++i)
1706     {
1707       if (serviceType_==CServicesManager::CLIENT) contextClientTmp = client ;
1708       else if (serviceType_==CServicesManager::GATHERER ) contextClientTmp = clientPrimServer[i] ;
1709       CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
1710
1711       if (contextClientTmp->isServerLeader())
1712       {
1713         CMessage msg;
1714         const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
1715         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1716           event.push(*itRank,1,msg) ;
1717         contextClientTmp->sendEvent(event);
1718       }
1719       else contextClientTmp->sendEvent(event);
1720     }
1721   }
1722   CATCH_DUMP_ATTR
1723
1724   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
1725   void CContext::recvCreateFileHeader(CEventServer& event)
1726   TRY
1727   {
1728      CBufferIn* buffer=event.subEvents.begin()->buffer;
1729      getCurrent()->recvCreateFileHeader(*buffer);
1730   }
1731   CATCH
1732
1733   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
1734   void CContext::recvCreateFileHeader(CBufferIn& buffer)
1735   TRY
1736   {
1737      if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 
1738        createFileHeader();
1739   }
1740   CATCH_DUMP_ATTR
1741
1742   void CContext::createCouplerInterCommunicator(void)
1743   TRY
1744   {
1745      int rank=this->getIntraCommRank() ;
1746      map<string,list<CCouplerOut*>> listCouplerOut ; 
1747      map<string,list<CCouplerIn*>> listCouplerIn ; 
1748
1749      for(auto couplerOut : enabledCouplerOut) listCouplerOut[couplerOut->getCouplingContextId()].push_back(couplerOut) ;
1750      for(auto couplerIn : enabledCouplerIn) listCouplerIn[couplerIn->getCouplingContextId()].push_back(couplerIn) ;
1751
1752      CCouplerManager* couplerManager = CXios::getCouplerManager() ;
1753      if (rank==0)
1754      {
1755        for(auto couplerOut : listCouplerOut) couplerManager->registerCoupling(this->getContextId(),couplerOut.first) ;
1756        for(auto couplerIn : listCouplerIn) couplerManager->registerCoupling(couplerIn.first,this->getContextId()) ;
1757      }
1758
1759      do
1760      {
1761        for(auto couplerOut : listCouplerOut) 
1762        {
1763          bool isNextCoupling ;
1764          if (rank==0) isNextCoupling = couplerManager->isNextCoupling(this->getContextId(),couplerOut.first) ;
1765          MPI_Bcast(&isNextCoupling,1,MPI_C_BOOL, 0, getIntraComm()) ; 
1766          if (isNextCoupling) 
1767          {
1768            addCouplingChanel(couplerOut.first, true) ;
1769            listCouplerOut.erase(couplerOut.first) ;
1770            break ;
1771          }           
1772        }
1773        for(auto couplerIn : listCouplerIn) 
1774        {
1775          bool isNextCoupling ;
1776          if (rank==0) isNextCoupling = couplerManager->isNextCoupling(couplerIn.first,this->getContextId());
1777          MPI_Bcast(&isNextCoupling,1,MPI_C_BOOL, 0, getIntraComm()) ; 
1778          if (isNextCoupling) 
1779          {
1780            addCouplingChanel(couplerIn.first, false) ;
1781            listCouplerIn.erase(couplerIn.first) ;
1782            break ;
1783          }           
1784        }
1785
1786      } while (!listCouplerOut.empty() || !listCouplerIn.empty()) ;
1787
1788   }
1789   CATCH_DUMP_ATTR
1790
1791 
1792     //! Client side: Send infomation of active files (files are enabled to write out)
1793   void CContext::sendEnabledFiles(const std::vector<CFile*>& activeFiles)
1794   TRY
1795   {
1796     int size = activeFiles.size();
1797
1798     // In a context, each type has a root definition, e.g: axis, domain, field.
1799     // Every object must be a child of one of these root definition. In this case
1800     // all new file objects created on server must be children of the root "file_definition"
1801     StdString fileDefRoot("file_definition");
1802     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
1803
1804     for (int i = 0; i < size; ++i)
1805     {
1806       CFile* f = activeFiles[i];
1807       cfgrpPtr->sendCreateChild(f->getId(),f->getContextClient());
1808       f->sendAllAttributesToServer(f->getContextClient());
1809       f->sendAddAllVariables(f->getContextClient());
1810     }
1811   }
1812   CATCH_DUMP_ATTR
1813
1814   //! Client side: Send information of active fields (ones are written onto files)
1815   void CContext::sendEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)
1816   TRY
1817   {
1818     int size = activeFiles.size();
1819     for (int i = 0; i < size; ++i)
1820     {
1821       activeFiles[i]->sendEnabledFields(activeFiles[i]->getContextClient());
1822     }
1823   }
1824   CATCH_DUMP_ATTR
1825
1826 
1827   //! Client side: Prepare the timeseries by adding the necessary files
1828   void CContext::prepareTimeseries()
1829   TRY
1830   {
1831     const std::vector<CFile*> allFiles = CFile::getAll();
1832     for (size_t i = 0; i < allFiles.size(); i++)
1833     {
1834       CFile* file = allFiles[i];
1835
1836       std::vector<CVariable*> fileVars, fieldVars, vars = file->getAllVariables();
1837       for (size_t k = 0; k < vars.size(); k++)
1838       {
1839         CVariable* var = vars[k];
1840
1841         if (var->ts_target.isEmpty()
1842              || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1843           fileVars.push_back(var);
1844
1845         if (!var->ts_target.isEmpty()
1846              && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1847           fieldVars.push_back(var);
1848       }
1849
1850       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1851       {
1852         StdString fileNameStr("%file_name%") ;
1853         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1854         
1855         StdString fileName=file->getFileOutputName();
1856         size_t pos=tsPrefix.find(fileNameStr) ;
1857         while (pos!=std::string::npos)
1858         {
1859           tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1860           pos=tsPrefix.find(fileNameStr) ;
1861         }
1862       
1863         const std::vector<CField*> allFields = file->getAllFields();
1864         for (size_t j = 0; j < allFields.size(); j++)
1865         {
1866           CField* field = allFields[j];
1867
1868           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1869           {
1870             CFile* tsFile = CFile::create();
1871             tsFile->duplicateAttributes(file);
1872
1873             // Add variables originating from file and targeted to timeserie file
1874             for (size_t k = 0; k < fileVars.size(); k++)
1875               tsFile->getVirtualVariableGroup()->addChild(fileVars[k]);
1876
1877           
1878             tsFile->name = tsPrefix + "_";
1879             if (!field->name.isEmpty())
1880               tsFile->name.get() += field->name;
1881             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1882               tsFile->name.get() += field->field_ref;
1883             else
1884               tsFile->name.get() += field->getId();
1885
1886             if (!field->ts_split_freq.isEmpty())
1887               tsFile->split_freq = field->ts_split_freq;
1888
1889             CField* tsField = tsFile->addField();
1890             tsField->field_ref = field->getId();
1891
1892             // Add variables originating from file and targeted to timeserie field
1893             for (size_t k = 0; k < fieldVars.size(); k++)
1894               tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1895
1896             vars = field->getAllVariables();
1897             for (size_t k = 0; k < vars.size(); k++)
1898             {
1899               CVariable* var = vars[k];
1900
1901               // Add variables originating from field and targeted to timeserie field
1902               if (var->ts_target.isEmpty()
1903                    || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1904                 tsField->getVirtualVariableGroup()->addChild(var);
1905
1906               // Add variables originating from field and targeted to timeserie file
1907               if (!var->ts_target.isEmpty()
1908                    && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1909                 tsFile->getVirtualVariableGroup()->addChild(var);
1910             }
1911
1912             tsFile->solveFieldRefInheritance(true);
1913
1914             if (file->timeseries == CFile::timeseries_attr::exclusive)
1915               field->enabled = false;
1916           }
1917         }
1918
1919         // Finally disable the original file is need be
1920         if (file->timeseries == CFile::timeseries_attr::only)
1921          file->enabled = false;
1922       }
1923     }
1924   }
1925   CATCH_DUMP_ATTR
1926
1927 
1928   //! Client side: Send information of reference domain, axis and scalar of active fields
1929   void CContext::sendRefDomainsAxisScalars(const std::vector<CFile*>& activeFiles)
1930   TRY
1931   {
1932     std::set<pair<StdString,CContextClient*>> domainIds, axisIds, scalarIds;
1933
1934     // Find all reference domain and axis of all active fields
1935     int numEnabledFiles = activeFiles.size();
1936     for (int i = 0; i < numEnabledFiles; ++i)
1937     {
1938       std::vector<CField*> enabledFields = activeFiles[i]->getEnabledFields();
1939       int numEnabledFields = enabledFields.size();
1940       for (int j = 0; j < numEnabledFields; ++j)
1941       {
1942         CContextClient* contextClient=enabledFields[j]->getContextClient() ;
1943         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1944         if ("" != prDomAxisScalarId[0]) domainIds.insert(make_pair(prDomAxisScalarId[0],contextClient));
1945         if ("" != prDomAxisScalarId[1]) axisIds.insert(make_pair(prDomAxisScalarId[1],contextClient));
1946         if ("" != prDomAxisScalarId[2]) scalarIds.insert(make_pair(prDomAxisScalarId[2],contextClient));
1947       }
1948     }
1949
1950     // Create all reference axis on server side
1951     std::set<StdString>::iterator itDom, itAxis, itScalar;
1952     std::set<StdString>::const_iterator itE;
1953
1954     StdString scalarDefRoot("scalar_definition");
1955     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1956     
1957     for (auto itScalar = scalarIds.begin(); itScalar != scalarIds.end(); ++itScalar)
1958     {
1959       if (!itScalar->first.empty())
1960       {
1961         scalarPtr->sendCreateChild(itScalar->first,itScalar->second);
1962         CScalar::get(itScalar->first)->sendAllAttributesToServer(itScalar->second);
1963       }
1964     }
1965
1966     StdString axiDefRoot("axis_definition");
1967     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1968     
1969     for (auto itAxis = axisIds.begin(); itAxis != axisIds.end(); ++itAxis)
1970     {
1971       if (!itAxis->first.empty())
1972       {
1973         axisPtr->sendCreateChild(itAxis->first, itAxis->second);
1974         CAxis::get(itAxis->first)->sendAllAttributesToServer(itAxis->second);
1975       }
1976     }
1977
1978     // Create all reference domains on server side
1979     StdString domDefRoot("domain_definition");
1980     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1981     
1982     for (auto itDom = domainIds.begin(); itDom != domainIds.end(); ++itDom)
1983     {
1984       if (!itDom->first.empty()) {
1985          domPtr->sendCreateChild(itDom->first, itDom->second);
1986          CDomain::get(itDom->first)->sendAllAttributesToServer(itDom->second);
1987       }
1988     }
1989   }
1990   CATCH_DUMP_ATTR
1991
1992   void CContext::triggerLateFields(void)
1993   TRY
1994   {
1995    for(auto& field : fileInFields_) field->triggerLateField() ;
1996    for(auto& field : couplerInFields_) field->triggerLateField() ;
1997   }
1998   CATCH_DUMP_ATTR
1999
2000   //! Update calendar in each time step
2001   void CContext::updateCalendar(int step)
2002   TRY
2003   {
2004      int prevStep = calendar->getStep();
2005
2006      if (prevStep < step)
2007      {
2008        if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data
2009        {
2010          triggerLateFields();
2011        }
2012
2013        info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
2014        calendar->update(step);
2015        info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
2016  #ifdef XIOS_MEMTRACK_LIGHT
2017        info(50) << " Current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<<step<<" of context "<<this->getId()<<endl ;
2018  #endif
2019
2020        if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data
2021        {
2022          doPostTimestepOperationsForEnabledReadModeFiles();
2023          garbageCollector.invalidate(calendar->getCurrentDate());
2024        }
2025      }
2026      else if (prevStep == step)
2027        info(50) << "updateCalendar: already at step " << step << ", no operation done." << endl;
2028      else // if (prevStep > step)
2029        ERROR("void CContext::updateCalendar(int step)",
2030              << "Illegal calendar update: previous step was " << prevStep << ", new step " << step << "is in the past!")
2031   }
2032   CATCH_DUMP_ATTR
2033
2034   void CContext::initReadFiles(void)
2035   TRY
2036   {
2037      vector<CFile*>::const_iterator it;
2038
2039      for (it=enabledReadModeFiles.begin(); it != enabledReadModeFiles.end(); it++)
2040      {
2041         (*it)->initRead();
2042      }
2043   }
2044   CATCH_DUMP_ATTR
2045
2046   //! Server side: Create header of netcdf file
2047   void CContext::createFileHeader(void)
2048   TRY
2049   {
2050      vector<CFile*>::const_iterator it;
2051
2052      //for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
2053      for (it=enabledWriteModeFiles.begin(); it != enabledWriteModeFiles.end(); it++)
2054      {
2055         (*it)->initWrite();
2056      }
2057   }
2058   CATCH_DUMP_ATTR
2059
2060   //! Get current context
2061   CContext* CContext::getCurrent(void)
2062   TRY
2063   {
2064     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
2065   }
2066   CATCH
2067
2068   /*!
2069   \brief Set context with an id be the current context
2070   \param [in] id identity of context to be set to current
2071   */
2072   void CContext::setCurrent(const string& id)
2073   TRY
2074   {
2075     CObjectFactory::SetCurrentContextId(id);
2076     CGroupFactory::SetCurrentContextId(id);
2077   }
2078   CATCH
2079
2080  /*!
2081  \brief Create a context with specific id
2082  \param [in] id identity of new context
2083  \return pointer to the new context or already-existed one with identity id
2084  */
2085  CContext* CContext::create(const StdString& id)
2086  TRY
2087  {
2088    CContext::setCurrent(id);
2089
2090    bool hasctxt = CContext::has(id);
2091    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
2092    getRoot();
2093    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
2094
2095#define DECLARE_NODE(Name_, name_) \
2096    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
2097#define DECLARE_NODE_PAR(Name_, name_)
2098#include "node_type.conf"
2099
2100    return (context);
2101  }
2102  CATCH
2103
2104 
2105  void CContext::sendFinalizeClient(CContextClient* contextClient, const string& contextClientId)
2106  TRY
2107  {
2108    CEventClient event(getType(),EVENT_ID_CONTEXT_FINALIZE_CLIENT);
2109    if (contextClient->isServerLeader())
2110    {
2111      CMessage msg;
2112      msg<<contextClientId ;
2113      const std::list<int>& ranks = contextClient->getRanksServerLeader();
2114      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
2115           event.push(*itRank,1,msg);
2116      contextClient->sendEvent(event);
2117    }
2118    else contextClient->sendEvent(event);
2119  }
2120  CATCH_DUMP_ATTR
2121
2122 
2123  void CContext::recvFinalizeClient(CEventServer& event)
2124  TRY
2125  {
2126    CBufferIn* buffer=event.subEvents.begin()->buffer;
2127    string id;
2128    *buffer>>id;
2129    get(id)->recvFinalizeClient(*buffer);
2130  }
2131  CATCH
2132
2133  void CContext::recvFinalizeClient(CBufferIn& buffer)
2134  TRY
2135  {
2136    countChildContextFinalized_++ ;
2137  }
2138  CATCH_DUMP_ATTR
2139
2140
2141
2142
2143 //! Client side: Send a message  announcing that context can receive grid definition from coupling
2144   void CContext::sendCouplerInReady(CContextClient* client)
2145   TRY
2146   {
2147      if (sendCouplerInReady_done_.count(client)!=0) return ;
2148      else sendCouplerInReady_done_.insert(client) ;
2149
2150      CEventClient event(getType(),EVENT_ID_COUPLER_IN_READY);
2151
2152      if (client->isServerLeader())
2153      {
2154        CMessage msg;
2155        msg<<this->getId();
2156        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2157        client->sendEvent(event);
2158      }
2159      else client->sendEvent(event);
2160   }
2161   CATCH_DUMP_ATTR
2162
2163   //! Server side: Receive a message announcing that context can send grid definition for context coupling
2164   void CContext::recvCouplerInReady(CEventServer& event)
2165   TRY
2166   {
2167      CBufferIn* buffer=event.subEvents.begin()->buffer;
2168      getCurrent()->recvCouplerInReady(*buffer);
2169   }
2170   CATCH
2171
2172   //! Server side: Receive a message announcing that context can send grid definition for context coupling
2173   void CContext::recvCouplerInReady(CBufferIn& buffer)
2174   TRY
2175   {
2176      string contextId ;
2177      buffer>>contextId;
2178      couplerInReady_.insert(getCouplerOutClient(contextId)) ;
2179   }
2180   CATCH_DUMP_ATTR
2181
2182
2183
2184
2185
2186 //! Client side: Send a message  announcing that a coupling context have done it closeDefinition, so data can be sent now.
2187   void CContext::sendCouplerInCloseDefinition(CContextClient* client)
2188   TRY
2189   {
2190      if (sendCouplerInCloseDefinition_done_.count(client)!=0) return ;
2191      else sendCouplerInCloseDefinition_done_.insert(client) ;
2192
2193      CEventClient event(getType(),EVENT_ID_COUPLER_IN_CLOSE_DEFINITION);
2194
2195      if (client->isServerLeader())
2196      {
2197        CMessage msg;
2198        msg<<this->getId();
2199        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2200        client->sendEvent(event);
2201      }
2202      else client->sendEvent(event);
2203   }
2204   CATCH_DUMP_ATTR
2205
2206   //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now.
2207   void CContext::recvCouplerInCloseDefinition(CEventServer& event)
2208   TRY
2209   {
2210      CBufferIn* buffer=event.subEvents.begin()->buffer;
2211      getCurrent()->recvCouplerInCloseDefinition(*buffer);
2212   }
2213   CATCH
2214
2215   //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now.
2216   void CContext::recvCouplerInCloseDefinition(CBufferIn& buffer)
2217   TRY
2218   {
2219      string contextId ;
2220      buffer>>contextId;
2221      couplerInCloseDefinition_.insert(getCouplerOutClient(contextId)) ;
2222   }
2223   CATCH_DUMP_ATTR
2224
2225
2226
2227
2228//! Client side: Send a message  announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2229   void CContext::sendCouplerInContextFinalized(CContextClient* client)
2230   TRY
2231   {
2232      if (sendCouplerInContextFinalized_done_.count(client)!=0) return ;
2233      else sendCouplerInContextFinalized_done_.insert(client) ;
2234
2235      CEventClient event(getType(),EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED);
2236
2237      if (client->isServerLeader())
2238      {
2239        CMessage msg;
2240        msg<<this->getId();
2241        for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg);
2242        client->sendEvent(event);
2243      }
2244      else client->sendEvent(event);
2245   }
2246   CATCH_DUMP_ATTR
2247
2248   //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2249   void CContext::recvCouplerInContextFinalized(CEventServer& event)
2250   TRY
2251   {
2252      CBufferIn* buffer=event.subEvents.begin()->buffer;
2253      getCurrent()->recvCouplerInContextFinalized(*buffer);
2254   }
2255   CATCH
2256
2257   //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context.
2258   void CContext::recvCouplerInContextFinalized(CBufferIn& buffer)
2259   TRY
2260   {
2261      string contextId ;
2262      buffer>>contextId;
2263      couplerInContextFinalized_.insert(getCouplerOutClient(contextId)) ;
2264   }
2265   CATCH_DUMP_ATTR
2266
2267
2268
2269
2270  /*!
2271  * \fn bool CContext::isFinalized(void)
2272  * Context is finalized if it received context post finalize event.
2273  */
2274  bool CContext::isFinalized(void)
2275  TRY
2276  {
2277    return finalized;
2278  }
2279  CATCH_DUMP_ATTR
2280  ///--------------------------------------------------------------
2281  StdString CContext::dumpClassAttributes(void)
2282  {
2283    StdString str;
2284    str.append("enabled files=\"");
2285    int size = this->enabledFiles.size();
2286    for (int i = 0; i < size; ++i)
2287    {
2288      str.append(enabledFiles[i]->getId());
2289      str.append(" ");
2290    }
2291    str.append("\"");
2292    return str;
2293  }
2294
2295} // namespace xios
Note: See TracBrowser for help on using the repository browser.