source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1060

Last change on this file since 1060 was 1060, checked in by yushan, 7 years ago

minor corrections + clean up

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 42.3 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Dfinitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrmentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     ep_lib::MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262     }
263     server = new CContextServer(this,intraCommServer,interCommServer);
264   }
265
266   void CContext::setClientServerBuffer()
267   {
268     size_t minBufferSize = CXios::minBufferSize;
269#define DECLARE_NODE(Name_, name_)    \
270     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
271#define DECLARE_NODE_PAR(Name_, name_)
272#include "node_type.conf"
273#undef DECLARE_NODE
274#undef DECLARE_NODE_PAR
275
276     std::map<int, StdSize> maxEventSize;
277     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
278     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
279
280     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
281     for (it = dataBufferSize.begin(); it != ite; ++it)
282       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
283
284     ite = bufferSize.end();
285     for (it = bufferSize.begin(); it != ite; ++it)
286     {
287       it->second *= CXios::bufferSizeFactor;
288       if (it->second < minBufferSize) it->second = minBufferSize;
289     }
290
291     // We consider that the minimum buffer size is also the minimum event size
292     ite = maxEventSize.end();
293     for (it = maxEventSize.begin(); it != ite; ++it)
294       if (it->second < minBufferSize) it->second = minBufferSize;
295
296     if (client->isServerLeader())
297     {
298       const std::list<int>& ranks = client->getRanksServerLeader();
299       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
300         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
301     }
302
303     client->setBufferSize(bufferSize, maxEventSize);
304   }
305
306   //! Verify whether a context is initialized
307   bool CContext::isInitialized(void)
308   {
309     return hasClient;
310   }
311
312   //! Initialize server
313   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
314   {
315     hasServer=true;
316     server = new CContextServer(this,intraComm,interComm);
317
318     registryIn=new CRegistry(intraComm);
319     registryIn->setPath(getId()) ;
320     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
321     registryIn->bcastRegistry() ;
322     registryOut=new CRegistry(intraComm) ;
323     registryOut->setPath(getId()) ;
324
325     ep_lib::MPI_Comm intraCommClient, interCommClient;
326     if (cxtClient) // Attached mode
327     {
328       intraCommClient = intraComm;
329       interCommClient = interComm;
330     }
331     else
332     {
333       MPI_Comm_dup(intraComm, &intraCommClient);
334       comms.push_back(intraCommClient);
335       MPI_Comm_dup(interComm, &interCommClient);
336       comms.push_back(interCommClient);
337     }
338     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
339   }
340
341   //! Server side: Put server into a loop in order to listen message from client
342   bool CContext::eventLoop(void)
343   {
344     return server->eventLoop();
345   }
346
347   //! Try to send the buffers and receive possible answers
348   bool CContext::checkBuffersAndListen(void)
349   {
350     client->checkBuffers();
351     return server->eventLoop();
352   }
353
354   //! Terminate a context
355   void CContext::finalize(void)
356   {
357      if (!finalized)
358      {
359        finalized = true;
360        if (hasClient) sendRegistry() ;
361        client->finalize();
362        while (!server->hasFinished())
363        {
364          server->eventLoop();
365        }
366
367        if (hasServer)
368        {
369          closeAllFile();
370          registryOut->hierarchicalGatherRegistry() ;
371          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
372        }
373
374        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
375          MPI_Comm_free(&(*it));
376        comms.clear();
377      }
378   }
379
380   /*!
381   \brief Close all the context defintion and do processing data
382      After everything is well defined on client side, they will be processed and sent to server
383   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
384   all necessary information to server, from which each server can build its own database.
385   Because the role of server is to write out field data on a specific netcdf file,
386   the only information that it needs is the enabled files
387   and the active fields (fields will be written onto active files)
388   */
389   void CContext::closeDefinition(void)
390   {
391     int myRank;
392     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
393
394     //printf("myRank = %d, hasClient = %d, hasServer = %d\n", myRank, hasClient, hasServer);
395
396     // There is nothing client need to send to server
397     if (hasClient)
398     {
399       // After xml is parsed, there are some more works with post processing
400       postProcessing(); 
401       //printf("myRank = %d,                postProcessing OK\n", myRank);
402     }
403     setClientServerBuffer(); //printf("myRank = %d, setClientServerBuffer OK\n", myRank);
404
405     //printf("hasClient = %d, hasServer = %d\n", hasClient, hasServer);
406
407     if (hasClient && !hasServer)
408     {
409      // Send all attributes of current context to server
410      this->sendAllAttributesToServer(); //printf("myRank = %d, this->sendAllAttributesToServer OK\n", myRank);
411
412      // Send all attributes of current calendar
413      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
414      //printf("myRank = %d, CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer OK\n", myRank);
415
416      // We have enough information to send to server
417      // First of all, send all enabled files
418       sendEnabledFiles();  //printf("myRank = %d, sendEnabledFiles OK\n", myRank);
419
420      // Then, send all enabled fields
421       sendEnabledFields();  //printf("myRank = %d, sendEnabledFields OK\n", myRank);
422
423      // At last, we have all info of domain and axis, then send them
424       sendRefDomainsAxis();  //printf("myRank = %d, sendRefDomainsAxis OK\n", myRank);
425
426      // After that, send all grid (if any)
427       sendRefGrid(); //printf("myRank = %d, sendRefGrid OK\n", myRank);
428    }
429
430    // We have a xml tree on the server side and now, it should be also processed
431    if (hasClient && !hasServer) sendPostProcessing(); 
432
433    // There are some processings that should be done after all of above. For example: check mask or index
434    if (hasClient)
435    {
436      this->buildFilterGraphOfEnabledFields();  //printf("myRank = %d, buildFilterGraphOfEnabledFields OK\n", myRank);
437      buildFilterGraphOfFieldsWithReadAccess();  //printf("myRank = %d, buildFilterGraphOfFieldsWithReadAccess OK\n", myRank);
438      this->solveAllRefOfEnabledFields(true);  //printf("myRank = %d, solveAllRefOfEnabledFields OK\n", myRank);
439    }
440
441    // Now tell server that it can process all messages from client
442    if (hasClient && !hasServer) this->sendCloseDefinition();
443
444    // Nettoyage de l'arborescence
445    if (hasClient && !hasServer) CleanTree();
446
447    if (hasClient)
448    {
449      sendCreateFileHeader();  //printf("myRank = %d, sendCreateFileHeader OK\n", myRank);
450
451      startPrefetchingOfEnabledReadModeFiles();  //printf("myRank = %d, startPrefetchingOfEnabledReadModeFiles OK\n", myRank);
452    }
453   }
454
455   void CContext::findAllEnabledFields(void)
456   {
457     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
458     (void)this->enabledFiles[i]->getEnabledFields();
459   }
460
461   void CContext::findAllEnabledFieldsInReadModeFiles(void)
462   {
463     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
464     (void)this->enabledReadModeFiles[i]->getEnabledFields();
465   }
466
467   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
468   {
469      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
470        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
471   }
472
473   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
474   {
475     int size = this->enabledFiles.size();
476     for (int i = 0; i < size; ++i)
477     {
478       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
479     }
480
481     for (int i = 0; i < size; ++i)
482     {
483       this->enabledFiles[i]->generateNewTransformationGridDest();
484     }
485   }
486
487   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
488   {
489     int size = this->enabledFiles.size();
490     
491     for (int i = 0; i < size; ++i)
492     {
493       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
494     }
495   }
496
497   void CContext::buildFilterGraphOfEnabledFields()
498   {
499     int size = this->enabledFiles.size();
500     for (int i = 0; i < size; ++i)
501     {
502       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
503     }
504   }
505
506   void CContext::startPrefetchingOfEnabledReadModeFiles()
507   {
508     int size = enabledReadModeFiles.size();
509     for (int i = 0; i < size; ++i)
510     {
511        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
512     }
513   }
514
515   void CContext::checkPrefetchingOfEnabledReadModeFiles()
516   {
517     int size = enabledReadModeFiles.size();
518     for (int i = 0; i < size; ++i)
519     {
520        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
521     }
522   }
523
524  void CContext::findFieldsWithReadAccess(void)
525  {
526    fieldsWithReadAccess.clear();
527    const vector<CField*> allFields = CField::getAll();
528    for (size_t i = 0; i < allFields.size(); ++i)
529    {
530      CField* field = allFields[i];
531
532      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
533        field->read_access = true;
534      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
535        fieldsWithReadAccess.push_back(field);
536    }
537  }
538
539  void CContext::solveAllRefOfFieldsWithReadAccess()
540  {
541    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
542      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
543  }
544
545  void CContext::buildFilterGraphOfFieldsWithReadAccess()
546  {
547    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
548      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
549  }
550
551   void CContext::solveAllInheritance(bool apply)
552   {
553     // Rsolution des hritages descendants (cd des hritages de groupes)
554     // pour chacun des contextes.
555      solveDescInheritance(apply);
556
557     // Rsolution des hritages par rfrence au niveau des fichiers.
558      const vector<CFile*> allFiles=CFile::getAll();
559      const vector<CGrid*> allGrids= CGrid::getAll();
560
561     //if (hasClient && !hasServer)
562      if (hasClient)
563      {
564        for (unsigned int i = 0; i < allFiles.size(); i++)
565          allFiles[i]->solveFieldRefInheritance(apply);
566      }
567
568      unsigned int vecSize = allGrids.size();
569      unsigned int i = 0;
570      for (i = 0; i < vecSize; ++i)
571        allGrids[i]->solveDomainAxisRefInheritance(apply);
572
573   }
574
575   void CContext::findEnabledFiles(void)
576   {
577      const std::vector<CFile*> allFiles = CFile::getAll();
578
579      for (unsigned int i = 0; i < allFiles.size(); i++)
580         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est dfini.
581         {
582            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix  vrai.
583               enabledFiles.push_back(allFiles[i]);
584         }
585         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
586
587
588      if (enabledFiles.size() == 0)
589         DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm \""
590               << getId() << "\" !");
591   }
592
593   void CContext::findEnabledReadModeFiles(void)
594   {
595     int size = this->enabledFiles.size();
596     for (int i = 0; i < size; ++i)
597     {
598       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
599        enabledReadModeFiles.push_back(enabledFiles[i]);
600     }
601   }
602
603   void CContext::closeAllFile(void)
604   {
605     std::vector<CFile*>::const_iterator
606            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
607
608     for (; it != end; it++)
609     {
610       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
611       (*it)->close();
612     }
613   }
614
615   /*!
616   \brief Dispatch event received from client
617      Whenever a message is received in buffer of server, it will be processed depending on
618   its event type. A new event type should be added in the switch list to make sure
619   it processed on server side.
620   \param [in] event: Received message
621   */
622   bool CContext::dispatchEvent(CEventServer& event)
623   {
624
625      if (SuperClass::dispatchEvent(event)) return true;
626      else
627      {
628        switch(event.type)
629        {
630           case EVENT_ID_CLOSE_DEFINITION :
631             recvCloseDefinition(event);
632             return true;
633             break;
634           case EVENT_ID_UPDATE_CALENDAR:
635             recvUpdateCalendar(event);
636             return true;
637             break;
638           case EVENT_ID_CREATE_FILE_HEADER :
639             recvCreateFileHeader(event);
640             return true;
641             break;
642           case EVENT_ID_POST_PROCESS:
643             recvPostProcessing(event);
644             return true;
645            case EVENT_ID_SEND_REGISTRY:
646             recvRegistry(event);
647             return true;
648            break;
649
650           default :
651             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
652                    <<"Unknown Event");
653           return false;
654         }
655      }
656   }
657
658   //! Client side: Send a message to server to make it close
659   void CContext::sendCloseDefinition(void)
660   {
661     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
662     if (client->isServerLeader())
663     {
664       CMessage msg;
665       msg<<this->getIdServer();
666       const std::list<int>& ranks = client->getRanksServerLeader();
667       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
668         event.push(*itRank,1,msg);
669       client->sendEvent(event);
670     }
671     else client->sendEvent(event);
672   }
673
674   //! Server side: Receive a message of client announcing a context close
675   void CContext::recvCloseDefinition(CEventServer& event)
676   {
677
678      CBufferIn* buffer=event.subEvents.begin()->buffer;
679      string id;
680      *buffer>>id;
681      get(id)->closeDefinition();
682   }
683
684   //! Client side: Send a message to update calendar in each time step
685   void CContext::sendUpdateCalendar(int step)
686   {
687     if (!hasServer)
688     {
689       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
690       if (client->isServerLeader())
691       {
692         CMessage msg;
693         msg<<this->getIdServer()<<step;
694         const std::list<int>& ranks = client->getRanksServerLeader();
695         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
696           event.push(*itRank,1,msg);
697         client->sendEvent(event);
698       }
699       else client->sendEvent(event);
700     }
701   }
702
703   //! Server side: Receive a message of client annoucing calendar update
704   void CContext::recvUpdateCalendar(CEventServer& event)
705   {
706      CBufferIn* buffer=event.subEvents.begin()->buffer;
707      string id;
708      *buffer>>id;
709      get(id)->recvUpdateCalendar(*buffer);
710   }
711
712   //! Server side: Receive a message of client annoucing calendar update
713   void CContext::recvUpdateCalendar(CBufferIn& buffer)
714   {
715      int step;
716      buffer>>step;
717      updateCalendar(step);
718   }
719
720   //! Client side: Send a message to create header part of netcdf file
721   void CContext::sendCreateFileHeader(void)
722   {
723     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
724     if (client->isServerLeader())
725     {
726       CMessage msg;
727       msg<<this->getIdServer();
728       const std::list<int>& ranks = client->getRanksServerLeader();
729       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
730         event.push(*itRank,1,msg) ;
731       client->sendEvent(event);
732     }
733     else client->sendEvent(event);
734   }
735
736   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
737   void CContext::recvCreateFileHeader(CEventServer& event)
738   {
739      CBufferIn* buffer=event.subEvents.begin()->buffer;
740      string id;
741      *buffer>>id;
742      get(id)->recvCreateFileHeader(*buffer);
743   }
744
745   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
746   void CContext::recvCreateFileHeader(CBufferIn& buffer)
747   {
748      createFileHeader();
749   }
750
751   //! Client side: Send a message to do some post processing on server
752   void CContext::sendPostProcessing()
753   {
754     if (!hasServer)
755     {
756       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
757       if (client->isServerLeader())
758       {
759         CMessage msg;
760         msg<<this->getIdServer();
761         const std::list<int>& ranks = client->getRanksServerLeader();
762         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
763           event.push(*itRank,1,msg);
764         client->sendEvent(event);
765       }
766       else client->sendEvent(event);
767     }
768   }
769
770   //! Server side: Receive a message to do some post processing
771   void CContext::recvPostProcessing(CEventServer& event)
772   {
773      CBufferIn* buffer=event.subEvents.begin()->buffer;
774      string id;
775      *buffer>>id;
776      get(id)->recvPostProcessing(*buffer);
777   }
778
779   //! Server side: Receive a message to do some post processing
780   void CContext::recvPostProcessing(CBufferIn& buffer)
781   {
782      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
783      postProcessing();
784   }
785
786   const StdString& CContext::getIdServer()
787   {
788      if (hasClient)
789      {
790        idServer_ = this->getId();
791        idServer_ += "_server";
792        return idServer_;
793      }
794      if (hasServer) return (this->getId());
795   }
796
797   /*!
798   \brief Do some simple post processings after parsing xml file
799      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
800   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
801   which will be written out into netcdf files, are processed
802   */
803   void CContext::postProcessing()
804   {
805     int myRank;
806     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
807
808     //printf("myRank = %d, in postProcessing, isPostProcessed = %d\n", myRank, isPostProcessed);
809     if (isPostProcessed) return;
810
811      // Make sure the calendar was correctly created
812      if (!calendar)
813        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
814      else if (calendar->getTimeStep() == NoneDu)
815        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
816      // Calendar first update to set the current date equals to the start date
817      calendar->update(0);  //printf("myRank = %d, calendar->update(0) OK\n", myRank);
818
819      // Find all inheritance in xml structure
820      this->solveAllInheritance();  //printf("myRank = %d, this->solveAllInheritance OK\n", myRank);
821
822      // Check if some axis, domains or grids are eligible to for compressed indexed output.
823      // Warning: This must be done after solving the inheritance and before the rest of post-processing
824      checkAxisDomainsGridsEligibilityForCompressedOutput();  //printf("myRank = %d, checkAxisDomainsGridsEligibilityForCompressedOutput OK\n", myRank);
825
826      // Check if some automatic time series should be generated
827      // Warning: This must be done after solving the inheritance and before the rest of post-processing
828      prepareTimeseries();  //printf("myRank = %d, prepareTimeseries OK\n", myRank);
829
830      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers  sortir.
831      this->findEnabledFiles();  //printf("myRank = %d, this->findEnabledFiles OK\n", myRank);
832      this->findEnabledReadModeFiles();  //printf("myRank = %d, this->findEnabledReadModeFiles OK\n", myRank);
833
834      // Find all enabled fields of each file
835      this->findAllEnabledFields();  //printf("myRank = %d, this->findAllEnabledFields OK\n", myRank);
836      this->findAllEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->findAllEnabledFieldsInReadModeFiles OK\n", myRank);
837
838     if (hasClient && !hasServer)
839     {
840      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
841      this->readAttributesOfEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->readAttributesOfEnabledFieldsInReadModeFiles OK\n", myRank);
842     }
843
844      // Only search and rebuild all reference objects of enable fields, don't transform
845      this->solveOnlyRefOfEnabledFields(false);  //printf("myRank = %d, this->solveOnlyRefOfEnabledFields(false) OK\n", myRank);
846
847      // Search and rebuild all reference object of enabled fields
848      this->solveAllRefOfEnabledFields(false);  //printf("myRank = %d, this->solveAllRefOfEnabledFields(false) OK\n", myRank);
849
850      // Find all fields with read access from the public API
851      findFieldsWithReadAccess();  //printf("myRank = %d, findFieldsWithReadAccess OK\n", myRank);
852      // and solve the all reference for them
853      solveAllRefOfFieldsWithReadAccess();  //printf("myRank = %d, solveAllRefOfFieldsWithReadAccess OK\n", myRank);
854
855      isPostProcessed = true;
856   }
857
858   /*!
859    * Compute the required buffer size to send the attributes (mostly those grid related).
860    *
861    * \param maxEventSize [in/out] the size of the bigger event for each connected server
862    */
863   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
864   {
865     std::map<int, StdSize> attributesSize;
866
867     if (hasClient)
868     {
869       size_t numEnabledFiles = this->enabledFiles.size();
870       for (size_t i = 0; i < numEnabledFiles; ++i)
871       {
872         CFile* file = this->enabledFiles[i];
873
874         std::vector<CField*> enabledFields = file->getEnabledFields();
875         size_t numEnabledFields = enabledFields.size();
876         for (size_t j = 0; j < numEnabledFields; ++j)
877         {
878           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
879           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
880           for (; it != itE; ++it)
881           {
882             // If attributesSize[it->first] does not exist, it will be zero-initialized
883             // so we can use it safely without checking for its existance
884             if (attributesSize[it->first] < it->second)
885               attributesSize[it->first] = it->second;
886
887             if (maxEventSize[it->first] < it->second)
888               maxEventSize[it->first] = it->second;
889           }
890         }
891       }
892     }
893
894     return attributesSize;
895   }
896
897   /*!
898    * Compute the required buffer size to send the fields data.
899    *
900    * \param maxEventSize [in/out] the size of the bigger event for each connected server
901    */
902   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
903   {
904     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
905
906     std::map<int, StdSize> dataSize;
907
908     // Find all reference domain and axis of all active fields
909     size_t numEnabledFiles = this->enabledFiles.size();
910     for (size_t i = 0; i < numEnabledFiles; ++i)
911     {
912       CFile* file = this->enabledFiles[i];
913       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
914
915       if (fileMode == mode)
916       {
917         std::vector<CField*> enabledFields = file->getEnabledFields();
918         size_t numEnabledFields = enabledFields.size();
919         for (size_t j = 0; j < numEnabledFields; ++j)
920         {
921           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
922           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
923           for (; it != itE; ++it)
924           {
925             // If dataSize[it->first] does not exist, it will be zero-initialized
926             // so we can use it safely without checking for its existance
927             if (CXios::isOptPerformance)
928               dataSize[it->first] += it->second;
929             else if (dataSize[it->first] < it->second)
930               dataSize[it->first] = it->second;
931
932             if (maxEventSize[it->first] < it->second)
933               maxEventSize[it->first] = it->second;
934           }
935         }
936       }
937     }
938
939     return dataSize;
940   }
941
942   //! Client side: Send infomation of active files (files are enabled to write out)
943   void CContext::sendEnabledFiles()
944   {
945     int size = this->enabledFiles.size();
946
947     // In a context, each type has a root definition, e.g: axis, domain, field.
948     // Every object must be a child of one of these root definition. In this case
949     // all new file objects created on server must be children of the root "file_definition"
950     StdString fileDefRoot("file_definition");
951     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
952
953     for (int i = 0; i < size; ++i)
954     {
955       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
956       this->enabledFiles[i]->sendAllAttributesToServer();
957       this->enabledFiles[i]->sendAddAllVariables();
958     }
959   }
960
961   //! Client side: Send information of active fields (ones are written onto files)
962   void CContext::sendEnabledFields()
963   {
964     int size = this->enabledFiles.size();
965     for (int i = 0; i < size; ++i)
966     {
967       this->enabledFiles[i]->sendEnabledFields();
968     }
969   }
970
971   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
972   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
973   {
974     if (!hasClient) return;
975
976     const vector<CAxis*> allAxis = CAxis::getAll();
977     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
978       (*it)->checkEligibilityForCompressedOutput();
979
980     const vector<CDomain*> allDomains = CDomain::getAll();
981     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
982       (*it)->checkEligibilityForCompressedOutput();
983
984     const vector<CGrid*> allGrids = CGrid::getAll();
985     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
986       (*it)->checkEligibilityForCompressedOutput();
987   }
988
989   //! Client side: Prepare the timeseries by adding the necessary files
990   void CContext::prepareTimeseries()
991   {
992     if (!hasClient) return;
993
994     const std::vector<CFile*> allFiles = CFile::getAll();
995     for (size_t i = 0; i < allFiles.size(); i++)
996     {
997       CFile* file = allFiles[i];
998
999       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1000       {
1001         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1002
1003         const std::vector<CField*> allFields = file->getAllFields();
1004         for (size_t j = 0; j < allFields.size(); j++)
1005         {
1006           CField* field = allFields[j];
1007
1008           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1009           {
1010             CFile* tsFile = CFile::create();
1011             tsFile->duplicateAttributes(file);
1012             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1013
1014             tsFile->name = tsPrefix + "_";
1015             if (!field->name.isEmpty())
1016               tsFile->name.get() += field->name;
1017             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1018               tsFile->name.get() += field->field_ref;
1019             else
1020               tsFile->name.get() += field->getId();
1021
1022             if (!field->ts_split_freq.isEmpty())
1023               tsFile->split_freq = field->ts_split_freq;
1024
1025             CField* tsField = tsFile->addField();
1026             tsField->field_ref = field->getId();
1027             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1028
1029             tsFile->solveFieldRefInheritance(true);
1030
1031             if (file->timeseries == CFile::timeseries_attr::exclusive)
1032               field->enabled = false;
1033           }
1034         }
1035
1036         // Finally disable the original file is need be
1037         if (file->timeseries == CFile::timeseries_attr::only)
1038          file->enabled = false;
1039       }
1040     }
1041   }
1042
1043   //! Client side: Send information of reference grid of active fields
1044   void CContext::sendRefGrid()
1045   {
1046     std::set<StdString> gridIds;
1047     int sizeFile = this->enabledFiles.size();
1048     CFile* filePtr(NULL);
1049
1050     // Firstly, find all reference grids of all active fields
1051     for (int i = 0; i < sizeFile; ++i)
1052     {
1053       filePtr = this->enabledFiles[i];
1054       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1055       int sizeField = enabledFields.size();
1056       for (int numField = 0; numField < sizeField; ++numField)
1057       {
1058         if (0 != enabledFields[numField]->getRelGrid())
1059           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1060       }
1061     }
1062
1063     // Create all reference grids on server side
1064     StdString gridDefRoot("grid_definition");
1065     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1066     std::set<StdString>::const_iterator it, itE = gridIds.end();
1067     for (it = gridIds.begin(); it != itE; ++it)
1068     {
1069       gridPtr->sendCreateChild(*it);
1070       CGrid::get(*it)->sendAllAttributesToServer();
1071       CGrid::get(*it)->sendAllDomains();
1072       CGrid::get(*it)->sendAllAxis();
1073       CGrid::get(*it)->sendAllScalars();
1074     }
1075   }
1076
1077
1078   //! Client side: Send information of reference domain and axis of active fields
1079   void CContext::sendRefDomainsAxis()
1080   {
1081     std::set<StdString> domainIds, axisIds, scalarIds;
1082
1083     // Find all reference domain and axis of all active fields
1084     int numEnabledFiles = this->enabledFiles.size();
1085     for (int i = 0; i < numEnabledFiles; ++i)
1086     {
1087       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1088       int numEnabledFields = enabledFields.size();
1089       for (int j = 0; j < numEnabledFields; ++j)
1090       {
1091         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1092         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1093         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1094         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1095       }
1096     }
1097
1098     // Create all reference axis on server side
1099     std::set<StdString>::iterator itDom, itAxis, itScalar;
1100     std::set<StdString>::const_iterator itE;
1101
1102     StdString scalarDefRoot("scalar_definition");
1103     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1104     itE = scalarIds.end();
1105     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1106     {
1107       if (!itScalar->empty())
1108       {
1109         scalarPtr->sendCreateChild(*itScalar);
1110         CScalar::get(*itScalar)->sendAllAttributesToServer();
1111       }
1112     }
1113
1114     StdString axiDefRoot("axis_definition");
1115     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1116     itE = axisIds.end();
1117     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1118     {
1119       if (!itAxis->empty())
1120       {
1121         axisPtr->sendCreateChild(*itAxis);
1122         CAxis::get(*itAxis)->sendAllAttributesToServer();
1123       }
1124     }
1125
1126     // Create all reference domains on server side
1127     StdString domDefRoot("domain_definition");
1128     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1129     itE = domainIds.end();
1130     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1131     {
1132       if (!itDom->empty()) {
1133          domPtr->sendCreateChild(*itDom);
1134          CDomain::get(*itDom)->sendAllAttributesToServer();
1135       }
1136     }
1137   }
1138
1139   //! Update calendar in each time step
1140   void CContext::updateCalendar(int step)
1141   {
1142      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1143      calendar->update(step);
1144      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1145
1146      if (hasClient)
1147      {
1148        checkPrefetchingOfEnabledReadModeFiles();
1149        garbageCollector.invalidate(calendar->getCurrentDate());
1150      }
1151   }
1152
1153   //! Server side: Create header of netcdf file
1154   void CContext::createFileHeader(void )
1155   {
1156      vector<CFile*>::const_iterator it;
1157
1158      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1159      {
1160         (*it)->initFile();
1161      }
1162   }
1163
1164   //! Get current context
1165   CContext* CContext::getCurrent(void)
1166   {
1167     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1168   }
1169
1170   /*!
1171   \brief Set context with an id be the current context
1172   \param [in] id identity of context to be set to current
1173   */
1174   void CContext::setCurrent(const string& id)
1175   {
1176     CObjectFactory::SetCurrentContextId(id);
1177     CGroupFactory::SetCurrentContextId(id);
1178   }
1179
1180  /*!
1181  \brief Create a context with specific id
1182  \param [in] id identity of new context
1183  \return pointer to the new context or already-existed one with identity id
1184  */
1185  CContext* CContext::create(const StdString& id)
1186  {
1187    CContext::setCurrent(id);
1188
1189    bool hasctxt = CContext::has(id);
1190    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1191    getRoot();
1192    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1193
1194#define DECLARE_NODE(Name_, name_) \
1195    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1196#define DECLARE_NODE_PAR(Name_, name_)
1197#include "node_type.conf"
1198
1199    return (context);
1200  }
1201
1202
1203
1204     //! Server side: Receive a message to do some post processing
1205  void CContext::recvRegistry(CEventServer& event)
1206  {
1207    CBufferIn* buffer=event.subEvents.begin()->buffer;
1208    string id;
1209    *buffer>>id;
1210    get(id)->recvRegistry(*buffer);
1211  }
1212
1213  void CContext::recvRegistry(CBufferIn& buffer)
1214  {
1215    if (server->intraCommRank==0)
1216    {
1217      CRegistry registry(server->intraComm) ;
1218      registry.fromBuffer(buffer) ;
1219      registryOut->mergeRegistry(registry) ;
1220    }
1221  }
1222
1223  void CContext::sendRegistry(void)
1224  {
1225    registryOut->hierarchicalGatherRegistry() ;
1226
1227    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1228    if (client->isServerLeader())
1229    {
1230       CMessage msg ;
1231       msg<<this->getIdServer();
1232       if (client->clientRank==0) msg<<*registryOut ;
1233       const std::list<int>& ranks = client->getRanksServerLeader();
1234       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1235         event.push(*itRank,1,msg);
1236       client->sendEvent(event);
1237    }
1238    else client->sendEvent(event);
1239  }
1240
1241} // namespace xios
Note: See TracBrowser for help on using the repository browser.