source: XIOS3/trunk/src/server.cpp @ 2407

Last change on this file since 2407 was 2407, checked in by ymipsl, 21 months ago

Implement separate "reader" and "writer" service. Default reader live on same ressources that "writer" or "gatherer" services.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 18.4 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "event_scheduler.hpp"
16#include "string_tools.hpp"
17#include "ressources_manager.hpp"
18#include "services_manager.hpp"
19#include "contexts_manager.hpp"
20#include "servers_ressource.hpp"
21#include "services.hpp"
22#include <cstdio>
23#include "workflow_graph.hpp"
24#include "release_static_allocation.hpp"
25#include <sys/stat.h>
26#include <unistd.h>
27
28
29
30namespace xios
31{
32    MPI_Comm CServer::intraComm_ ;
33    MPI_Comm CServer::serversComm_ ;
34    std::list<MPI_Comm> CServer::interCommLeft ;
35    std::list<MPI_Comm> CServer::interCommRight ;
36    std::list<MPI_Comm> CServer::contextInterComms;
37    std::list<MPI_Comm> CServer::contextIntraComms;
38    int CServer::serverLevel = 0 ;
39    int CServer::nbContexts = 0;
40    bool CServer::isRoot = false ;
41    int CServer::rank_ = INVALID_RANK;
42    StdOFStream CServer::m_infoStream;
43    StdOFStream CServer::m_errorStream;
44    map<string,CContext*> CServer::contextList ;
45    vector<int> CServer::sndServerGlobalRanks;
46    bool CServer::finished=false ;
47    bool CServer::is_MPI_Initialized ;
48    CEventScheduler* CServer::eventScheduler = 0;
49    CServersRessource* CServer::serversRessource_=nullptr ;
50    CThirdPartyDriver* CServer::driver_ =nullptr ;
51
52       
53    void CServer::initialize(void)
54    {
55     
56      MPI_Comm serverComm ;
57      int initialized ;
58      MPI_Initialized(&initialized) ;
59      if (initialized) is_MPI_Initialized=true ;
60      else is_MPI_Initialized=false ;
61      MPI_Comm globalComm=CXios::getGlobalComm() ;
62      CTimer::get("XIOS server").resume() ;
63      /////////////////////////////////////////
64      ///////////// PART 1 ////////////////////
65      /////////////////////////////////////////
66      // don't use OASIS
67      if (!CXios::usingOasis)
68      {
69        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
70       
71        // split the global communicator
72        // get hash from all model to attribute a unique color (int) and then split to get client communicator
73        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
74         
75        int commRank, commSize ;
76        MPI_Comm_rank(globalComm,&commRank) ;
77        MPI_Comm_size(globalComm,&commSize) ;
78
79        std::hash<string> hashString ;
80        size_t hashServer=hashString(CXios::xiosCodeId) ;
81         
82        size_t* hashAll = new size_t[commSize] ;
83        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
84         
85        int color=0 ;
86        map<size_t,int> listHash ;
87        for(int i=0 ; i<=commSize ; i++) 
88          if (listHash.count(hashAll[i])==0) 
89          {
90            listHash[hashAll[i]]=color ;
91            color=color+1 ;
92          }
93        color=listHash[hashServer] ;
94        delete[] hashAll ;
95
96        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
97      }
98      else // using OASIS
99      {
100        if (!is_MPI_Initialized) driver_ = new CThirdPartyDriver();
101
102        driver_->getComponentCommunicator( serverComm );
103      }
104      MPI_Comm_dup(serverComm, &intraComm_);
105     
106      CTimer::get("XIOS").resume() ;
107      CTimer::get("XIOS initialize").resume() ;
108 
109      /////////////////////////////////////////
110      ///////////// PART 2 ////////////////////
111      /////////////////////////////////////////
112     
113
114      // Create the XIOS communicator for every process which is related
115      // to XIOS, as well on client side as on server side
116      MPI_Comm xiosGlobalComm ;
117      string strIds=CXios::getin<string>("clients_code_id","") ;
118      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
119      if (strIds.empty())
120      {
121        // no code Ids given, suppose XIOS initialisation is global           
122        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
123        MPI_Comm splitComm,interComm ;
124        MPI_Comm_rank(globalComm,&commGlobalRank) ;
125        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
126        MPI_Comm_rank(splitComm,&commRank) ;
127        if (commRank==0) serverLeader=commGlobalRank ;
128        else serverLeader=0 ;
129        clientLeader=0 ;
130        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
131        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
132        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
133        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
134        CXios::setXiosComm(xiosGlobalComm) ;
135      }
136      else
137      {
138
139        xiosGlobalCommByFileExchange(serverComm) ;
140
141      }
142     
143      /////////////////////////////////////////
144      ///////////// PART 4 ////////////////////
145      //  create servers intra communicator  //
146      /////////////////////////////////////////
147     
148      int commRank ;
149      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
150      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
151     
152      CXios::setUsingServer() ;
153
154      /////////////////////////////////////////
155      ///////////// PART 5 ////////////////////
156      //       redirect files output         //
157      /////////////////////////////////////////
158     
159      CServer::openInfoStream(CXios::serverFile);
160      CServer::openErrorStream(CXios::serverFile);
161
162      /////////////////////////////////////////
163      ///////////// PART 4 ////////////////////
164      /////////////////////////////////////////
165
166      CXios::launchDaemonsManager(true) ;
167     
168      /////////////////////////////////////////
169      ///////////// PART 5 ////////////////////
170      /////////////////////////////////////////
171
172      // create the services
173
174      auto ressourcesManager=CXios::getRessourcesManager() ;
175      auto servicesManager=CXios::getServicesManager() ;
176      auto contextsManager=CXios::getContextsManager() ;
177      auto daemonsManager=CXios::getDaemonsManager() ;
178      auto serversRessource=CServer::getServersRessource() ;
179
180      int rank;
181      MPI_Comm_rank(intraComm_, &rank) ;
182      if (rank==0) isRoot=true;
183      else isRoot=false;
184
185      if (serversRessource->isServerLeader())
186      {
187        int nbRessources = ressourcesManager->getRessourcesSize() ;
188        if (!CXios::usingServer2)
189        {
190          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
191          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ;
192          servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ;
193        }
194        else
195        {
196          int nprocsServer = nbRessources*CXios::ratioServer2/100.;
197          int nprocsGatherer = nbRessources - nprocsServer ;
198         
199          int nbPoolsServer2 = CXios::nbPoolsServer2 ;
200          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
201          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
202          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
203          servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ;
204          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ;
205
206
207        }
208//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
209      }
210      CTimer::get("XIOS initialize").suspend() ;
211
212      /////////////////////////////////////////
213      ///////////// PART 5 ////////////////////
214      /////////////////////////////////////////
215      // loop on event loop
216
217      bool finished=false ;
218      CTimer::get("XIOS event loop").resume() ;
219
220      while (!finished)
221      {
222        finished=daemonsManager->eventLoop() ;
223      }
224      CTimer::get("XIOS event loop").suspend() ;
225
226      // Delete CContext
227      //CObjectTemplate<CContext>::cleanStaticDataStructure();
228    }
229
230
231
232
233
234    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
235    {
236       
237      MPI_Comm globalComm=CXios::getGlobalComm() ;
238      MPI_Comm xiosGlobalComm ;
239     
240      string strIds=CXios::getin<string>("clients_code_id","") ;
241      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
242     
243      int commRank, globalRank ;
244      MPI_Comm_rank(serverComm, &commRank) ;
245      MPI_Comm_rank(globalComm, &globalRank) ;
246      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
247
248      if (commRank==0) // if root process publish name
249      { 
250        std::ofstream ofs (serverFileName, std::ofstream::out);
251        ofs<<globalRank ;
252        ofs.close();
253      }
254       
255      vector<int> clientsRank(clientsCodeId.size()) ;
256      for(int i=0;i<clientsRank.size();i++)
257      {
258        std::ifstream ifs ;
259        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
260        struct stat buffer;
261        do {
262        } while( stat(fileName.c_str(), &buffer) != 0 );
263        sleep(1);
264        ifs.open(fileName, ifstream::in) ;
265        ifs>>clientsRank[i] ;
266        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
267        ifs.close() ; 
268      }
269
270      MPI_Comm intraComm ;
271      MPI_Comm_dup(serverComm,&intraComm) ;
272      MPI_Comm interComm ;
273      for(int i=0 ; i<clientsRank.size(); i++)
274      { 
275        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
276        interCommLeft.push_back(interComm) ;
277        MPI_Comm_free(&intraComm) ;
278        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
279      }
280      xiosGlobalComm=intraComm ; 
281      MPI_Barrier(xiosGlobalComm);
282      if (commRank==0) std::remove(serverFileName.c_str()) ;
283      MPI_Barrier(xiosGlobalComm);
284
285      CXios::setXiosComm(xiosGlobalComm) ;
286     
287    }
288
289
290    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
291    {
292        // untested, need to be tested on a true MPI-2 compliant library
293
294        // try to discover other client/server
295/*
296        // publish server name
297        char portName[MPI_MAX_PORT_NAME];
298        int ierr ;
299        int commRank ;
300        MPI_Comm_rank(serverComm, &commRank) ;
301       
302        if (commRank==0) // if root process publish name
303        { 
304          MPI_Open_port(MPI_INFO_NULL, portName);
305          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
306        }
307
308        MPI_Comm intraComm=serverComm ;
309        MPI_Comm interComm ;
310        for(int i=0 ; i<clientsCodeId.size(); i++)
311        { 
312          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
313          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
314        }
315*/     
316    }
317
318   /*!
319    * Root process is listening for an order sent by client to call "oasis_enddef".
320    * The root client of a compound send the order (tag 5). It is probed and received.
321    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
322    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
323    */
324   
325     void CServer::listenOasisEnddef(void)
326     {
327        int flag ;
328        MPI_Status status ;
329        list<MPI_Comm>::iterator it;
330        int msg ;
331        static int nbCompound=0 ;
332        int size ;
333        static bool sent=false ;
334        static MPI_Request* allRequests ;
335        static MPI_Status* allStatus ;
336
337
338        if (sent)
339        {
340          MPI_Comm_size(intraComm_,&size) ;
341          MPI_Testall(size,allRequests, &flag, allStatus) ;
342          if (flag==true)
343          {
344            delete [] allRequests ;
345            delete [] allStatus ;
346            sent=false ;
347          }
348        }
349       
350
351        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
352        {
353           MPI_Status status ;
354           traceOff() ;
355           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
356           traceOn() ;
357           if (flag==true)
358           {
359              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
360              nbCompound++ ;
361              if (nbCompound==interCommLeft.size())
362              {
363                MPI_Comm_size(intraComm_,&size) ;
364                allRequests= new MPI_Request[size] ;
365                allStatus= new MPI_Status[size] ;
366                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
367                sent=true ;
368              }
369           }
370        }
371}
372     
373   /*!
374    * Processes probes message from root process if oasis_enddef call must be done.
375    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
376    */
377     void CServer::listenRootOasisEnddef(void)
378     {
379       int flag ;
380       MPI_Status status ;
381       const int root=0 ;
382       int msg ;
383       static bool eventSent=false ;
384
385       if (eventSent)
386       {
387         boost::hash<string> hashString;
388         size_t hashId = hashString("oasis_enddef");
389         if (CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->queryEvent(0,hashId))
390         {
391           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->popEvent() ;
392           driver_->endSynchronizedDefinition() ;
393           eventSent=false ;
394         }
395       }
396         
397       traceOff() ;
398       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
399       traceOn() ;
400       if (flag==true)
401       {
402           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
403           boost::hash<string> hashString;
404           size_t hashId = hashString("oasis_enddef");
405           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->registerEvent(0,hashId);
406           eventSent=true ;
407       }
408     }
409
410    void CServer::finalize(void)
411    {
412      CTimer::get("XIOS").suspend() ;
413      CTimer::get("XIOS server").suspend() ;
414      delete eventScheduler ;
415
416      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
417        MPI_Comm_free(&(*it));
418
419      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
420        MPI_Comm_free(&(*it));
421
422        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
423          MPI_Comm_free(&(*it));
424
425//      MPI_Comm_free(&intraComm);
426      CXios::finalizeDaemonsManager();
427      finalizeServersRessource();
428     
429      CContext::removeAllContexts() ; // free memory for related context
430         
431      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
432
433      if (!is_MPI_Initialized)
434      {
435        if (CXios::usingOasis) delete driver_;
436        else MPI_Finalize() ;
437      }
438      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
439      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
440      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
441      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
442     
443      CWorkflowGraph::drawWorkFlowGraph_server();
444      xios::releaseStaticAllocation() ; // free memory from static allocation
445    }
446
447    /*!
448    * Open a file specified by a suffix and an extension and use it for the given file buffer.
449    * The file name will be suffix+rank+extension.
450    *
451    * \param fileName[in] protype file name
452    * \param ext [in] extension of the file
453    * \param fb [in/out] the file buffer
454    */
455    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
456    {
457      StdStringStream fileNameServer;
458      int numDigit = 0;
459      int commSize = 0;
460      int commRank ;
461      int id;
462     
463      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
464      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
465
466      while (commSize)
467      {
468        commSize /= 10;
469        ++numDigit;
470      }
471      id = commRank;
472
473      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
474      fb->open(fileNameServer.str().c_str(), std::ios::out);
475      if (!fb->is_open())
476        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
477              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
478    }
479
480    /*!
481    * \brief Open a file stream to write the info logs
482    * Open a file stream with a specific file name suffix+rank
483    * to write the info logs.
484    * \param fileName [in] protype file name
485    */
486    void CServer::openInfoStream(const StdString& fileName)
487    {
488      std::filebuf* fb = m_infoStream.rdbuf();
489      openStream(fileName, ".out", fb);
490
491      info.write2File(fb);
492      report.write2File(fb);
493    }
494
495    //! Write the info logs to standard output
496    void CServer::openInfoStream()
497    {
498      info.write2StdOut();
499      report.write2StdOut();
500    }
501
502    //! Close the info logs file if it opens
503    void CServer::closeInfoStream()
504    {
505      if (m_infoStream.is_open()) m_infoStream.close();
506    }
507
508    /*!
509    * \brief Open a file stream to write the error log
510    * Open a file stream with a specific file name suffix+rank
511    * to write the error log.
512    * \param fileName [in] protype file name
513    */
514    void CServer::openErrorStream(const StdString& fileName)
515    {
516      std::filebuf* fb = m_errorStream.rdbuf();
517      openStream(fileName, ".err", fb);
518
519      error.write2File(fb);
520    }
521
522    //! Write the error log to standard error output
523    void CServer::openErrorStream()
524    {
525      error.write2StdErr();
526    }
527
528    //! Close the error log file if it opens
529    void CServer::closeErrorStream()
530    {
531      if (m_errorStream.is_open()) m_errorStream.close();
532    }
533
534    void CServer::launchServersRessource(MPI_Comm serverComm)
535    {
536      serversRessource_ = new CServersRessource(serverComm) ;
537    }
538
539    void  CServer::finalizeServersRessource(void) 
540    { 
541      delete serversRessource_; serversRessource_=nullptr ;
542    }
543}
Note: See TracBrowser for help on using the repository browser.