source: XIOS/dev/dev_ym/XIOS_COUPLING/src/server.cpp @ 2335

Last change on this file since 2335 was 2335, checked in by jderouillat, 2 years ago

Introduced a new service (named defaultServicesId which contains all gatherers and all IO servers) to manage oasis_enddef through the different levels of servers. OASIS integration is moved in a dedicated virtual class CThirdPartyDriver whose an instance is owned by CServer.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 18.0 KB
RevLine 
[490]1#include "globalScopeData.hpp"
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "server.hpp"
[983]5#include "client.hpp"
[300]6#include "type.hpp"
7#include "context.hpp"
[352]8#include "object_template.hpp"
[300]9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[492]15#include "event_scheduler.hpp"
[1587]16#include "string_tools.hpp"
[1761]17#include "ressources_manager.hpp"
18#include "services_manager.hpp"
19#include "contexts_manager.hpp"
20#include "servers_ressource.hpp"
[2333]21#include "services.hpp"
[1761]22#include <cstdio>
[2146]23#include "workflow_graph.hpp"
[2274]24#include "release_static_allocation.hpp"
[2333]25#include <sys/stat.h>
26#include <unistd.h>
[300]27
[1761]28
[2274]29
[335]30namespace xios
[490]31{
[2332]32    MPI_Comm CServer::intraComm_ ;
[1761]33    MPI_Comm CServer::serversComm_ ;
[1639]34    std::list<MPI_Comm> CServer::interCommLeft ;
35    std::list<MPI_Comm> CServer::interCommRight ;
36    std::list<MPI_Comm> CServer::contextInterComms;
37    std::list<MPI_Comm> CServer::contextIntraComms;
[1021]38    int CServer::serverLevel = 0 ;
[1148]39    int CServer::nbContexts = 0;
[983]40    bool CServer::isRoot = false ;
[1077]41    int CServer::rank_ = INVALID_RANK;
[490]42    StdOFStream CServer::m_infoStream;
[523]43    StdOFStream CServer::m_errorStream;
[490]44    map<string,CContext*> CServer::contextList ;
[1152]45    vector<int> CServer::sndServerGlobalRanks;
[300]46    bool CServer::finished=false ;
47    bool CServer::is_MPI_Initialized ;
[597]48    CEventScheduler* CServer::eventScheduler = 0;
[1761]49    CServersRessource* CServer::serversRessource_=nullptr ;
[2335]50    CThirdPartyDriver* CServer::driver_ =nullptr ;
[983]51
[1765]52       
[1761]53    void CServer::initialize(void)
54    {
55     
56      MPI_Comm serverComm ;
57      int initialized ;
58      MPI_Initialized(&initialized) ;
59      if (initialized) is_MPI_Initialized=true ;
60      else is_MPI_Initialized=false ;
61      MPI_Comm globalComm=CXios::getGlobalComm() ;
62
63      /////////////////////////////////////////
64      ///////////// PART 1 ////////////////////
65      /////////////////////////////////////////
66      // don't use OASIS
67      if (!CXios::usingOasis)
68      {
69        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
70       
71        // split the global communicator
72        // get hash from all model to attribute a unique color (int) and then split to get client communicator
73        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
74         
75        int commRank, commSize ;
76        MPI_Comm_rank(globalComm,&commRank) ;
77        MPI_Comm_size(globalComm,&commSize) ;
78
79        std::hash<string> hashString ;
80        size_t hashServer=hashString(CXios::xiosCodeId) ;
81         
82        size_t* hashAll = new size_t[commSize] ;
[2242]83        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
[1761]84         
85        int color=0 ;
[2242]86        map<size_t,int> listHash ;
87        for(int i=0 ; i<=commSize ; i++) 
88          if (listHash.count(hashAll[i])==0) 
[1761]89          {
[2242]90            listHash[hashAll[i]]=color ;
[1761]91            color=color+1 ;
92          }
[2242]93        color=listHash[hashServer] ;
[1761]94        delete[] hashAll ;
95
96        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
97      }
98      else // using OASIS
99      {
[2335]100        if (!is_MPI_Initialized) driver_ = new CThirdPartyDriver();
[1761]101
[2335]102        driver_->getComponentCommunicator( serverComm );
[1761]103      }
[2334]104      MPI_Comm_dup(serverComm, &intraComm_);
105     
[2290]106      CTimer::get("XIOS").resume() ;
107      CTimer::get("XIOS initialize").resume() ;
[1761]108 
109      /////////////////////////////////////////
110      ///////////// PART 2 ////////////////////
111      /////////////////////////////////////////
112     
113
114      // Create the XIOS communicator for every process which is related
115      // to XIOS, as well on client side as on server side
116      MPI_Comm xiosGlobalComm ;
117      string strIds=CXios::getin<string>("clients_code_id","") ;
118      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
119      if (strIds.empty())
120      {
121        // no code Ids given, suppose XIOS initialisation is global           
122        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
123        MPI_Comm splitComm,interComm ;
124        MPI_Comm_rank(globalComm,&commGlobalRank) ;
125        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
126        MPI_Comm_rank(splitComm,&commRank) ;
127        if (commRank==0) serverLeader=commGlobalRank ;
128        else serverLeader=0 ;
129        clientLeader=0 ;
130        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
131        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
132        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
133        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
134        CXios::setXiosComm(xiosGlobalComm) ;
135      }
136      else
137      {
138
139        xiosGlobalCommByFileExchange(serverComm) ;
140
141      }
142     
143      /////////////////////////////////////////
144      ///////////// PART 4 ////////////////////
145      //  create servers intra communicator  //
146      /////////////////////////////////////////
147     
148      int commRank ;
149      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
150      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
151     
152      CXios::setUsingServer() ;
153
154      /////////////////////////////////////////
155      ///////////// PART 5 ////////////////////
156      //       redirect files output         //
157      /////////////////////////////////////////
158     
159      CServer::openInfoStream(CXios::serverFile);
160      CServer::openErrorStream(CXios::serverFile);
161
162      /////////////////////////////////////////
163      ///////////// PART 4 ////////////////////
164      /////////////////////////////////////////
165
166      CXios::launchDaemonsManager(true) ;
167     
168      /////////////////////////////////////////
169      ///////////// PART 5 ////////////////////
170      /////////////////////////////////////////
171
172      // create the services
173
174      auto ressourcesManager=CXios::getRessourcesManager() ;
175      auto servicesManager=CXios::getServicesManager() ;
176      auto contextsManager=CXios::getContextsManager() ;
177      auto daemonsManager=CXios::getDaemonsManager() ;
178      auto serversRessource=CServer::getServersRessource() ;
179
[2333]180      int rank;
181      MPI_Comm_rank(intraComm_, &rank) ;
182      if (rank==0) isRoot=true;
183      else isRoot=false;
184
[1761]185      if (serversRessource->isServerLeader())
186      {
187        int nbRessources = ressourcesManager->getRessourcesSize() ;
188        if (!CXios::usingServer2)
189        {
190          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
191          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ;
192        }
193        else
194        {
195          int nprocsServer = nbRessources*CXios::ratioServer2/100.;
196          int nprocsGatherer = nbRessources - nprocsServer ;
197         
198          int nbPoolsServer2 = CXios::nbPoolsServer2 ;
199          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
200          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
201          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
202          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ;
[2335]203
204
[1761]205        }
[2335]206        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
[1761]207      }
[2242]208      CTimer::get("XIOS initialize").suspend() ;
[1761]209
210      /////////////////////////////////////////
211      ///////////// PART 5 ////////////////////
212      /////////////////////////////////////////
213      // loop on event loop
214
215      bool finished=false ;
[2242]216      CTimer::get("XIOS event loop").resume() ;
217
[1761]218      while (!finished)
219      {
220        finished=daemonsManager->eventLoop() ;
221      }
[2242]222      CTimer::get("XIOS event loop").suspend() ;
[2243]223
224      // Delete CContext
[2274]225      //CObjectTemplate<CContext>::cleanStaticDataStructure();
[1761]226    }
227
228
229
230
231
232    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
233    {
234       
235      MPI_Comm globalComm=CXios::getGlobalComm() ;
236      MPI_Comm xiosGlobalComm ;
237     
238      string strIds=CXios::getin<string>("clients_code_id","") ;
239      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
240     
241      int commRank, globalRank ;
242      MPI_Comm_rank(serverComm, &commRank) ;
243      MPI_Comm_rank(globalComm, &globalRank) ;
244      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
245
246      if (commRank==0) // if root process publish name
247      { 
248        std::ofstream ofs (serverFileName, std::ofstream::out);
249        ofs<<globalRank ;
250        ofs.close();
251      }
252       
253      vector<int> clientsRank(clientsCodeId.size()) ;
254      for(int i=0;i<clientsRank.size();i++)
255      {
256        std::ifstream ifs ;
257        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
[2333]258        struct stat buffer;
259        do {
260        } while( stat(fileName.c_str(), &buffer) != 0 );
261        sleep(1);
262        ifs.open(fileName, ifstream::in) ;
[1761]263        ifs>>clientsRank[i] ;
[2333]264        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
[1761]265        ifs.close() ; 
266      }
267
268      MPI_Comm intraComm ;
269      MPI_Comm_dup(serverComm,&intraComm) ;
270      MPI_Comm interComm ;
271      for(int i=0 ; i<clientsRank.size(); i++)
272      { 
273        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
[2333]274        interCommLeft.push_back(interComm) ;
[1761]275        MPI_Comm_free(&intraComm) ;
276        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
277      }
278      xiosGlobalComm=intraComm ; 
279      MPI_Barrier(xiosGlobalComm);
280      if (commRank==0) std::remove(serverFileName.c_str()) ;
281      MPI_Barrier(xiosGlobalComm);
282
283      CXios::setXiosComm(xiosGlobalComm) ;
284     
285    }
286
287
288    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
289    {
290        // untested, need to be tested on a true MPI-2 compliant library
291
292        // try to discover other client/server
293/*
294        // publish server name
295        char portName[MPI_MAX_PORT_NAME];
296        int ierr ;
297        int commRank ;
298        MPI_Comm_rank(serverComm, &commRank) ;
299       
300        if (commRank==0) // if root process publish name
301        { 
302          MPI_Open_port(MPI_INFO_NULL, portName);
303          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
304        }
305
306        MPI_Comm intraComm=serverComm ;
307        MPI_Comm interComm ;
308        for(int i=0 ; i<clientsCodeId.size(); i++)
309        { 
310          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
311          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
312        }
313*/     
314    }
315
[2333]316   /*!
317    * Root process is listening for an order sent by client to call "oasis_enddef".
318    * The root client of a compound send the order (tag 5). It is probed and received.
319    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
320    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
321    */
322   
323     void CServer::listenOasisEnddef(void)
324     {
325        int flag ;
326        MPI_Status status ;
327        list<MPI_Comm>::iterator it;
328        int msg ;
329        static int nbCompound=0 ;
330        int size ;
331        static bool sent=false ;
332        static MPI_Request* allRequests ;
333        static MPI_Status* allStatus ;
[490]334
[2333]335
336        if (sent)
337        {
338          MPI_Comm_size(intraComm_,&size) ;
339          MPI_Testall(size,allRequests, &flag, allStatus) ;
340          if (flag==true)
341          {
342            delete [] allRequests ;
343            delete [] allStatus ;
344            sent=false ;
345          }
346        }
347       
348
349        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
350        {
351           MPI_Status status ;
352           traceOff() ;
353           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
354           traceOn() ;
355           if (flag==true)
356           {
357              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
358              nbCompound++ ;
359              if (nbCompound==interCommLeft.size())
360              {
361                MPI_Comm_size(intraComm_,&size) ;
362                allRequests= new MPI_Request[size] ;
363                allStatus= new MPI_Status[size] ;
364                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
365                sent=true ;
366              }
367           }
368        }
369}
370     
371   /*!
372    * Processes probes message from root process if oasis_enddef call must be done.
373    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
374    */
375     void CServer::listenRootOasisEnddef(void)
376     {
377       int flag ;
378       MPI_Status status ;
379       const int root=0 ;
380       int msg ;
381       static bool eventSent=false ;
382
383       if (eventSent)
384       {
385         boost::hash<string> hashString;
386         size_t hashId = hashString("oasis_enddef");
[2335]387         if (CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->queryEvent(0,hashId))
[2333]388         {
[2335]389           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->popEvent() ;
390           driver_->endSynchronizedDefinition() ;
[2333]391           eventSent=false ;
392         }
393       }
394         
395       traceOff() ;
396       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
397       traceOn() ;
398       if (flag==true)
399       {
400           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
401           boost::hash<string> hashString;
402           size_t hashId = hashString("oasis_enddef");
[2335]403           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->registerEvent(0,hashId);
[2333]404           eventSent=true ;
405       }
406     }
407
[300]408    void CServer::finalize(void)
409    {
[361]410      CTimer::get("XIOS").suspend() ;
[697]411     
[492]412      delete eventScheduler ;
[655]413
[1639]414      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
415        MPI_Comm_free(&(*it));
[983]416
[1639]417      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
418        MPI_Comm_free(&(*it));
[1071]419
[1639]420        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
421          MPI_Comm_free(&(*it));
[992]422
[1761]423//      MPI_Comm_free(&intraComm);
[2266]424      CXios::finalizeDaemonsManager();
[2274]425      finalizeServersRessource();
[1764]426     
[2274]427      CContext::removeAllContexts() ; // free memory for related context
428         
[2310]429      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
430
[300]431      if (!is_MPI_Initialized)
[490]432      {
[2335]433        if (CXios::usingOasis) delete driver_;
[1639]434        else MPI_Finalize() ;
[300]435      }
[347]436      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
437      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
438      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
[1158]439      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
[2274]440     
[2146]441      CWorkflowGraph::drawWorkFlowGraph_server();
[2274]442      xios::releaseStaticAllocation() ; // free memory from static allocation
[300]443    }
[490]444
[523]445    /*!
446    * Open a file specified by a suffix and an extension and use it for the given file buffer.
447    * The file name will be suffix+rank+extension.
448    *
449    * \param fileName[in] protype file name
450    * \param ext [in] extension of the file
451    * \param fb [in/out] the file buffer
452    */
453    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
454    {
[1761]455      StdStringStream fileNameServer;
[523]456      int numDigit = 0;
[1761]457      int commSize = 0;
458      int commRank ;
[1021]459      int id;
[1761]460     
461      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
462      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
463
464      while (commSize)
[523]465      {
[1761]466        commSize /= 10;
[523]467        ++numDigit;
468      }
[1761]469      id = commRank;
[497]470
[1761]471      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
472      fb->open(fileNameServer.str().c_str(), std::ios::out);
[523]473      if (!fb->is_open())
474        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
[1761]475              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
[523]476    }
[490]477
[523]478    /*!
479    * \brief Open a file stream to write the info logs
480    * Open a file stream with a specific file name suffix+rank
481    * to write the info logs.
482    * \param fileName [in] protype file name
483    */
484    void CServer::openInfoStream(const StdString& fileName)
485    {
486      std::filebuf* fb = m_infoStream.rdbuf();
487      openStream(fileName, ".out", fb);
[490]488
[523]489      info.write2File(fb);
490      report.write2File(fb);
491    }
[490]492
[523]493    //! Write the info logs to standard output
494    void CServer::openInfoStream()
495    {
496      info.write2StdOut();
497      report.write2StdOut();
498    }
[490]499
[523]500    //! Close the info logs file if it opens
501    void CServer::closeInfoStream()
502    {
503      if (m_infoStream.is_open()) m_infoStream.close();
504    }
505
506    /*!
507    * \brief Open a file stream to write the error log
508    * Open a file stream with a specific file name suffix+rank
509    * to write the error log.
510    * \param fileName [in] protype file name
511    */
512    void CServer::openErrorStream(const StdString& fileName)
513    {
514      std::filebuf* fb = m_errorStream.rdbuf();
515      openStream(fileName, ".err", fb);
516
517      error.write2File(fb);
518    }
519
520    //! Write the error log to standard error output
521    void CServer::openErrorStream()
522    {
523      error.write2StdErr();
524    }
525
526    //! Close the error log file if it opens
527    void CServer::closeErrorStream()
528    {
529      if (m_errorStream.is_open()) m_errorStream.close();
530    }
[1761]531
532    void CServer::launchServersRessource(MPI_Comm serverComm)
533    {
534      serversRessource_ = new CServersRessource(serverComm) ;
535    }
[2274]536
537    void  CServer::finalizeServersRessource(void) 
538    { 
539      delete serversRessource_; serversRessource_=nullptr ;
540    }
[300]541}
Note: See TracBrowser for help on using the repository browser.