source: XIOS3/branches/xios-3.0-beta/src/server.cpp @ 2463

Last change on this file since 2463 was 2463, checked in by jderouillat, 17 months ago

On servers, move first call of CTimer after the first step of servers initialisations (IntelMPI considers MPI_Wtime as a MPI call, need to be after MPI_Init)

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 18.3 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "mem_checker.hpp"
16#include "event_scheduler.hpp"
17#include "string_tools.hpp"
18#include "ressources_manager.hpp"
19#include "services_manager.hpp"
20#include "contexts_manager.hpp"
21#include "servers_ressource.hpp"
22#include "services.hpp"
23#include <cstdio>
24#include "workflow_graph.hpp"
25#include "release_static_allocation.hpp"
26#include <sys/stat.h>
27#include <unistd.h>
28
29
30
31namespace xios
32{
33    MPI_Comm CServer::intraComm_ ;
34    MPI_Comm CServer::serversComm_ ;
35    std::list<MPI_Comm> CServer::interCommLeft ;
36    std::list<MPI_Comm> CServer::interCommRight ;
37    std::list<MPI_Comm> CServer::contextInterComms;
38    std::list<MPI_Comm> CServer::contextIntraComms;
39    int CServer::serverLevel = 0 ;
40    int CServer::nbContexts = 0;
41    bool CServer::isRoot = false ;
42    int CServer::rank_ = INVALID_RANK;
43    StdOFStream CServer::m_infoStream;
44    StdOFStream CServer::m_errorStream;
45    map<string,CContext*> CServer::contextList ;
46    vector<int> CServer::sndServerGlobalRanks;
47    bool CServer::finished=false ;
48    bool CServer::is_MPI_Initialized ;
49    CEventScheduler* CServer::eventScheduler = 0;
50    CServersRessource* CServer::serversRessource_=nullptr ;
51    CThirdPartyDriver* CServer::driver_ =nullptr ;
52
53       
54    void CServer::initialize(void)
55    {
56     
57      MPI_Comm serverComm ;
58      int initialized ;
59      MPI_Initialized(&initialized) ;
60      if (initialized) is_MPI_Initialized=true ;
61      else is_MPI_Initialized=false ;
62      MPI_Comm globalComm=CXios::getGlobalComm() ;
63      /////////////////////////////////////////
64      ///////////// PART 1 ////////////////////
65      /////////////////////////////////////////
66      // don't use OASIS
67      if (!CXios::usingOasis)
68      {
69        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
70       
71        // split the global communicator
72        // get hash from all model to attribute a unique color (int) and then split to get client communicator
73        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
74         
75        int commRank, commSize ;
76        MPI_Comm_rank(globalComm,&commRank) ;
77        MPI_Comm_size(globalComm,&commSize) ;
78
79        std::hash<string> hashString ;
80        size_t hashServer=hashString(CXios::xiosCodeId) ;
81         
82        size_t* hashAll = new size_t[commSize] ;
83        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
84         
85        int color=0 ;
86        map<size_t,int> listHash ;
87        for(int i=0 ; i<=commSize ; i++) 
88          if (listHash.count(hashAll[i])==0) 
89          {
90            listHash[hashAll[i]]=color ;
91            color=color+1 ;
92          }
93        color=listHash[hashServer] ;
94        delete[] hashAll ;
95
96        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
97      }
98      else // using OASIS
99      {
100        if (!is_MPI_Initialized) driver_ = new CThirdPartyDriver();
101
102        driver_->getComponentCommunicator( serverComm );
103      }
104      MPI_Comm_dup(serverComm, &intraComm_);
105     
106      CTimer::get("XIOS").resume() ;
107      CTimer::get("XIOS server").resume() ;
108      CTimer::get("XIOS initialize").resume() ;
109 
110      /////////////////////////////////////////
111      ///////////// PART 2 ////////////////////
112      /////////////////////////////////////////
113     
114
115      // Create the XIOS communicator for every process which is related
116      // to XIOS, as well on client side as on server side
117      MPI_Comm xiosGlobalComm ;
118      string strIds=CXios::getin<string>("clients_code_id","") ;
119      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
120      if (strIds.empty())
121      {
122        // no code Ids given, suppose XIOS initialisation is global           
123        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
124        MPI_Comm splitComm,interComm ;
125        MPI_Comm_rank(globalComm,&commGlobalRank) ;
126        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
127        MPI_Comm_rank(splitComm,&commRank) ;
128        if (commRank==0) serverLeader=commGlobalRank ;
129        else serverLeader=0 ;
130        clientLeader=0 ;
131        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
132        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
133        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
134        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
135        CXios::setXiosComm(xiosGlobalComm) ;
136      }
137      else
138      {
139
140        xiosGlobalCommByFileExchange(serverComm) ;
141
142      }
143     
144      /////////////////////////////////////////
145      ///////////// PART 4 ////////////////////
146      //  create servers intra communicator  //
147      /////////////////////////////////////////
148     
149      int commRank ;
150      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
151      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
152     
153      CXios::setUsingServer() ;
154
155      /////////////////////////////////////////
156      ///////////// PART 5 ////////////////////
157      //       redirect files output         //
158      /////////////////////////////////////////
159     
160      CServer::openInfoStream(CXios::serverFile);
161      CServer::openErrorStream(CXios::serverFile);
162
163      CMemChecker::logMem( "CServer::initialize" );
164
165      /////////////////////////////////////////
166      ///////////// PART 4 ////////////////////
167      /////////////////////////////////////////
168
169      CXios::launchDaemonsManager(true) ;
170     
171      /////////////////////////////////////////
172      ///////////// PART 5 ////////////////////
173      /////////////////////////////////////////
174
175      // create the services
176
177      auto ressourcesManager=CXios::getRessourcesManager() ;
178      auto servicesManager=CXios::getServicesManager() ;
179      auto contextsManager=CXios::getContextsManager() ;
180      auto daemonsManager=CXios::getDaemonsManager() ;
181      auto serversRessource=CServer::getServersRessource() ;
182
183      int rank;
184      MPI_Comm_rank(intraComm_, &rank) ;
185      if (rank==0) isRoot=true;
186      else isRoot=false;
187
188      if (serversRessource->isServerLeader())
189      {
190        int nbRessources = ressourcesManager->getRessourcesSize() ;
191        if (!CXios::usingServer2)
192        {
193          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
194          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ;
195        }
196        else
197        {
198          int nprocsServer = nbRessources*CXios::ratioServer2/100.;
199          int nprocsGatherer = nbRessources - nprocsServer ;
200         
201          int nbPoolsServer2 = CXios::nbPoolsServer2 ;
202          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
203          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
204          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
205          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ;
206
207
208        }
209        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
210      }
211      CTimer::get("XIOS initialize").suspend() ;
212
213      /////////////////////////////////////////
214      ///////////// PART 5 ////////////////////
215      /////////////////////////////////////////
216      // loop on event loop
217
218      bool finished=false ;
219      CTimer::get("XIOS event loop").resume() ;
220
221      while (!finished)
222      {
223        finished=daemonsManager->eventLoop() ;
224      }
225      CTimer::get("XIOS event loop").suspend() ;
226
227      // Delete CContext
228      //CObjectTemplate<CContext>::cleanStaticDataStructure();
229    }
230
231
232
233
234
235    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
236    {
237       
238      MPI_Comm globalComm=CXios::getGlobalComm() ;
239      MPI_Comm xiosGlobalComm ;
240     
241      string strIds=CXios::getin<string>("clients_code_id","") ;
242      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
243     
244      int commRank, globalRank ;
245      MPI_Comm_rank(serverComm, &commRank) ;
246      MPI_Comm_rank(globalComm, &globalRank) ;
247      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
248
249      if (commRank==0) // if root process publish name
250      { 
251        std::ofstream ofs (serverFileName, std::ofstream::out);
252        ofs<<globalRank ;
253        ofs.close();
254      }
255       
256      vector<int> clientsRank(clientsCodeId.size()) ;
257      for(int i=0;i<clientsRank.size();i++)
258      {
259        std::ifstream ifs ;
260        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
261        struct stat buffer;
262        do {
263        } while( stat(fileName.c_str(), &buffer) != 0 );
264        sleep(1);
265        ifs.open(fileName, ifstream::in) ;
266        ifs>>clientsRank[i] ;
267        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
268        ifs.close() ; 
269      }
270
271      MPI_Comm intraComm ;
272      MPI_Comm_dup(serverComm,&intraComm) ;
273      MPI_Comm interComm ;
274      for(int i=0 ; i<clientsRank.size(); i++)
275      { 
276        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
277        interCommLeft.push_back(interComm) ;
278        MPI_Comm_free(&intraComm) ;
279        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
280      }
281      xiosGlobalComm=intraComm ; 
282      MPI_Barrier(xiosGlobalComm);
283      if (commRank==0) std::remove(serverFileName.c_str()) ;
284      MPI_Barrier(xiosGlobalComm);
285
286      CXios::setXiosComm(xiosGlobalComm) ;
287     
288    }
289
290
291    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
292    {
293        // untested, need to be tested on a true MPI-2 compliant library
294
295        // try to discover other client/server
296/*
297        // publish server name
298        char portName[MPI_MAX_PORT_NAME];
299        int ierr ;
300        int commRank ;
301        MPI_Comm_rank(serverComm, &commRank) ;
302       
303        if (commRank==0) // if root process publish name
304        { 
305          MPI_Open_port(MPI_INFO_NULL, portName);
306          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
307        }
308
309        MPI_Comm intraComm=serverComm ;
310        MPI_Comm interComm ;
311        for(int i=0 ; i<clientsCodeId.size(); i++)
312        { 
313          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
314          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
315        }
316*/     
317    }
318
319   /*!
320    * Root process is listening for an order sent by client to call "oasis_enddef".
321    * The root client of a compound send the order (tag 5). It is probed and received.
322    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
323    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
324    */
325   
326     void CServer::listenOasisEnddef(void)
327     {
328        int flag ;
329        MPI_Status status ;
330        list<MPI_Comm>::iterator it;
331        int msg ;
332        static int nbCompound=0 ;
333        int size ;
334        static bool sent=false ;
335        static MPI_Request* allRequests ;
336        static MPI_Status* allStatus ;
337
338
339        if (sent)
340        {
341          MPI_Comm_size(intraComm_,&size) ;
342          MPI_Testall(size,allRequests, &flag, allStatus) ;
343          if (flag==true)
344          {
345            delete [] allRequests ;
346            delete [] allStatus ;
347            sent=false ;
348          }
349        }
350       
351
352        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
353        {
354           MPI_Status status ;
355           traceOff() ;
356           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
357           traceOn() ;
358           if (flag==true)
359           {
360              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
361              nbCompound++ ;
362              if (nbCompound==interCommLeft.size())
363              {
364                MPI_Comm_size(intraComm_,&size) ;
365                allRequests= new MPI_Request[size] ;
366                allStatus= new MPI_Status[size] ;
367                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
368                sent=true ;
369              }
370           }
371        }
372}
373     
374   /*!
375    * Processes probes message from root process if oasis_enddef call must be done.
376    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
377    */
378     void CServer::listenRootOasisEnddef(void)
379     {
380       int flag ;
381       MPI_Status status ;
382       const int root=0 ;
383       int msg ;
384       static bool eventSent=false ;
385
386       if (eventSent)
387       {
388         boost::hash<string> hashString;
389         size_t hashId = hashString("oasis_enddef");
390         if (CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->queryEvent(0,hashId))
391         {
392           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->popEvent() ;
393           driver_->endSynchronizedDefinition() ;
394           eventSent=false ;
395         }
396       }
397         
398       traceOff() ;
399       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
400       traceOn() ;
401       if (flag==true)
402       {
403           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
404           boost::hash<string> hashString;
405           size_t hashId = hashString("oasis_enddef");
406           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->registerEvent(0,hashId);
407           eventSent=true ;
408       }
409     }
410
411    void CServer::finalize(void)
412    {
413      CTimer::get("XIOS").suspend() ;
414      CTimer::get("XIOS server").suspend() ;
415      delete eventScheduler ;
416
417      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
418        MPI_Comm_free(&(*it));
419
420      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
421        MPI_Comm_free(&(*it));
422
423        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
424          MPI_Comm_free(&(*it));
425
426//      MPI_Comm_free(&intraComm);
427      CXios::finalizeDaemonsManager();
428      finalizeServersRessource();
429     
430      CContext::removeAllContexts() ; // free memory for related context
431         
432      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
433
434      CMemChecker::logMem( "CServer::finalize", true );
435      if (!is_MPI_Initialized)
436      {
437        if (CXios::usingOasis) delete driver_;
438        else MPI_Finalize() ;
439      }
440      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
441      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
442      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
443      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
444      report(100)<<CMemChecker::getAllCumulatedMem()<<endl ;
445     
446      CWorkflowGraph::drawWorkFlowGraph_server();
447      xios::releaseStaticAllocation() ; // free memory from static allocation
448    }
449
450    /*!
451    * Open a file specified by a suffix and an extension and use it for the given file buffer.
452    * The file name will be suffix+rank+extension.
453    *
454    * \param fileName[in] protype file name
455    * \param ext [in] extension of the file
456    * \param fb [in/out] the file buffer
457    */
458    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
459    {
460      StdStringStream fileNameServer;
461      int numDigit = 0;
462      int commSize = 0;
463      int commRank ;
464      int id;
465     
466      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
467      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
468
469      while (commSize)
470      {
471        commSize /= 10;
472        ++numDigit;
473      }
474      id = commRank;
475
476      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
477      fb->open(fileNameServer.str().c_str(), std::ios::out);
478      if (!fb->is_open())
479        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
480              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
481    }
482
483    /*!
484    * \brief Open a file stream to write the info logs
485    * Open a file stream with a specific file name suffix+rank
486    * to write the info logs.
487    * \param fileName [in] protype file name
488    */
489    void CServer::openInfoStream(const StdString& fileName)
490    {
491      std::filebuf* fb = m_infoStream.rdbuf();
492      openStream(fileName, ".out", fb);
493
494      info.write2File(fb);
495      report.write2File(fb);
496    }
497
498    //! Write the info logs to standard output
499    void CServer::openInfoStream()
500    {
501      info.write2StdOut();
502      report.write2StdOut();
503    }
504
505    //! Close the info logs file if it opens
506    void CServer::closeInfoStream()
507    {
508      if (m_infoStream.is_open()) m_infoStream.close();
509    }
510
511    /*!
512    * \brief Open a file stream to write the error log
513    * Open a file stream with a specific file name suffix+rank
514    * to write the error log.
515    * \param fileName [in] protype file name
516    */
517    void CServer::openErrorStream(const StdString& fileName)
518    {
519      std::filebuf* fb = m_errorStream.rdbuf();
520      openStream(fileName, ".err", fb);
521
522      error.write2File(fb);
523    }
524
525    //! Write the error log to standard error output
526    void CServer::openErrorStream()
527    {
528      error.write2StdErr();
529    }
530
531    //! Close the error log file if it opens
532    void CServer::closeErrorStream()
533    {
534      if (m_errorStream.is_open()) m_errorStream.close();
535    }
536
537    void CServer::launchServersRessource(MPI_Comm serverComm)
538    {
539      serversRessource_ = new CServersRessource(serverComm) ;
540    }
541
542    void  CServer::finalizeServersRessource(void) 
543    { 
544      delete serversRessource_; serversRessource_=nullptr ;
545    }
546}
Note: See TracBrowser for help on using the repository browser.