source: XIOS1/branches/xios-1.0/src/server.cpp @ 2520

Last change on this file since 2520 was 548, checked in by rlacroix, 10 years ago

Backport r523 into the stable branch.

Improve the message error handling by mimicking the behavior of the info/report logs.

Output the error messages to the standart error message until the context is correctly initialized. Then, output the error messages to a file if the user has set "print_file" parameter to "true".

  • Fix: Errors that occured before MPI was initialized (e.g. during the config file parsing) caused a MPI error on top of the original error.
  • Fix: The error file could sometimes be misnamed if the error happened before the context was completely known.
  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.4 KB
Line 
1#include "globalScopeData.hpp"
2#include "xmlioserver_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include "mpi.hpp"
12#include "tracer.hpp"
13#include "timer.hpp"
14#include "event_scheduler.hpp"
15
16namespace xios
17{
18    MPI_Comm CServer::intraComm ;
19    list<MPI_Comm> CServer::interComm ;
20    bool CServer::isRoot ;
21    int CServer::rank = INVALID_RANK;
22    StdOFStream CServer::m_infoStream;
23    StdOFStream CServer::m_errorStream;
24    map<string,CContext*> CServer::contextList ;
25    bool CServer::finished=false ;
26    bool CServer::is_MPI_Initialized ;
27    CEventScheduler* CServer::eventScheduler ;
28
29    void CServer::initialize(void)
30    {
31      int initialized ;
32      MPI_Initialized(&initialized) ;
33      if (initialized) is_MPI_Initialized=true ;
34      else is_MPI_Initialized=false ;
35
36      // Not using OASIS
37      if (!CXios::usingOasis)
38      {
39
40        if (!is_MPI_Initialized)
41        {
42          int argc=0;
43          char** argv=NULL;
44          MPI_Init(&argc,&argv) ;
45        }
46        CTimer::get("XIOS").resume() ;
47
48        boost::hash<string> hashString ;
49
50        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
51        unsigned long* hashAll ;
52
53//        int rank ;
54        int size ;
55        int myColor ;
56        int i,c ;
57        MPI_Comm newComm ;
58
59        MPI_Comm_size(CXios::globalComm,&size) ;
60        MPI_Comm_rank(CXios::globalComm,&rank);
61        hashAll=new unsigned long[size] ;
62
63        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
64
65        map<unsigned long, int> colors ;
66        map<unsigned long, int> leaders ;
67        map<unsigned long, int>::iterator it ;
68
69        for(i=0,c=0;i<size;i++)
70        {
71          if (colors.find(hashAll[i])==colors.end())
72          {
73            colors[hashAll[i]]=c ;
74            leaders[hashAll[i]]=i ;
75            c++ ;
76          }
77        }
78
79        myColor=colors[hashServer] ;
80        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
81
82        int serverLeader=leaders[hashServer] ;
83        int clientLeader;
84
85         serverLeader=leaders[hashServer] ;
86         for(it=leaders.begin();it!=leaders.end();it++)
87         {
88           if (it->first!=hashServer)
89           {
90             clientLeader=it->second ;
91             int intraCommSize, intraCommRank ;
92             MPI_Comm_size(intraComm,&intraCommSize) ;
93             MPI_Comm_rank(intraComm,&intraCommRank) ;
94             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
95                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
96
97             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
98             interComm.push_back(newComm) ;
99           }
100         }
101
102         delete [] hashAll ;
103      }
104      // using OASIS
105      else
106      {
107//        int rank ,size;
108        int size;
109        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
110
111        CTimer::get("XIOS").resume() ;
112        oasis_get_localcomm(intraComm) ;
113        MPI_Comm_rank(intraComm,&rank) ;
114        MPI_Comm_size(intraComm,&size) ;
115        string codesId=CXios::getin<string>("oasis_codes_id") ;
116
117        vector<string> splitted ;
118        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
119        vector<string>::iterator it ;
120
121        MPI_Comm newComm ;
122        int globalRank ;
123        MPI_Comm_rank(CXios::globalComm,&globalRank);
124
125        for(it=splitted.begin();it!=splitted.end();it++)
126        {
127          oasis_get_intercomm(newComm,*it) ;
128          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
129          MPI_Comm_remote_size(newComm,&size);
130          interComm.push_back(newComm) ;
131        }
132              oasis_enddef() ;
133      }
134
135//      int rank;
136      MPI_Comm_rank(intraComm,&rank) ;
137      if (rank==0) isRoot=true;
138      else isRoot=false;
139     
140      eventScheduler = new CEventScheduler(intraComm) ;
141    }
142
143    void CServer::finalize(void)
144    {
145      CTimer::get("XIOS").suspend() ;
146     
147      delete eventScheduler ;
148     
149      if (!is_MPI_Initialized)
150      {
151        if (CXios::usingOasis) oasis_finalize();
152        else MPI_Finalize() ;
153      }
154      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
155      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
156      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
157    }
158
159     void CServer::eventLoop(void)
160     {
161       bool stop=false ;
162
163       CTimer::get("XIOS server").resume() ;
164       while(!stop)
165       {
166         if (isRoot)
167         {
168           listenContext();
169           if (!finished) listenFinalize() ;
170         }
171         else
172         {
173           listenRootContext();
174           if (!finished) listenRootFinalize() ;
175         }
176
177         contextEventLoop() ;
178         if (finished && contextList.empty()) stop=true ;
179         if (! CXios::isServer) eventScheduler->checkEvent() ;
180       }
181       CTimer::get("XIOS server").suspend() ;
182     }
183
184     void CServer::listenFinalize(void)
185     {
186        list<MPI_Comm>::iterator it;
187        int msg ;
188        int flag ;
189
190        for(it=interComm.begin();it!=interComm.end();it++)
191        {
192           MPI_Status status ;
193           traceOff() ;
194           MPI_Iprobe(0,0,*it,&flag,&status) ;
195           traceOn() ;
196           if (flag==true)
197           {
198              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
199              info(20)<<" CServer : Receive client finalize"<<endl ;
200              interComm.erase(it) ;
201              break ;
202            }
203         }
204
205         if (interComm.empty())
206         {
207           int i,size ;
208           MPI_Comm_size(intraComm,&size) ;
209           MPI_Request* requests= new MPI_Request[size-1] ;
210           MPI_Status* status= new MPI_Status[size-1] ;
211
212           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
213           MPI_Waitall(size-1,requests,status) ;
214
215           finished=true ;
216           delete [] requests ;
217           delete [] status ;
218         }
219     }
220
221
222     void CServer::listenRootFinalize()
223     {
224        int flag ;
225        MPI_Status status ;
226        int msg ;
227
228        traceOff() ;
229        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
230        traceOn() ;
231        if (flag==true)
232        {
233           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
234           finished=true ;
235        }
236      }
237
238     void CServer::listenContext(void)
239     {
240
241       MPI_Status status ;
242       int flag ;
243       static void* buffer ;
244       static MPI_Request request ;
245       static bool recept=false ;
246       int rank ;
247       int count ;
248
249       if (recept==false)
250       {
251         traceOff() ;
252         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
253         traceOn() ;
254         if (flag==true)
255         {
256           rank=status.MPI_SOURCE ;
257           MPI_Get_count(&status,MPI_CHAR,&count) ;
258           buffer=new char[count] ;
259           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
260           recept=true ;
261         }
262       }
263       else
264       {
265         traceOff() ;
266         MPI_Test(&request,&flag,&status) ;
267         traceOn() ;
268         if (flag==true)
269         {
270           rank=status.MPI_SOURCE ;
271           MPI_Get_count(&status,MPI_CHAR,&count) ;
272           recvContextMessage(buffer,count) ;
273           delete [] buffer ;
274           recept=false ;
275         }
276       }
277     }
278
279     void CServer::recvContextMessage(void* buff,int count)
280     {
281       static map<string,contextMessage> recvContextId ;
282       map<string,contextMessage>::iterator it ;
283
284       CBufferIn buffer(buff,count) ;
285       string id ;
286       int clientLeader ;
287       int nbMessage ;
288
289       buffer>>id>>nbMessage>>clientLeader ;
290
291       it=recvContextId.find(id) ;
292       if (it==recvContextId.end())
293       {
294         contextMessage msg={0,0} ;
295         pair<map<string,contextMessage>::iterator,bool> ret ;
296         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
297         it=ret.first ;
298       }
299       it->second.nbRecv+=1 ;
300       it->second.leaderRank+=clientLeader ;
301
302       if (it->second.nbRecv==nbMessage)
303       {
304         int size ;
305         MPI_Comm_size(intraComm,&size) ;
306         MPI_Request* requests= new MPI_Request[size-1] ;
307         MPI_Status* status= new MPI_Status[size-1] ;
308
309         for(int i=1;i<size;i++)
310         {
311            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
312         }
313         MPI_Waitall(size-1,requests,status) ;
314         registerContext(buff,count,it->second.leaderRank) ;
315
316         recvContextId.erase(it) ;
317         delete [] requests ;
318         delete [] status ;
319
320       }
321     }
322
323     void CServer::listenRootContext(void)
324     {
325
326       MPI_Status status ;
327       int flag ;
328       static void* buffer ;
329       static MPI_Request request ;
330       static bool recept=false ;
331       int rank ;
332       int count ;
333       const int root=0 ;
334
335       if (recept==false)
336       {
337         traceOff() ;
338         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
339         traceOn() ;
340         if (flag==true)
341         {
342           MPI_Get_count(&status,MPI_CHAR,&count) ;
343           buffer=new char[count] ;
344           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
345           recept=true ;
346         }
347       }
348       else
349       {
350         MPI_Test(&request,&flag,&status) ;
351         if (flag==true)
352         {
353           MPI_Get_count(&status,MPI_CHAR,&count) ;
354           registerContext(buffer,count) ;
355           delete [] buffer ;
356           recept=false ;
357         }
358       }
359     }
360
361
362
363     void CServer::registerContext(void* buff,int count, int leaderRank)
364     {
365
366       string contextId;
367       CBufferIn buffer(buff,count) ;
368
369       buffer>>contextId ;
370       MPI_Comm contextIntercomm ;
371       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
372
373       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
374       MPI_Comm inter ;
375       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
376       MPI_Barrier(inter) ;
377       if (contextList.find(contextId)!=contextList.end())
378        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
379              <<"Context has already been registred") ;
380
381      CContext* context=CContext::create(contextId) ;
382      contextList[contextId]=context ;
383      context->initServer(intraComm,contextIntercomm) ;
384
385     }
386
387
388     void CServer::contextEventLoop(void)
389     {
390       bool finished ;
391       map<string,CContext*>::iterator it ;
392       for(it=contextList.begin();it!=contextList.end();it++)
393       {
394         finished=it->second->eventLoop() ;
395         if (finished)
396         {
397           contextList.erase(it) ;
398           break ;
399         }
400       }
401
402     }
403
404     //! Get rank of the current process
405     int CServer::getRank()
406     {
407       return rank;
408     }
409
410    /*!
411    * Open a file specified by a suffix and an extension and use it for the given file buffer.
412    * The file name will be suffix+rank+extension.
413    *
414    * \param fileName[in] protype file name
415    * \param ext [in] extension of the file
416    * \param fb [in/out] the file buffer
417    */
418    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
419    {
420      StdStringStream fileNameClient;
421      int numDigit = 0;
422      int size = 0;
423      MPI_Comm_size(CXios::globalComm, &size);
424      while (size)
425      {
426        size /= 10;
427        ++numDigit;
428      }
429
430      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
431      fb->open(fileNameClient.str().c_str(), std::ios::out);
432      if (!fb->is_open())
433        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
434              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
435    }
436
437    /*!
438    * \brief Open a file stream to write the info logs
439    * Open a file stream with a specific file name suffix+rank
440    * to write the info logs.
441    * \param fileName [in] protype file name
442    */
443    void CServer::openInfoStream(const StdString& fileName)
444    {
445      std::filebuf* fb = m_infoStream.rdbuf();
446      openStream(fileName, ".out", fb);
447
448      info.write2File(fb);
449      report.write2File(fb);
450    }
451
452    //! Write the info logs to standard output
453    void CServer::openInfoStream()
454    {
455      info.write2StdOut();
456      report.write2StdOut();
457    }
458
459    //! Close the info logs file if it opens
460    void CServer::closeInfoStream()
461    {
462      if (m_infoStream.is_open()) m_infoStream.close();
463    }
464
465    /*!
466    * \brief Open a file stream to write the error log
467    * Open a file stream with a specific file name suffix+rank
468    * to write the error log.
469    * \param fileName [in] protype file name
470    */
471    void CServer::openErrorStream(const StdString& fileName)
472    {
473      std::filebuf* fb = m_errorStream.rdbuf();
474      openStream(fileName, ".err", fb);
475
476      error.write2File(fb);
477    }
478
479    //! Write the error log to standard error output
480    void CServer::openErrorStream()
481    {
482      error.write2StdErr();
483    }
484
485    //! Close the error log file if it opens
486    void CServer::closeErrorStream()
487    {
488      if (m_errorStream.is_open()) m_errorStream.close();
489    }
490}
Note: See TracBrowser for help on using the repository browser.