source: XIOS/dev/branch_yushan/src/node/context.cpp @ 1063

Last change on this file since 1063 was 1063, checked in by yushan, 7 years ago

server mode OK for both multiple and one file mode. Tested with test_client

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
File size: 42.3 KB
Line 
1#include "context.hpp"
2#include "attribute_template.hpp"
3#include "object_template.hpp"
4#include "group_template.hpp"
5
6#include "calendar_type.hpp"
7#include "duration.hpp"
8
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "nc4_data_output.hpp"
12#include "node_type.hpp"
13#include "message.hpp"
14#include "type.hpp"
15#include "xios_spl.hpp"
16
17
18namespace xios {
19
20  shared_ptr<CContextGroup> CContext::root;
21
22   /// ////////////////////// Dfinitions ////////////////////// ///
23
24   CContext::CContext(void)
25      : CObjectTemplate<CContext>(), CContextAttributes()
26      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
27      , idServer_(), client(0), server(0)
28   { /* Ne rien faire de plus */ }
29
30   CContext::CContext(const StdString & id)
31      : CObjectTemplate<CContext>(id), CContextAttributes()
32      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false)
33      , idServer_(), client(0), server(0)
34   { /* Ne rien faire de plus */ }
35
36   CContext::~CContext(void)
37   {
38     delete client;
39     delete server;
40   }
41
42   //----------------------------------------------------------------
43   //! Get name of context
44   StdString CContext::GetName(void)   { return (StdString("context")); }
45   StdString CContext::GetDefName(void){ return (CContext::GetName()); }
46   ENodeType CContext::GetType(void)   { return (eContext); }
47
48   //----------------------------------------------------------------
49
50   /*!
51   \brief Get context group (context root)
52   \return Context root
53   */
54   CContextGroup* CContext::getRoot(void)
55   {
56      if (root.get()==NULL) root=shared_ptr<CContextGroup>(new CContextGroup(xml::CXMLNode::GetRootName()));
57      return root.get();
58   }
59
60   //----------------------------------------------------------------
61
62   /*!
63   \brief Get calendar of a context
64   \return Calendar
65   */
66   boost::shared_ptr<CCalendar> CContext::getCalendar(void) const
67   {
68      return (this->calendar);
69   }
70
71   //----------------------------------------------------------------
72
73   /*!
74   \brief Set a context with a calendar
75   \param[in] newCalendar new calendar
76   */
77   void CContext::setCalendar(boost::shared_ptr<CCalendar> newCalendar)
78   {
79      this->calendar = newCalendar;
80   }
81
82   //----------------------------------------------------------------
83   /*!
84   \brief Parse xml file and write information into context object
85   \param [in] node xmld node corresponding in xml file
86   */
87   void CContext::parse(xml::CXMLNode & node)
88   {
89      CContext::SuperClass::parse(node);
90
91      // PARSING POUR GESTION DES ENFANTS
92      xml::THashAttributes attributes = node.getAttributes();
93
94      if (attributes.end() != attributes.find("src"))
95      {
96         StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in );
97         if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
98            ERROR("void CContext::parse(xml::CXMLNode & node)",
99                  <<endl<< "Can not open <"<<attributes["src"].c_str()<<"> file" );
100         if (!ifs.good())
101            ERROR("CContext::parse(xml::CXMLNode & node)",
102                  << "[ filename = " << attributes["src"] << " ] Bad xml stream !");
103         xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this);
104      }
105
106      if (node.getElementName().compare(CContext::GetName()))
107         DEBUG("Le noeud is wrong defined but will be considered as a context !");
108
109      if (!(node.goToChildElement()))
110      {
111         DEBUG("Le context ne contient pas d'enfant !");
112      }
113      else
114      {
115         do { // Parcours des contextes pour traitement.
116
117            StdString name = node.getElementName();
118            attributes.clear();
119            attributes = node.getAttributes();
120
121            if (attributes.end() != attributes.find("id"))
122            {
123              DEBUG(<< "Definition node has an id,"
124                    << "it will not be taking account !");
125            }
126
127#define DECLARE_NODE(Name_, name_)    \
128   if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
129   { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
130#define DECLARE_NODE_PAR(Name_, name_)
131#include "node_type.conf"
132
133            DEBUG(<< "The element \'"     << name
134                  << "\' in the context \'" << CContext::getCurrent()->getId()
135                  << "\' is not a definition !");
136
137         } while (node.goToNextElement());
138
139         node.goToParentElement(); // Retour au parent
140      }
141   }
142
143   //----------------------------------------------------------------
144   //! Show tree structure of context
145   void CContext::ShowTree(StdOStream & out)
146   {
147      StdString currentContextId = CContext::getCurrent() -> getId();
148      std::vector<CContext*> def_vector =
149         CContext::getRoot()->getChildList();
150      std::vector<CContext*>::iterator
151         it = def_vector.begin(), end = def_vector.end();
152
153      out << "<? xml version=\"1.0\" ?>" << std::endl;
154      out << "<"  << xml::CXMLNode::GetRootName() << " >" << std::endl;
155
156      for (; it != end; it++)
157      {
158         CContext* context = *it;
159         CContext::setCurrent(context->getId());
160         out << *context << std::endl;
161      }
162
163      out << "</" << xml::CXMLNode::GetRootName() << " >" << std::endl;
164      CContext::setCurrent(currentContextId);
165   }
166
167
168   //----------------------------------------------------------------
169
170   //! Convert context object into string (to print)
171   StdString CContext::toString(void) const
172   {
173      StdOStringStream oss;
174      oss << "<" << CContext::GetName()
175          << " id=\"" << this->getId() << "\" "
176          << SuperClassAttribute::toString() << ">" << std::endl;
177      if (!this->hasChild())
178      {
179         //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrmentation
180      }
181      else
182      {
183
184#define DECLARE_NODE(Name_, name_)    \
185   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
186   oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
187#define DECLARE_NODE_PAR(Name_, name_)
188#include "node_type.conf"
189
190      }
191
192      oss << "</" << CContext::GetName() << " >";
193
194      return (oss.str());
195   }
196
197   //----------------------------------------------------------------
198
199   /*!
200   \brief Find all inheritace among objects in a context.
201   \param [in] apply (true) write attributes of parent into ones of child if they are empty
202                     (false) write attributes of parent into a new container of child
203   \param [in] parent unused
204   */
205   void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent))
206   {
207#define DECLARE_NODE(Name_, name_)    \
208   if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209     C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
210#define DECLARE_NODE_PAR(Name_, name_)
211#include "node_type.conf"
212   }
213
214   //----------------------------------------------------------------
215
216   //! Verify if all root definition in the context have child.
217   bool CContext::hasChild(void) const
218   {
219      return (
220#define DECLARE_NODE(Name_, name_)    \
221   C##Name_##Definition::has(C##Name_##Definition::GetDefName())   ||
222#define DECLARE_NODE_PAR(Name_, name_)
223#include "node_type.conf"
224      false);
225}
226
227   //----------------------------------------------------------------
228
229   void CContext::CleanTree(void)
230   {
231#define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
232#define DECLARE_NODE_PAR(Name_, name_)
233#include "node_type.conf"
234   }
235   ///---------------------------------------------------------------
236
237   //! Initialize client side
238   void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/)
239   {
240     hasClient=true;
241     client = new CContextClient(this,intraComm, interComm, cxtServer);
242     registryIn=new CRegistry(intraComm);
243     registryIn->setPath(getId()) ;
244     if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ;
245     registryIn->bcastRegistry() ;
246
247     registryOut=new CRegistry(intraComm) ;
248     registryOut->setPath(getId()) ;
249
250     ep_lib::MPI_Comm intraCommServer, interCommServer;
251     if (cxtServer) // Attached mode
252     {
253       intraCommServer = intraComm;
254       interCommServer = interComm;
255     }
256     else
257     {
258       MPI_Comm_dup(intraComm, &intraCommServer);
259       comms.push_back(intraCommServer);
260       MPI_Comm_dup(interComm, &interCommServer);
261       comms.push_back(interCommServer);
262       
263       printf("comm_dup OK\n");
264     }
265     server = new CContextServer(this,intraCommServer,interCommServer);
266   }
267
268   void CContext::setClientServerBuffer()
269   {
270     size_t minBufferSize = CXios::minBufferSize;
271#define DECLARE_NODE(Name_, name_)    \
272     if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
273#define DECLARE_NODE_PAR(Name_, name_)
274#include "node_type.conf"
275#undef DECLARE_NODE
276#undef DECLARE_NODE_PAR
277
278     std::map<int, StdSize> maxEventSize;
279     std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize);
280     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize);
281
282     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
283     for (it = dataBufferSize.begin(); it != ite; ++it)
284       if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
285
286     ite = bufferSize.end();
287     for (it = bufferSize.begin(); it != ite; ++it)
288     {
289       it->second *= CXios::bufferSizeFactor;
290       if (it->second < minBufferSize) it->second = minBufferSize;
291     }
292
293     // We consider that the minimum buffer size is also the minimum event size
294     ite = maxEventSize.end();
295     for (it = maxEventSize.begin(); it != ite; ++it)
296       if (it->second < minBufferSize) it->second = minBufferSize;
297
298     if (client->isServerLeader())
299     {
300       const std::list<int>& ranks = client->getRanksServerLeader();
301       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
302         if (!bufferSize.count(*itRank)) bufferSize[*itRank] = maxEventSize[*itRank] = minBufferSize;
303     }
304
305     client->setBufferSize(bufferSize, maxEventSize);
306   }
307
308   //! Verify whether a context is initialized
309   bool CContext::isInitialized(void)
310   {
311     return hasClient;
312   }
313
314   //! Initialize server
315   void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/)
316   {
317     hasServer=true;
318     server = new CContextServer(this,intraComm,interComm);
319
320     registryIn=new CRegistry(intraComm);
321     registryIn->setPath(getId()) ;
322     if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ;
323     registryIn->bcastRegistry() ;
324     registryOut=new CRegistry(intraComm) ;
325     registryOut->setPath(getId()) ;
326
327     ep_lib::MPI_Comm intraCommClient, interCommClient;
328     if (cxtClient) // Attached mode
329     {
330       intraCommClient = intraComm;
331       interCommClient = interComm;
332     }
333     else
334     {
335       MPI_Comm_dup(intraComm, &intraCommClient);
336       comms.push_back(intraCommClient);
337       MPI_Comm_dup(interComm, &interCommClient);
338       comms.push_back(interCommClient);
339     }
340     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient);
341   }
342
343   //! Server side: Put server into a loop in order to listen message from client
344   bool CContext::eventLoop(void)
345   {
346     return server->eventLoop();
347   }
348
349   //! Try to send the buffers and receive possible answers
350   bool CContext::checkBuffersAndListen(void)
351   {
352     client->checkBuffers();
353     return server->eventLoop();
354   }
355
356   //! Terminate a context
357   void CContext::finalize(void)
358   {
359      if (!finalized)
360      {
361        finalized = true;
362        if (hasClient) sendRegistry() ;
363        client->finalize();
364        while (!server->hasFinished())
365        {
366          server->eventLoop();
367        }
368
369        if (hasServer)
370        {
371          closeAllFile();
372          registryOut->hierarchicalGatherRegistry() ;
373          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;
374        }
375
376        for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
377          MPI_Comm_free(&(*it));
378        comms.clear();
379      }
380   }
381
382   /*!
383   \brief Close all the context defintion and do processing data
384      After everything is well defined on client side, they will be processed and sent to server
385   From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send
386   all necessary information to server, from which each server can build its own database.
387   Because the role of server is to write out field data on a specific netcdf file,
388   the only information that it needs is the enabled files
389   and the active fields (fields will be written onto active files)
390   */
391   void CContext::closeDefinition(void)
392   {
393     int myRank;
394     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
395
396     //printf("myRank = %d, hasClient = %d, hasServer = %d\n", myRank, hasClient, hasServer);
397
398     // There is nothing client need to send to server
399     if (hasClient)
400     {
401       // After xml is parsed, there are some more works with post processing
402       postProcessing(); 
403       //printf("myRank = %d,                postProcessing OK\n", myRank);
404     }
405     setClientServerBuffer(); //printf("myRank = %d, setClientServerBuffer OK\n", myRank);
406
407     //printf("hasClient = %d, hasServer = %d\n", hasClient, hasServer);
408
409     if (hasClient && !hasServer)
410     {
411      // Send all attributes of current context to server
412      this->sendAllAttributesToServer(); //printf("myRank = %d, this->sendAllAttributesToServer OK\n", myRank);
413
414      // Send all attributes of current calendar
415      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer();
416      //printf("myRank = %d, CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer OK\n", myRank);
417
418      // We have enough information to send to server
419      // First of all, send all enabled files
420       sendEnabledFiles();  //printf("myRank = %d, sendEnabledFiles OK\n", myRank);
421
422      // Then, send all enabled fields
423       sendEnabledFields();  //printf("myRank = %d, sendEnabledFields OK\n", myRank);
424
425      // At last, we have all info of domain and axis, then send them
426       sendRefDomainsAxis();  //printf("myRank = %d, sendRefDomainsAxis OK\n", myRank);
427
428      // After that, send all grid (if any)
429       sendRefGrid(); //printf("myRank = %d, sendRefGrid OK\n", myRank);
430    }
431
432    // We have a xml tree on the server side and now, it should be also processed
433    if (hasClient && !hasServer) sendPostProcessing(); 
434
435    // There are some processings that should be done after all of above. For example: check mask or index
436    if (hasClient)
437    {
438      this->buildFilterGraphOfEnabledFields();  //printf("myRank = %d, buildFilterGraphOfEnabledFields OK\n", myRank);
439      buildFilterGraphOfFieldsWithReadAccess();  //printf("myRank = %d, buildFilterGraphOfFieldsWithReadAccess OK\n", myRank);
440      this->solveAllRefOfEnabledFields(true);  //printf("myRank = %d, solveAllRefOfEnabledFields OK\n", myRank);
441    }
442
443    // Now tell server that it can process all messages from client
444    if (hasClient && !hasServer) this->sendCloseDefinition();
445
446    // Nettoyage de l'arborescence
447    if (hasClient && !hasServer) CleanTree();
448
449    if (hasClient)
450    {
451      sendCreateFileHeader();  //printf("myRank = %d, sendCreateFileHeader OK\n", myRank);
452
453      startPrefetchingOfEnabledReadModeFiles();  //printf("myRank = %d, startPrefetchingOfEnabledReadModeFiles OK\n", myRank);
454    }
455   }
456
457   void CContext::findAllEnabledFields(void)
458   {
459     for (unsigned int i = 0; i < this->enabledFiles.size(); i++)
460     (void)this->enabledFiles[i]->getEnabledFields();
461   }
462
463   void CContext::findAllEnabledFieldsInReadModeFiles(void)
464   {
465     for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
466     (void)this->enabledReadModeFiles[i]->getEnabledFields();
467   }
468
469   void CContext::readAttributesOfEnabledFieldsInReadModeFiles()
470   {
471      for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i)
472        (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode();
473   }
474
475   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)
476   {
477     int size = this->enabledFiles.size();
478     for (int i = 0; i < size; ++i)
479     {
480       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
481     }
482
483     for (int i = 0; i < size; ++i)
484     {
485       this->enabledFiles[i]->generateNewTransformationGridDest();
486     }
487   }
488
489   void CContext::solveAllRefOfEnabledFields(bool sendToServer)
490   {
491     int size = this->enabledFiles.size();
492     
493     for (int i = 0; i < size; ++i)
494     {
495       this->enabledFiles[i]->solveAllRefOfEnabledFields(sendToServer);
496     }
497   }
498
499   void CContext::buildFilterGraphOfEnabledFields()
500   {
501     int size = this->enabledFiles.size();
502     for (int i = 0; i < size; ++i)
503     {
504       this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);
505     }
506   }
507
508   void CContext::startPrefetchingOfEnabledReadModeFiles()
509   {
510     int size = enabledReadModeFiles.size();
511     for (int i = 0; i < size; ++i)
512     {
513        enabledReadModeFiles[i]->prefetchEnabledReadModeFields();
514     }
515   }
516
517   void CContext::checkPrefetchingOfEnabledReadModeFiles()
518   {
519     int size = enabledReadModeFiles.size();
520     for (int i = 0; i < size; ++i)
521     {
522        enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded();
523     }
524   }
525
526  void CContext::findFieldsWithReadAccess(void)
527  {
528    fieldsWithReadAccess.clear();
529    const vector<CField*> allFields = CField::getAll();
530    for (size_t i = 0; i < allFields.size(); ++i)
531    {
532      CField* field = allFields[i];
533
534      if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)
535        field->read_access = true;
536      else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
537        fieldsWithReadAccess.push_back(field);
538    }
539  }
540
541  void CContext::solveAllRefOfFieldsWithReadAccess()
542  {
543    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
544      fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false);
545  }
546
547  void CContext::buildFilterGraphOfFieldsWithReadAccess()
548  {
549    for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i)
550      fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true);
551  }
552
553   void CContext::solveAllInheritance(bool apply)
554   {
555     // Rsolution des hritages descendants (cd des hritages de groupes)
556     // pour chacun des contextes.
557      solveDescInheritance(apply);
558
559     // Rsolution des hritages par rfrence au niveau des fichiers.
560      const vector<CFile*> allFiles=CFile::getAll();
561      const vector<CGrid*> allGrids= CGrid::getAll();
562
563     //if (hasClient && !hasServer)
564      if (hasClient)
565      {
566        for (unsigned int i = 0; i < allFiles.size(); i++)
567          allFiles[i]->solveFieldRefInheritance(apply);
568      }
569
570      unsigned int vecSize = allGrids.size();
571      unsigned int i = 0;
572      for (i = 0; i < vecSize; ++i)
573        allGrids[i]->solveDomainAxisRefInheritance(apply);
574
575   }
576
577   void CContext::findEnabledFiles(void)
578   {
579      const std::vector<CFile*> allFiles = CFile::getAll();
580
581      for (unsigned int i = 0; i < allFiles.size(); i++)
582         if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est dfini.
583         {
584            if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix  vrai.
585               enabledFiles.push_back(allFiles[i]);
586         }
587         else enabledFiles.push_back(allFiles[i]); // otherwise true by default
588
589
590      if (enabledFiles.size() == 0)
591         DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm \""
592               << getId() << "\" !");
593   }
594
595   void CContext::findEnabledReadModeFiles(void)
596   {
597     int size = this->enabledFiles.size();
598     for (int i = 0; i < size; ++i)
599     {
600       if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read)
601        enabledReadModeFiles.push_back(enabledFiles[i]);
602     }
603   }
604
605   void CContext::closeAllFile(void)
606   {
607     std::vector<CFile*>::const_iterator
608            it = this->enabledFiles.begin(), end = this->enabledFiles.end();
609
610     for (; it != end; it++)
611     {
612       info(30)<<"Closing File : "<<(*it)->getId()<<endl;
613       (*it)->close();
614     }
615   }
616
617   /*!
618   \brief Dispatch event received from client
619      Whenever a message is received in buffer of server, it will be processed depending on
620   its event type. A new event type should be added in the switch list to make sure
621   it processed on server side.
622   \param [in] event: Received message
623   */
624   bool CContext::dispatchEvent(CEventServer& event)
625   {
626
627      if (SuperClass::dispatchEvent(event)) return true;
628      else
629      {
630        switch(event.type)
631        {
632           case EVENT_ID_CLOSE_DEFINITION :
633             recvCloseDefinition(event);
634             return true;
635             break;
636           case EVENT_ID_UPDATE_CALENDAR:
637             recvUpdateCalendar(event);
638             return true;
639             break;
640           case EVENT_ID_CREATE_FILE_HEADER :
641             recvCreateFileHeader(event);
642             return true;
643             break;
644           case EVENT_ID_POST_PROCESS:
645             recvPostProcessing(event);
646             return true;
647            case EVENT_ID_SEND_REGISTRY:
648             recvRegistry(event);
649             return true;
650            break;
651
652           default :
653             ERROR("bool CContext::dispatchEvent(CEventServer& event)",
654                    <<"Unknown Event");
655           return false;
656         }
657      }
658   }
659
660   //! Client side: Send a message to server to make it close
661   void CContext::sendCloseDefinition(void)
662   {
663     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION);
664     if (client->isServerLeader())
665     {
666       CMessage msg;
667       msg<<this->getIdServer();
668       const std::list<int>& ranks = client->getRanksServerLeader();
669       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
670         event.push(*itRank,1,msg);
671       client->sendEvent(event);
672     }
673     else client->sendEvent(event);
674   }
675
676   //! Server side: Receive a message of client announcing a context close
677   void CContext::recvCloseDefinition(CEventServer& event)
678   {
679
680      CBufferIn* buffer=event.subEvents.begin()->buffer;
681      string id;
682      *buffer>>id;
683      get(id)->closeDefinition();
684   }
685
686   //! Client side: Send a message to update calendar in each time step
687   void CContext::sendUpdateCalendar(int step)
688   {
689     if (!hasServer)
690     {
691       CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
692       if (client->isServerLeader())
693       {
694         CMessage msg;
695         msg<<this->getIdServer()<<step;
696         const std::list<int>& ranks = client->getRanksServerLeader();
697         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
698           event.push(*itRank,1,msg);
699         client->sendEvent(event);
700       }
701       else client->sendEvent(event);
702     }
703   }
704
705   //! Server side: Receive a message of client annoucing calendar update
706   void CContext::recvUpdateCalendar(CEventServer& event)
707   {
708      CBufferIn* buffer=event.subEvents.begin()->buffer;
709      string id;
710      *buffer>>id;
711      get(id)->recvUpdateCalendar(*buffer);
712   }
713
714   //! Server side: Receive a message of client annoucing calendar update
715   void CContext::recvUpdateCalendar(CBufferIn& buffer)
716   {
717      int step;
718      buffer>>step;
719      updateCalendar(step);
720   }
721
722   //! Client side: Send a message to create header part of netcdf file
723   void CContext::sendCreateFileHeader(void)
724   {
725     CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER);
726     if (client->isServerLeader())
727     {
728       CMessage msg;
729       msg<<this->getIdServer();
730       const std::list<int>& ranks = client->getRanksServerLeader();
731       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
732         event.push(*itRank,1,msg) ;
733       client->sendEvent(event);
734     }
735     else client->sendEvent(event);
736   }
737
738   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
739   void CContext::recvCreateFileHeader(CEventServer& event)
740   {
741      CBufferIn* buffer=event.subEvents.begin()->buffer;
742      string id;
743      *buffer>>id;
744      get(id)->recvCreateFileHeader(*buffer);
745   }
746
747   //! Server side: Receive a message of client annoucing the creation of header part of netcdf file
748   void CContext::recvCreateFileHeader(CBufferIn& buffer)
749   {
750      createFileHeader();
751   }
752
753   //! Client side: Send a message to do some post processing on server
754   void CContext::sendPostProcessing()
755   {
756     if (!hasServer)
757     {
758       CEventClient event(getType(),EVENT_ID_POST_PROCESS);
759       if (client->isServerLeader())
760       {
761         CMessage msg;
762         msg<<this->getIdServer();
763         const std::list<int>& ranks = client->getRanksServerLeader();
764         for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
765           event.push(*itRank,1,msg);
766         client->sendEvent(event);
767       }
768       else client->sendEvent(event);
769     }
770   }
771
772   //! Server side: Receive a message to do some post processing
773   void CContext::recvPostProcessing(CEventServer& event)
774   {
775      CBufferIn* buffer=event.subEvents.begin()->buffer;
776      string id;
777      *buffer>>id;
778      get(id)->recvPostProcessing(*buffer);
779   }
780
781   //! Server side: Receive a message to do some post processing
782   void CContext::recvPostProcessing(CBufferIn& buffer)
783   {
784      CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar();
785      postProcessing();
786   }
787
788   const StdString& CContext::getIdServer()
789   {
790      if (hasClient)
791      {
792        idServer_ = this->getId();
793        idServer_ += "_server";
794        return idServer_;
795      }
796      if (hasServer) return (this->getId());
797   }
798
799   /*!
800   \brief Do some simple post processings after parsing xml file
801      After the xml file (iodef.xml) is parsed, it is necessary to build all relations among
802   created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields),
803   which will be written out into netcdf files, are processed
804   */
805   void CContext::postProcessing()
806   {
807     int myRank;
808     MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
809
810     //printf("myRank = %d, in postProcessing, isPostProcessed = %d\n", myRank, isPostProcessed);
811     if (isPostProcessed) return;
812
813      // Make sure the calendar was correctly created
814      if (!calendar)
815        ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"")
816      else if (calendar->getTimeStep() == NoneDu)
817        ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"")
818      // Calendar first update to set the current date equals to the start date
819      calendar->update(0);  //printf("myRank = %d, calendar->update(0) OK\n", myRank);
820
821      // Find all inheritance in xml structure
822      this->solveAllInheritance();  //printf("myRank = %d, this->solveAllInheritance OK\n", myRank);
823
824      // Check if some axis, domains or grids are eligible to for compressed indexed output.
825      // Warning: This must be done after solving the inheritance and before the rest of post-processing
826      checkAxisDomainsGridsEligibilityForCompressedOutput();  //printf("myRank = %d, checkAxisDomainsGridsEligibilityForCompressedOutput OK\n", myRank);
827
828      // Check if some automatic time series should be generated
829      // Warning: This must be done after solving the inheritance and before the rest of post-processing
830      prepareTimeseries();  //printf("myRank = %d, prepareTimeseries OK\n", myRank);
831
832      //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers  sortir.
833      this->findEnabledFiles();  //printf("myRank = %d, this->findEnabledFiles OK\n", myRank);
834      this->findEnabledReadModeFiles();  //printf("myRank = %d, this->findEnabledReadModeFiles OK\n", myRank);
835
836      // Find all enabled fields of each file
837      this->findAllEnabledFields();  //printf("myRank = %d, this->findAllEnabledFields OK\n", myRank);
838      this->findAllEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->findAllEnabledFieldsInReadModeFiles OK\n", myRank);
839
840     if (hasClient && !hasServer)
841     {
842      // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis)
843      this->readAttributesOfEnabledFieldsInReadModeFiles();  //printf("myRank = %d, this->readAttributesOfEnabledFieldsInReadModeFiles OK\n", myRank);
844     }
845
846      // Only search and rebuild all reference objects of enable fields, don't transform
847      this->solveOnlyRefOfEnabledFields(false);  //printf("myRank = %d, this->solveOnlyRefOfEnabledFields(false) OK\n", myRank);
848
849      // Search and rebuild all reference object of enabled fields
850      this->solveAllRefOfEnabledFields(false);  //printf("myRank = %d, this->solveAllRefOfEnabledFields(false) OK\n", myRank);
851
852      // Find all fields with read access from the public API
853      findFieldsWithReadAccess();  //printf("myRank = %d, findFieldsWithReadAccess OK\n", myRank);
854      // and solve the all reference for them
855      solveAllRefOfFieldsWithReadAccess();  //printf("myRank = %d, solveAllRefOfFieldsWithReadAccess OK\n", myRank);
856
857      isPostProcessed = true;
858   }
859
860   /*!
861    * Compute the required buffer size to send the attributes (mostly those grid related).
862    *
863    * \param maxEventSize [in/out] the size of the bigger event for each connected server
864    */
865   std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize)
866   {
867     std::map<int, StdSize> attributesSize;
868
869     if (hasClient)
870     {
871       size_t numEnabledFiles = this->enabledFiles.size();
872       for (size_t i = 0; i < numEnabledFiles; ++i)
873       {
874         CFile* file = this->enabledFiles[i];
875
876         std::vector<CField*> enabledFields = file->getEnabledFields();
877         size_t numEnabledFields = enabledFields.size();
878         for (size_t j = 0; j < numEnabledFields; ++j)
879         {
880           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize();
881           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
882           for (; it != itE; ++it)
883           {
884             // If attributesSize[it->first] does not exist, it will be zero-initialized
885             // so we can use it safely without checking for its existance
886             if (attributesSize[it->first] < it->second)
887               attributesSize[it->first] = it->second;
888
889             if (maxEventSize[it->first] < it->second)
890               maxEventSize[it->first] = it->second;
891           }
892         }
893       }
894     }
895
896     return attributesSize;
897   }
898
899   /*!
900    * Compute the required buffer size to send the fields data.
901    *
902    * \param maxEventSize [in/out] the size of the bigger event for each connected server
903    */
904   std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize)
905   {
906     CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read;
907
908     std::map<int, StdSize> dataSize;
909
910     // Find all reference domain and axis of all active fields
911     size_t numEnabledFiles = this->enabledFiles.size();
912     for (size_t i = 0; i < numEnabledFiles; ++i)
913     {
914       CFile* file = this->enabledFiles[i];
915       CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue();
916
917       if (fileMode == mode)
918       {
919         std::vector<CField*> enabledFields = file->getEnabledFields();
920         size_t numEnabledFields = enabledFields.size();
921         for (size_t j = 0; j < numEnabledFields; ++j)
922         {
923           const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize();
924           std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
925           for (; it != itE; ++it)
926           {
927             // If dataSize[it->first] does not exist, it will be zero-initialized
928             // so we can use it safely without checking for its existance
929             if (CXios::isOptPerformance)
930               dataSize[it->first] += it->second;
931             else if (dataSize[it->first] < it->second)
932               dataSize[it->first] = it->second;
933
934             if (maxEventSize[it->first] < it->second)
935               maxEventSize[it->first] = it->second;
936           }
937         }
938       }
939     }
940
941     return dataSize;
942   }
943
944   //! Client side: Send infomation of active files (files are enabled to write out)
945   void CContext::sendEnabledFiles()
946   {
947     int size = this->enabledFiles.size();
948
949     // In a context, each type has a root definition, e.g: axis, domain, field.
950     // Every object must be a child of one of these root definition. In this case
951     // all new file objects created on server must be children of the root "file_definition"
952     StdString fileDefRoot("file_definition");
953     CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
954
955     for (int i = 0; i < size; ++i)
956     {
957       cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId());
958       this->enabledFiles[i]->sendAllAttributesToServer();
959       this->enabledFiles[i]->sendAddAllVariables();
960     }
961   }
962
963   //! Client side: Send information of active fields (ones are written onto files)
964   void CContext::sendEnabledFields()
965   {
966     int size = this->enabledFiles.size();
967     for (int i = 0; i < size; ++i)
968     {
969       this->enabledFiles[i]->sendEnabledFields();
970     }
971   }
972
973   //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output
974   void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput()
975   {
976     if (!hasClient) return;
977
978     const vector<CAxis*> allAxis = CAxis::getAll();
979     for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
980       (*it)->checkEligibilityForCompressedOutput();
981
982     const vector<CDomain*> allDomains = CDomain::getAll();
983     for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
984       (*it)->checkEligibilityForCompressedOutput();
985
986     const vector<CGrid*> allGrids = CGrid::getAll();
987     for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
988       (*it)->checkEligibilityForCompressedOutput();
989   }
990
991   //! Client side: Prepare the timeseries by adding the necessary files
992   void CContext::prepareTimeseries()
993   {
994     if (!hasClient) return;
995
996     const std::vector<CFile*> allFiles = CFile::getAll();
997     for (size_t i = 0; i < allFiles.size(); i++)
998     {
999       CFile* file = allFiles[i];
1000
1001       if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1002       {
1003         StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : file->getFileOutputName();
1004
1005         const std::vector<CField*> allFields = file->getAllFields();
1006         for (size_t j = 0; j < allFields.size(); j++)
1007         {
1008           CField* field = allFields[j];
1009
1010           if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1011           {
1012             CFile* tsFile = CFile::create();
1013             tsFile->duplicateAttributes(file);
1014             tsFile->setVirtualVariableGroup(file->getVirtualVariableGroup());
1015
1016             tsFile->name = tsPrefix + "_";
1017             if (!field->name.isEmpty())
1018               tsFile->name.get() += field->name;
1019             else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet
1020               tsFile->name.get() += field->field_ref;
1021             else
1022               tsFile->name.get() += field->getId();
1023
1024             if (!field->ts_split_freq.isEmpty())
1025               tsFile->split_freq = field->ts_split_freq;
1026
1027             CField* tsField = tsFile->addField();
1028             tsField->field_ref = field->getId();
1029             tsField->setVirtualVariableGroup(field->getVirtualVariableGroup());
1030
1031             tsFile->solveFieldRefInheritance(true);
1032
1033             if (file->timeseries == CFile::timeseries_attr::exclusive)
1034               field->enabled = false;
1035           }
1036         }
1037
1038         // Finally disable the original file is need be
1039         if (file->timeseries == CFile::timeseries_attr::only)
1040          file->enabled = false;
1041       }
1042     }
1043   }
1044
1045   //! Client side: Send information of reference grid of active fields
1046   void CContext::sendRefGrid()
1047   {
1048     std::set<StdString> gridIds;
1049     int sizeFile = this->enabledFiles.size();
1050     CFile* filePtr(NULL);
1051
1052     // Firstly, find all reference grids of all active fields
1053     for (int i = 0; i < sizeFile; ++i)
1054     {
1055       filePtr = this->enabledFiles[i];
1056       std::vector<CField*> enabledFields = filePtr->getEnabledFields();
1057       int sizeField = enabledFields.size();
1058       for (int numField = 0; numField < sizeField; ++numField)
1059       {
1060         if (0 != enabledFields[numField]->getRelGrid())
1061           gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1062       }
1063     }
1064
1065     // Create all reference grids on server side
1066     StdString gridDefRoot("grid_definition");
1067     CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1068     std::set<StdString>::const_iterator it, itE = gridIds.end();
1069     for (it = gridIds.begin(); it != itE; ++it)
1070     {
1071       gridPtr->sendCreateChild(*it);
1072       CGrid::get(*it)->sendAllAttributesToServer();
1073       CGrid::get(*it)->sendAllDomains();
1074       CGrid::get(*it)->sendAllAxis();
1075       CGrid::get(*it)->sendAllScalars();
1076     }
1077   }
1078
1079
1080   //! Client side: Send information of reference domain and axis of active fields
1081   void CContext::sendRefDomainsAxis()
1082   {
1083     std::set<StdString> domainIds, axisIds, scalarIds;
1084
1085     // Find all reference domain and axis of all active fields
1086     int numEnabledFiles = this->enabledFiles.size();
1087     for (int i = 0; i < numEnabledFiles; ++i)
1088     {
1089       std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();
1090       int numEnabledFields = enabledFields.size();
1091       for (int j = 0; j < numEnabledFields; ++j)
1092       {
1093         const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1094         if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1095         if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1096         if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1097       }
1098     }
1099
1100     // Create all reference axis on server side
1101     std::set<StdString>::iterator itDom, itAxis, itScalar;
1102     std::set<StdString>::const_iterator itE;
1103
1104     StdString scalarDefRoot("scalar_definition");
1105     CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1106     itE = scalarIds.end();
1107     for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1108     {
1109       if (!itScalar->empty())
1110       {
1111         scalarPtr->sendCreateChild(*itScalar);
1112         CScalar::get(*itScalar)->sendAllAttributesToServer();
1113       }
1114     }
1115
1116     StdString axiDefRoot("axis_definition");
1117     CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1118     itE = axisIds.end();
1119     for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1120     {
1121       if (!itAxis->empty())
1122       {
1123         axisPtr->sendCreateChild(*itAxis);
1124         CAxis::get(*itAxis)->sendAllAttributesToServer();
1125       }
1126     }
1127
1128     // Create all reference domains on server side
1129     StdString domDefRoot("domain_definition");
1130     CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1131     itE = domainIds.end();
1132     for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1133     {
1134       if (!itDom->empty()) {
1135          domPtr->sendCreateChild(*itDom);
1136          CDomain::get(*itDom)->sendAllAttributesToServer();
1137       }
1138     }
1139   }
1140
1141   //! Update calendar in each time step
1142   void CContext::updateCalendar(int step)
1143   {
1144      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl;
1145      calendar->update(step);
1146      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl;
1147
1148      if (hasClient)
1149      {
1150        checkPrefetchingOfEnabledReadModeFiles();
1151        garbageCollector.invalidate(calendar->getCurrentDate());
1152      }
1153   }
1154
1155   //! Server side: Create header of netcdf file
1156   void CContext::createFileHeader(void )
1157   {
1158      vector<CFile*>::const_iterator it;
1159
1160      for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)
1161      {
1162         (*it)->initFile();
1163      }
1164   }
1165
1166   //! Get current context
1167   CContext* CContext::getCurrent(void)
1168   {
1169     return CObjectFactory::GetObject<CContext>(CObjectFactory::GetCurrentContextId()).get();
1170   }
1171
1172   /*!
1173   \brief Set context with an id be the current context
1174   \param [in] id identity of context to be set to current
1175   */
1176   void CContext::setCurrent(const string& id)
1177   {
1178     CObjectFactory::SetCurrentContextId(id);
1179     CGroupFactory::SetCurrentContextId(id);
1180   }
1181
1182  /*!
1183  \brief Create a context with specific id
1184  \param [in] id identity of new context
1185  \return pointer to the new context or already-existed one with identity id
1186  */
1187  CContext* CContext::create(const StdString& id)
1188  {
1189    CContext::setCurrent(id);
1190
1191    bool hasctxt = CContext::has(id);
1192    CContext* context = CObjectFactory::CreateObject<CContext>(id).get();
1193    getRoot();
1194    if (!hasctxt) CGroupFactory::AddChild(root, context->getShared());
1195
1196#define DECLARE_NODE(Name_, name_) \
1197    C##Name_##Definition::create(C##Name_##Definition::GetDefName());
1198#define DECLARE_NODE_PAR(Name_, name_)
1199#include "node_type.conf"
1200
1201    return (context);
1202  }
1203
1204
1205
1206     //! Server side: Receive a message to do some post processing
1207  void CContext::recvRegistry(CEventServer& event)
1208  {
1209    CBufferIn* buffer=event.subEvents.begin()->buffer;
1210    string id;
1211    *buffer>>id;
1212    get(id)->recvRegistry(*buffer);
1213  }
1214
1215  void CContext::recvRegistry(CBufferIn& buffer)
1216  {
1217    if (server->intraCommRank==0)
1218    {
1219      CRegistry registry(server->intraComm) ;
1220      registry.fromBuffer(buffer) ;
1221      registryOut->mergeRegistry(registry) ;
1222    }
1223  }
1224
1225  void CContext::sendRegistry(void)
1226  {
1227    registryOut->hierarchicalGatherRegistry() ;
1228
1229    CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY);
1230    if (client->isServerLeader())
1231    {
1232       CMessage msg ;
1233       msg<<this->getIdServer();
1234       if (client->clientRank==0) msg<<*registryOut ;
1235       const std::list<int>& ranks = client->getRanksServerLeader();
1236       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1237         event.push(*itRank,1,msg);
1238       client->sendEvent(event);
1239    }
1240    else client->sendEvent(event);
1241  }
1242
1243} // namespace xios
Note: See TracBrowser for help on using the repository browser.