source: XIOS/dev/dev_ym/XIOS_SERVICES/src/client.cpp @ 1764

Last change on this file since 1764 was 1764, checked in by ymipsl, 5 years ago

Some Update on XIOS services
Seems to work on Irène for :

  • first level of servers
  • fisrt + second level of servers
  • attached mode

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 29.4 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "client.hpp"
5#include <boost/functional/hash.hpp>
6#include "type.hpp"
7#include "context.hpp"
8#include "context_client.hpp"
9#include "oasis_cinterface.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "buffer_client.hpp"
13#include "string_tools.hpp"
14#include "ressources_manager.hpp"
15#include "services_manager.hpp"
16#include <functional>
17#include <cstdio>
18
19
20namespace xios
21{
22
23    const double serverPublishDefaultTimeout=10;
24
25    MPI_Comm CClient::intraComm ;
26    MPI_Comm CClient::interComm ;
27    MPI_Comm CClient::clientsComm_ ;
28
29    std::list<MPI_Comm> CClient::contextInterComms;
30    int CClient::serverLeader ;
31    bool CClient::is_MPI_Initialized ;
32    int CClient::rank_ = INVALID_RANK;
33    StdOFStream CClient::m_infoStream;
34    StdOFStream CClient::m_errorStream;
35    CPoolRessource* CClient::poolRessource_=nullptr ;
36
37    MPI_Comm& CClient::getInterComm(void)   { return (interComm); }
38     
39///---------------------------------------------------------------
40/*!
41 * \fn void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
42 * Function creates intraComm (CClient::intraComm) for client group with id=codeId and interComm (CClient::interComm) between client and server groups.
43 * \param [in] codeId identity of context.
44 * \param [in/out] localComm local communicator.
45 * \param [in/out] returnComm (intra)communicator of client group.
46 */
47
48    void CClient::initRessources(void)
49    {
50
51 /*     
52      int commRank;
53      MPI_Comm_rank(CXios::globalComm,&commRank) ;
54      if (commRank==0)
55      {
56        ressources.createPool("ioserver1",ressources.getRessourcesSize()/2) ;
57      }
58      else if (commRank==1)
59      {
60        ressources.createPool("ioserver2",ressources.getRessourcesSize()/2) ;
61      }
62  */
63    }
64
65    void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
66    {
67   
68       MPI_Comm clientComm ;
69      // initialize MPI if not initialized
70      int initialized ;
71      MPI_Initialized(&initialized) ;
72      if (initialized) is_MPI_Initialized=true ;
73      else is_MPI_Initialized=false ;
74     
75      MPI_Comm globalComm=CXios::getGlobalComm() ;
76
77      /////////////////////////////////////////
78      ///////////// PART 1 ////////////////////
79      /////////////////////////////////////////
80     
81
82      // localComm isn't given
83      if (localComm == MPI_COMM_NULL)
84      {
85         
86        // don't use OASIS
87        if (!CXios::usingOasis)
88        {
89
90          if (!is_MPI_Initialized)
91          {
92            MPI_Init(NULL, NULL);
93          }
94          CTimer::get("XIOS").resume() ;
95          CTimer::get("XIOS init/finalize",false).resume() ;
96         
97          // split the global communicator
98          // get hash from all model to attribute a unique color (int) and then split to get client communicator
99          // every mpi process of globalComm (MPI_COMM_WORLD) must participate
100
101          int commRank, commSize ;
102          MPI_Comm_rank(globalComm,&commRank) ;
103          MPI_Comm_size(globalComm,&commSize) ;
104
105          std::hash<string> hashString ;
106          size_t hashClient=hashString(codeId) ;
107         
108          size_t* hashAll = new size_t[commSize] ;
109          MPI_Allgather(&hashClient,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ;
110         
111          int color=0 ;
112          set<size_t> listHash ;
113          for(int i=0 ; i<=commRank ; i++) 
114            if (listHash.count(hashAll[i])==0)
115            {
116              listHash.insert(hashAll[i]) ;
117              color=color+1 ;
118            }
119          delete[] hashAll ;
120
121          MPI_Comm_split(globalComm, color, commRank, &clientComm) ;
122        }
123        else // using oasis to split communicator
124        {
125          if (!is_MPI_Initialized) oasis_init(codeId) ;
126          oasis_get_localcomm(clientComm) ;
127        }
128      }
129      else // localComm is given
130      {
131        MPI_Comm_dup(localComm,&clientComm) ;
132      }
133     
134     
135      /////////////////////////////////////////
136      ///////////// PART 2 ////////////////////
137      /////////////////////////////////////////
138     
139
140      // Create the XIOS communicator for every process which is related
141      // to XIOS, as well on client side as on server side
142     
143      MPI_Comm xiosGlobalComm ;
144      string strIds=CXios::getin<string>("clients_code_id","") ;
145      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
146      if (strIds.empty())
147      {
148         // no code Ids given, suppose XIOS initialisation is global           
149         int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
150         MPI_Comm splitComm,interComm ;
151         MPI_Comm_rank(globalComm,&commGlobalRank) ;
152         MPI_Comm_split(globalComm, 0, commGlobalRank, &splitComm) ;
153         int splitCommSize, globalCommSize ;
154       
155         MPI_Comm_size(splitComm,&splitCommSize) ;
156         MPI_Comm_size(globalComm,&globalCommSize) ;
157         if (splitCommSize==globalCommSize) // no server
158         {
159           MPI_Comm_dup(globalComm,&xiosGlobalComm) ;
160           CXios::setXiosComm(xiosGlobalComm) ;
161         }
162         else
163         {
164           MPI_Comm_rank(splitComm,&commRank) ;
165           if (commRank==0) clientLeader=commGlobalRank ;
166           else clientLeader=0 ;
167           serverLeader=0 ;
168           MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
169           MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
170           MPI_Intercomm_create(splitComm, 0, globalComm, serverRemoteLeader,1341,&interComm) ;
171           MPI_Intercomm_merge(interComm,true,&xiosGlobalComm) ;
172           CXios::setXiosComm(xiosGlobalComm) ;
173         }
174      }
175      else
176      {
177
178        xiosGlobalCommByFileExchange(clientComm, codeId) ;
179     
180      }
181
182      int commRank ;
183      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
184      MPI_Comm_split(CXios::getXiosComm(),false,commRank, &clientsComm_) ;
185     
186      // is using server or not ?
187      int xiosCommSize, clientsCommSize ; 
188      MPI_Comm_size(CXios::getXiosComm(), &xiosCommSize) ;
189      MPI_Comm_size(clientsComm_, &clientsCommSize) ;
190      if (xiosCommSize==clientsCommSize) CXios::setUsingServer() ;
191      else CXios::setNotUsingServer() ;
192
193
194      CXios::setGlobalRegistry(new CRegistry(clientsComm_)) ;
195      /////////////////////////////////////////
196      ///////////// PART 3 ////////////////////
197      /////////////////////////////////////////
198     
199      CXios::launchDaemonsManager(false) ;
200      poolRessource_ = new CPoolRessource(clientComm, codeId) ;
201
202      /////////////////////////////////////////
203      ///////////// PART 4 ////////////////////
204      /////////////////////////////////////////     
205     
206      // create the services
207/*
208      int commRank ;
209      MPI_Comm_rank(clientComm,&commRank) ;
210      auto contextsManager=CXios::getContextsManager() ;
211   
212      if (commRank==0)
213      {
214        contextsManager->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, codeId) ;
215      }
216     
217      MPI_Comm interComm ;
218
219      contextsManager->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, codeId, clientComm, interComm) ;
220*/ 
221/*      while (true)
222      {
223
224      }
225*/     
226      returnComm = clientComm ;
227    }
228
229
230    void CClient::xiosGlobalCommByFileExchange(MPI_Comm clientComm, const string& codeId)
231    {
232 
233      MPI_Comm globalComm=CXios::getGlobalComm() ;
234      MPI_Comm xiosGlobalComm ;
235
236      string strIds=CXios::getin<string>("clients_code_id","") ;
237      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
238
239      int commRank, globalRank, clientRank, serverRank ;
240      MPI_Comm_rank(clientComm, &commRank) ;
241      MPI_Comm_rank(globalComm, &globalRank) ;
242      string clientFileName("__xios_publisher::"+codeId+"__to_remove__") ;
243           
244      int error ;
245
246      if (commRank==0) // if root process publish name
247      { 
248        std::ofstream ofs (clientFileName, std::ofstream::out);
249        ofs<<globalRank ;
250        ofs.close();
251       
252  // get server root rank
253
254        std::ifstream ifs ;
255        string fileName=("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
256     
257        double timeout = CXios::getin<double>("server_puplish_timeout",serverPublishDefaultTimeout) ;
258        double time ;
259         
260        do
261        {
262          CTimer::get("server_publish_timeout").resume() ; 
263          ifs.clear() ;
264          ifs.open(fileName, std::ifstream::in) ;
265          CTimer::get("server_publish_timeout").suspend() ;
266        } while (ifs.fail() && CTimer::get("server_publish_timeout").getCumulatedTime()<timeout) ;
267       
268        if (CTimer::get("server_publish_timeout").getCumulatedTime()>=timeout || ifs.fail())
269        {
270          ifs.clear() ;
271          ifs.close() ;
272          ifs.clear() ;
273          error=true ;           
274        }
275        else 
276        {
277          ifs>>serverRank ;
278          ifs.close() ;
279          error=false ;
280        } 
281
282      } 
283     
284      MPI_Bcast(&error,1,MPI_INT,0,clientComm) ;
285     
286      if (error==false)  // you have a server
287      {
288        MPI_Comm intraComm ;
289        MPI_Comm_dup(clientComm,&intraComm) ;
290        MPI_Comm interComm ;
291       
292        int pos=0 ;
293        for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ;
294
295        bool high=true ;
296        for(int i=pos ; i<clientsCodeId.size(); i++)
297        { 
298          MPI_Intercomm_create(intraComm, 0, globalComm, serverRank, 3141, &interComm);
299          MPI_Comm_free(&intraComm) ;
300          MPI_Intercomm_merge(interComm,high, &intraComm ) ;
301          high=false ;
302        }
303        xiosGlobalComm=intraComm ;
304      }
305      else  // no server detected
306      {
307        vector<int> clientsRank(clientsCodeId.size()) ;
308       
309        if (commRank==0)
310        { 
311          for(int i=0;i<clientsRank.size();i++)
312          {
313            std::ifstream ifs ;
314            string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
315            do
316            {
317              ifs.clear() ;
318              ifs.open(fileName, std::ifstream::in) ;
319            } while (ifs.fail()) ;
320            ifs>>clientsRank[i] ;
321            ifs.close() ;
322          }
323        }
324         
325        int client ;
326        MPI_Comm intraComm ;
327        MPI_Comm_dup(clientComm,&intraComm) ;
328        MPI_Comm interComm ;
329       
330        int pos=0 ;
331        for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ;
332       
333        bool high=true ;
334        for(int i=pos+1 ; i<clientsCodeId.size(); i++)
335        { 
336          if (codeId==clientsCodeId[0])   // first model play the server rule
337          {         
338            MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
339            MPI_Intercomm_merge(interComm,false, &intraComm ) ;
340          }
341          else
342          {         
343            MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[0], 3141, &interComm);
344            MPI_Intercomm_merge(interComm,high, &intraComm ) ;
345            high=false ;
346          }
347        }
348        xiosGlobalComm=intraComm ;
349      }
350
351      MPI_Barrier(xiosGlobalComm);
352      if (commRank==0) std::remove(clientFileName.c_str()) ;         
353      MPI_Barrier(xiosGlobalComm);
354 
355      CXios::setXiosComm(xiosGlobalComm) ;
356
357      MPI_Comm commUnfree ;
358      MPI_Comm_dup(clientComm, &commUnfree ) ;
359 
360    }
361
362    void CClient::xiosGlobalCommByPublishing(MPI_Comm clientComm, const string& codeId)
363    {
364
365      // untested. need to be developped an a true MPI compliant library
366
367/*
368        // try to discover other client/server
369        // do you have a xios server ?
370        char portName[MPI_MAX_PORT_NAME];
371        int ierr ;
372        int commRank ;
373        MPI_Comm_rank(clientComm,&commRank) ;
374
375        MPI_Barrier(globalComm) ;
376        if (commRank==0)
377        {
378             
379          MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN );
380          const char* serviceName=CXios::xiosCodeId.c_str() ;
381          ierr=MPI_Lookup_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
382          MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL );
383        }
384        ierr=MPI_SUCCESS ;
385        MPI_Bcast(&ierr,1,MPI_INT,0,clientComm) ;
386
387        if (ierr==MPI_SUCCESS) // you have a server
388        { 
389          MPI_Comm intraComm=clientComm ;
390          MPI_Comm interComm ;
391          for(int i=0 ; i<clientsCodeId.size(); i++)
392          { 
393            MPI_Comm_connect(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
394            MPI_Intercomm_merge(interComm, true, &intraComm ) ;
395          }
396          xiosGlobalComm=intraComm ;
397        }
398        else  // you don't have any server
399        {
400          if (codeId==clientsCodeId[0]) // first code will publish his name
401          {
402
403            if (commRank==0) // if root process publish name
404            { 
405              MPI_Open_port(MPI_INFO_NULL, portName);
406              MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
407            }
408
409            MPI_Comm intraComm=clientComm ;
410            MPI_Comm interComm ;
411            for(int i=0 ; i<clientsCodeId.size()-1; i++)
412            { 
413              MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
414              MPI_Intercomm_merge(interComm,false, &intraComm ) ;
415            }
416          }
417          else  // other clients are connecting to the first one
418          {
419            if (commRank==0)
420            {
421
422              MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN );
423              ierr=MPI_Lookup_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
424              MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL );
425             }
426
427            MPI_Bcast(&ierr,1,MPI_INT,0,clientComm) ;
428
429            if (ierr==MPI_SUCCESS) // you can connect
430            { 
431              MPI_Comm intraComm=clientComm ;
432              MPI_Comm interComm ;
433              for(int i=0 ; i<clientsCodeId.size()-1; i++)
434              { 
435                MPI_Comm_connect(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
436                MPI_Intercomm_merge(interComm, true, &intraComm ) ;
437              }
438              xiosGlobalComm=intraComm ;
439            }
440          }
441        } 
442      */
443    }
444
445    void CClient::initialize_old(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
446    {
447      int initialized ;
448      MPI_Initialized(&initialized) ;
449      if (initialized) is_MPI_Initialized=true ;
450      else is_MPI_Initialized=false ;
451      int rank ;
452     
453      CXios::launchRessourcesManager(false) ;
454      CXios::launchServicesManager( false) ;
455      CXios::launchContextsManager(false) ;
456
457      initRessources() ;
458// don't use OASIS
459      if (!CXios::usingOasis)
460      {
461// localComm isn't given
462        if (localComm == MPI_COMM_NULL)
463        {
464          if (!is_MPI_Initialized)
465          {
466            MPI_Init(NULL, NULL);
467          }
468          CTimer::get("XIOS").resume() ;
469          CTimer::get("XIOS init/finalize",false).resume() ;
470          boost::hash<string> hashString ;
471
472          unsigned long hashClient=hashString(codeId) ;
473          unsigned long hashServer=hashString(CXios::xiosCodeId) ;
474          unsigned long* hashAll ;
475          int size ;
476          int myColor ;
477          int i,c ;
478          MPI_Comm newComm ;
479
480          MPI_Comm_size(CXios::globalComm,&size) ;
481          MPI_Comm_rank(CXios::globalComm,&rank_);
482
483          hashAll=new unsigned long[size] ;
484
485          MPI_Allgather(&hashClient,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
486
487          map<unsigned long, int> colors ;
488          map<unsigned long, int> leaders ;
489
490          for(i=0,c=0;i<size;i++)
491          {
492            if (colors.find(hashAll[i])==colors.end())
493            {
494              colors[hashAll[i]] =c ;
495              leaders[hashAll[i]]=i ;
496              c++ ;
497            }
498          }
499
500          // Verify whether we are on server mode or not
501          CXios::setNotUsingServer();
502          for (i=0; i < size; ++i)
503          {
504            if (hashServer == hashAll[i])
505            {
506              CXios::setUsingServer();
507              break;
508            }
509          }
510
511          myColor=colors[hashClient];
512          MPI_Comm_split(CXios::globalComm,myColor,rank_,&intraComm) ;
513
514          if (CXios::usingServer)
515          {
516            int clientLeader=leaders[hashClient] ;
517            serverLeader=leaders[hashServer] ;
518            int intraCommSize, intraCommRank ;
519            MPI_Comm_size(intraComm,&intraCommSize) ;
520            MPI_Comm_rank(intraComm,&intraCommRank) ;
521            info(50)<<"intercommCreate::client "<<rank_<<" intraCommSize : "<<intraCommSize
522                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< serverLeader<<endl ;
523             MPI_Intercomm_create(intraComm, 0, CXios::globalComm, serverLeader, 0, &interComm) ;
524             //rank_ = intraCommRank;
525          }
526          else
527          {
528            MPI_Comm_dup(intraComm,&interComm) ;
529          }
530          delete [] hashAll ;
531        }
532        // localComm argument is given
533        else
534        {
535          if (CXios::usingServer)
536          {
537            //ERROR("void CClient::initialize(const string& codeId,MPI_Comm& localComm,MPI_Comm& returnComm)", << " giving a local communictor is not compatible with using server mode") ;
538          }
539          else
540          {
541            MPI_Comm_dup(localComm,&intraComm) ;
542            MPI_Comm_dup(intraComm,&interComm) ;
543          }
544        }
545      }
546      // using OASIS
547      else
548      {
549        // localComm isn't given
550        if (localComm == MPI_COMM_NULL)
551        {
552          if (!is_MPI_Initialized) oasis_init(codeId) ;
553          oasis_get_localcomm(localComm) ;
554        }
555        MPI_Comm_dup(localComm,&intraComm) ;
556
557        CTimer::get("XIOS").resume() ;
558        CTimer::get("XIOS init/finalize",false).resume() ;
559
560        if (CXios::usingServer)
561        {
562          MPI_Status status ;
563          MPI_Comm_rank(intraComm,&rank_) ;
564
565          oasis_get_intercomm(interComm,CXios::xiosCodeId) ;
566          if (rank_==0) MPI_Recv(&serverLeader,1, MPI_INT, 0, 0, interComm, &status) ;
567          MPI_Bcast(&serverLeader,1,MPI_INT,0,intraComm) ;
568        }
569        else MPI_Comm_dup(intraComm,&interComm) ;
570      }
571
572      MPI_Comm_dup(intraComm,&returnComm) ;
573    }
574
575
576
577    void CClient::registerContext(const string& id, MPI_Comm contextComm)
578    {
579      int commRank, commSize ;
580      MPI_Comm_rank(contextComm,&commRank) ;
581      MPI_Comm_size(contextComm,&commSize) ;
582
583      getPoolRessource()->createService(contextComm, id, 0, CServicesManager::CLIENT, 1) ;
584      getPoolRessource()->createService(contextComm, CXios::defaultServerId, 0, CServicesManager::IO_SERVER, 1) ;
585
586      if (commRank==0) while (!CXios::getServicesManager()->hasService(getPoolRessource()->getId(), id, 0)) { CXios::getDaemonsManager()->eventLoop();}
587
588      if (commRank==0) CXios::getContextsManager()->createServerContext(getPoolRessource()->getId(), id, 0, id) ;
589      int type=CServicesManager::CLIENT ;
590      string name = CXios::getContextsManager()->getServerContextName(getPoolRessource()->getId(), id, 0, type, id) ;
591      while (!CXios::getContextsManager()->hasContext(name, contextComm) )
592      {
593        CXios::getDaemonsManager()->eventLoop() ;
594      }
595
596/*     
597
598      CContext::setCurrent(id) ;
599      CContext* context=CContext::create(id);
600     
601      // register the new client side context to the contexts manager
602      if (commRank==0)
603      {
604        MPI_Comm_rank(CXios::getXiosComm(),&commRank) ;
605        SRegisterContextInfo contextInfo ;
606        contextInfo.serviceType=CServicesManager::CLIENT ;
607        contextInfo.partitionId=0 ;
608        contextInfo.leader=commRank ;
609        contextInfo.size=commSize ;
610        CXios::getContextsManager()->registerContext(id, contextInfo) ;
611      }
612      context->initClient(contextComm) ;
613*/ 
614    }
615
616
617///---------------------------------------------------------------
618/*!
619 * \fn void CClient::registerContext(const string& id, MPI_Comm contextComm)
620 * \brief Sends a request to create a context to server. Creates client/server contexts.
621 * \param [in] id id of context.
622 * \param [in] contextComm.
623 * Function is only called by client.
624 */
625    void CClient::registerContext_old(const string& id, MPI_Comm contextComm)
626    {
627      CContext::setCurrent(id) ;
628      CContext* context=CContext::create(id);
629      StdString idServer(id);
630      idServer += "_server";
631
632      if (CXios::isServer && !context->hasServer)
633      // Attached mode
634      {
635        MPI_Comm contextInterComm ;
636        MPI_Comm_dup(contextComm,&contextInterComm) ;
637        CContext* contextServer = CContext::create(idServer);
638
639        // Firstly, initialize context on client side
640        context->initClient(contextComm,contextInterComm, contextServer);
641
642        // Secondly, initialize context on server side
643        contextServer->initServer(contextComm,contextInterComm, context);
644
645        // Finally, we should return current context to context client
646        CContext::setCurrent(id);
647
648        contextInterComms.push_back(contextInterComm);
649      }
650      else
651      {
652        int size,rank,globalRank ;
653        size_t message_size ;
654        int leaderRank ;
655        MPI_Comm contextInterComm ;
656
657        MPI_Comm_size(contextComm,&size) ;
658        MPI_Comm_rank(contextComm,&rank) ;
659        MPI_Comm_rank(CXios::globalComm,&globalRank) ;
660        if (rank!=0) globalRank=0 ;
661
662        CMessage msg ;
663        msg<<idServer<<size<<globalRank ;
664//        msg<<id<<size<<globalRank ;
665
666        int messageSize=msg.size() ;
667        char * buff = new char[messageSize] ;
668        CBufferOut buffer((void*)buff,messageSize) ;
669        buffer<<msg ;
670
671        MPI_Send((void*)buff,buffer.count(),MPI_CHAR,serverLeader,1,CXios::globalComm) ;
672
673        MPI_Intercomm_create(contextComm,0,CXios::globalComm,serverLeader,10+globalRank,&contextInterComm) ;
674        info(10)<<"Register new Context : "<<id<<endl ;
675        MPI_Comm inter ;
676        MPI_Intercomm_merge(contextInterComm,0,&inter) ;
677        MPI_Barrier(inter) ;
678
679        context->initClient(contextComm,contextInterComm) ;
680
681        contextInterComms.push_back(contextInterComm);
682        MPI_Comm_free(&inter);
683        delete [] buff ;
684
685      }
686    }
687
688/*!
689 * \fn void CClient::callOasisEnddef(void)
690 * \brief Send the order to the servers to call "oasis_enddef". It must be done by each compound of models before calling oasis_enddef on client side
691 * Function is only called by client.
692 */
693    void CClient::callOasisEnddef(void)
694    {
695      bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ;
696      if (!oasisEnddef) ERROR("void CClient::callOasisEnddef(void)", <<"Function xios_oasis_enddef called but variable <call_oasis_enddef> is set to false."<<endl
697                                                                     <<"Variable <call_oasis_enddef> must be set to true"<<endl) ;
698      if (CXios::isServer)
699      // Attached mode
700      {
701        // nothing to do   
702      }
703      else
704      {
705        int rank ;
706        int msg=0 ;
707
708        MPI_Comm_rank(intraComm,&rank) ;
709        if (rank==0) 
710        {
711          MPI_Send(&msg,1,MPI_INT,0,5,interComm) ; // tags oasis_endded = 5
712        }
713
714      }
715    }
716
717    void CClient::finalize(void)
718    {
719     
720      MPI_Barrier(clientsComm_) ;
721      int commRank ;
722      MPI_Comm_rank(clientsComm_, &commRank) ;
723      if (commRank==0) CXios::getRessourcesManager()->finalize() ;
724     
725      auto globalRegistry=CXios::getGlobalRegistry() ;
726      globalRegistry->hierarchicalGatherRegistry() ;
727
728      if (commRank==0)
729      {
730        info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
731        globalRegistry->toFile("xios_registry.bin") ;
732      }
733      delete globalRegistry ;
734
735      CTimer::get("XIOS init/finalize",false).suspend() ;
736      CTimer::get("XIOS").suspend() ;
737     
738      CXios::finalizeDaemonsManager() ;
739
740      if (!is_MPI_Initialized)
741      {
742        if (CXios::usingOasis) oasis_finalize();
743        else MPI_Finalize() ;
744      }
745     
746      info(20) << "Client side context is finalized"<<endl ;
747      report(0) <<" Performance report : Whole time from XIOS init and finalize: "<< CTimer::get("XIOS init/finalize").getCumulatedTime()<<" s"<<endl ;
748      report(0) <<" Performance report : total time spent for XIOS : "<< CTimer::get("XIOS").getCumulatedTime()<<" s"<<endl ;
749      report(0)<< " Performance report : time spent for waiting free buffer : "<< CTimer::get("Blocking time").getCumulatedTime()<<" s"<<endl ;
750      report(0)<< " Performance report : Ratio : "<< CTimer::get("Blocking time").getCumulatedTime()/CTimer::get("XIOS init/finalize").getCumulatedTime()*100.<<" %"<<endl ;
751      report(0)<< " Performance report : This ratio must be close to zero. Otherwise it may be usefull to increase buffer size or numbers of server"<<endl ;
752//      report(0)<< " Memory report : Current buffer_size : "<<CXios::bufferSize<<endl ;
753      report(0)<< " Memory report : Minimum buffer size required : " << CClientBuffer::maxRequestSize << " bytes" << endl ;
754      report(0)<< " Memory report : increasing it by a factor will increase performance, depending of the volume of data wrote in file at each time step of the file"<<endl ;
755      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
756   
757     
758    }
759   
760
761    void CClient::finalize_old(void)
762    {
763      int rank ;
764      int msg=0 ;
765
766      MPI_Comm_rank(intraComm,&rank) ;
767 
768      if (!CXios::isServer)
769      {
770        MPI_Comm_rank(intraComm,&rank) ;
771        if (rank==0)
772        {
773          MPI_Send(&msg,1,MPI_INT,0,0,interComm) ;
774        }
775      }
776
777      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
778        MPI_Comm_free(&(*it));
779      MPI_Comm_free(&interComm);
780      MPI_Comm_free(&intraComm);
781
782      CTimer::get("XIOS init/finalize",false).suspend() ;
783      CTimer::get("XIOS").suspend() ;
784
785      if (!is_MPI_Initialized)
786      {
787        if (CXios::usingOasis) oasis_finalize();
788        else MPI_Finalize() ;
789      }
790     
791      info(20) << "Client side context is finalized"<<endl ;
792      report(0) <<" Performance report : Whole time from XIOS init and finalize: "<< CTimer::get("XIOS init/finalize").getCumulatedTime()<<" s"<<endl ;
793      report(0) <<" Performance report : total time spent for XIOS : "<< CTimer::get("XIOS").getCumulatedTime()<<" s"<<endl ;
794      report(0)<< " Performance report : time spent for waiting free buffer : "<< CTimer::get("Blocking time").getCumulatedTime()<<" s"<<endl ;
795      report(0)<< " Performance report : Ratio : "<< CTimer::get("Blocking time").getCumulatedTime()/CTimer::get("XIOS init/finalize").getCumulatedTime()*100.<<" %"<<endl ;
796      report(0)<< " Performance report : This ratio must be close to zero. Otherwise it may be usefull to increase buffer size or numbers of server"<<endl ;
797//      report(0)<< " Memory report : Current buffer_size : "<<CXios::bufferSize<<endl ;
798      report(0)<< " Memory report : Minimum buffer size required : " << CClientBuffer::maxRequestSize << " bytes" << endl ;
799      report(0)<< " Memory report : increasing it by a factor will increase performance, depending of the volume of data wrote in file at each time step of the file"<<endl ;
800      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
801   }
802
803    /*!
804    * Return global rank without oasis and current rank in model intraComm in case of oasis
805    */
806   int CClient::getRank()
807   {
808     return rank_;
809   }
810
811    /*!
812    * Open a file specified by a suffix and an extension and use it for the given file buffer.
813    * The file name will be suffix+rank+extension.
814    *
815    * \param fileName[in] protype file name
816    * \param ext [in] extension of the file
817    * \param fb [in/out] the file buffer
818    */
819    void CClient::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
820    {
821      StdStringStream fileNameClient;
822      int numDigit = 0;
823      int size = 0;
824      int rank;
825      MPI_Comm_size(CXios::getGlobalComm(), &size);
826      MPI_Comm_rank(CXios::getGlobalComm(),&rank);
827      while (size)
828      {
829        size /= 10;
830        ++numDigit;
831      }
832
833      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << rank << ext;
834
835      fb->open(fileNameClient.str().c_str(), std::ios::out);
836      if (!fb->is_open())
837        ERROR("void CClient::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
838              << std::endl << "Can not open <" << fileNameClient.str() << "> file to write the client log(s).");
839    }
840
841    /*!
842    * \brief Open a file stream to write the info logs
843    * Open a file stream with a specific file name suffix+rank
844    * to write the info logs.
845    * \param fileName [in] protype file name
846    */
847    void CClient::openInfoStream(const StdString& fileName)
848    {
849      std::filebuf* fb = m_infoStream.rdbuf();
850      openStream(fileName, ".out", fb);
851
852      info.write2File(fb);
853      report.write2File(fb);
854    }
855
856    //! Write the info logs to standard output
857    void CClient::openInfoStream()
858    {
859      info.write2StdOut();
860      report.write2StdOut();
861    }
862
863    //! Close the info logs file if it opens
864    void CClient::closeInfoStream()
865    {
866      if (m_infoStream.is_open()) m_infoStream.close();
867    }
868
869    /*!
870    * \brief Open a file stream to write the error log
871    * Open a file stream with a specific file name suffix+rank
872    * to write the error log.
873    * \param fileName [in] protype file name
874    */
875    void CClient::openErrorStream(const StdString& fileName)
876    {
877      std::filebuf* fb = m_errorStream.rdbuf();
878      openStream(fileName, ".err", fb);
879
880      error.write2File(fb);
881    }
882
883    //! Write the error log to standard error output
884    void CClient::openErrorStream()
885    {
886      error.write2StdErr();
887    }
888
889    //! Close the error log file if it opens
890    void CClient::closeErrorStream()
891    {
892      if (m_errorStream.is_open()) m_errorStream.close();
893    }
894}
Note: See TracBrowser for help on using the repository browser.