source: XIOS/dev/branch_yushan/src/server.cpp @ 1037

Last change on this file since 1037 was 1037, checked in by yushan, 7 years ago

initialize the branch

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.6 KB
RevLine 
[490]1#include "globalScopeData.hpp"
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "server.hpp"
[300]5#include "type.hpp"
6#include "context.hpp"
[352]7#include "object_template.hpp"
[300]8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
[1037]11//#include "mpi.hpp"
[347]12#include "tracer.hpp"
13#include "timer.hpp"
[492]14#include "event_scheduler.hpp"
[300]15
[335]16namespace xios
[490]17{
[300]18    MPI_Comm CServer::intraComm ;
19    list<MPI_Comm> CServer::interComm ;
[655]20    std::list<MPI_Comm> CServer::contextInterComms;
[300]21    bool CServer::isRoot ;
[490]22    int CServer::rank = INVALID_RANK;
23    StdOFStream CServer::m_infoStream;
[523]24    StdOFStream CServer::m_errorStream;
[490]25    map<string,CContext*> CServer::contextList ;
[300]26    bool CServer::finished=false ;
27    bool CServer::is_MPI_Initialized ;
[1037]28
29   
[597]30    CEventScheduler* CServer::eventScheduler = 0;
[697]31   
[300]32    void CServer::initialize(void)
33    {
34      // Not using OASIS
35      if (!CXios::usingOasis)
36      {
[490]37
[359]38        CTimer::get("XIOS").resume() ;
[490]39
40        boost::hash<string> hashString ;
41
[300]42        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
43        unsigned long* hashAll ;
[490]44
[1037]45
[300]46        int size ;
47        int myColor ;
48        int i,c ;
49        MPI_Comm newComm ;
[490]50
[300]51        MPI_Comm_size(CXios::globalComm,&size) ;
52        MPI_Comm_rank(CXios::globalComm,&rank);
53        hashAll=new unsigned long[size] ;
[490]54
[300]55        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
56
57        map<unsigned long, int> colors ;
58        map<unsigned long, int> leaders ;
59        map<unsigned long, int>::iterator it ;
[490]60
[300]61        for(i=0,c=0;i<size;i++)
62        {
63          if (colors.find(hashAll[i])==colors.end())
64          {
65            colors[hashAll[i]]=c ;
66            leaders[hashAll[i]]=i ;
67            c++ ;
68          }
69        }
[490]70
[300]71        myColor=colors[hashServer] ;
72
[1037]73
74        MPI_Comm_split(CXios::globalComm,myColor,rank,&intraComm) ;
75
76       
[300]77        int serverLeader=leaders[hashServer] ;
78        int clientLeader;
[490]79
[300]80         serverLeader=leaders[hashServer] ;
81         for(it=leaders.begin();it!=leaders.end();it++)
82         {
83           if (it->first!=hashServer)
84           {
85             clientLeader=it->second ;
[492]86             int intraCommSize, intraCommRank ;
87             MPI_Comm_size(intraComm,&intraCommSize) ;
88             MPI_Comm_rank(intraComm,&intraCommRank) ;
[493]89             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
90                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
[490]91
[300]92             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
93             interComm.push_back(newComm) ;
[1037]94             printf("after inter create, interComm.size = %lu\n", interComm.size());
[300]95           }
96         }
97
98         delete [] hashAll ;
99      }
100      // using OASIS
101      else
102      {
[490]103        int size;
104        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
105
[359]106        CTimer::get("XIOS").resume() ;
[655]107        MPI_Comm localComm;
108        oasis_get_localcomm(localComm);
109        MPI_Comm_dup(localComm, &intraComm);
110
[300]111        MPI_Comm_rank(intraComm,&rank) ;
112        MPI_Comm_size(intraComm,&size) ;
113        string codesId=CXios::getin<string>("oasis_codes_id") ;
[490]114
[300]115        vector<string> splitted ;
[483]116        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
[300]117        vector<string>::iterator it ;
118
119        MPI_Comm newComm ;
120        int globalRank ;
121        MPI_Comm_rank(CXios::globalComm,&globalRank);
[490]122
[300]123        for(it=splitted.begin();it!=splitted.end();it++)
124        {
125          oasis_get_intercomm(newComm,*it) ;
126          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
127          MPI_Comm_remote_size(newComm,&size);
128          interComm.push_back(newComm) ;
129        }
[492]130              oasis_enddef() ;
[300]131      }
[490]132
133//      int rank;
[300]134      MPI_Comm_rank(intraComm,&rank) ;
135      if (rank==0) isRoot=true;
[490]136      else isRoot=false;
[492]137     
138      eventScheduler = new CEventScheduler(intraComm) ;
[300]139    }
[490]140
[300]141    void CServer::finalize(void)
142    {
[361]143      CTimer::get("XIOS").suspend() ;
[697]144     
[492]145      delete eventScheduler ;
[1037]146     
147     
[655]148
149      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
150        MPI_Comm_free(&(*it));
[1037]151
[655]152      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)
153        MPI_Comm_free(&(*it));
[1037]154
[655]155      MPI_Comm_free(&intraComm);
156
[300]157      if (!is_MPI_Initialized)
[490]158      {
[300]159        if (CXios::usingOasis) oasis_finalize();
[1037]160        else  {MPI_Finalize() ; printf("CServer::finalize called MPI_finalize\n");}
[300]161      }
[1037]162
163     
[347]164      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
165      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
166      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
[300]167    }
[490]168
[300]169     void CServer::eventLoop(void)
170     {
171       bool stop=false ;
[490]172
[347]173       CTimer::get("XIOS server").resume() ;
[300]174       while(!stop)
175       {
176         if (isRoot)
177         {
[1037]178           listenContext(); 
179           if (!finished) listenFinalize() ; 
[300]180         }
181         else
182         {
[1037]183           listenRootContext(); 
184           if (!finished) 
185           {
186             listenRootFinalize() ; 
187           }
[300]188         }
[1037]189         
[300]190         contextEventLoop() ;
191         if (finished && contextList.empty()) stop=true ;
[1037]192         
[956]193         eventScheduler->checkEvent() ;
[300]194       }
[1037]195       
196       
[347]197       CTimer::get("XIOS server").suspend() ;
[300]198     }
[490]199
[300]200     void CServer::listenFinalize(void)
201     {
202        list<MPI_Comm>::iterator it;
203        int msg ;
204        int flag ;
[1037]205       
[490]206
[300]207        for(it=interComm.begin();it!=interComm.end();it++)
208        {
209           MPI_Status status ;
[347]210           traceOff() ;
[300]211           MPI_Iprobe(0,0,*it,&flag,&status) ;
[347]212           traceOn() ;
[300]213           if (flag==true)
214           {
215              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
[1037]216              printf(" CServer : Receive client finalize\n");
[300]217              info(20)<<" CServer : Receive client finalize"<<endl ;
[1037]218
[655]219              MPI_Comm_free(&(*it));
[300]220              interComm.erase(it) ;
221              break ;
222            }
223         }
[490]224
[300]225         if (interComm.empty())
226         {
227           int i,size ;
228           MPI_Comm_size(intraComm,&size) ;
229           MPI_Request* requests= new MPI_Request[size-1] ;
230           MPI_Status* status= new MPI_Status[size-1] ;
[490]231
[300]232           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
233           MPI_Waitall(size-1,requests,status) ;
234
235           finished=true ;
236           delete [] requests ;
237           delete [] status ;
238         }
239     }
[490]240
241
[300]242     void CServer::listenRootFinalize()
243     {
244        int flag ;
245        MPI_Status status ;
246        int msg ;
[1037]247       
[347]248        traceOff() ;
[300]249        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
[347]250        traceOn() ;
[300]251        if (flag==true)
252        {
253           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
254           finished=true ;
255        }
256      }
[490]257
[300]258     void CServer::listenContext(void)
259     {
[490]260
[300]261       MPI_Status status ;
[1037]262       int flag = false ;
263       static void* buffer ;
[300]264       static MPI_Request request ;
265       static bool recept=false ;
266       int rank ;
[1037]267       int count ; 
[490]268
[300]269       if (recept==false)
[1037]270       {     
[347]271         traceOff() ;
[300]272         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
[347]273         traceOn() ;
[1037]274         
[490]275         if (flag==true)
[300]276         {
[1037]277           #ifdef _usingMPI
[300]278           rank=status.MPI_SOURCE ;
[1037]279           #elif _usingEP
280           rank= status.ep_src ;
281           #endif
[300]282           MPI_Get_count(&status,MPI_CHAR,&count) ;
283           buffer=new char[count] ;
[1037]284           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
[490]285           recept=true ;
[300]286         }
[1037]287         
[300]288       }
289       else
290       {
[347]291         traceOff() ;
[300]292         MPI_Test(&request,&flag,&status) ;
[347]293         traceOn() ;
[300]294         if (flag==true)
295         {
[1037]296           #ifdef _usingMPI
[300]297           rank=status.MPI_SOURCE ;
[1037]298           #elif _usingEP
299           rank= status.ep_src ;
300           #endif
[300]301           MPI_Get_count(&status,MPI_CHAR,&count) ;
[1037]302           recvContextMessage(buffer,count) ;
303           printf("listerContext register context OK, interComm size = %lu\n", interComm.size());
304           
[300]305           delete [] buffer ;
[490]306           recept=false ;
[300]307         }
308       }
309     }
[490]310
[300]311     void CServer::recvContextMessage(void* buff,int count)
312     {
313       static map<string,contextMessage> recvContextId ;
314       map<string,contextMessage>::iterator it ;
[490]315
[300]316       CBufferIn buffer(buff,count) ;
317       string id ;
318       int clientLeader ;
319       int nbMessage ;
320
321       buffer>>id>>nbMessage>>clientLeader ;
[490]322
[300]323       it=recvContextId.find(id) ;
324       if (it==recvContextId.end())
[490]325       {
[300]326         contextMessage msg={0,0} ;
327         pair<map<string,contextMessage>::iterator,bool> ret ;
328         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
329         it=ret.first ;
[490]330       }
[300]331       it->second.nbRecv+=1 ;
332       it->second.leaderRank+=clientLeader ;
[490]333
[300]334       if (it->second.nbRecv==nbMessage)
[490]335       {
[300]336         int size ;
337         MPI_Comm_size(intraComm,&size) ;
338         MPI_Request* requests= new MPI_Request[size-1] ;
339         MPI_Status* status= new MPI_Status[size-1] ;
[490]340
[300]341         for(int i=1;i<size;i++)
342         {
343            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
344         }
345         MPI_Waitall(size-1,requests,status) ;
346         registerContext(buff,count,it->second.leaderRank) ;
[1037]347         printf("recvContextMessage register context OK\n");
[300]348
349         recvContextId.erase(it) ;
350         delete [] requests ;
351         delete [] status ;
352
353       }
[490]354     }
355
[300]356     void CServer::listenRootContext(void)
357     {
[490]358
[300]359       MPI_Status status ;
360       int flag ;
[1037]361       static void* buffer ;
[300]362       static MPI_Request request ;
363       static bool recept=false ;
364       int rank ;
365       int count ;
366       const int root=0 ;
[1037]367       
[490]368
[300]369       if (recept==false)
370       {
[347]371         traceOff() ;
[300]372         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
[347]373         traceOn() ;
[490]374         if (flag==true)
[300]375         {
376           MPI_Get_count(&status,MPI_CHAR,&count) ;
377           buffer=new char[count] ;
[1037]378           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
[490]379           recept=true ;
[300]380         }
381       }
382       else
383       {
384         MPI_Test(&request,&flag,&status) ;
385         if (flag==true)
386         {
387           MPI_Get_count(&status,MPI_CHAR,&count) ;
[1037]388           registerContext(buffer,count) ;
389           printf("listenRootContext register context OK, interComm size = %lu\n", interComm.size());
[300]390           delete [] buffer ;
[490]391           recept=false ;
[300]392         }
393       }
[490]394     }
395
[655]396     void CServer::registerContext(void* buff, int count, int leaderRank)
[300]397     {
398       string contextId;
[655]399       CBufferIn buffer(buff, count);
400       buffer >> contextId;
[300]401
[680]402       info(20) << "CServer : Register new Context : " << contextId << endl;
[490]403
[680]404       if (contextList.find(contextId) != contextList.end())
405         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
406               << "Context '" << contextId << "' has already been registred");
[490]407
[655]408       MPI_Comm contextIntercomm;
[1037]409       
[655]410       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm);
[490]411
[655]412       MPI_Comm inter;
413       MPI_Intercomm_merge(contextIntercomm,1,&inter);
414       MPI_Barrier(inter);
[1037]415       
[655]416
417       CContext* context=CContext::create(contextId);
418       contextList[contextId]=context;
419       context->initServer(intraComm,contextIntercomm);
[1037]420       
421       
[655]422
423       contextInterComms.push_back(contextIntercomm);
[1037]424       
425       
[655]426       MPI_Comm_free(&inter);
[1037]427       
428       printf(" ****   server: register context OK\n");
[490]429     }
430
[300]431     void CServer::contextEventLoop(void)
432     {
433       bool finished ;
434       map<string,CContext*>::iterator it ;
[490]435       for(it=contextList.begin();it!=contextList.end();it++)
[300]436       {
[597]437         finished=it->second->checkBuffersAndListen();
[300]438         if (finished)
439         {
440           contextList.erase(it) ;
441           break ;
442         }
443       }
[490]444
[300]445     }
[490]446
447     //! Get rank of the current process
448     int CServer::getRank()
449     {
450       return rank;
451     }
452
[523]453    /*!
454    * Open a file specified by a suffix and an extension and use it for the given file buffer.
455    * The file name will be suffix+rank+extension.
456    *
457    * \param fileName[in] protype file name
458    * \param ext [in] extension of the file
459    * \param fb [in/out] the file buffer
460    */
461    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
462    {
463      StdStringStream fileNameClient;
464      int numDigit = 0;
465      int size = 0;
466      MPI_Comm_size(CXios::globalComm, &size);
467      while (size)
468      {
469        size /= 10;
470        ++numDigit;
471      }
[497]472
[523]473      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
474      fb->open(fileNameClient.str().c_str(), std::ios::out);
475      if (!fb->is_open())
476        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
477              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
478    }
[490]479
[523]480    /*!
481    * \brief Open a file stream to write the info logs
482    * Open a file stream with a specific file name suffix+rank
483    * to write the info logs.
484    * \param fileName [in] protype file name
485    */
486    void CServer::openInfoStream(const StdString& fileName)
487    {
488      std::filebuf* fb = m_infoStream.rdbuf();
489      openStream(fileName, ".out", fb);
[490]490
[523]491      info.write2File(fb);
492      report.write2File(fb);
493    }
[490]494
[523]495    //! Write the info logs to standard output
496    void CServer::openInfoStream()
497    {
498      info.write2StdOut();
499      report.write2StdOut();
500    }
[490]501
[523]502    //! Close the info logs file if it opens
503    void CServer::closeInfoStream()
504    {
505      if (m_infoStream.is_open()) m_infoStream.close();
506    }
507
508    /*!
509    * \brief Open a file stream to write the error log
510    * Open a file stream with a specific file name suffix+rank
511    * to write the error log.
512    * \param fileName [in] protype file name
513    */
514    void CServer::openErrorStream(const StdString& fileName)
515    {
516      std::filebuf* fb = m_errorStream.rdbuf();
517      openStream(fileName, ".err", fb);
518
519      error.write2File(fb);
520    }
521
522    //! Write the error log to standard error output
523    void CServer::openErrorStream()
524    {
525      error.write2StdErr();
526    }
527
528    //! Close the error log file if it opens
529    void CServer::closeErrorStream()
530    {
531      if (m_errorStream.is_open()) m_errorStream.close();
532    }
[300]533}
Note: See TracBrowser for help on using the repository browser.