source: XIOS3/trunk/src/server.cpp @ 2551

Last change on this file since 2551 was 2547, checked in by ymipsl, 10 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 25.7 KB
RevLine 
[490]1#include "globalScopeData.hpp"
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "server.hpp"
[983]5#include "client.hpp"
[300]6#include "type.hpp"
7#include "context.hpp"
[352]8#include "object_template.hpp"
[300]9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[2418]15#include "mem_checker.hpp"
[492]16#include "event_scheduler.hpp"
[1587]17#include "string_tools.hpp"
[1761]18#include "ressources_manager.hpp"
19#include "services_manager.hpp"
20#include "contexts_manager.hpp"
21#include "servers_ressource.hpp"
[2333]22#include "services.hpp"
[2458]23#include "pool_node.hpp"
[1761]24#include <cstdio>
[2146]25#include "workflow_graph.hpp"
[2274]26#include "release_static_allocation.hpp"
[2547]27#include "thread_manager.hpp"
[2333]28#include <sys/stat.h>
29#include <unistd.h>
[300]30
[1761]31
[2274]32
[335]33namespace xios
[490]34{
[2332]35    MPI_Comm CServer::intraComm_ ;
[1761]36    MPI_Comm CServer::serversComm_ ;
[1639]37    std::list<MPI_Comm> CServer::interCommLeft ;
38    std::list<MPI_Comm> CServer::interCommRight ;
39    std::list<MPI_Comm> CServer::contextInterComms;
40    std::list<MPI_Comm> CServer::contextIntraComms;
[1021]41    int CServer::serverLevel = 0 ;
[1148]42    int CServer::nbContexts = 0;
[983]43    bool CServer::isRoot = false ;
[1077]44    int CServer::rank_ = INVALID_RANK;
[490]45    StdOFStream CServer::m_infoStream;
[523]46    StdOFStream CServer::m_errorStream;
[490]47    map<string,CContext*> CServer::contextList ;
[1152]48    vector<int> CServer::sndServerGlobalRanks;
[300]49    bool CServer::finished=false ;
50    bool CServer::is_MPI_Initialized ;
[597]51    CEventScheduler* CServer::eventScheduler = 0;
[1761]52    CServersRessource* CServer::serversRessource_=nullptr ;
[2335]53    CThirdPartyDriver* CServer::driver_ =nullptr ;
[983]54
[1765]55       
[1761]56    void CServer::initialize(void)
57    {
58     
59      MPI_Comm serverComm ;
60      int initialized ;
61      MPI_Initialized(&initialized) ;
62      if (initialized) is_MPI_Initialized=true ;
63      else is_MPI_Initialized=false ;
64      MPI_Comm globalComm=CXios::getGlobalComm() ;
65      /////////////////////////////////////////
66      ///////////// PART 1 ////////////////////
67      /////////////////////////////////////////
68      // don't use OASIS
69      if (!CXios::usingOasis)
70      {
[2547]71        if (!is_MPI_Initialized) 
72        {
73          int required = MPI_THREAD_SERIALIZED ;
74          int provided ;
75          MPI_Init_thread(NULL,NULL, required, &provided) ;
76        }
[1761]77       
78        // split the global communicator
79        // get hash from all model to attribute a unique color (int) and then split to get client communicator
80        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
81         
82        int commRank, commSize ;
83        MPI_Comm_rank(globalComm,&commRank) ;
84        MPI_Comm_size(globalComm,&commSize) ;
85
86        std::hash<string> hashString ;
87        size_t hashServer=hashString(CXios::xiosCodeId) ;
88         
89        size_t* hashAll = new size_t[commSize] ;
[2242]90        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
[1761]91         
92        int color=0 ;
[2242]93        map<size_t,int> listHash ;
94        for(int i=0 ; i<=commSize ; i++) 
95          if (listHash.count(hashAll[i])==0) 
[1761]96          {
[2242]97            listHash[hashAll[i]]=color ;
[1761]98            color=color+1 ;
99          }
[2242]100        color=listHash[hashServer] ;
[1761]101        delete[] hashAll ;
102
103        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
104      }
105      else // using OASIS
106      {
[2547]107       
108        if (!is_MPI_Initialized) 
109        {
110          int required = MPI_THREAD_SERIALIZED ;
111          int provided ;
112          MPI_Init_thread(NULL,NULL, required, &provided) ;
113        }
[1761]114
[2547]115        driver_ = new CThirdPartyDriver();
116
[2335]117        driver_->getComponentCommunicator( serverComm );
[1761]118      }
[2334]119      MPI_Comm_dup(serverComm, &intraComm_);
120     
[2290]121      CTimer::get("XIOS").resume() ;
[2437]122      CTimer::get("XIOS server").resume() ;
[2290]123      CTimer::get("XIOS initialize").resume() ;
[1761]124 
125      /////////////////////////////////////////
126      ///////////// PART 2 ////////////////////
127      /////////////////////////////////////////
128     
129
130      // Create the XIOS communicator for every process which is related
131      // to XIOS, as well on client side as on server side
132      MPI_Comm xiosGlobalComm ;
133      string strIds=CXios::getin<string>("clients_code_id","") ;
134      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
135      if (strIds.empty())
136      {
137        // no code Ids given, suppose XIOS initialisation is global           
138        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
139        MPI_Comm splitComm,interComm ;
140        MPI_Comm_rank(globalComm,&commGlobalRank) ;
141        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
142        MPI_Comm_rank(splitComm,&commRank) ;
143        if (commRank==0) serverLeader=commGlobalRank ;
144        else serverLeader=0 ;
145        clientLeader=0 ;
146        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
147        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
148        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
149        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
150        CXios::setXiosComm(xiosGlobalComm) ;
151      }
152      else
153      {
154
155        xiosGlobalCommByFileExchange(serverComm) ;
156
157      }
158     
159      /////////////////////////////////////////
160      ///////////// PART 4 ////////////////////
161      //  create servers intra communicator  //
162      /////////////////////////////////////////
163     
164      int commRank ;
165      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
166      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
167     
168      CXios::setUsingServer() ;
169
170      /////////////////////////////////////////
171      ///////////// PART 5 ////////////////////
172      //       redirect files output         //
173      /////////////////////////////////////////
174     
175      CServer::openInfoStream(CXios::serverFile);
176      CServer::openErrorStream(CXios::serverFile);
177
[2418]178      CMemChecker::logMem( "CServer::initialize" );
179
[1761]180      /////////////////////////////////////////
181      ///////////// PART 4 ////////////////////
182      /////////////////////////////////////////
183
184      CXios::launchDaemonsManager(true) ;
185     
186      /////////////////////////////////////////
187      ///////////// PART 5 ////////////////////
188      /////////////////////////////////////////
189
190      // create the services
191
192      auto ressourcesManager=CXios::getRessourcesManager() ;
193      auto servicesManager=CXios::getServicesManager() ;
194      auto contextsManager=CXios::getContextsManager() ;
195      auto daemonsManager=CXios::getDaemonsManager() ;
196      auto serversRessource=CServer::getServersRessource() ;
197
[2333]198      int rank;
199      MPI_Comm_rank(intraComm_, &rank) ;
200      if (rank==0) isRoot=true;
201      else isRoot=false;
202
[1761]203      if (serversRessource->isServerLeader())
204      {
[2458]205        // creating pool
206        CPoolNodeGroup::get("xios","pool_definition")->solveDescInheritance(true) ;
207        vector<CPoolNode*> pools = CPoolNodeGroup::get("xios","pool_definition")->getAllChildren();
208        for(auto& pool : pools) pool->allocateRessources() ;
209       
210        int nbRessources = ressourcesManager->getFreeRessourcesSize() ;
211        if (nbRessources>0)
[1761]212        {
[2458]213          if (!CXios::usingServer2)
214          {
215            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
[2547]216            if (CThreadManager::isUsingThreads()) 
217              while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 
218              {
219                daemonsManager->eventLoop() ;
220                CThreadManager::yield() ;
221              }
222            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
223         
[2458]224            servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ;
[2547]225            if (CThreadManager::isUsingThreads()) 
226              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,0)) 
227              {
228                daemonsManager->eventLoop() ;
229                CThreadManager::yield() ;
230              }
231            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ;
232           
[2458]233            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ;
[2547]234            if (CThreadManager::isUsingThreads()) 
235            {
236              daemonsManager->eventLoop() ;
237              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) CThreadManager::yield() ;
238            }
239            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ;
[2458]240          }
241          else
242          {
243            int nprocsServer = nbRessources*CXios::ratioServer2/100.;
244            int nprocsGatherer = nbRessources - nprocsServer ;
[1761]245         
[2458]246            int nbPoolsServer2 = CXios::nbPoolsServer2 ;
247            if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
248            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
[2547]249            if (CThreadManager::isUsingThreads()) 
250              while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 
251              {
252                daemonsManager->eventLoop() ;
253                CThreadManager::yield() ;
254              }
255            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ;
256
[2458]257            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
[2547]258            if (CThreadManager::isUsingThreads()) 
259              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultGathererId,0)) 
260              {
261                daemonsManager->eventLoop() ;
262                CThreadManager::yield() ;
263              }
264            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ;
265
[2458]266            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ;
[2547]267            if (CThreadManager::isUsingThreads()) 
268              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) 
269              {
270                daemonsManager->eventLoop() ;
271                CThreadManager::yield() ;
272              }
273            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ;
274           
[2458]275            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ;
[2547]276            if (CThreadManager::isUsingThreads())
277              for(int i=0; i<nbPoolsServer2; i++)
278                while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,i)) 
279                {
280                  daemonsManager->eventLoop() ;
281                  CThreadManager::yield() ;
282                }
283            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ;
[2458]284          }
[1761]285        }
[2407]286//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
[1761]287      }
[2523]288
289      MPI_Request req ;
290      MPI_Status status ;
291      MPI_Ibarrier(getServersRessource()->getCommunicator(),&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
292      int ok=false ;
293      while (!ok)
294      {
295        daemonsManager->eventLoop() ;
[2547]296        if (CThreadManager::isUsingThreads()) CThreadManager::yield();
[2523]297        MPI_Test(&req,&ok,&status) ;
298      }
299
300
[2547]301      //testingEventScheduler() ;
[2458]302/*
303      MPI_Request req ;
304      MPI_Status status ;
[2523]305      MPI_Ibarrier(CXios::getXiosComm(),&req) ; // be sure that all services are created now, could be remove later if more asynchronisity
[2458]306      int ok=false ;
307      while (!ok)
308      {
309        daemonsManager->eventLoop() ;
310        MPI_Test(&req,&ok,&status) ;
311      }
312*/
[2242]313      CTimer::get("XIOS initialize").suspend() ;
[1761]314
315      /////////////////////////////////////////
316      ///////////// PART 5 ////////////////////
317      /////////////////////////////////////////
318      // loop on event loop
319
320      bool finished=false ;
[2242]321      CTimer::get("XIOS event loop").resume() ;
322
[1761]323      while (!finished)
324      {
325        finished=daemonsManager->eventLoop() ;
[2547]326        if (CThreadManager::isUsingThreads()) CThreadManager::yield() ;
[1761]327      }
[2242]328      CTimer::get("XIOS event loop").suspend() ;
[2243]329
330      // Delete CContext
[2274]331      //CObjectTemplate<CContext>::cleanStaticDataStructure();
[1761]332    }
333
334
[2523]335    void CServer::testingEventScheduler(void)
336    {
337      CXios::getPoolRessource()->getEventScheduler()->registerEvent(1,10) ;
338      CXios::getPoolRessource()->getEventScheduler()->registerEvent(2,10) ;
339      if (CXios::getPoolRessource()->hasService(CXios::defaultGathererId,0))
340      {
341        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(1,100) ;
342        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(2,100) ;
343        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(3,100) ;
344      }
345      if (CXios::getPoolRessource()->hasService(CXios::defaultWriterId,0))
346      {
347        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(1,1000) ;
348        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(2,1000) ;
349      }
350      CXios::getPoolRessource()->getEventScheduler()->registerEvent(3,10) ;
351      CXios::getPoolRessource()->getEventScheduler()->registerEvent(4,10) ;
352     
353      if (CXios::getPoolRessource()->hasService(CXios::defaultGathererId,0))
354      {
355        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(4,100) ;
356        CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)-> getEventScheduler()->registerEvent(5,100) ;
357      }
358      if (CXios::getPoolRessource()->hasService(CXios::defaultWriterId,0))
359      {
360        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(3,1000) ;
361        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(4,1000) ;
362        CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)-> getEventScheduler()->registerEvent(5,1000) ;
363      }
364      CXios::getPoolRessource()->getEventScheduler()->registerEvent(5,10) ;
365      CXios::getPoolRessource()->getEventScheduler()->registerEvent(6,10) ;
366     
367      int numEvents=0 ;
368      int poolEvent=1 ;
369      int gatherEvent=1 ;
370      int writerEvent=1 ;
371      do
372      {
373        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(poolEvent,10))
374        {
375          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
376          MPI_Barrier(CXios::getPoolRessource()->getCommunicator());
377          poolEvent++ ;
378          numEvents++;
379        }
380       
381        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(gatherEvent,100))
382        {
383          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
384          MPI_Barrier(CXios::getPoolRessource()->getService(CXios::defaultGathererId,0)->getCommunicator());
385          gatherEvent++ ;
386          numEvents++;
387        }
[1761]388
[2523]389        if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(writerEvent,1000))
390        {
391          CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
392          MPI_Barrier(CXios::getPoolRessource()->getService(CXios::defaultWriterId,0)->getCommunicator());
393          writerEvent++ ;
394          numEvents++;
395        }
[1761]396
[2523]397       
398      } while (numEvents!=11) ;
[1761]399
[2523]400    }
401
402
[1761]403    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
404    {
405       
406      MPI_Comm globalComm=CXios::getGlobalComm() ;
407      MPI_Comm xiosGlobalComm ;
408     
409      string strIds=CXios::getin<string>("clients_code_id","") ;
410      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
411     
412      int commRank, globalRank ;
413      MPI_Comm_rank(serverComm, &commRank) ;
414      MPI_Comm_rank(globalComm, &globalRank) ;
415      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
416
417      if (commRank==0) // if root process publish name
418      { 
419        std::ofstream ofs (serverFileName, std::ofstream::out);
420        ofs<<globalRank ;
421        ofs.close();
422      }
423       
424      vector<int> clientsRank(clientsCodeId.size()) ;
425      for(int i=0;i<clientsRank.size();i++)
426      {
427        std::ifstream ifs ;
428        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
[2333]429        struct stat buffer;
430        do {
431        } while( stat(fileName.c_str(), &buffer) != 0 );
432        sleep(1);
433        ifs.open(fileName, ifstream::in) ;
[1761]434        ifs>>clientsRank[i] ;
[2333]435        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
[1761]436        ifs.close() ; 
437      }
438
439      MPI_Comm intraComm ;
440      MPI_Comm_dup(serverComm,&intraComm) ;
441      MPI_Comm interComm ;
442      for(int i=0 ; i<clientsRank.size(); i++)
443      { 
444        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
[2333]445        interCommLeft.push_back(interComm) ;
[1761]446        MPI_Comm_free(&intraComm) ;
447        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
448      }
449      xiosGlobalComm=intraComm ; 
450      MPI_Barrier(xiosGlobalComm);
451      if (commRank==0) std::remove(serverFileName.c_str()) ;
452      MPI_Barrier(xiosGlobalComm);
453
454      CXios::setXiosComm(xiosGlobalComm) ;
455     
456    }
457
458
459    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
460    {
461        // untested, need to be tested on a true MPI-2 compliant library
462
463        // try to discover other client/server
464/*
465        // publish server name
466        char portName[MPI_MAX_PORT_NAME];
467        int ierr ;
468        int commRank ;
469        MPI_Comm_rank(serverComm, &commRank) ;
470       
471        if (commRank==0) // if root process publish name
472        { 
473          MPI_Open_port(MPI_INFO_NULL, portName);
474          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
475        }
476
477        MPI_Comm intraComm=serverComm ;
478        MPI_Comm interComm ;
479        for(int i=0 ; i<clientsCodeId.size(); i++)
480        { 
481          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
482          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
483        }
484*/     
485    }
486
[2333]487   /*!
488    * Root process is listening for an order sent by client to call "oasis_enddef".
489    * The root client of a compound send the order (tag 5). It is probed and received.
490    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
491    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
492    */
493   
494     void CServer::listenOasisEnddef(void)
495     {
496        int flag ;
497        MPI_Status status ;
498        list<MPI_Comm>::iterator it;
499        int msg ;
500        static int nbCompound=0 ;
501        int size ;
502        static bool sent=false ;
503        static MPI_Request* allRequests ;
504        static MPI_Status* allStatus ;
[490]505
[2333]506
507        if (sent)
508        {
509          MPI_Comm_size(intraComm_,&size) ;
510          MPI_Testall(size,allRequests, &flag, allStatus) ;
511          if (flag==true)
512          {
513            delete [] allRequests ;
514            delete [] allStatus ;
515            sent=false ;
516          }
517        }
518       
519
520        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
521        {
522           MPI_Status status ;
523           traceOff() ;
524           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
525           traceOn() ;
526           if (flag==true)
527           {
528              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
529              nbCompound++ ;
530              if (nbCompound==interCommLeft.size())
531              {
532                MPI_Comm_size(intraComm_,&size) ;
533                allRequests= new MPI_Request[size] ;
534                allStatus= new MPI_Status[size] ;
535                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
536                sent=true ;
537              }
538           }
539        }
540}
541     
542   /*!
543    * Processes probes message from root process if oasis_enddef call must be done.
544    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
545    */
546     void CServer::listenRootOasisEnddef(void)
547     {
548       int flag ;
549       MPI_Status status ;
550       const int root=0 ;
551       int msg ;
552       static bool eventSent=false ;
553
554       if (eventSent)
555       {
556         boost::hash<string> hashString;
557         size_t hashId = hashString("oasis_enddef");
[2523]558
559         if (CXios::getPoolRessource()->getEventScheduler()->queryEvent(0,hashId))
[2333]560         {
[2523]561           CXios::getPoolRessource()->getEventScheduler()->popEvent() ;
[2335]562           driver_->endSynchronizedDefinition() ;
[2333]563           eventSent=false ;
564         }
565       }
566         
567       traceOff() ;
568       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
569       traceOn() ;
570       if (flag==true)
571       {
572           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
573           boost::hash<string> hashString;
574           size_t hashId = hashString("oasis_enddef");
[2523]575           CXios::getPoolRessource()->getEventScheduler()->registerEvent(0,hashId);
[2333]576           eventSent=true ;
577       }
578     }
579
[300]580    void CServer::finalize(void)
581    {
[361]582      CTimer::get("XIOS").suspend() ;
[2399]583      CTimer::get("XIOS server").suspend() ;
[492]584      delete eventScheduler ;
[655]585
[1639]586      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
587        MPI_Comm_free(&(*it));
[983]588
[1639]589      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
590        MPI_Comm_free(&(*it));
[1071]591
[1639]592        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
593          MPI_Comm_free(&(*it));
[992]594
[1761]595//      MPI_Comm_free(&intraComm);
[2266]596      CXios::finalizeDaemonsManager();
[2274]597      finalizeServersRessource();
[1764]598     
[2274]599      CContext::removeAllContexts() ; // free memory for related context
600         
[2310]601      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
602
[2420]603      CMemChecker::logMem( "CServer::finalize", true );
[300]604      if (!is_MPI_Initialized)
[490]605      {
[2335]606        if (CXios::usingOasis) delete driver_;
[1639]607        else MPI_Finalize() ;
[300]608      }
[347]609      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
610      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
611      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
[1158]612      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
[2535]613      if (CXios::reportMemory)
614      {
615        report(100)<<CMemChecker::getAllCumulatedMem()<<endl ;
616      }
[2274]617     
[2146]618      CWorkflowGraph::drawWorkFlowGraph_server();
[2274]619      xios::releaseStaticAllocation() ; // free memory from static allocation
[300]620    }
[490]621
[523]622    /*!
623    * Open a file specified by a suffix and an extension and use it for the given file buffer.
624    * The file name will be suffix+rank+extension.
625    *
626    * \param fileName[in] protype file name
627    * \param ext [in] extension of the file
628    * \param fb [in/out] the file buffer
629    */
630    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
631    {
[1761]632      StdStringStream fileNameServer;
[523]633      int numDigit = 0;
[1761]634      int commSize = 0;
635      int commRank ;
[1021]636      int id;
[1761]637     
638      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
639      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
640
641      while (commSize)
[523]642      {
[1761]643        commSize /= 10;
[523]644        ++numDigit;
645      }
[1761]646      id = commRank;
[497]647
[1761]648      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
649      fb->open(fileNameServer.str().c_str(), std::ios::out);
[523]650      if (!fb->is_open())
651        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
[1761]652              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
[523]653    }
[490]654
[523]655    /*!
656    * \brief Open a file stream to write the info logs
657    * Open a file stream with a specific file name suffix+rank
658    * to write the info logs.
659    * \param fileName [in] protype file name
660    */
661    void CServer::openInfoStream(const StdString& fileName)
662    {
663      std::filebuf* fb = m_infoStream.rdbuf();
664      openStream(fileName, ".out", fb);
[490]665
[523]666      info.write2File(fb);
667      report.write2File(fb);
668    }
[490]669
[523]670    //! Write the info logs to standard output
671    void CServer::openInfoStream()
672    {
673      info.write2StdOut();
674      report.write2StdOut();
675    }
[490]676
[523]677    //! Close the info logs file if it opens
678    void CServer::closeInfoStream()
679    {
680      if (m_infoStream.is_open()) m_infoStream.close();
681    }
682
683    /*!
684    * \brief Open a file stream to write the error log
685    * Open a file stream with a specific file name suffix+rank
686    * to write the error log.
687    * \param fileName [in] protype file name
688    */
689    void CServer::openErrorStream(const StdString& fileName)
690    {
691      std::filebuf* fb = m_errorStream.rdbuf();
692      openStream(fileName, ".err", fb);
693
694      error.write2File(fb);
695    }
696
697    //! Write the error log to standard error output
698    void CServer::openErrorStream()
699    {
700      error.write2StdErr();
701    }
702
703    //! Close the error log file if it opens
704    void CServer::closeErrorStream()
705    {
706      if (m_errorStream.is_open()) m_errorStream.close();
707    }
[1761]708
709    void CServer::launchServersRessource(MPI_Comm serverComm)
710    {
711      serversRessource_ = new CServersRessource(serverComm) ;
712    }
[2274]713
714    void  CServer::finalizeServersRessource(void) 
715    { 
716      delete serversRessource_; serversRessource_=nullptr ;
717    }
[300]718}
Note: See TracBrowser for help on using the repository browser.