source: XIOS3/dev/XIOS_FILE_SERVICES/src/server.cpp @ 2453

Last change on this file since 2453 was 2453, checked in by ymipsl, 18 months ago

Implementation of files service on dev branch

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 19.4 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "mem_checker.hpp"
16#include "event_scheduler.hpp"
17#include "string_tools.hpp"
18#include "ressources_manager.hpp"
19#include "services_manager.hpp"
20#include "contexts_manager.hpp"
21#include "servers_ressource.hpp"
22#include "services.hpp"
23#include "pool_node.hpp"
24#include <cstdio>
25#include "workflow_graph.hpp"
26#include "release_static_allocation.hpp"
27#include <sys/stat.h>
28#include <unistd.h>
29
30
31
32namespace xios
33{
34    MPI_Comm CServer::intraComm_ ;
35    MPI_Comm CServer::serversComm_ ;
36    std::list<MPI_Comm> CServer::interCommLeft ;
37    std::list<MPI_Comm> CServer::interCommRight ;
38    std::list<MPI_Comm> CServer::contextInterComms;
39    std::list<MPI_Comm> CServer::contextIntraComms;
40    int CServer::serverLevel = 0 ;
41    int CServer::nbContexts = 0;
42    bool CServer::isRoot = false ;
43    int CServer::rank_ = INVALID_RANK;
44    StdOFStream CServer::m_infoStream;
45    StdOFStream CServer::m_errorStream;
46    map<string,CContext*> CServer::contextList ;
47    vector<int> CServer::sndServerGlobalRanks;
48    bool CServer::finished=false ;
49    bool CServer::is_MPI_Initialized ;
50    CEventScheduler* CServer::eventScheduler = 0;
51    CServersRessource* CServer::serversRessource_=nullptr ;
52    CThirdPartyDriver* CServer::driver_ =nullptr ;
53
54       
55    void CServer::initialize(void)
56    {
57     
58      MPI_Comm serverComm ;
59      int initialized ;
60      MPI_Initialized(&initialized) ;
61      if (initialized) is_MPI_Initialized=true ;
62      else is_MPI_Initialized=false ;
63      MPI_Comm globalComm=CXios::getGlobalComm() ;
64      /////////////////////////////////////////
65      ///////////// PART 1 ////////////////////
66      /////////////////////////////////////////
67      // don't use OASIS
68      if (!CXios::usingOasis)
69      {
70        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
71       
72        // split the global communicator
73        // get hash from all model to attribute a unique color (int) and then split to get client communicator
74        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
75         
76        int commRank, commSize ;
77        MPI_Comm_rank(globalComm,&commRank) ;
78        MPI_Comm_size(globalComm,&commSize) ;
79
80        std::hash<string> hashString ;
81        size_t hashServer=hashString(CXios::xiosCodeId) ;
82         
83        size_t* hashAll = new size_t[commSize] ;
84        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
85         
86        int color=0 ;
87        map<size_t,int> listHash ;
88        for(int i=0 ; i<=commSize ; i++) 
89          if (listHash.count(hashAll[i])==0) 
90          {
91            listHash[hashAll[i]]=color ;
92            color=color+1 ;
93          }
94        color=listHash[hashServer] ;
95        delete[] hashAll ;
96
97        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
98      }
99      else // using OASIS
100      {
101        if (!is_MPI_Initialized) driver_ = new CThirdPartyDriver();
102
103        driver_->getComponentCommunicator( serverComm );
104      }
105      MPI_Comm_dup(serverComm, &intraComm_);
106     
107      CTimer::get("XIOS").resume() ;
108      CTimer::get("XIOS server").resume() ;
109      CTimer::get("XIOS initialize").resume() ;
110 
111      /////////////////////////////////////////
112      ///////////// PART 2 ////////////////////
113      /////////////////////////////////////////
114     
115
116      // Create the XIOS communicator for every process which is related
117      // to XIOS, as well on client side as on server side
118      MPI_Comm xiosGlobalComm ;
119      string strIds=CXios::getin<string>("clients_code_id","") ;
120      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
121      if (strIds.empty())
122      {
123        // no code Ids given, suppose XIOS initialisation is global           
124        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
125        MPI_Comm splitComm,interComm ;
126        MPI_Comm_rank(globalComm,&commGlobalRank) ;
127        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
128        MPI_Comm_rank(splitComm,&commRank) ;
129        if (commRank==0) serverLeader=commGlobalRank ;
130        else serverLeader=0 ;
131        clientLeader=0 ;
132        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
133        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
134        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
135        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
136        CXios::setXiosComm(xiosGlobalComm) ;
137      }
138      else
139      {
140
141        xiosGlobalCommByFileExchange(serverComm) ;
142
143      }
144     
145      /////////////////////////////////////////
146      ///////////// PART 4 ////////////////////
147      //  create servers intra communicator  //
148      /////////////////////////////////////////
149     
150      int commRank ;
151      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
152      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
153     
154      CXios::setUsingServer() ;
155
156      /////////////////////////////////////////
157      ///////////// PART 5 ////////////////////
158      //       redirect files output         //
159      /////////////////////////////////////////
160     
161      CServer::openInfoStream(CXios::serverFile);
162      CServer::openErrorStream(CXios::serverFile);
163
164      CMemChecker::logMem( "CServer::initialize" );
165
166      /////////////////////////////////////////
167      ///////////// PART 4 ////////////////////
168      /////////////////////////////////////////
169
170      CXios::launchDaemonsManager(true) ;
171     
172      /////////////////////////////////////////
173      ///////////// PART 5 ////////////////////
174      /////////////////////////////////////////
175
176      // create the services
177
178      auto ressourcesManager=CXios::getRessourcesManager() ;
179      auto servicesManager=CXios::getServicesManager() ;
180      auto contextsManager=CXios::getContextsManager() ;
181      auto daemonsManager=CXios::getDaemonsManager() ;
182      auto serversRessource=CServer::getServersRessource() ;
183
184      int rank;
185      MPI_Comm_rank(intraComm_, &rank) ;
186      if (rank==0) isRoot=true;
187      else isRoot=false;
188
189      if (serversRessource->isServerLeader())
190      {
191        // creating pool
192        CPoolNodeGroup::get("xios","pool_definition")->solveDescInheritance(true) ;
193        vector<CPoolNode*> pools = CPoolNodeGroup::get("xios","pool_definition")->getAllChildren();
194        for(auto& pool : pools) pool->allocateRessources() ;
195       
196        int nbRessources = ressourcesManager->getFreeRessourcesSize() ;
197        if (nbRessources>0)
198        {
199          if (!CXios::usingServer2)
200          {
201            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
202            ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
203            servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ;
204            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ;
205          }
206          else
207          {
208            int nprocsServer = nbRessources*CXios::ratioServer2/100.;
209            int nprocsGatherer = nbRessources - nprocsServer ;
210         
211            int nbPoolsServer2 = CXios::nbPoolsServer2 ;
212            if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
213            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
214            ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
215            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
216            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ;
217            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ;
218          }
219        }
220//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
221      }
222/*
223      MPI_Request req ;
224      MPI_Status status ;
225      MPI_Ibarrier(xiosGlobalComm,&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
226      int ok=false ;
227      while (!ok)
228      {
229        daemonsManager->eventLoop() ;
230        MPI_Test(&req,&ok,&status) ;
231      }
232*/
233      CTimer::get("XIOS initialize").suspend() ;
234
235      /////////////////////////////////////////
236      ///////////// PART 5 ////////////////////
237      /////////////////////////////////////////
238      // loop on event loop
239
240      bool finished=false ;
241      CTimer::get("XIOS event loop").resume() ;
242
243      while (!finished)
244      {
245        finished=daemonsManager->eventLoop() ;
246      }
247      CTimer::get("XIOS event loop").suspend() ;
248
249      // Delete CContext
250      //CObjectTemplate<CContext>::cleanStaticDataStructure();
251    }
252
253
254
255
256
257    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
258    {
259       
260      MPI_Comm globalComm=CXios::getGlobalComm() ;
261      MPI_Comm xiosGlobalComm ;
262     
263      string strIds=CXios::getin<string>("clients_code_id","") ;
264      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
265     
266      int commRank, globalRank ;
267      MPI_Comm_rank(serverComm, &commRank) ;
268      MPI_Comm_rank(globalComm, &globalRank) ;
269      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
270
271      if (commRank==0) // if root process publish name
272      { 
273        std::ofstream ofs (serverFileName, std::ofstream::out);
274        ofs<<globalRank ;
275        ofs.close();
276      }
277       
278      vector<int> clientsRank(clientsCodeId.size()) ;
279      for(int i=0;i<clientsRank.size();i++)
280      {
281        std::ifstream ifs ;
282        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
283        struct stat buffer;
284        do {
285        } while( stat(fileName.c_str(), &buffer) != 0 );
286        sleep(1);
287        ifs.open(fileName, ifstream::in) ;
288        ifs>>clientsRank[i] ;
289        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
290        ifs.close() ; 
291      }
292
293      MPI_Comm intraComm ;
294      MPI_Comm_dup(serverComm,&intraComm) ;
295      MPI_Comm interComm ;
296      for(int i=0 ; i<clientsRank.size(); i++)
297      { 
298        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
299        interCommLeft.push_back(interComm) ;
300        MPI_Comm_free(&intraComm) ;
301        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
302      }
303      xiosGlobalComm=intraComm ; 
304      MPI_Barrier(xiosGlobalComm);
305      if (commRank==0) std::remove(serverFileName.c_str()) ;
306      MPI_Barrier(xiosGlobalComm);
307
308      CXios::setXiosComm(xiosGlobalComm) ;
309     
310    }
311
312
313    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
314    {
315        // untested, need to be tested on a true MPI-2 compliant library
316
317        // try to discover other client/server
318/*
319        // publish server name
320        char portName[MPI_MAX_PORT_NAME];
321        int ierr ;
322        int commRank ;
323        MPI_Comm_rank(serverComm, &commRank) ;
324       
325        if (commRank==0) // if root process publish name
326        { 
327          MPI_Open_port(MPI_INFO_NULL, portName);
328          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
329        }
330
331        MPI_Comm intraComm=serverComm ;
332        MPI_Comm interComm ;
333        for(int i=0 ; i<clientsCodeId.size(); i++)
334        { 
335          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
336          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
337        }
338*/     
339    }
340
341   /*!
342    * Root process is listening for an order sent by client to call "oasis_enddef".
343    * The root client of a compound send the order (tag 5). It is probed and received.
344    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
345    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
346    */
347   
348     void CServer::listenOasisEnddef(void)
349     {
350        int flag ;
351        MPI_Status status ;
352        list<MPI_Comm>::iterator it;
353        int msg ;
354        static int nbCompound=0 ;
355        int size ;
356        static bool sent=false ;
357        static MPI_Request* allRequests ;
358        static MPI_Status* allStatus ;
359
360
361        if (sent)
362        {
363          MPI_Comm_size(intraComm_,&size) ;
364          MPI_Testall(size,allRequests, &flag, allStatus) ;
365          if (flag==true)
366          {
367            delete [] allRequests ;
368            delete [] allStatus ;
369            sent=false ;
370          }
371        }
372       
373
374        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
375        {
376           MPI_Status status ;
377           traceOff() ;
378           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
379           traceOn() ;
380           if (flag==true)
381           {
382              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
383              nbCompound++ ;
384              if (nbCompound==interCommLeft.size())
385              {
386                MPI_Comm_size(intraComm_,&size) ;
387                allRequests= new MPI_Request[size] ;
388                allStatus= new MPI_Status[size] ;
389                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
390                sent=true ;
391              }
392           }
393        }
394}
395     
396   /*!
397    * Processes probes message from root process if oasis_enddef call must be done.
398    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
399    */
400     void CServer::listenRootOasisEnddef(void)
401     {
402       int flag ;
403       MPI_Status status ;
404       const int root=0 ;
405       int msg ;
406       static bool eventSent=false ;
407
408       if (eventSent)
409       {
410         boost::hash<string> hashString;
411         size_t hashId = hashString("oasis_enddef");
412         if (CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->queryEvent(0,hashId))
413         {
414           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->popEvent() ;
415           driver_->endSynchronizedDefinition() ;
416           eventSent=false ;
417         }
418       }
419         
420       traceOff() ;
421       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
422       traceOn() ;
423       if (flag==true)
424       {
425           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
426           boost::hash<string> hashString;
427           size_t hashId = hashString("oasis_enddef");
428           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->registerEvent(0,hashId);
429           eventSent=true ;
430       }
431     }
432
433    void CServer::finalize(void)
434    {
435      CTimer::get("XIOS").suspend() ;
436      CTimer::get("XIOS server").suspend() ;
437      delete eventScheduler ;
438
439      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
440        MPI_Comm_free(&(*it));
441
442      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
443        MPI_Comm_free(&(*it));
444
445        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
446          MPI_Comm_free(&(*it));
447
448//      MPI_Comm_free(&intraComm);
449      CXios::finalizeDaemonsManager();
450      finalizeServersRessource();
451     
452      CContext::removeAllContexts() ; // free memory for related context
453         
454      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
455
456      CMemChecker::logMem( "CServer::finalize", true );
457      if (!is_MPI_Initialized)
458      {
459        if (CXios::usingOasis) delete driver_;
460        else MPI_Finalize() ;
461      }
462      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
463      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
464      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
465      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
466      report(100)<<CMemChecker::getAllCumulatedMem()<<endl ;
467     
468      CWorkflowGraph::drawWorkFlowGraph_server();
469      xios::releaseStaticAllocation() ; // free memory from static allocation
470    }
471
472    /*!
473    * Open a file specified by a suffix and an extension and use it for the given file buffer.
474    * The file name will be suffix+rank+extension.
475    *
476    * \param fileName[in] protype file name
477    * \param ext [in] extension of the file
478    * \param fb [in/out] the file buffer
479    */
480    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
481    {
482      StdStringStream fileNameServer;
483      int numDigit = 0;
484      int commSize = 0;
485      int commRank ;
486      int id;
487     
488      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
489      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
490
491      while (commSize)
492      {
493        commSize /= 10;
494        ++numDigit;
495      }
496      id = commRank;
497
498      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
499      fb->open(fileNameServer.str().c_str(), std::ios::out);
500      if (!fb->is_open())
501        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
502              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
503    }
504
505    /*!
506    * \brief Open a file stream to write the info logs
507    * Open a file stream with a specific file name suffix+rank
508    * to write the info logs.
509    * \param fileName [in] protype file name
510    */
511    void CServer::openInfoStream(const StdString& fileName)
512    {
513      std::filebuf* fb = m_infoStream.rdbuf();
514      openStream(fileName, ".out", fb);
515
516      info.write2File(fb);
517      report.write2File(fb);
518    }
519
520    //! Write the info logs to standard output
521    void CServer::openInfoStream()
522    {
523      info.write2StdOut();
524      report.write2StdOut();
525    }
526
527    //! Close the info logs file if it opens
528    void CServer::closeInfoStream()
529    {
530      if (m_infoStream.is_open()) m_infoStream.close();
531    }
532
533    /*!
534    * \brief Open a file stream to write the error log
535    * Open a file stream with a specific file name suffix+rank
536    * to write the error log.
537    * \param fileName [in] protype file name
538    */
539    void CServer::openErrorStream(const StdString& fileName)
540    {
541      std::filebuf* fb = m_errorStream.rdbuf();
542      openStream(fileName, ".err", fb);
543
544      error.write2File(fb);
545    }
546
547    //! Write the error log to standard error output
548    void CServer::openErrorStream()
549    {
550      error.write2StdErr();
551    }
552
553    //! Close the error log file if it opens
554    void CServer::closeErrorStream()
555    {
556      if (m_errorStream.is_open()) m_errorStream.close();
557    }
558
559    void CServer::launchServersRessource(MPI_Comm serverComm)
560    {
561      serversRessource_ = new CServersRessource(serverComm) ;
562    }
563
564    void  CServer::finalizeServersRessource(void) 
565    { 
566      delete serversRessource_; serversRessource_=nullptr ;
567    }
568}
Note: See TracBrowser for help on using the repository browser.