source: XIOS/dev/dev_ym/XIOS_COUPLING/src/client.cpp @ 2238

Last change on this file since 2238 was 2238, checked in by ymipsl, 3 years ago

Bug fix in when split global communicator between clients and servers. Did'nt work with cyclic distribution of servers because it was assume than clients and servers partionning was contiguous
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 20.0 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "client.hpp"
5#include <boost/functional/hash.hpp>
6#include "type.hpp"
7#include "context.hpp"
8#include "context_client.hpp"
9#include "oasis_cinterface.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "buffer_client.hpp"
13#include "string_tools.hpp"
14#include "ressources_manager.hpp"
15#include "services_manager.hpp"
16#include <functional>
17#include <cstdio>
18#include "workflow_graph.hpp"
19
20namespace xios
21{
22
23    const double serverPublishDefaultTimeout=10;
24
25    MPI_Comm CClient::intraComm ;
26    MPI_Comm CClient::interComm ;
27    MPI_Comm CClient::clientsComm_ ;
28
29    std::list<MPI_Comm> CClient::contextInterComms;
30    int CClient::serverLeader ;
31    bool CClient::is_MPI_Initialized ;
32    int CClient::rank_ = INVALID_RANK;
33    StdOFStream CClient::m_infoStream;
34    StdOFStream CClient::m_errorStream;
35    CPoolRessource* CClient::poolRessource_=nullptr ;
36
37    MPI_Comm& CClient::getInterComm(void)   { return (interComm); }
38     
39///---------------------------------------------------------------
40/*!
41 * \fn void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
42 * Function creates intraComm (CClient::intraComm) for client group with id=codeId and interComm (CClient::interComm) between client and server groups.
43 * \param [in] codeId identity of context.
44 * \param [in/out] localComm local communicator.
45 * \param [in/out] returnComm (intra)communicator of client group.
46 */
47
48    void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
49    {
50   
51       MPI_Comm clientComm ;
52      // initialize MPI if not initialized
53      int initialized ;
54      MPI_Initialized(&initialized) ;
55      if (initialized) is_MPI_Initialized=true ;
56      else is_MPI_Initialized=false ;
57     
58      MPI_Comm globalComm=CXios::getGlobalComm() ;
59
60      /////////////////////////////////////////
61      ///////////// PART 1 ////////////////////
62      /////////////////////////////////////////
63     
64
65      // localComm isn't given
66      if (localComm == MPI_COMM_NULL)
67      {
68         
69        // don't use OASIS
70        if (!CXios::usingOasis)
71        {
72
73          if (!is_MPI_Initialized)
74          {
75            MPI_Init(NULL, NULL);
76          }
77          CTimer::get("XIOS").resume() ;
78          CTimer::get("XIOS init/finalize",false).resume() ;
79         
80          // split the global communicator
81          // get hash from all model to attribute a unique color (int) and then split to get client communicator
82          // every mpi process of globalComm (MPI_COMM_WORLD) must participate
83
84          int commRank, commSize ;
85          MPI_Comm_rank(globalComm,&commRank) ;
86          MPI_Comm_size(globalComm,&commSize) ;
87
88          std::hash<string> hashString ;
89          size_t hashClient=hashString(codeId) ;
90         
91          size_t* hashAll = new size_t[commSize] ;
92          MPI_Allgather(&hashClient,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
93         
94          int color=0 ;
95          map<size_t,int> listHash ;
96          for(int i=0 ; i<=commSize ; i++) 
97            if (listHash.count(hashAll[i])==0) 
98            {
99              listHash[hashAll[i]]=color ;
100              color=color+1 ;
101            }
102            color=listHash[hashClient] ;
103          delete[] hashAll ;
104
105          MPI_Comm_split(globalComm, color, commRank, &clientComm) ;
106        }
107        else // using oasis to split communicator
108        {
109          if (!is_MPI_Initialized) oasis_init(codeId) ;
110          oasis_get_localcomm(clientComm) ;
111        }
112      }
113      else // localComm is given
114      {
115        MPI_Comm_dup(localComm,&clientComm) ;
116      }
117     
118     
119      /////////////////////////////////////////
120      ///////////// PART 2 ////////////////////
121      /////////////////////////////////////////
122     
123
124      // Create the XIOS communicator for every process which is related
125      // to XIOS, as well on client side as on server side
126     
127      MPI_Comm xiosGlobalComm ;
128      string strIds=CXios::getin<string>("clients_code_id","") ;
129      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
130      if (strIds.empty())
131      {
132         // no code Ids given, suppose XIOS initialisation is global           
133         int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
134         MPI_Comm splitComm,interComm ;
135         MPI_Comm_rank(globalComm,&commGlobalRank) ;
136         MPI_Comm_split(globalComm, 0, commGlobalRank, &splitComm) ;
137         int splitCommSize, globalCommSize ;
138       
139         MPI_Comm_size(splitComm,&splitCommSize) ;
140         MPI_Comm_size(globalComm,&globalCommSize) ;
141         if (splitCommSize==globalCommSize) // no server
142         {
143           MPI_Comm_dup(globalComm,&xiosGlobalComm) ;
144           CXios::setXiosComm(xiosGlobalComm) ;
145         }
146         else
147         {
148           MPI_Comm_rank(splitComm,&commRank) ;
149           if (commRank==0) clientLeader=commGlobalRank ;
150           else clientLeader=0 ;
151           serverLeader=0 ;
152           MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
153           MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
154           MPI_Intercomm_create(splitComm, 0, globalComm, serverRemoteLeader,1341,&interComm) ;
155           MPI_Intercomm_merge(interComm,true,&xiosGlobalComm) ;
156           CXios::setXiosComm(xiosGlobalComm) ;
157         }
158      }
159      else
160      {
161
162        xiosGlobalCommByFileExchange(clientComm, codeId) ;
163     
164      }
165
166      int commRank ;
167      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
168      MPI_Comm_split(CXios::getXiosComm(),false,commRank, &clientsComm_) ;
169     
170      // is using server or not ?
171      int xiosCommSize, clientsCommSize ; 
172      MPI_Comm_size(CXios::getXiosComm(), &xiosCommSize) ;
173      MPI_Comm_size(clientsComm_, &clientsCommSize) ;
174      if (xiosCommSize==clientsCommSize) CXios::setUsingServer() ;
175      else CXios::setNotUsingServer() ;
176
177      /////////////////////////////////////////
178      ///////////// PART 3 ////////////////////
179      /////////////////////////////////////////
180     
181      CXios::launchDaemonsManager(false) ;
182      poolRessource_ = new CPoolRessource(clientComm, codeId) ;
183
184      /////////////////////////////////////////
185      ///////////// PART 4 ////////////////////
186      /////////////////////////////////////////     
187     
188      returnComm = clientComm ;
189    }
190
191
192    void CClient::xiosGlobalCommByFileExchange(MPI_Comm clientComm, const string& codeId)
193    {
194 
195      MPI_Comm globalComm=CXios::getGlobalComm() ;
196      MPI_Comm xiosGlobalComm ;
197
198      string strIds=CXios::getin<string>("clients_code_id","") ;
199      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
200
201      int commRank, globalRank, clientRank, serverRank ;
202      MPI_Comm_rank(clientComm, &commRank) ;
203      MPI_Comm_rank(globalComm, &globalRank) ;
204      string clientFileName("__xios_publisher::"+codeId+"__to_remove__") ;
205           
206      int error ;
207
208      if (commRank==0) // if root process publish name
209      { 
210        std::ofstream ofs (clientFileName, std::ofstream::out);
211        ofs<<globalRank ;
212        ofs.close();
213       
214  // get server root rank
215
216        std::ifstream ifs ;
217        string fileName=("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
218     
219        double timeout = CXios::getin<double>("server_puplish_timeout",serverPublishDefaultTimeout) ;
220        double time ;
221         
222        do
223        {
224          CTimer::get("server_publish_timeout").resume() ; 
225          ifs.clear() ;
226          ifs.open(fileName, std::ifstream::in) ;
227          CTimer::get("server_publish_timeout").suspend() ;
228        } while (ifs.fail() && CTimer::get("server_publish_timeout").getCumulatedTime()<timeout) ;
229       
230        if (CTimer::get("server_publish_timeout").getCumulatedTime()>=timeout || ifs.fail())
231        {
232          ifs.clear() ;
233          ifs.close() ;
234          ifs.clear() ;
235          error=true ;           
236        }
237        else 
238        {
239          ifs>>serverRank ;
240          ifs.close() ;
241          error=false ;
242        } 
243
244      } 
245     
246      MPI_Bcast(&error,1,MPI_INT,0,clientComm) ;
247     
248      if (error==false)  // you have a server
249      {
250        MPI_Comm intraComm ;
251        MPI_Comm_dup(clientComm,&intraComm) ;
252        MPI_Comm interComm ;
253       
254        int pos=0 ;
255        for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ;
256
257        bool high=true ;
258        for(int i=pos ; i<clientsCodeId.size(); i++)
259        { 
260          MPI_Intercomm_create(intraComm, 0, globalComm, serverRank, 3141, &interComm);
261          MPI_Comm_free(&intraComm) ;
262          MPI_Intercomm_merge(interComm,high, &intraComm ) ;
263          high=false ;
264        }
265        xiosGlobalComm=intraComm ;
266      }
267      else  // no server detected
268      {
269        vector<int> clientsRank(clientsCodeId.size()) ;
270       
271        if (commRank==0)
272        { 
273          for(int i=0;i<clientsRank.size();i++)
274          {
275            std::ifstream ifs ;
276            string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
277            do
278            {
279              ifs.clear() ;
280              ifs.open(fileName, std::ifstream::in) ;
281            } while (ifs.fail()) ;
282            ifs>>clientsRank[i] ;
283            ifs.close() ;
284          }
285        }
286         
287        int client ;
288        MPI_Comm intraComm ;
289        MPI_Comm_dup(clientComm,&intraComm) ;
290        MPI_Comm interComm ;
291       
292        int pos=0 ;
293        for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ;
294       
295        bool high=true ;
296        for(int i=pos+1 ; i<clientsCodeId.size(); i++)
297        { 
298          if (codeId==clientsCodeId[0])   // first model play the server rule
299          {         
300            MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
301            MPI_Intercomm_merge(interComm,false, &intraComm ) ;
302          }
303          else
304          {         
305            MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[0], 3141, &interComm);
306            MPI_Intercomm_merge(interComm,high, &intraComm ) ;
307            high=false ;
308          }
309        }
310        xiosGlobalComm=intraComm ;
311      }
312
313      MPI_Barrier(xiosGlobalComm);
314      if (commRank==0) std::remove(clientFileName.c_str()) ;         
315      MPI_Barrier(xiosGlobalComm);
316 
317      CXios::setXiosComm(xiosGlobalComm) ;
318
319      MPI_Comm commUnfree ;
320      MPI_Comm_dup(clientComm, &commUnfree ) ;
321 
322    }
323
324// to check on other architecture
325    void CClient::xiosGlobalCommByPublishing(MPI_Comm clientComm, const string& codeId)
326    {
327
328      // untested. need to be developped an a true MPI compliant library
329
330/*
331        // try to discover other client/server
332        // do you have a xios server ?
333        char portName[MPI_MAX_PORT_NAME];
334        int ierr ;
335        int commRank ;
336        MPI_Comm_rank(clientComm,&commRank) ;
337
338        MPI_Barrier(globalComm) ;
339        if (commRank==0)
340        {
341             
342          MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN );
343          const char* serviceName=CXios::xiosCodeId.c_str() ;
344          ierr=MPI_Lookup_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
345          MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL );
346        }
347        ierr=MPI_SUCCESS ;
348        MPI_Bcast(&ierr,1,MPI_INT,0,clientComm) ;
349
350        if (ierr==MPI_SUCCESS) // you have a server
351        { 
352          MPI_Comm intraComm=clientComm ;
353          MPI_Comm interComm ;
354          for(int i=0 ; i<clientsCodeId.size(); i++)
355          { 
356            MPI_Comm_connect(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
357            MPI_Intercomm_merge(interComm, true, &intraComm ) ;
358          }
359          xiosGlobalComm=intraComm ;
360        }
361        else  // you don't have any server
362        {
363          if (codeId==clientsCodeId[0]) // first code will publish his name
364          {
365
366            if (commRank==0) // if root process publish name
367            { 
368              MPI_Open_port(MPI_INFO_NULL, portName);
369              MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
370            }
371
372            MPI_Comm intraComm=clientComm ;
373            MPI_Comm interComm ;
374            for(int i=0 ; i<clientsCodeId.size()-1; i++)
375            { 
376              MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
377              MPI_Intercomm_merge(interComm,false, &intraComm ) ;
378            }
379          }
380          else  // other clients are connecting to the first one
381          {
382            if (commRank==0)
383            {
384
385              MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN );
386              ierr=MPI_Lookup_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
387              MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL );
388             }
389
390            MPI_Bcast(&ierr,1,MPI_INT,0,clientComm) ;
391
392            if (ierr==MPI_SUCCESS) // you can connect
393            { 
394              MPI_Comm intraComm=clientComm ;
395              MPI_Comm interComm ;
396              for(int i=0 ; i<clientsCodeId.size()-1; i++)
397              { 
398                MPI_Comm_connect(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
399                MPI_Intercomm_merge(interComm, true, &intraComm ) ;
400              }
401              xiosGlobalComm=intraComm ;
402            }
403          }
404        } 
405      */
406    }
407
408
409///---------------------------------------------------------------
410/*!
411 * \fn void CClient::registerContext(const string& id, MPI_Comm contextComm)
412 * \brief Sends a request to create a context to server. Creates client/server contexts.
413 * \param [in] id id of context.
414 * \param [in] contextComm.
415 * Function is only called by client.
416 */
417    void CClient::registerContext(const string& id, MPI_Comm contextComm)
418    {
419      int commRank, commSize ;
420      MPI_Comm_rank(contextComm,&commRank) ;
421      MPI_Comm_size(contextComm,&commSize) ;
422
423      getPoolRessource()->createService(contextComm, id, 0, CServicesManager::CLIENT, 1) ;
424      getPoolRessource()->createService(contextComm, CXios::defaultServerId, 0, CServicesManager::IO_SERVER, 1) ;
425
426      if (commRank==0) while (!CXios::getServicesManager()->hasService(getPoolRessource()->getId(), id, 0)) { CXios::getDaemonsManager()->eventLoop();}
427
428      if (commRank==0) CXios::getContextsManager()->createServerContext(getPoolRessource()->getId(), id, 0, id) ;
429      int type=CServicesManager::CLIENT ;
430      string name = CXios::getContextsManager()->getServerContextName(getPoolRessource()->getId(), id, 0, type, id) ;
431      double time ;
432      double lastTime=0 ;
433      double latency=1e-2 ;
434      bool out=false ;
435      while (!out)
436      {
437        time=MPI_Wtime() ;
438        if (time-lastTime > latency) 
439        {
440          out=CXios::getContextsManager()->hasContext(name, contextComm);
441          lastTime=time ;
442        }
443        if (!out) CXios::getDaemonsManager()->eventLoop() ;
444      }
445
446    }
447
448
449
450/*!
451 * \fn void CClient::callOasisEnddef(void)
452 * \brief Send the order to the servers to call "oasis_enddef". It must be done by each compound of models before calling oasis_enddef on client side
453 * Function is only called by client.
454 */
455    void CClient::callOasisEnddef(void)
456    {
457      bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ;
458      if (!oasisEnddef) ERROR("void CClient::callOasisEnddef(void)", <<"Function xios_oasis_enddef called but variable <call_oasis_enddef> is set to false."<<endl
459                                                                     <<"Variable <call_oasis_enddef> must be set to true"<<endl) ;
460      if (CXios::isServer)
461      // Attached mode
462      {
463        // nothing to do   
464      }
465      else
466      {
467        int rank ;
468        int msg=0 ;
469
470        MPI_Comm_rank(intraComm,&rank) ;
471        if (rank==0) 
472        {
473          MPI_Send(&msg,1,MPI_INT,0,5,interComm) ; // tags oasis_endded = 5
474        }
475
476      }
477    }
478
479    void CClient::finalize(void)
480    {
481     
482      MPI_Barrier(clientsComm_) ;
483      int commRank ;
484      MPI_Comm_rank(clientsComm_, &commRank) ;
485      if (commRank==0) CXios::getRessourcesManager()->finalize() ;
486     
487      CTimer::get("XIOS init/finalize",false).suspend() ;
488      CTimer::get("XIOS").suspend() ;
489     
490      CXios::finalizeDaemonsManager() ;
491
492      if (!is_MPI_Initialized)
493      {
494        if (CXios::usingOasis) oasis_finalize();
495        else MPI_Finalize() ;
496      }
497     
498      info(20) << "Client side context is finalized"<<endl ;
499      report(0) <<" Performance report : Whole time from XIOS init and finalize: "<< CTimer::get("XIOS init/finalize").getCumulatedTime()<<" s"<<endl ;
500      report(0) <<" Performance report : total time spent for XIOS : "<< CTimer::get("XIOS").getCumulatedTime()<<" s"<<endl ;
501      report(0)<< " Performance report : time spent for waiting free buffer : "<< CTimer::get("Blocking time").getCumulatedTime()<<" s"<<endl ;
502      report(0)<< " Performance report : Ratio : "<< CTimer::get("Blocking time").getCumulatedTime()/CTimer::get("XIOS init/finalize").getCumulatedTime()*100.<<" %"<<endl ;
503      report(0)<< " Performance report : This ratio must be close to zero. Otherwise it may be usefull to increase buffer size or numbers of server"<<endl ;
504//      report(0)<< " Memory report : Current buffer_size : "<<CXios::bufferSize<<endl ;
505      report(0)<< " Memory report : Minimum buffer size required : " << CClientBuffer::maxRequestSize << " bytes" << endl ;
506      report(0)<< " Memory report : increasing it by a factor will increase performance, depending of the volume of data wrote in file at each time step of the file"<<endl ;
507      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
508   
509      CWorkflowGraph::drawWorkFlowGraph_client();
510    }
511   
512
513    /*!
514    * Return global rank without oasis and current rank in model intraComm in case of oasis
515    */
516   int CClient::getRank()
517   {
518     return rank_;
519   }
520
521    /*!
522    * Open a file specified by a suffix and an extension and use it for the given file buffer.
523    * The file name will be suffix+rank+extension.
524    *
525    * \param fileName[in] protype file name
526    * \param ext [in] extension of the file
527    * \param fb [in/out] the file buffer
528    */
529    void CClient::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
530    {
531      StdStringStream fileNameClient;
532      int numDigit = 0;
533      int size = 0;
534      int rank;
535      MPI_Comm_size(CXios::getGlobalComm(), &size);
536      MPI_Comm_rank(CXios::getGlobalComm(),&rank);
537      while (size)
538      {
539        size /= 10;
540        ++numDigit;
541      }
542
543      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << rank << ext;
544
545      fb->open(fileNameClient.str().c_str(), std::ios::out);
546      if (!fb->is_open())
547        ERROR("void CClient::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
548              << std::endl << "Can not open <" << fileNameClient.str() << "> file to write the client log(s).");
549    }
550
551    /*!
552    * \brief Open a file stream to write the info logs
553    * Open a file stream with a specific file name suffix+rank
554    * to write the info logs.
555    * \param fileName [in] protype file name
556    */
557    void CClient::openInfoStream(const StdString& fileName)
558    {
559      std::filebuf* fb = m_infoStream.rdbuf();
560      openStream(fileName, ".out", fb);
561
562      info.write2File(fb);
563      report.write2File(fb);
564    }
565
566    //! Write the info logs to standard output
567    void CClient::openInfoStream()
568    {
569      info.write2StdOut();
570      report.write2StdOut();
571    }
572
573    //! Close the info logs file if it opens
574    void CClient::closeInfoStream()
575    {
576      if (m_infoStream.is_open()) m_infoStream.close();
577    }
578
579    /*!
580    * \brief Open a file stream to write the error log
581    * Open a file stream with a specific file name suffix+rank
582    * to write the error log.
583    * \param fileName [in] protype file name
584    */
585    void CClient::openErrorStream(const StdString& fileName)
586    {
587      std::filebuf* fb = m_errorStream.rdbuf();
588      openStream(fileName, ".err", fb);
589
590      error.write2File(fb);
591    }
592
593    //! Write the error log to standard error output
594    void CClient::openErrorStream()
595    {
596      error.write2StdErr();
597    }
598
599    //! Close the error log file if it opens
600    void CClient::closeErrorStream()
601    {
602      if (m_errorStream.is_open()) m_errorStream.close();
603    }
604}
Note: See TracBrowser for help on using the repository browser.