source: XIOS/dev/dev_olga/src/cxios.cpp @ 1677

Last change on this file since 1677 was 1653, checked in by oabramkina, 5 years ago

Developments for visualization of XIOS workflow.

Branch is spawned from trunk r1649.

Boost library is used for producing Graphviz DOT files. Current results: a DOT file representing a static workflow. For a complete proof of concept, DOT files for each timestamp should be generated. The necessary information has been collected by XIOS, it only requires rearranging the information for graphing (changes in classes CWorkflowGraph and CGraphviz).

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 9.5 KB
Line 
1
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "client.hpp"
5#include "server.hpp"
6#include "xml_parser.hpp"
7#include <boost/functional/hash.hpp>
8#include "mpi.hpp"
9#include "memory.hpp"
10#include <new>
11#include "memtrack.hpp"
12#include "registry.hpp"
13#include "graphviz.hpp"
14
15namespace xios
16{
17  string CXios::rootFile="./iodef.xml" ;
18  string CXios::xiosCodeId="xios.x" ;
19  string CXios::clientFile="./xios_client";
20  string CXios::serverFile="./xios_server";
21  string CXios::serverPrmFile="./xios_server1";
22  string CXios::serverSndFile="./xios_server2";
23
24  bool CXios::xiosStack = true;
25  bool CXios::systemStack = false;
26
27  bool CXios::isClient ;
28  bool CXios::isServer ;
29  MPI_Comm CXios::globalComm ;
30  bool CXios::usingOasis ;
31  bool CXios::usingServer = false;
32  bool CXios::usingServer2 = false;
33  int CXios::ratioServer2 = 50;
34  int CXios::nbPoolsServer2 = 1;
35  double CXios::bufferSizeFactor = 1.0;
36  const double CXios::defaultBufferSizeFactor = 1.0;
37  StdSize CXios::minBufferSize = 1024 * sizeof(double);
38  StdSize CXios::maxBufferSize = std::numeric_limits<int>::max() ;
39  bool CXios::printLogs2Files;
40  bool CXios::isOptPerformance = true;
41  CRegistry* CXios::globalRegistry = 0;
42  double CXios::recvFieldTimeout = 300.0;
43  bool CXios::checkEventSync=false ;
44 
45  //! Parse configuration file and create some objects from it
46  void CXios::initialize()
47  {
48    set_new_handler(noMemory);
49    parseFile(rootFile);
50    parseXiosConfig();
51  }
52
53  /*!
54  \brief Parse xios part of configuration file (.iodef.xml)
55   Both client and server need information returned from this function
56  */
57  void CXios::parseXiosConfig()
58  {
59    usingOasis=getin<bool>("using_oasis",false) ;
60    usingServer=getin<bool>("using_server",false) ;
61    usingServer2=getin<bool>("using_server2",false) ;
62    ratioServer2=getin<int>("ratio_server2",50);
63    nbPoolsServer2=getin<int>("number_pools_server2",0);
64    info.setLevel(getin<int>("info_level",0)) ;
65    report.setLevel(getin<int>("info_level",50));
66    printLogs2Files=getin<bool>("print_file",false);
67
68    xiosStack=getin<bool>("xios_stack",true) ;
69    systemStack=getin<bool>("system_stack",false) ;
70    if (xiosStack && systemStack)
71    {
72      xiosStack = false;
73    }
74
75    StdString bufMemory("memory");
76    StdString bufPerformance("performance");
77    StdString bufOpt = getin<StdString>("optimal_buffer_size", bufPerformance);
78    std::transform(bufOpt.begin(), bufOpt.end(), bufOpt.begin(), ::tolower);
79    if (0 == bufOpt.compare(bufMemory)) isOptPerformance = false;
80    else if (0 != bufOpt.compare(bufPerformance))
81    {
82      ERROR("CXios::parseXiosConfig()", << "optimal_buffer_size must be memory or performance "<< endl );
83    }
84
85    bufferSizeFactor = getin<double>("buffer_size_factor", defaultBufferSizeFactor);
86    minBufferSize = getin<int>("min_buffer_size", 1024 * sizeof(double));
87    maxBufferSize = getin<int>("max_buffer_size", std::numeric_limits<int>::max());
88    recvFieldTimeout = getin<double>("recv_field_timeout", recvFieldTimeout);
89    if (recvFieldTimeout < 0.0)
90      ERROR("CXios::parseXiosConfig()", "recv_field_timeout cannot be negative.");
91
92    checkEventSync = getin<bool>("check_event_sync", checkEventSync);
93
94    globalComm=MPI_COMM_WORLD ;
95  }
96
97  /*!
98  Initialize client
99  \param [in] codeId identity of context
100  \param [in] localComm local communicator
101  \param [in/out] returnComm communicator corresponding to group of client with same codeId
102  */
103  void CXios::initClientSide(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
104  TRY
105  {
106    initialize() ;
107
108    isClient = true;
109
110    CClient::initialize(codeId,localComm,returnComm) ;
111    if (CClient::getRank()==0) globalRegistry = new CRegistry(returnComm) ;
112
113    // If there are no server processes then we are in attached mode
114    // and the clients are also servers
115    isServer = !usingServer;
116
117    if (printLogs2Files)
118    {
119      CClient::openInfoStream(clientFile);
120      CClient::openErrorStream(clientFile);
121    }
122    else
123    {
124      CClient::openInfoStream();
125      CClient::openErrorStream();
126    }
127  }
128  CATCH
129
130  void CXios::clientFinalize(void)
131  {
132     CClient::finalize() ;
133     if (CClient::getRank()==0)
134     {
135       info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
136       globalRegistry->toFile("xios_registry.bin") ;
137       delete globalRegistry ;
138       
139       CGraphviz::buildStaticWorkflowGraph();
140     }
141
142#ifdef XIOS_MEMTRACK
143
144#ifdef XIOS_MEMTRACK_LIGHT
145       report(10) << " Memory report : current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
146       report(10) << " Memory report : maximum memory used by XIOS : "<<  MemTrack::getMaxMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
147#endif
148
149#ifdef XIOS_MEMTRACK_FULL
150     MemTrack::TrackListMemoryUsage() ;
151     MemTrack::TrackDumpBlocks();
152#endif
153
154     CClient::closeInfoStream();
155
156#endif
157  }
158
159  //! Init server by parsing only xios part of config file
160  void CXios::initServer()
161  {
162    set_new_handler(noMemory);
163    std::set<StdString> parseList;
164    parseList.insert("xios");
165    xml::CXMLParser::ParseFile(rootFile, parseList);
166    parseXiosConfig();
167  }
168
169  //! Initialize server then put it into listening state
170  void CXios::initServerSide(void)
171  {
172    initServer();
173    isClient = false;
174    isServer = true;
175
176    // Initialize all aspects MPI
177    CServer::initialize();
178    if (CServer::getRank()==0 && CServer::serverLevel != 1) globalRegistry = new CRegistry(CServer::intraComm) ;
179   
180    if (printLogs2Files)
181    {
182      if (CServer::serverLevel == 0)
183      {
184        CServer::openInfoStream(serverFile);
185        CServer::openErrorStream(serverFile);
186      }
187      else if (CServer::serverLevel == 1)
188      {
189        CServer::openInfoStream(serverPrmFile);
190        CServer::openErrorStream(serverPrmFile);
191      }
192      else
193      {
194        CServer::openInfoStream(serverSndFile);
195        CServer::openErrorStream(serverSndFile);
196      }
197    }
198    else
199    {
200      CServer::openInfoStream();
201      CServer::openErrorStream();
202    }
203
204    // Enter the loop to listen message from Client
205    CServer::eventLoop();
206
207    // Finalize
208    if (CServer::serverLevel == 0)
209    {
210      if (CServer::getRank()==0)
211      {
212        info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
213        globalRegistry->toFile("xios_registry.bin") ;
214        delete globalRegistry ;
215      }
216    }
217    else
218    {
219      // If using two server levels:
220      // (1) merge registries on each pool
221      // (2) send merged registries to the first pool
222      // (3) merge received registries on the first pool
223      if (CServer::serverLevel == 2)
224      {
225        vector<int>& secondaryServerGlobalRanks = CServer::getSecondaryServerGlobalRanks();
226        int firstPoolGlobalRank = secondaryServerGlobalRanks[0];
227        int rankGlobal;
228        MPI_Comm_rank(globalComm, &rankGlobal);
229
230        // Merge registries defined on each pools
231        CRegistry globalRegistrySndServers (CServer::intraComm);
232
233        // All pools (except the first): send globalRegistry to the first pool
234        for (int i=1; i<secondaryServerGlobalRanks.size(); i++)
235        {
236          if (rankGlobal == secondaryServerGlobalRanks[i])
237          {
238            globalRegistrySndServers.mergeRegistry(*globalRegistry) ;
239            int registrySize = globalRegistrySndServers.size();
240            MPI_Send(&registrySize,1,MPI_LONG,firstPoolGlobalRank,15,CXios::globalComm) ;
241            CBufferOut buffer(registrySize) ;
242            globalRegistrySndServers.toBuffer(buffer) ;
243            MPI_Send(buffer.start(),registrySize,MPI_CHAR,firstPoolGlobalRank,15,CXios::globalComm) ;
244          }
245        }
246
247        // First pool: receive globalRegistry of all secondary server pools, merge and write the resultant registry
248        if (rankGlobal == firstPoolGlobalRank)
249        {
250          MPI_Status status;
251          char* recvBuff;
252
253          globalRegistrySndServers.mergeRegistry(*globalRegistry) ;
254
255          for (int i=1; i< secondaryServerGlobalRanks.size(); i++)
256          {
257            int rank = secondaryServerGlobalRanks[i];
258            int registrySize = 0;
259            MPI_Recv(&registrySize, 1, MPI_LONG, rank, 15, CXios::globalComm, &status);
260            recvBuff = new char[registrySize];
261            MPI_Recv(recvBuff, registrySize, MPI_CHAR, rank, 15, CXios::globalComm, &status);
262            CBufferIn buffer(recvBuff, registrySize) ;
263            CRegistry recvRegistry;
264            recvRegistry.fromBuffer(buffer) ;
265            globalRegistrySndServers.mergeRegistry(recvRegistry) ;
266            delete[] recvBuff;
267          }
268
269          info(80)<<"Write data base Registry"<<endl<<globalRegistrySndServers.toString()<<endl ;
270          globalRegistrySndServers.toFile("xios_registry.bin") ;
271
272        }
273      }
274      delete globalRegistry;
275    }
276    CServer::finalize();
277
278#ifdef XIOS_MEMTRACK
279
280#ifdef XIOS_MEMTRACK_LIGHT
281       report(10) << " Memory report : current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
282       report(10) << " Memory report : maximum memory used by XIOS : "<<  MemTrack::getMaxMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
283#endif
284
285#ifdef XIOS_MEMTRACK_FULL
286     MemTrack::TrackListMemoryUsage() ;
287     MemTrack::TrackDumpBlocks();
288#endif
289#endif
290    CServer::closeInfoStream();
291  }
292
293  //! Parse configuration file
294  void CXios::parseFile(const string& filename)
295  {
296    xml::CXMLParser::ParseFile(filename);
297  }
298
299  //! Set using server
300  void CXios::setUsingServer()
301  {
302    usingServer = true;
303  }
304
305  //! Unset using server
306  void CXios::setNotUsingServer()
307  {
308    usingServer = false;
309  }
310}
Note: See TracBrowser for help on using the repository browser.