source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1115

Last change on this file since 1115 was 1115, checked in by yushan, 7 years ago

bug corrected in MPI_Gatherv

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 42.2 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  //shared_ptr<CContextGroup> CContext::root;
21  shared_ptr<CContextGroup> * CContext::root_ptr = 0;
22
23   /// ////////////////////// Dfinitions ////////////////////// ///
24
25   CContext::CContext(void)
26      : CObjectTemplate<CContext>(), CContextAttributes()
27      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
28      , idServer_(), client(0), server(0)
29   { /* Ne rien faire de plus */ }
30
31   CContext::CContext(const StdString & id)
32      : CObjectTemplate<CContext>(id), CContextAttributes()
33      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
34      , idServer_(), client(0), server(0)
35   { /* Ne rien faire de plus */ }
36
37   CContext::~CContext(void)
38   {
39     delete client;
40     delete server;
41   }
42
43   //----------------------------------------------------------------
44   //! Get name of context
45   StdString CContext::GetName(void)   { return (StdString("context")); }
46   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
47   ENodeType CContext::GetType(void)   { return (eContext); }
48
49   //----------------------------------------------------------------
50
51   /*!
52   \brief Get context group (context root)
53   \return Context root
54   */
55   CContextGroup* CContext::getRoot(void)
56   {
57      //if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
58      //return root.get();
59
60      //static shared_ptr<CContextGroup> *root_ptr;
61      if(root_ptr == 0) //root_ptr = new shared_ptr<CContextGroup>;
62      // if (root_ptr->get()==NULL)
63      root_ptr = new shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
64      return root_ptr->get();
65   }
66
67   //----------------------------------------------------------------
68
69   /*!
70   \brief Get calendar of a context
71   \return Calendar
72   */
73   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
74   {
75      return (this->calendar);
76   }
77
78   //----------------------------------------------------------------
79
80   /*!
81   \brief Set a context with a calendar
82   \param[in] newCalendar new calendar
83   */
84   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
85   {
86      this->calendar = newCalendar;
87   }
88
89   //----------------------------------------------------------------
90   /*!
91   \brief Parse xml file and write information into context object
92   \param [in] node xmld node corresponding in xml file
93   */
94   void CContext::parse(xml::CXMLNode & node)
95   {
96      CContext::SuperClass::parse(node);
97
98      // PARSING POUR GESTION DES ENFANTS
99      xml::THashAttributes attributes = node.getAttributes();
100
101      if (attributes.end() != attributes.find("src"))
102      {
103         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
104         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
105            ERROR("void CContext::parse(xml::CXMLNode & node)",
106                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
107         if (!ifs.good())
108            ERROR("CContext::parse(xml::CXMLNode & node)",
109                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
110         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
111      }
112
113      if (node.getElementName().compare(CContext::GetName()))
114         DEBUG("Le noeud is wrong defined but will be considered as a context !");
115
116      if (!(node.goToChildElement()))
117      {
118         DEBUG("Le context ne contient pas d'enfant !");
119      }
120      else
121      {
122         do { // Parcours des contextes pour traitement.
123
124            StdString name = node.getElementName();
125            attributes.clear();
126            attributes = node.getAttributes();
127
128            if (attributes.end() != attributes.find("id"))
129            {
130              DEBUG(<< "Definition node has an id,"
131                    << "it will not be taking account !");
132            }
133
134#define DECLARE_NODE(Name_, name_)    \
135   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
136   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
137#define DECLARE_NODE_PAR(Name_, name_)
138#include "node_type.conf"
139
140            DEBUG(<< "The element \'"     << name
141                  << "\' in the context \'" << CContext::getCurrent()->getId()
142                  << "\' is not a definition !");
143
144         } while (node.goToNextElement());
145
146         node.goToParentElement(); // Retour au parent
147      }
148   }
149
150   //----------------------------------------------------------------
151   //! Show tree structure of context
152   void CContext::ShowTree(StdOStream & out)
153   {
154      StdString currentContextId = CContext::getCurrent() -> getId();
155      std::vector<CContext*> def_vector =
156         CContext::getRoot()->getChildList();
157      std::vector<CContext*>::iterator
158         it = def_vector.begin(), end = def_vector.end();
159
160      out << "<? xml version=\"1.0\" ?>" << std::endl;
161      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
162
163      for (; it != end; it++)
164      {
165         CContext* context = *it;
166         CContext::setCurrent(context->getId());
167         out << *context << std::endl;
168      }
169
170      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
171      CContext::setCurrent(currentContextId);
172   }
173
174
175   //----------------------------------------------------------------
176
177   //! Convert context object into string (to print)
178   StdString CContext::toString(void) const
179   {
180      StdOStringStream oss;
181      oss << "<" << CContext::GetName()
182          << " id=\"" << this->getId() << "\" "
183          << SuperClassAttribute::toString() << ">" << std::endl;
184      if (!this->hasChild())
185      {
186         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrmentation
187      }
188      else
189      {
190
191#define DECLARE_NODE(Name_, name_)    \
192   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
193   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
194#define DECLARE_NODE_PAR(Name_, name_)
195#include "node_type.conf"
196
197      }
198
199      oss << "</" << CContext::GetName() << " >";
200
201      return (oss.str());
202   }
203
204   //----------------------------------------------------------------
205
206   /*!
207   \brief Find all inheritace among objects in a context.
208   \param [in] apply (true) write attributes of parent into ones of child if they are empty
209                     (false) write attributes of parent into a new container of child
210   \param [in] parent unused
211   */
212   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
213   {
214#define DECLARE_NODE(Name_, name_)    \
215   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
216     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
217#define DECLARE_NODE_PAR(Name_, name_)
218#include "node_type.conf"
219   }
220
221   //----------------------------------------------------------------
222
223   //! Verify if all root definition in the context have child.
224   bool CContext::hasChild(void) const
225   {
226      return (
227#define DECLARE_NODE(Name_, name_)    \
228   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
229#define DECLARE_NODE_PAR(Name_, name_)
230#include "node_type.conf"
231      false);
232}
233
234   //----------------------------------------------------------------
235
236   void CContext::CleanTree(void)
237   {
238#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
239#define DECLARE_NODE_PAR(Name_, name_)
240#include "node_type.conf"
241   }
242   ///---------------------------------------------------------------
243
244   //! Initialize client side
245   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
246   {
247     hasClient=true;
248     
249     #pragma omp critical
250     client = new CContextClient(this, intraComm, interComm, cxtServer);
251
252     int tmp_rank;
253     MPI_Comm_rank(intraComm, &tmp_rank);
254     MPI_Barrier(intraComm);
255
256     // #pragma omp critical (_output)
257     // printf("Client %d : context.cpp client = new CContextClient, client add = %p, clientRank = %d\n", tmp_rank, &(*client), client->clientRank) ;
258     
259     #pragma omp critical
260     registryIn=new CRegistry(intraComm);
261     
262
263     registryIn->setPath(getId()) ;
264     
265     // #pragma omp critical (_output)
266     // printf("Client %d : context.cpp registryIn->setPath, client add = %p, clientRank = %d\n", tmp_rank, &(*client), client->clientRank) ;
267
268     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
269     registryIn->bcastRegistry() ;
270
271     registryOut=new CRegistry(intraComm) ;
272     registryOut->setPath(getId()) ;
273     
274
275     ep_lib::MPI_Comm intraCommServer, interCommServer;
276     if (cxtServer) // Attached mode
277     {
278       intraCommServer = intraComm;
279       interCommServer = interComm;
280     }
281     else
282     {
283       MPI_Comm_dup(intraComm, &intraCommServer);
284       comms.push_back(intraCommServer);
285       MPI_Comm_dup(interComm, &interCommServer);
286       comms.push_back(interCommServer);
287       
288     }
289     server = new CContextServer(this,intraCommServer,interCommServer);
290   }
291
292   void CContext::setClientServerBuffer()
293   {
294     size_t minBufferSize = CXios::minBufferSize;
295#define DECLARE_NODE(Name_, name_)    \
296     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
297#define DECLARE_NODE_PAR(Name_, name_)
298#include "node_type.conf"
299#undef DECLARE_NODE
300#undef DECLARE_NODE_PAR
301
302     std::map<int, StdSize> maxEventSize;
303     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
304     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
305
306     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
307     for (it = dataBufferSize.begin(); it != ite; ++it)
308       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
309
310     ite = bufferSize.end();
311     for (it = bufferSize.begin(); it != ite; ++it)
312     {
313       it->second *= CXios::bufferSizeFactor;
314       if (it->second < minBufferSize) it->second = minBufferSize;
315     }
316
317     // We consider that the minimum buffer size is also the minimum event size
318     ite = maxEventSize.end();
319     for (it = maxEventSize.begin(); it != ite; ++it)
320       if (it->second < minBufferSize) it->second = minBufferSize;
321
322     if (client->isServerLeader())
323     {
324       const std::list<int>& ranks = client->getRanksServerLeader();
325       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
326         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
327     }
328
329     client->setBufferSize(bufferSize, maxEventSize);
330   }
331
332   //! Verify whether a context is initialized
333   bool CContext::isInitialized(void)
334   {
335     return hasClient;
336   }
337
338   //! Initialize server
339   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
340   {
341     hasServer=true;
342     server = new CContextServer(this,intraComm,interComm);
343
344     registryIn=new CRegistry(intraComm);
345     registryIn->setPath(getId()) ;
346     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
347     registryIn->bcastRegistry() ;
348     registryOut=new CRegistry(intraComm) ;
349     registryOut->setPath(getId()) ;
350
351     ep_lib::MPI_Comm intraCommClient, interCommClient;
352     if (cxtClient) // Attached mode
353     {
354       intraCommClient = intraComm;
355       interCommClient = interComm;
356     }
357     else
358     {
359       MPI_Comm_dup(intraComm, &intraCommClient);
360       comms.push_back(intraCommClient);
361       MPI_Comm_dup(interComm, &interCommClient);
362       comms.push_back(interCommClient);
363     }
364     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
365   }
366
367   //! Server side: Put server into a loop in order to listen message from client
368   bool CContext::eventLoop(void)
369   {
370     return server->eventLoop();
371   }
372
373   //! Try to send the buffers and receive possible answers
374   bool CContext::checkBuffersAndListen(void)
375   {
376     client->checkBuffers();
377     return server->eventLoop();
378   }
379
380   //! Terminate a context
381   void CContext::finalize(void)
382   {
383     
384      if (!finalized)
385      {
386        finalized = true;
387        if (hasClient) sendRegistry() ;
388        client->finalize();
389        while (!server->hasFinished())
390        {
391          server->eventLoop();
392          //printf("server->hasFinished() = %d\n", server->hasFinished());
393        }
394
395        if (hasServer)
396        {
397          closeAllFile();
398          registryOut->hierarchicalGatherRegistry() ;
399          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
400        }
401
402        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
403          MPI_Comm_free(&(*it));
404        comms.clear();
405      }
406   }
407
408   /*!
409   \brief Close all the context defintion and do processing data
410      After everything is well defined on client side, they will be processed and sent to server
411   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
412   all necessary information to server, from which each server can build its own database.
413   Because the role of server is to write out field data on a specific netcdf file,
414   the only information that it needs is the enabled files
415   and the active fields (fields will be written onto active files)
416   */
417   void CContext::closeDefinition(void)
418   {
419
420     // There is nothing client need to send to server
421     if (hasClient)
422     {
423       // After xml is parsed, there are some more works with post processing
424       postProcessing(); 
425     }
426     setClientServerBuffer(); 
427
428     if (hasClient && !hasServer)
429     {
430      // Send all attributes of current context to server
431      this->sendAllAttributesToServer();
432
433      // Send all attributes of current calendar
434      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
435
436      // We have enough information to send to server
437      // First of all, send all enabled files
438       sendEnabledFiles(); 
439
440      // Then, send all enabled fields
441       sendEnabledFields(); 
442
443      // At last, we have all info of domain and axis, then send them
444       sendRefDomainsAxis(); 
445      // After that, send all grid (if any)
446       sendRefGrid(); 
447    }
448
449    // We have a xml tree on the server side and now, it should be also processed
450    if (hasClient && !hasServer) sendPostProcessing(); 
451
452    // There are some processings that should be done after all of above. For example: check mask or index
453    if (hasClient)
454    {
455      this->buildFilterGraphOfEnabledFields(); 
456      buildFilterGraphOfFieldsWithReadAccess(); 
457      this->solveAllRefOfEnabledFields(true); 
458    }
459
460    // Now tell server that it can process all messages from client
461    if (hasClient && !hasServer) this->sendCloseDefinition();
462
463    // Nettoyage de l'arborescence
464    if (hasClient && !hasServer) CleanTree();
465
466    if (hasClient)
467    {
468      sendCreateFileHeader(); 
469
470      startPrefetchingOfEnabledReadModeFiles(); 
471    }
472   }
473
474   void CContext::findAllEnabledFields(void)
475   {
476     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
477     (void)this->enabledFiles[i]->getEnabledFields();
478   }
479
480   void CContext::findAllEnabledFieldsInReadModeFiles(void)
481   {
482     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
483     (void)this->enabledReadModeFiles[i]->getEnabledFields();
484   }
485
486   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
487   {
488      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
489        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
490   }
491
492   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
493   {
494     int size = this->enabledFiles.size();
495     for (int i = 0; i < size; ++i)
496     {
497       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
498     }
499
500     for (int i = 0; i < size; ++i)
501     {
502       this->enabledFiles[i]->generateNewTransformationGridDest();
503     }
504   }
505
506   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
507   {
508     int size = this->enabledFiles.size();
509     
510     for (int i = 0; i < size; ++i)
511     {
512       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
513     }
514   }
515
516   void CContext::buildFilterGraphOfEnabledFields()
517   {
518     int size = this->enabledFiles.size();
519     for (int i = 0; i < size; ++i)
520     {
521       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
522     }
523   }
524
525   void CContext::startPrefetchingOfEnabledReadModeFiles()
526   {
527     int size = enabledReadModeFiles.size();
528     for (int i = 0; i < size; ++i)
529     {
530        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
531     }
532   }
533
534   void CContext::checkPrefetchingOfEnabledReadModeFiles()
535   {
536     int size = enabledReadModeFiles.size();
537     for (int i = 0; i < size; ++i)
538     {
539        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
540     }
541   }
542
543  void CContext::findFieldsWithReadAccess(void)
544  {
545    fieldsWithReadAccess.clear();
546    const vector<CField*> allFields = CField::getAll();
547    for (size_t i = 0; i < allFields.size(); ++i)
548    {
549      CField* field = allFields[i];
550
551      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
552        field->read_access = true;
553      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
554        fieldsWithReadAccess.push_back(field);
555    }
556  }
557
558  void CContext::solveAllRefOfFieldsWithReadAccess()
559  {
560    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
561      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
562  }
563
564  void CContext::buildFilterGraphOfFieldsWithReadAccess()
565  {
566    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
567      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
568  }
569
570   void CContext::solveAllInheritance(bool apply)
571   {
572     // Rsolution des hritages descendants (cd des hritages de groupes)
573     // pour chacun des contextes.
574      solveDescInheritance(apply);
575
576     // Rsolution des hritages par rfrence au niveau des fichiers.
577      const vector<CFile*> allFiles=CFile::getAll();
578      const vector<CGrid*> allGrids= CGrid::getAll();
579
580     //if (hasClient && !hasServer)
581      if (hasClient)
582      {
583        for (unsigned int i = 0; i < allFiles.size(); i++)
584          allFiles[i]->solveFieldRefInheritance(apply);
585      }
586
587      unsigned int vecSize = allGrids.size();
588      unsigned int i = 0;
589      for (i = 0; i < vecSize; ++i)
590        allGrids[i]->solveDomainAxisRefInheritance(apply);
591
592   }
593
594   void CContext::findEnabledFiles(void)
595   {
596      const std::vector<CFile*> allFiles = CFile::getAll();
597
598      for (unsigned int i = 0; i < allFiles.size(); i++)
599         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est dfini.
600         {
601            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix  vrai.
602               enabledFiles.push_back(allFiles[i]);
603         }
604         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
605
606
607      if (enabledFiles.size() == 0)
608         DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm \""
609               << getId() << "\" !");
610   }
611
612   void CContext::findEnabledReadModeFiles(void)
613   {
614     int size = this->enabledFiles.size();
615     for (int i = 0; i < size; ++i)
616     {
617       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
618        enabledReadModeFiles.push_back(enabledFiles[i]);
619     }
620   }
621
622   void CContext::closeAllFile(void)
623   {
624     std::vector<CFile*>::const_iterator
625            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
626
627     for (; it != end; it++)
628     {
629       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
630       (*it)->close();
631     }
632   }
633
634   /*!
635   \brief Dispatch event received from client
636      Whenever a message is received in buffer of server, it will be processed depending on
637   its event type. A new event type should be added in the switch list to make sure
638   it processed on server side.
639   \param [in] event: Received message
640   */
641   bool CContext::dispatchEvent(CEventServer& event)
642   {
643
644      if (SuperClass::dispatchEvent(event)) return true;
645      else
646      {
647        switch(event.type)
648        {
649           case EVENT_ID_CLOSE_DEFINITION :
650             recvCloseDefinition(event);
651             return true;
652             break;
653           case EVENT_ID_UPDATE_CALENDAR:
654             recvUpdateCalendar(event);
655             return true;
656             break;
657           case EVENT_ID_CREATE_FILE_HEADER :
658             recvCreateFileHeader(event);
659             return true;
660             break;
661           case EVENT_ID_POST_PROCESS:
662             recvPostProcessing(event);
663             return true;
664            case EVENT_ID_SEND_REGISTRY:
665             recvRegistry(event);
666             return true;
667            break;
668
669           default :
670             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
671                    <<"Unknown Event");
672           return false;
673         }
674      }
675   }
676
677   //! Client side: Send a message to server to make it close
678   void CContext::sendCloseDefinition(void)
679   {
680     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
681     if (client->isServerLeader())
682     {
683       CMessage msg;
684       msg<<this->getIdServer();
685       const std::list<int>& ranks = client->getRanksServerLeader();
686       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
687         event.push(*itRank,1,msg);
688       client->sendEvent(event);
689     }
690     else client->sendEvent(event);
691   }
692
693   //! Server side: Receive a message of client announcing a context close
694   void CContext::recvCloseDefinition(CEventServer& event)
695   {
696
697      CBufferIn* buffer=event.subEvents.begin()->buffer;
698      string id;
699      *buffer>>id;
700      get(id)->closeDefinition();
701   }
702
703   //! Client side: Send a message to update calendar in each time step
704   void CContext::sendUpdateCalendar(int step)
705   {
706     if (!hasServer)
707     {
708       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
709       if (client->isServerLeader())
710       {
711         CMessage msg;
712         msg<<this->getIdServer()<<step;
713         const std::list<int>& ranks = client->getRanksServerLeader();
714         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
715           event.push(*itRank,1,msg);
716         client->sendEvent(event);
717       }
718       else client->sendEvent(event);
719     }
720   }
721
722   //! Server side: Receive a message of client annoucing calendar update
723   void CContext::recvUpdateCalendar(CEventServer& event)
724   {
725      CBufferIn* buffer=event.subEvents.begin()->buffer;
726      string id;
727      *buffer>>id;
728      get(id)->recvUpdateCalendar(*buffer);
729   }
730
731   //! Server side: Receive a message of client annoucing calendar update
732   void CContext::recvUpdateCalendar(CBufferIn& buffer)
733   {
734      int step;
735      buffer>>step;
736      updateCalendar(step);
737   }
738
739   //! Client side: Send a message to create header part of netcdf file
740   void CContext::sendCreateFileHeader(void)
741   {
742     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
743     if (client->isServerLeader())
744     {
745       CMessage msg;
746       msg<<this->getIdServer();
747       const std::list<int>& ranks = client->getRanksServerLeader();
748       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
749         event.push(*itRank,1,msg) ;
750       client->sendEvent(event);
751     }
752     else client->sendEvent(event);
753   }
754
755   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
756   void CContext::recvCreateFileHeader(CEventServer& event)
757   {
758      CBufferIn* buffer=event.subEvents.begin()->buffer;
759      string id;
760      *buffer>>id;
761      get(id)->recvCreateFileHeader(*buffer);
762   }
763
764   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
765   void CContext::recvCreateFileHeader(CBufferIn& buffer)
766   {
767      createFileHeader();
768   }
769
770   //! Client side: Send a message to do some post processing on server
771   void CContext::sendPostProcessing()
772   {
773     if (!hasServer)
774     {
775       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
776       if (client->isServerLeader())
777       {
778         CMessage msg;
779         msg<<this->getIdServer();
780         const std::list<int>& ranks = client->getRanksServerLeader();
781         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
782           event.push(*itRank,1,msg);
783         client->sendEvent(event);
784       }
785       else client->sendEvent(event);
786     }
787   }
788
789   //! Server side: Receive a message to do some post processing
790   void CContext::recvPostProcessing(CEventServer& event)
791   {
792      CBufferIn* buffer=event.subEvents.begin()->buffer;
793      string id;
794      *buffer>>id;
795      get(id)->recvPostProcessing(*buffer);
796   }
797
798   //! Server side: Receive a message to do some post processing
799   void CContext::recvPostProcessing(CBufferIn& buffer)
800   {
801      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
802      postProcessing();
803   }
804
805   const StdString& CContext::getIdServer()
806   {
807      if (hasClient)
808      {
809        idServer_ = this->getId();
810        idServer_ += "_server";
811        return idServer_;
812      }
813      if (hasServer) return (this->getId());
814   }
815
816   /*!
817   \brief Do some simple post processings after parsing xml file
818      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
819   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
820   which will be written out into netcdf files, are processed
821   */
822   void CContext::postProcessing()
823   {
824     int myRank;
825     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
826
827     //printf("myRank = %d, in postProcessing, isPostProcessed = %d\n", myRank, isPostProcessed);
828     if (isPostProcessed) return;
829
830      // Make sure the calendar was correctly created
831      if (!calendar)
832        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
833      else if (calendar->getTimeStep() == NoneDu)
834        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
835      // Calendar first update to set the current date equals to the start date
836      calendar->update(0);  //printf("myRank = %d, calendar->update(0) OK\n", myRank);
837
838      // Find all inheritance in xml structure
839      this->solveAllInheritance();  //printf("myRank = %d, this->solveAllInheritance OK\n", myRank);
840
841      // Check if some axis, domains or grids are eligible to for compressed indexed output.
842      // Warning: This must be done after solving the inheritance and before the rest of post-processing
843      checkAxisDomainsGridsEligibilityForCompressedOutput();  //printf("myRank = %d, checkAxisDomainsGridsEligibilityForCompressedOutput OK\n", myRank);
844
845      // Check if some automatic time series should be generated
846      // Warning: This must be done after solving the inheritance and before the rest of post-processing
847      prepareTimeseries();  //printf("myRank = %d, prepareTimeseries OK\n", myRank);
848
849      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers  sortir.
850      this->findEnabledFiles();  //printf("myRank = %d, this->findEnabledFiles OK\n", myRank);
851      this->findEnabledReadModeFiles();  //printf("myRank = %d, this->findEnabledReadModeFiles OK\n", myRank);
852
853      // Find all enabled fields of each file
854      this->findAllEnabledFields();  //printf("myRank = %d, this->findAllEnabledFields OK\n", myRank);
855      this->findAllEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->findAllEnabledFieldsInReadModeFiles OK\n", myRank);
856
857     if (hasClient && !hasServer)
858     {
859      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
860      this->readAttributesOfEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->readAttributesOfEnabledFieldsInReadModeFiles OK\n", myRank);
861     }
862
863      // Only search and rebuild all reference objects of enable fields, don't transform
864      this->solveOnlyRefOfEnabledFields(false);  //printf("myRank = %d, this->solveOnlyRefOfEnabledFields(false) OK\n", myRank);
865
866      // Search and rebuild all reference object of enabled fields
867      this->solveAllRefOfEnabledFields(false);  //printf("myRank = %d, this->solveAllRefOfEnabledFields(false) OK\n", myRank);
868
869      // Find all fields with read access from the public API
870      findFieldsWithReadAccess();  //printf("myRank = %d, findFieldsWithReadAccess OK\n", myRank);
871      // and solve the all reference for them
872      solveAllRefOfFieldsWithReadAccess();  //printf("myRank = %d, solveAllRefOfFieldsWithReadAccess OK\n", myRank);
873
874      isPostProcessed = true;
875   }
876
877   /*!
878    * Compute the required buffer size to send the attributes (mostly those grid related).
879    *
880    * \param maxEventSize [in/out] the size of the bigger event for each connected server
881    */
882   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
883   {
884     std::map<int, StdSize> attributesSize;
885
886     if (hasClient)
887     {
888       size_t numEnabledFiles = this->enabledFiles.size();
889       for (size_t i = 0; i < numEnabledFiles; ++i)
890       {
891         CFile* file = this->enabledFiles[i];
892
893         std::vector<CField*> enabledFields = file->getEnabledFields();
894         size_t numEnabledFields = enabledFields.size();
895         for (size_t j = 0; j < numEnabledFields; ++j)
896         {
897           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
898           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
899           for (; it != itE; ++it)
900           {
901             // If attributesSize[it->first] does not exist, it will be zero-initialized
902             // so we can use it safely without checking for its existance
903             if (attributesSize[it->first] < it->second)
904               attributesSize[it->first] = it->second;
905
906             if (maxEventSize[it->first] < it->second)
907               maxEventSize[it->first] = it->second;
908           }
909         }
910       }
911     }
912
913     return attributesSize;
914   }
915
916   /*!
917    * Compute the required buffer size to send the fields data.
918    *
919    * \param maxEventSize [in/out] the size of the bigger event for each connected server
920    */
921   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
922   {
923     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
924
925     std::map<int, StdSize> dataSize;
926
927     // Find all reference domain and axis of all active fields
928     size_t numEnabledFiles = this->enabledFiles.size();
929     for (size_t i = 0; i < numEnabledFiles; ++i)
930     {
931       CFile* file = this->enabledFiles[i];
932       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
933
934       if (fileMode == mode)
935       {
936         std::vector<CField*> enabledFields = file->getEnabledFields();
937         size_t numEnabledFields = enabledFields.size();
938         for (size_t j = 0; j < numEnabledFields; ++j)
939         {
940           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
941           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
942           for (; it != itE; ++it)
943           {
944             // If dataSize[it->first] does not exist, it will be zero-initialized
945             // so we can use it safely without checking for its existance
946             if (CXios::isOptPerformance)
947               dataSize[it->first] += it->second;
948             else if (dataSize[it->first] < it->second)
949               dataSize[it->first] = it->second;
950
951             if (maxEventSize[it->first] < it->second)
952               maxEventSize[it->first] = it->second;
953           }
954         }
955       }
956     }
957
958     return dataSize;
959   }
960
961   //! Client side: Send infomation of active files (files are enabled to write out)
962   void CContext::sendEnabledFiles()
963   {
964     int size = this->enabledFiles.size();
965
966     // In a context, each type has a root definition, e.g: axis, domain, field.
967     // Every object must be a child of one of these root definition. In this case
968     // all new file objects created on server must be children of the root "file_definition"
969     StdString fileDefRoot("file_definition");
970     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
971
972     for (int i = 0; i < size; ++i)
973     {
974       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
975       this->enabledFiles[i]->sendAllAttributesToServer();
976       this->enabledFiles[i]->sendAddAllVariables();
977     }
978   }
979
980   //! Client side: Send information of active fields (ones are written onto files)
981   void CContext::sendEnabledFields()
982   {
983     int size = this->enabledFiles.size();
984     for (int i = 0; i < size; ++i)
985     {
986       this->enabledFiles[i]->sendEnabledFields();
987     }
988   }
989
990   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
991   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
992   {
993     if (!hasClient) return;
994
995     const vector<CAxis*> allAxis = CAxis::getAll();
996     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
997       (*it)->checkEligibilityForCompressedOutput();
998
999     const vector<CDomain*> allDomains = CDomain::getAll();
1000     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
1001       (*it)->checkEligibilityForCompressedOutput();
1002
1003     const vector<CGrid*> allGrids = CGrid::getAll();
1004     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
1005       (*it)->checkEligibilityForCompressedOutput();
1006   }
1007
1008   //! Client side: Prepare the timeseries by adding the necessary files
1009   void CContext::prepareTimeseries()
1010   {
1011     if (!hasClient) return;
1012
1013     const std::vector<CFile*> allFiles = CFile::getAll();
1014     for (size_t i = 0; i < allFiles.size(); i++)
1015     {
1016       CFile* file = allFiles[i];
1017
1018       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1019       {
1020         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1021
1022         const std::vector<CField*> allFields = file->getAllFields();
1023         for (size_t j = 0; j < allFields.size(); j++)
1024         {
1025           CField* field = allFields[j];
1026
1027           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1028           {
1029             CFile* tsFile = CFile::create();
1030             tsFile->duplicateAttributes(file);
1031             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1032
1033             tsFile->name = tsPrefix + "_";
1034             if (!field->name.isEmpty())
1035               tsFile->name.get() += field->name;
1036             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1037               tsFile->name.get() += field->field_ref;
1038             else
1039               tsFile->name.get() += field->getId();
1040
1041             if (!field->ts_split_freq.isEmpty())
1042               tsFile->split_freq = field->ts_split_freq;
1043
1044             CField* tsField = tsFile->addField();
1045             tsField->field_ref = field->getId();
1046             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1047
1048             tsFile->solveFieldRefInheritance(true);
1049
1050             if (file->timeseries == CFile::timeseries_attr::exclusive)
1051               field->enabled = false;
1052           }
1053         }
1054
1055         // Finally disable the original file is need be
1056         if (file->timeseries == CFile::timeseries_attr::only)
1057          file->enabled = false;
1058       }
1059     }
1060   }
1061
1062   //! Client side: Send information of reference grid of active fields
1063   void CContext::sendRefGrid()
1064   {
1065     std::set<StdString> gridIds;
1066     int sizeFile = this->enabledFiles.size();
1067     CFile* filePtr(NULL);
1068
1069     // Firstly, find all reference grids of all active fields
1070     for (int i = 0; i < sizeFile; ++i)
1071     {
1072       filePtr = this->enabledFiles[i];
1073       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1074       int sizeField = enabledFields.size();
1075       for (int numField = 0; numField < sizeField; ++numField)
1076       {
1077         if (0 != enabledFields[numField]->getRelGrid())
1078           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1079       }
1080     }
1081
1082     // Create all reference grids on server side
1083     StdString gridDefRoot("grid_definition");
1084     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1085     std::set<StdString>::const_iterator it, itE = gridIds.end();
1086     for (it = gridIds.begin(); it != itE; ++it)
1087     {
1088       gridPtr->sendCreateChild(*it);
1089       CGrid::get(*it)->sendAllAttributesToServer();
1090       CGrid::get(*it)->sendAllDomains();
1091       CGrid::get(*it)->sendAllAxis();
1092       CGrid::get(*it)->sendAllScalars();
1093     }
1094   }
1095
1096
1097   //! Client side: Send information of reference domain and axis of active fields
1098   void CContext::sendRefDomainsAxis()
1099   {
1100     std::set<StdString> domainIds, axisIds, scalarIds;
1101
1102     // Find all reference domain and axis of all active fields
1103     int numEnabledFiles = this->enabledFiles.size();
1104     for (int i = 0; i < numEnabledFiles; ++i)
1105     {
1106       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1107       int numEnabledFields = enabledFields.size();
1108       for (int j = 0; j < numEnabledFields; ++j)
1109       {
1110         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1111         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1112         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1113         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1114       }
1115     }
1116
1117     // Create all reference axis on server side
1118     std::set<StdString>::iterator itDom, itAxis, itScalar;
1119     std::set<StdString>::const_iterator itE;
1120
1121     StdString scalarDefRoot("scalar_definition");
1122     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1123     itE = scalarIds.end();
1124     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1125     {
1126       if (!itScalar->empty())
1127       {
1128         scalarPtr->sendCreateChild(*itScalar);
1129         CScalar::get(*itScalar)->sendAllAttributesToServer();
1130       }
1131     }
1132
1133     StdString axiDefRoot("axis_definition");
1134     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1135     itE = axisIds.end();
1136     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1137     {
1138       if (!itAxis->empty())
1139       {
1140         axisPtr->sendCreateChild(*itAxis);
1141         CAxis::get(*itAxis)->sendAllAttributesToServer();
1142       }
1143     }
1144
1145     // Create all reference domains on server side
1146     StdString domDefRoot("domain_definition");
1147     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1148     itE = domainIds.end();
1149     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1150     {
1151       if (!itDom->empty()) {
1152          domPtr->sendCreateChild(*itDom);
1153          CDomain::get(*itDom)->sendAllAttributesToServer();
1154       }
1155     }
1156   }
1157
1158   //! Update calendar in each time step
1159   void CContext::updateCalendar(int step)
1160   {
1161      //info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1162      calendar->update(step);
1163      //info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1164
1165      if (hasClient)
1166      {
1167        checkPrefetchingOfEnabledReadModeFiles();
1168        garbageCollector.invalidate(calendar->getCurrentDate());
1169      }
1170   }
1171
1172   //! Server side: Create header of netcdf file
1173   void CContext::createFileHeader(void )
1174   {
1175      vector<CFile*>::const_iterator it;
1176
1177      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1178      {
1179         (*it)->initFile();
1180      }
1181   }
1182
1183   //! Get current context
1184   CContext* CContext::getCurrent(void)
1185   {
1186     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1187   }
1188
1189   /*!
1190   \brief Set context with an id be the current context
1191   \param [in] id identity of context to be set to current
1192   */
1193   void CContext::setCurrent(const string& id)
1194   {
1195     CObjectFactory::SetCurrentContextId(id);
1196     CGroupFactory::SetCurrentContextId(id);
1197   }
1198
1199  /*!
1200  \brief Create a context with specific id
1201  \param [in] id identity of new context
1202  \return pointer to the new context or already-existed one with identity id
1203  */
1204   //bkp
1205  CContext* CContext::create(const StdString& id)
1206  {
1207    CContext::setCurrent(id);
1208
1209    bool hasctxt = CContext::has(id);
1210    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1211    getRoot();
1212    //if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1213    if (!hasctxt) CGroupFactory::AddChild(*root_ptr, context->getShared());
1214
1215#define DECLARE_NODE(Name_, name_) \
1216    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1217#define DECLARE_NODE_PAR(Name_, name_)
1218#include "node_type.conf"
1219
1220    return (context);
1221  }
1222
1223
1224
1225     //! Server side: Receive a message to do some post processing
1226  void CContext::recvRegistry(CEventServer& event)
1227  {
1228    CBufferIn* buffer=event.subEvents.begin()->buffer;
1229    string id;
1230    *buffer>>id;
1231    get(id)->recvRegistry(*buffer);
1232  }
1233
1234  void CContext::recvRegistry(CBufferIn& buffer)
1235  {
1236    if (server->intraCommRank==0)
1237    {
1238      CRegistry registry(server->intraComm) ;
1239      registry.fromBuffer(buffer) ;
1240      registryOut->mergeRegistry(registry) ;
1241    }
1242  }
1243
1244  void CContext::sendRegistry(void)
1245  {
1246    registryOut->hierarchicalGatherRegistry() ;
1247
1248    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1249    if (client->isServerLeader())
1250    {
1251       CMessage msg ;
1252       msg<<this->getIdServer();
1253       if (client->clientRank==0) msg<<*registryOut ;
1254       const std::list<int>& ranks = client->getRanksServerLeader();
1255       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1256         event.push(*itRank,1,msg);
1257       client->sendEvent(event);
1258    }
1259    else client->sendEvent(event);
1260  }
1261
1262} // namespace xios
Note: See TracBrowser for help on using the repository browser.