source: XIOS3/dev/mhedley/buildCompilationPatchesA/src/server.cpp @ 2698

Last change on this file since 2698 was 2693, checked in by jderouillat, 7 weeks ago

When 2 levels servers are used, instanciate first writer, then gatherers. It avoids a deadlock situation in the case of services managed through a notification manager embedded on a server.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 30.2 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include "mpi.hpp"
11#include "tracer.hpp"
12#include "timer.hpp"
13#include "mem_checker.hpp"
14#include "event_scheduler.hpp"
15#include "string_tools.hpp"
16#include "ressources_manager.hpp"
17#include "services_manager.hpp"
18#include "contexts_manager.hpp"
19#include "servers_ressource.hpp"
20#include "services.hpp"
21#include "pool_node.hpp"
22#include <cstdio>
23#include "workflow_graph.hpp"
24#include "release_static_allocation.hpp"
25#include "thread_manager.hpp"
26#include <sys/stat.h>
27#include <unistd.h>
28
29
30
31namespace xios
32{
33    MPI_Comm CServer::intraComm_ ;
34    MPI_Comm CServer::serversComm_ ;
35    std::list<MPI_Comm> CServer::interCommLeft ;
36    std::list<MPI_Comm> CServer::interCommRight ;
37    std::list<MPI_Comm> CServer::contextInterComms;
38    std::list<MPI_Comm> CServer::contextIntraComms;
39    int CServer::serverLevel = 0 ;
40    int CServer::nbContexts = 0;
41    bool CServer::isRoot = false ;
42    int CServer::rank_ = INVALID_RANK;
43    StdOFStream CServer::m_infoStream;
44    StdOFStream CServer::m_errorStream;
45    map<string,CContext*> CServer::contextList ;
46    vector<int> CServer::sndServerGlobalRanks;
47    bool CServer::finished=false ;
48    bool CServer::is_MPI_Initialized ;
49    CEventScheduler* CServer::eventScheduler = 0;
50    CServersRessource* CServer::serversRessource_=nullptr ;
51    CThirdPartyDriver* CServer::driver_ =nullptr ;
52    extern CLogType logTimers ;
53       
54    void CServer::initialize(void)
55    {
56     
57      MPI_Comm serverComm ;
58      int initialized ;
59      MPI_Initialized(&initialized) ;
60      if (initialized) is_MPI_Initialized=true ;
61      else is_MPI_Initialized=false ;
62      MPI_Comm globalComm=CXios::getGlobalComm() ;
63      /////////////////////////////////////////
64      ///////////// PART 1 ////////////////////
65      /////////////////////////////////////////
66      // don't use OASIS
67      if (!CXios::usingOasis)
68      {
69        if (!is_MPI_Initialized) 
70        {
71          int required = MPI_THREAD_SERIALIZED ;
72          int provided ;
73          MPI_Init_thread(NULL,NULL, required, &provided) ;
74        }
75       
76        // split the global communicator
77        // get hash from all model to attribute a unique color (int) and then split to get client communicator
78        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
79         
80        int commRank, commSize ;
81        MPI_Comm_rank(globalComm,&commRank) ;
82        MPI_Comm_size(globalComm,&commSize) ;
83
84        std::hash<string> hashString ;
85        size_t hashServer=hashString(CXios::xiosCodeId) ;
86         
87        size_t* hashAll = new size_t[commSize] ;
88        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
89         
90        int color=0 ;
91        map<size_t,int> listHash ;
92        for(int i=0 ; i<=commSize ; i++) 
93          if (listHash.count(hashAll[i])==0) 
94          {
95            listHash[hashAll[i]]=color ;
96            color=color+1 ;
97          }
98        color=listHash[hashServer] ;
99        delete[] hashAll ;
100
101        xios::MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
102        CXios::getMpiGarbageCollector().registerCommunicator(serverComm) ;
103
104      }
105      else // using OASIS
106      {
107       
108        if (!is_MPI_Initialized) 
109        {
110          int required = MPI_THREAD_SERIALIZED ;
111          int provided ;
112          MPI_Init_thread(NULL,NULL, required, &provided) ;
113        }
114
115        driver_ = new CThirdPartyDriver();
116
117        driver_->getComponentCommunicator( serverComm );
118      }
119      xios::MPI_Comm_dup(serverComm, &intraComm_);
120      CXios::getMpiGarbageCollector().registerCommunicator(intraComm_) ;
121     
122      CTimer::get("XIOS").resume() ;
123      CTimer::get("XIOS server").resume() ;
124      CTimer::get("XIOS initialize").resume() ;
125 
126      /////////////////////////////////////////
127      ///////////// PART 2 ////////////////////
128      /////////////////////////////////////////
129     
130
131      // Create the XIOS communicator for every process which is related
132      // to XIOS, as well on client side as on server side
133      MPI_Comm xiosGlobalComm ;
134      string strIds=CXios::getin<string>("clients_code_id","") ;
135      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
136      if (strIds.empty())
137      {
138        // no code Ids given, suppose XIOS initialisation is global           
139        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
140        MPI_Comm splitComm,interComm ;
141        MPI_Comm_rank(globalComm,&commGlobalRank) ;
142        xios::MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
143        MPI_Comm_rank(splitComm,&commRank) ;
144        if (commRank==0) serverLeader=commGlobalRank ;
145        else serverLeader=0 ;
146        clientLeader=0 ;
147        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
148        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
149        xios::MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
150        xios::MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
151        CXios::setXiosComm(xiosGlobalComm) ;
152
153        xios::MPI_Comm_free( &interComm );
154        xios::MPI_Comm_free( &splitComm );
155      }
156      else
157      {
158
159        xiosGlobalCommByFileExchange(serverComm) ;
160
161      }
162     
163      /////////////////////////////////////////
164      ///////////// PART 4 ////////////////////
165      //  create servers intra communicator  //
166      /////////////////////////////////////////
167     
168      int commRank ;
169      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
170      xios::MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
171      CXios::getMpiGarbageCollector().registerCommunicator(serversComm_) ;
172     
173      CXios::setUsingServer() ;
174
175      /////////////////////////////////////////
176      ///////////// PART 5 ////////////////////
177      //       redirect files output         //
178      /////////////////////////////////////////
179     
180      CServer::openInfoStream(CXios::serverFile);
181      CServer::openErrorStream(CXios::serverFile);
182
183      CMemChecker::logMem( "CServer::initialize" );
184
185      /////////////////////////////////////////
186      ///////////// PART 4 ////////////////////
187      /////////////////////////////////////////
188
189      CXios::launchDaemonsManager(true) ;
190     
191      /////////////////////////////////////////
192      ///////////// PART 5 ////////////////////
193      /////////////////////////////////////////
194
195      // create the services
196
197      auto ressourcesManager=CXios::getRessourcesManager() ;
198      auto servicesManager=CXios::getServicesManager() ;
199      auto contextsManager=CXios::getContextsManager() ;
200      auto daemonsManager=CXios::getDaemonsManager() ;
201      auto serversRessource=CServer::getServersRessource() ;
202
203      int rank;
204      MPI_Comm_rank(intraComm_, &rank) ;
205      if (rank==0) isRoot=true;
206      else isRoot=false;
207
208      if (serversRessource->isServerLeader())
209      {
210        // creating pool
211        CPoolNodeGroup::get("xios","pool_definition")->solveDescInheritance(true) ;
212        vector<CPoolNode*> pools = CPoolNodeGroup::get("xios","pool_definition")->getAllChildren();
213        for(auto& pool : pools) pool->allocateRessources() ;
214       
215        int nbRessources = ressourcesManager->getFreeRessourcesSize() ;
216        if (nbRessources>0)
217        {
218          if (!CXios::usingServer2)
219          {
220            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
221            if (CThreadManager::isUsingThreads()) 
222              while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 
223              {
224                daemonsManager->eventLoop() ;
225                CThreadManager::yield() ;
226              }
227            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
228         
229            servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ;
230            if (CThreadManager::isUsingThreads()) 
231              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,0)) 
232              {
233                daemonsManager->eventLoop() ;
234                CThreadManager::yield() ;
235              }
236            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ;
237           
238            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ;
239            if (CThreadManager::isUsingThreads()) 
240            {
241              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0))
242              { 
243                daemonsManager->eventLoop() ;
244                CThreadManager::yield() ;
245              }
246            }
247            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ;
248          }
249          else
250          {
251            int nprocsServer = nbRessources*CXios::ratioServer2/100.;
252            int nprocsGatherer = nbRessources - nprocsServer ;
253         
254            int nbPoolsServer2 = CXios::nbPoolsServer2 ;
255            if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
256            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
257            if (CThreadManager::isUsingThreads()) 
258              while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 
259              {
260                daemonsManager->eventLoop() ;
261                CThreadManager::yield() ;
262              }
263            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
264           
265            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ;
266            if (CThreadManager::isUsingThreads())
267              for(int i=0; i<nbPoolsServer2; i++)
268                while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,i)) 
269                {
270                  daemonsManager->eventLoop() ;
271                  CThreadManager::yield() ;
272                }
273            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ;
274
275            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
276            if (CThreadManager::isUsingThreads()) 
277              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultGathererId,0)) 
278              {
279                daemonsManager->eventLoop() ;
280                CThreadManager::yield() ;
281              }
282            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ;
283
284            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ;
285            if (CThreadManager::isUsingThreads()) 
286              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) 
287              {
288                daemonsManager->eventLoop() ;
289                CThreadManager::yield() ;
290              }
291            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ;
292          }
293        }
294//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
295      }
296
297      MPI_Request req ;
298      MPI_Status status ;
299      MPI_Ibarrier(getServersRessource()->getCommunicator(),&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
300      int ok=false ;
301      while (!ok)
302      {
303        daemonsManager->eventLoop() ;
304        if (CThreadManager::isUsingThreads()) CThreadManager::yield();
305        MPI_Test(&req,&ok,&status) ;
306      }
307
308
309      //testingEventScheduler() ;
310/*
311      MPI_Request req ;
312      MPI_Status status ;
313      MPI_Ibarrier(CXios::getXiosComm(),&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
314      int ok=false ;
315      while (!ok)
316      {
317        daemonsManager->eventLoop() ;
318        MPI_Test(&req,&ok,&status) ;
319      }
320*/
321      CTimer::get("XIOS initialize").suspend() ;
322
323      /////////////////////////////////////////
324      ///////////// PART 5 ////////////////////
325      /////////////////////////////////////////
326      // loop on event loop
327
328      bool finished=false ;
329      CTimer::get("XIOS event loop").resume() ;
330
331      while (!finished)
332      {
333        finished=daemonsManager->eventLoop() ;
334        if (CThreadManager::isUsingThreads()) CThreadManager::yield() ;
335      }
336      CTimer::get("XIOS event loop").suspend() ;
337
338      // Delete CContext
339      //CObjectTemplate<CContext>::cleanStaticDataStructure();
340    }
341
342
343    void CServer::testingEventScheduler(void)
344    {
345      CXios::getPoolRessource()->getEventScheduler()->registerEvent(1,10) ;
346      CXios::getPoolRessource()->getEventScheduler()->registerEvent(2,10) ;
347      if (CXios::getPoolRessource()->hasService(CXios::defaultGathererId,0))
348      {
349        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(1,100) ;
350        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(2,100) ;
351        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(3,100) ;
352      }
353      if (CXios::getPoolRessource()->hasService(CXios::defaultWriterId,0))
354      {
355        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(1,1000) ;
356        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(2,1000) ;
357      }
358      CXios::getPoolRessource()->getEventScheduler()->registerEvent(3,10) ;
359      CXios::getPoolRessource()->getEventScheduler()->registerEvent(4,10) ;
360     
361      if (CXios::getPoolRessource()->hasService(CXios::defaultGathererId,0))
362      {
363        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(4,100) ;
364        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(5,100) ;
365      }
366      if (CXios::getPoolRessource()->hasService(CXios::defaultWriterId,0))
367      {
368        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(3,1000) ;
369        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(4,1000) ;
370        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(5,1000) ;
371      }
372      CXios::getPoolRessource()->getEventScheduler()->registerEvent(5,10) ;
373      CXios::getPoolRessource()->getEventScheduler()->registerEvent(6,10) ;
374     
375      int numEvents=0 ;
376      int poolEvent=1 ;
377      int gatherEvent=1 ;
378      int writerEvent=1 ;
379      do
380      {
381        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(poolEvent,10))
382        {
383          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
384          MPI_Barrier(CXios::getPoolRessource()->getCommunicator());
385          poolEvent++ ;
386          numEvents++;
387        }
388       
389        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(gatherEvent,100))
390        {
391          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
392          MPI_Barrier(CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)->getCommunicator());
393          gatherEvent++ ;
394          numEvents++;
395        }
396
397        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(writerEvent,1000))
398        {
399          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
400          MPI_Barrier(CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)->getCommunicator());
401          writerEvent++ ;
402          numEvents++;
403        }
404
405       
406      } while (numEvents!=11) ;
407
408    }
409
410
411    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
412    {
413       
414      MPI_Comm globalComm=CXios::getGlobalComm() ;
415      MPI_Comm xiosGlobalComm ;
416     
417      string strIds=CXios::getin<string>("clients_code_id","") ;
418      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
419     
420      int commRank, globalRank ;
421      MPI_Comm_rank(serverComm, &commRank) ;
422      MPI_Comm_rank(globalComm, &globalRank) ;
423      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
424
425      if (commRank==0) // if root process publish name
426      { 
427        std::ofstream ofs (serverFileName, std::ofstream::out);
428        ofs<<globalRank ;
429        ofs.close();
430      }
431       
432      vector<int> clientsRank(clientsCodeId.size()) ;
433      for(int i=0;i<clientsRank.size();i++)
434      {
435        std::ifstream ifs ;
436        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
437        struct stat buffer;
438        do {
439        } while( stat(fileName.c_str(), &buffer) != 0 );
440        sleep(1);
441        ifs.open(fileName, ifstream::in) ;
442        ifs>>clientsRank[i] ;
443        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
444        ifs.close() ; 
445      }
446
447      MPI_Comm intraComm ;
448      xios::MPI_Comm_dup(serverComm,&intraComm) ;
449      MPI_Comm interComm ;
450      for(int i=0 ; i<clientsRank.size(); i++)
451      { 
452        xios::MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
453        CXios::getMpiGarbageCollector().registerCommunicator(interComm) ;
454        interCommLeft.push_back(interComm) ;
455        xios::MPI_Comm_free(&intraComm) ;
456        xios::MPI_Intercomm_merge(interComm,false, &intraComm ) ;
457      }
458      xiosGlobalComm=intraComm ; 
459      MPI_Barrier(xiosGlobalComm);
460      if (commRank==0) std::remove(serverFileName.c_str()) ;
461      MPI_Barrier(xiosGlobalComm);
462
463      CXios::setXiosComm(xiosGlobalComm) ;
464     
465    }
466
467
468    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
469    {
470        // untested, need to be tested on a true MPI-2 compliant library
471
472        // try to discover other client/server
473/*
474        // publish server name
475        char portName[MPI_MAX_PORT_NAME];
476        int ierr ;
477        int commRank ;
478        MPI_Comm_rank(serverComm, &commRank) ;
479       
480        if (commRank==0) // if root process publish name
481        { 
482          MPI_Open_port(MPI_INFO_NULL, portName);
483          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
484        }
485
486        MPI_Comm intraComm=serverComm ;
487        MPI_Comm interComm ;
488        for(int i=0 ; i<clientsCodeId.size(); i++)
489        { 
490          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
491          xios::MPI_Intercomm_merge(interComm,false, &intraComm ) ;
492        }
493*/     
494    }
495
496   /*!
497    * Root process is listening for an order sent by client to call "oasis_enddef".
498    * The root client of a compound send the order (tag 5). It is probed and received.
499    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
500    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
501    */
502   
503     void CServer::listenOasisEnddef(void)
504     {
505        int flag ;
506        MPI_Status status ;
507        list<MPI_Comm>::iterator it;
508        int msg ;
509        static int nbCompound=0 ;
510        int size ;
511        static bool sent=false ;
512        static MPI_Request* allRequests ;
513        static MPI_Status* allStatus ;
514
515
516        if (sent)
517        {
518          MPI_Comm_size(intraComm_,&size) ;
519          MPI_Testall(size,allRequests, &flag, allStatus) ;
520          if (flag==true)
521          {
522            delete [] allRequests ;
523            delete [] allStatus ;
524            sent=false ;
525          }
526        }
527       
528
529        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
530        {
531           MPI_Status status ;
532           traceOff() ;
533           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
534           traceOn() ;
535           if (flag==true)
536           {
537              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
538              nbCompound++ ;
539              if (nbCompound==interCommLeft.size())
540              {
541                MPI_Comm_size(intraComm_,&size) ;
542                allRequests= new MPI_Request[size] ;
543                allStatus= new MPI_Status[size] ;
544                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
545                sent=true ;
546              }
547           }
548        }
549}
550     
551   /*!
552    * Processes probes message from root process if oasis_enddef call must be done.
553    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
554    */
555     void CServer::listenRootOasisEnddef(void)
556     {
557       int flag ;
558       MPI_Status status ;
559       const int root=0 ;
560       int msg ;
561       static bool eventSent=false ;
562
563       if (eventSent)
564       {
565         std::hash<string> hashString;
566         size_t hashId = hashString("oasis_enddef");
567
568         if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(0,hashId))
569         {
570           CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
571           driver_->endSynchronizedDefinition() ;
572           eventSent=false ;
573         }
574       }
575         
576       traceOff() ;
577       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
578       traceOn() ;
579       if (flag==true)
580       {
581           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
582           std::hash<string> hashString;
583           size_t hashId = hashString("oasis_enddef");
584           CXios::getPoolRessource()->getEventScheduler()->registerEvent(0,hashId);
585           eventSent=true ;
586       }
587     }
588
589    void CServer::finalize(void)
590    {
591      CTimer::get("XIOS").suspend() ;
592      CTimer::get("XIOS server").suspend() ;
593      delete eventScheduler ;
594
595      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
596        xios::MPI_Comm_free(&(*it));
597
598      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
599        xios::MPI_Comm_free(&(*it));
600
601        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
602          xios::MPI_Comm_free(&(*it));
603
604//      xios::MPI_Comm_free(&intraComm);
605      CXios::finalizeDaemonsManager();
606      finalizeServersRessource();
607     
608      CContext::removeAllContexts() ; // free memory for related context
609     
610      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
611      MPI_Comm xiosComm=CXios::getXiosComm() ;
612      xios::MPI_Comm_free(&xiosComm) ;
613      CMemChecker::logMem( "CServer::finalize", true );
614     
615      CCommTrack::dumpComm() ;
616
617      if (!is_MPI_Initialized)
618      {
619        if (CXios::usingOasis) delete driver_;
620        MPI_Finalize() ;
621      }
622      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
623      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
624      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
625      report(0)<< "lock exclusive : accumulated time : "<<CTimer::get("lock exclusive").getCumulatedTime()
626                                                        <<" --- call number : "<<CTimer::get("lock exclusive").getNumCall()
627                                                        <<" --- average time : "<<CTimer::get("lock exclusive").getAverageTime()<<endl ;
628      report(0)<< "lock shared : accumulated time : "<<CTimer::get("lock shared").getCumulatedTime()
629                                                        <<" --- call number : "<<CTimer::get("lock shared").getNumCall()
630                                                        <<" --- average time : "<<CTimer::get("lock shared").getAverageTime()<<endl ;
631      report(0)<< "unlock exclusive : accumulated time : "<<CTimer::get("unlock exclusive").getCumulatedTime()
632                                                        <<" --- call number : "<<CTimer::get("unlock exclusive").getNumCall()
633                                                        <<" --- average time : "<<CTimer::get("unlock exclusive").getAverageTime()<<endl ;
634      report(0)<< "unlock shared : accumulated time : "<<CTimer::get("unlock shared").getCumulatedTime()
635                                                        <<" --- call number : "<<CTimer::get("unlock shared").getNumCall()
636                                                        <<" --- average time : "<<CTimer::get("unlock shared").getAverageTime()<<endl ;
637
638      if (info.isActive(logProfile))
639      {
640        printProfile();
641      }
642     
643      if (info.isActive(logTimers)) report(0)<<"\n"<<CTimer::getAllCumulatedTime()<<endl ;
644      if (CXios::reportMemory)
645      {
646        report(100)<<CMemChecker::getAllCumulatedMem()<<endl ;
647      }
648     
649      CWorkflowGraph::drawWorkFlowGraph_server();
650      xios::releaseStaticAllocation() ; // free memory from static allocation
651    }
652   
653    void CServer::printProfile()
654    {
655      list< pair<string,int> > timer_name;
656      timer_name.push_back({"XIOS server",0});
657      timer_name.push_back({"XIOS initialize",0});
658      timer_name.push_back({"XIOS event loop",0});
659      //timer_name.push_back({"Recv event loop (p2p)",1});      // timer concerned by yield and thread (if reader embedded)
660      //timer_name.push_back({"Recv event loop (legacy)",1});   // timer concerned by yield and thread
661      timer_name.push_back({"Process events",2});
662      timer_name.push_back({"Context : close definition",3});
663      timer_name.push_back({"Reader workflow data entry",3});
664      timer_name.push_back({"Files : reading data",4});
665      //timer_name.push_back({"Field : send data (read)",4});   // timer concerned by yield and thread
666      timer_name.push_back({"Server workflow data entry",3});
667      timer_name.push_back({"Server workflow",3});
668      timer_name.push_back({"Applying filters",4});
669      timer_name.push_back({"Transformation transfers",5});
670      timer_name.push_back({"Transformation MPI",6});
671      timer_name.push_back({"Temporal filters",5});
672      timer_name.push_back({"Field : send data",4});
673      //timer_name.push_back({"Scatter event",5});              // timer concerned by yield and thread
674      //timer_name.push_back({"Blocking time",6});              // timer concerned by yield and thread
675      timer_name.push_back({"Files : create headers",4});
676      timer_name.push_back({"Files : writing data",4});
677      timer_name.push_back({"Context finalize",3});             // timer concerned by yield and thread
678      timer_name.push_back({"Files : close",4});
679     
680      report(0)<< endl;
681      double total_time = CTimer::get("Process events").getCumulatedTime();
682      for(auto it_timer_name = timer_name.begin(); it_timer_name != timer_name.end(); it_timer_name++)
683      {
684        double timer_time = CTimer::get(it_timer_name->first).getCumulatedTime();
685        if ( timer_time / total_time > 0.001 )
686        {
687          ostringstream printed_line;
688          printed_line << setprecision(3) << std::fixed;
689          for(int itab=0;itab<it_timer_name->second;itab++)
690              printed_line << "  ";
691          printed_line << it_timer_name->first << " : " << timer_time <<endl;
692          string string_line = printed_line.str();
693          report(0)<< string_line;
694        }
695      }
696    }
697
698    /*!
699    * Open a file specified by a suffix and an extension and use it for the given file buffer.
700    * The file name will be suffix+rank+extension.
701    *
702    * \param fileName[in] protype file name
703    * \param ext [in] extension of the file
704    * \param fb [in/out] the file buffer
705    */
706    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
707    {
708      StdStringStream fileNameServer;
709      int numDigit = 0;
710      int commSize = 0;
711      int commRank ;
712      int id;
713     
714      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
715      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
716
717      while (commSize)
718      {
719        commSize /= 10;
720        ++numDigit;
721      }
722      id = commRank;
723
724      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
725      fb->open(fileNameServer.str().c_str(), std::ios::out);
726      if (!fb->is_open())
727        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
728              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
729    }
730
731    /*!
732    * \brief Open a file stream to write the info logs
733    * Open a file stream with a specific file name suffix+rank
734    * to write the info logs.
735    * \param fileName [in] protype file name
736    */
737    void CServer::openInfoStream(const StdString& fileName)
738    {
739      std::filebuf* fb = m_infoStream.rdbuf();
740      openStream(fileName, ".out", fb);
741
742      info.write2File(fb);
743      report.write2File(fb);
744    }
745
746    //! Write the info logs to standard output
747    void CServer::openInfoStream()
748    {
749      info.write2StdOut();
750      report.write2StdOut();
751    }
752
753    //! Close the info logs file if it opens
754    void CServer::closeInfoStream()
755    {
756      if (m_infoStream.is_open()) m_infoStream.close();
757    }
758
759    /*!
760    * \brief Open a file stream to write the error log
761    * Open a file stream with a specific file name suffix+rank
762    * to write the error log.
763    * \param fileName [in] protype file name
764    */
765    void CServer::openErrorStream(const StdString& fileName)
766    {
767      std::filebuf* fb = m_errorStream.rdbuf();
768      openStream(fileName, ".err", fb);
769
770      error.write2File(fb);
771    }
772
773    //! Write the error log to standard error output
774    void CServer::openErrorStream()
775    {
776      error.write2StdErr();
777    }
778
779    //! Close the error log file if it opens
780    void CServer::closeErrorStream()
781    {
782      if (m_errorStream.is_open()) m_errorStream.close();
783    }
784
785    void CServer::launchServersRessource(MPI_Comm serverComm)
786    {
787      serversRessource_ = new CServersRessource(serverComm) ;
788    }
789
790    void  CServer::finalizeServersRessource(void) 
791    { 
792      delete serversRessource_; serversRessource_=nullptr ;
793    }
794}
Note: See TracBrowser for help on using the repository browser.