source: XIOS2/dev/hshepherd/reduce_output_log/src/server.cpp @ 2704

Last change on this file since 2704 was 2704, checked in by hshepherd, 3 weeks ago

Port across changes from the patching - Not yet tested

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 35.6 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "mem_checker.hpp"
16#include "timestats.hpp"
17#include "event_scheduler.hpp"
18#include "string_tools.hpp"
19
20namespace xios
21{
22    MPI_Comm CServer::intraComm ;
23    std::list<MPI_Comm> CServer::interCommLeft ;
24    std::list<MPI_Comm> CServer::interCommRight ;
25    std::list<MPI_Comm> CServer::contextInterComms;
26    std::list<MPI_Comm> CServer::contextIntraComms;
27    int CServer::serverLevel = 0 ;
28    int CServer::nbContexts = 0;
29    bool CServer::isRoot = false ;
30    int CServer::rank_ = INVALID_RANK;
31    StdOFStream CServer::m_infoStream;
32    StdOFStream CServer::m_errorStream;
33    map<string,CContext*> CServer::contextList ;
34    vector<int> CServer::sndServerGlobalRanks;
35    bool CServer::finished=false ;
36    bool CServer::is_MPI_Initialized ;
37    bool CServer::writeLogFromRank;
38    CEventScheduler* CServer::eventScheduler = 0;
39
40//---------------------------------------------------------------
41/*!
42 * \fn void CServer::initialize(void)
43 * Creates intraComm for each possible type of servers (classical, primary or secondary).
44 * Creates interComm and stores them into the following lists:
45 *   classical server -- interCommLeft
46 *   primary server -- interCommLeft and interCommRight
47 *   secondary server -- interCommLeft for each pool.
48 *   IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead.
49 */
50    void CServer::initialize(void)
51    {
52      int initialized ;
53      MPI_Initialized(&initialized) ;
54      if (initialized) is_MPI_Initialized=true ;
55      else is_MPI_Initialized=false ;
56      int rank ;
57
58      // Not using OASIS
59      if (!CXios::usingOasis)
60      {
61
62        if (!is_MPI_Initialized)
63        {
64          MPI_Init(NULL, NULL);
65        }
66        CTimer::get("XIOS").resume() ;
67
68        boost::hash<string> hashString ;
69        unsigned long hashServer = hashString(CXios::xiosCodeId);
70
71        unsigned long* hashAll ;
72        unsigned long* srvLevelAll ;
73
74        int size ;
75        int myColor ;
76        int i,c ;
77        MPI_Comm newComm;
78
79        MPI_Comm_size(CXios::globalComm, &size) ;
80        MPI_Comm_rank(CXios::globalComm, &rank_);
81
82        hashAll=new unsigned long[size] ;
83        MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ;
84
85        map<unsigned long, int> colors ;
86        map<unsigned long, int> leaders ;
87        map<unsigned long, int>::iterator it ;
88
89        // (1) Establish client leaders, distribute processes between two server levels
90        std::vector<int> srvRanks;
91        for(i=0,c=0;i<size;i++)
92        {
93          if (colors.find(hashAll[i])==colors.end())
94          {
95            colors[hashAll[i]]=c ;
96            leaders[hashAll[i]]=i ;
97            c++ ;
98          }
99          if (CXios::usingServer2)
100            if (hashAll[i] == hashServer)
101              srvRanks.push_back(i);
102        }
103
104        if (CXios::usingServer2)
105        {
106          int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.;
107          if (reqNbProc<1 || reqNbProc==srvRanks.size())
108          {
109            error(0)<<"WARNING: void CServer::initialize(void)"<<endl
110                << "It is impossible to dedicate the requested number of processes = "<<reqNbProc
111                <<" to secondary server. XIOS will run in the classical server mode."<<endl;
112          }
113          else
114          {
115            if (CXios::nbPoolsServer2 == 0) CXios::nbPoolsServer2 = reqNbProc;
116            int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ;
117            int poolLeader = firstSndSrvRank;
118//*********** (1) Comment out the line below to set one process per pool
119            sndServerGlobalRanks.push_back(srvRanks[poolLeader]);
120            int nbPools = CXios::nbPoolsServer2;
121            if ( nbPools > reqNbProc || nbPools < 1)
122            {
123              error(0)<<"WARNING: void CServer::initialize(void)"<<endl
124                  << "It is impossible to allocate the requested number of pools = "<<nbPools
125                  <<" on the secondary server. It will be set so that there is one process per pool."<<endl;
126              nbPools = reqNbProc;
127            }
128            int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools;
129            int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools;
130            for (i=0; i<srvRanks.size(); i++)
131            {
132              if (i >= firstSndSrvRank)
133              {
134                if (rank_ == srvRanks[i])
135                {
136                  serverLevel=2;
137                }
138                poolLeader += procsPerPool;
139                if (remainder != 0)
140                {
141                  ++poolLeader;
142                  --remainder;
143                }
144//*********** (2) Comment out the two lines below to set one process per pool
145                if (poolLeader < srvRanks.size())
146                  sndServerGlobalRanks.push_back(srvRanks[poolLeader]);
147//*********** (3) Uncomment the line below to set one process per pool
148//                sndServerGlobalRanks.push_back(srvRanks[i]);
149              }
150              else
151              {
152                if (rank_ == srvRanks[i]) serverLevel=1;
153              }
154            }
155            if (serverLevel==2)
156            {
157              info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ;
158              for (i=0; i<sndServerGlobalRanks.size(); i++)
159              {
160                if (rank_>= sndServerGlobalRanks[i])
161                {
162                  if ( i == sndServerGlobalRanks.size()-1)
163                  {
164                    myColor = colors.size() + sndServerGlobalRanks[i];
165                  }
166                  else if (rank_< sndServerGlobalRanks[i+1])
167                  {
168                    myColor = colors.size() + sndServerGlobalRanks[i];
169                    break;
170                  }
171                }
172              }
173            }
174          }
175        }
176
177        // (2) Create intraComm
178        if (serverLevel != 2) myColor=colors[hashServer];
179        MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ;
180
181        // (3) Create interComm
182        if (serverLevel == 0)
183        {
184          int clientLeader;
185          for(it=leaders.begin();it!=leaders.end();it++)
186          {
187            if (it->first!=hashServer)
188            {
189              clientLeader=it->second ;
190              int intraCommSize, intraCommRank ;
191              MPI_Comm_size(intraComm,&intraCommSize) ;
192              MPI_Comm_rank(intraComm,&intraCommRank) ;
193              info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize
194                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
195
196              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
197              interCommLeft.push_back(newComm) ;
198            }
199          }
200        }
201        else if (serverLevel == 1)
202        {
203          int clientLeader, srvSndLeader;
204          int srvPrmLeader ;
205
206          for (it=leaders.begin();it!=leaders.end();it++)
207          {
208            if (it->first != hashServer)
209            {
210              clientLeader=it->second ;
211              int intraCommSize, intraCommRank ;
212              MPI_Comm_size(intraComm, &intraCommSize) ;
213              MPI_Comm_rank(intraComm, &intraCommRank) ;
214              info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize
215                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
216              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
217              interCommLeft.push_back(newComm) ;
218            }
219          }
220
221          for (int i = 0; i < sndServerGlobalRanks.size(); ++i)
222          {
223            int intraCommSize, intraCommRank ;
224            MPI_Comm_size(intraComm, &intraCommSize) ;
225            MPI_Comm_rank(intraComm, &intraCommRank) ;
226            info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize
227                <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< sndServerGlobalRanks[i]<<endl ;
228            MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ;
229            interCommRight.push_back(newComm) ;
230          }
231        }
232        else
233        {
234          int clientLeader;
235          clientLeader = leaders[hashString(CXios::xiosCodeId)];
236          int intraCommSize, intraCommRank ;
237          MPI_Comm_size(intraComm, &intraCommSize) ;
238          MPI_Comm_rank(intraComm, &intraCommRank) ;
239          info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize
240                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
241
242          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ;
243          interCommLeft.push_back(newComm) ;
244        }
245
246        delete [] hashAll ;
247
248      }
249      // using OASIS
250      else
251      {
252        int size;
253        int myColor;
254        int* srvGlobalRanks;
255        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
256
257        CTimer::get("XIOS").resume() ;
258        MPI_Comm localComm;
259        oasis_get_localcomm(localComm);
260        MPI_Comm_rank(localComm,&rank_) ;
261
262//      (1) Create server intraComm
263        if (!CXios::usingServer2)
264        {
265          MPI_Comm_dup(localComm, &intraComm);
266        }
267        else
268        {
269          int globalRank;
270          MPI_Comm_size(localComm,&size) ;
271          MPI_Comm_rank(CXios::globalComm,&globalRank) ;
272          srvGlobalRanks = new int[size] ;
273          MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ;
274
275          int reqNbProc = size*CXios::ratioServer2/100.;
276          if (reqNbProc < 1 || reqNbProc == size)
277          {
278            error(0)<<"WARNING: void CServer::initialize(void)"<<endl
279                << "It is impossible to dedicate the requested number of processes = "<<reqNbProc
280                <<" to secondary server. XIOS will run in the classical server mode."<<endl;
281            MPI_Comm_dup(localComm, &intraComm);
282          }
283          else
284          {
285            int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ;
286            int poolLeader = firstSndSrvRank;
287//*********** (1) Comment out the line below to set one process per pool
288//            sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]);
289            int nbPools = CXios::nbPoolsServer2;
290            if ( nbPools > reqNbProc || nbPools < 1)
291            {
292              error(0)<<"WARNING: void CServer::initialize(void)"<<endl
293                  << "It is impossible to allocate the requested number of pools = "<<nbPools
294                  <<" on the secondary server. It will be set so that there is one process per pool."<<endl;
295              nbPools = reqNbProc;
296            }
297            int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools;
298            int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools;
299            for (int i=0; i<size; i++)
300            {
301              if (i >= firstSndSrvRank)
302              {
303                if (globalRank == srvGlobalRanks[i])
304                {
305                  serverLevel=2;
306                }
307                poolLeader += procsPerPool;
308                if (remainder != 0)
309                {
310                  ++poolLeader;
311                  --remainder;
312                }
313//*********** (2) Comment out the two lines below to set one process per pool
314//                if (poolLeader < size)
315//                  sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]);
316//*********** (3) Uncomment the line below to set one process per pool
317                sndServerGlobalRanks.push_back(srvGlobalRanks[i]);
318              }
319              else
320              {
321                if (globalRank == srvGlobalRanks[i]) serverLevel=1;
322              }
323            }
324            if (serverLevel==2)
325            {
326              info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ;
327              for (int i=0; i<sndServerGlobalRanks.size(); i++)
328              {
329                if (globalRank>= sndServerGlobalRanks[i])
330                {
331                  if (i == sndServerGlobalRanks.size()-1)
332                  {
333                    myColor = sndServerGlobalRanks[i];
334                  }
335                  else if (globalRank< sndServerGlobalRanks[i+1])
336                  {
337                    myColor = sndServerGlobalRanks[i];
338                    break;
339                  }
340                }
341              }
342            }
343            if (serverLevel != 2) myColor=0;
344            MPI_Comm_split(localComm, myColor, rank_, &intraComm) ;
345          }
346        }
347
348        string codesId=CXios::getin<string>("oasis_codes_id") ;
349        vector<string> oasisCodeId=splitRegex(codesId,"\\s*,\\s*") ;
350 
351        vector<string>::iterator it ;
352
353        MPI_Comm newComm ;
354        int globalRank ;
355        MPI_Comm_rank(CXios::globalComm,&globalRank);
356
357//      (2) Create interComms with models
358        for(it=oasisCodeId.begin();it!=oasisCodeId.end();it++)
359        {
360          oasis_get_intercomm(newComm,*it) ;
361          if ( serverLevel == 0 || serverLevel == 1)
362          {
363            interCommLeft.push_back(newComm) ;
364            if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
365          }
366        }
367
368//      (3) Create interComms between primary and secondary servers
369        int intraCommSize, intraCommRank ;
370        MPI_Comm_size(intraComm,&intraCommSize) ;
371        MPI_Comm_rank(intraComm, &intraCommRank) ;
372
373        if (serverLevel == 1)
374        {
375          for (int i = 0; i < sndServerGlobalRanks.size(); ++i)
376          {
377            int srvSndLeader = sndServerGlobalRanks[i];
378            info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize
379                <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvSndLeader<<endl ;
380            MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ;
381            interCommRight.push_back(newComm) ;
382          }
383        }
384        else if (serverLevel == 2)
385        {
386          info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize
387                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvGlobalRanks[0] <<endl ;
388          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ;
389          interCommLeft.push_back(newComm) ;
390        }
391        if (CXios::usingServer2) delete [] srvGlobalRanks ;
392
393        bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ;
394        if (!oasisEnddef) oasis_enddef() ;
395      }
396
397
398      MPI_Comm_rank(intraComm, &rank) ;
399      if (rank==0) isRoot=true;
400      else isRoot=false;
401
402      writeLogFromRank = determineWriteLogFromRank();
403     
404      eventScheduler = new CEventScheduler(intraComm) ;
405    }
406
407    std::vector<double> CServer::collate_timings(std::string timing_param)
408    {
409      int myrank, comm_size;
410      MPI_Comm_rank(intraComm, &myrank);
411      MPI_Comm_size(intraComm, &comm_size);
412
413      std::vector<double> collated_results;
414      MPI_Barrier(intraComm);
415      if (myrank == 0) {
416        double recv_val = 0.;
417        collated_results.push_back(CTimer::get(timing_param).getCumulatedTime());
418        for (int i = 1; i < comm_size; i++) {
419          MPI_Recv(&recv_val, 1, MPI_DOUBLE, i, 0, intraComm, MPI_STATUS_IGNORE);
420          collated_results.push_back(recv_val);
421        }
422      } else {
423        double snd_val = CTimer::get(timing_param).getCumulatedTime();
424        MPI_Send(&snd_val, 1, MPI_DOUBLE, 0, 0, intraComm);
425      }
426      MPI_Barrier(intraComm);
427      return collated_results;
428    }
429
430    void CServer::present_collated_timings(void)
431    {
432      std::vector<double> processing_times = collate_timings("Process events");
433      std::vector<double> xios_server_times = collate_timings("XIOS server");
434      std::vector<double> ratio = percentage_ratio_vec_double(processing_times, xios_server_times);
435      // writeLogFromRank gives the lead rank for L1 servers and server pools
436      if (getRank() == 0) {
437        int n_ranks;
438        MPI_Comm_size(intraComm, &n_ranks);
439        report(0) << " Performance metrics across the intraComm communicator for this rank." << endl;
440        report(0) << "     There are " << n_ranks << " ranks in this intraComm" << endl;
441        write_summary_timings(processing_times, "Processing events");
442        write_summary_timings(ratio, "Ratio (Percentage)");
443      }
444    }
445     
446
447    void CServer::write_summary_timings(std::vector<double>& collated_results,
448                                        std::string results_label)
449    {
450      report(0) << "  " << results_label << " average " << calc_mean_double(collated_results) << endl;
451      report(0) << "  " << results_label << " std dev " << calc_std_double(collated_results) << endl;
452      report(0) << "  " << results_label << " min " << calc_min_double(collated_results) << endl;
453      report(0) << "  " << results_label << " max " << calc_max_double(collated_results) << endl;
454    }
455 
456
457    void CServer::finalize(void)
458    {
459      present_collated_timings();
460      MPI_Barrier(intraComm);
461
462      CTimer::get("XIOS").suspend() ;
463     
464      delete eventScheduler ;
465
466      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
467        /* MPI_Comm_free(&(*it)) */; // WARNING remove freeing communicator !! --> deadlock raised, to be checked
468
469      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
470        MPI_Comm_free(&(*it));
471
472//      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)
473//        MPI_Comm_free(&(*it));
474
475//        for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++)
476//          MPI_Comm_free(&(*it));
477
478        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
479          /* MPI_Comm_free(&(*it)) */ ; // WARNING remove freeing communicator !! --> deadlock raised, to be checked
480
481      MPI_Comm_free(&intraComm);
482
483      CMemChecker::logMem( "CServer::finalize", true );
484      if (!is_MPI_Initialized)
485      {
486        if (CXios::usingOasis) oasis_finalize();
487        else MPI_Finalize() ;
488      }
489
490      if (CXios::reduceLogFiles) report(0) << "Performance summary from the first rank in this intraComm." << endl;
491
492      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
493      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
494      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
495      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
496      if (CXios::reportMemory)
497      {
498        report(100)<<CMemChecker::getAllCumulatedMem()<<endl ;
499      }
500
501    }
502
503     void CServer::eventLoop(void)
504     {
505       bool stop=false ;
506
507       CTimer::get("XIOS server").resume() ;
508       while(!stop)
509       {
510         if (isRoot)
511         {
512           listenContext();
513           listenRootContext();
514           listenOasisEnddef() ;
515           listenRootOasisEnddef() ;
516           if (!finished) listenFinalize() ;
517         }
518         else
519         {
520           listenRootContext();
521           listenRootOasisEnddef() ;
522           if (!finished) listenRootFinalize() ;
523         }
524
525         contextEventLoop() ;
526         if (finished && contextList.empty()) stop=true ;
527         eventScheduler->checkEvent() ;
528       }
529       CTimer::get("XIOS server").suspend() ;
530     }
531
532     void CServer::listenFinalize(void)
533     {
534        list<MPI_Comm>::iterator it, itr;
535        int msg ;
536        int flag ;
537
538        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
539        {
540           MPI_Status status ;
541           traceOff() ;
542           MPI_Iprobe(0,0,*it,&flag,&status) ;
543           traceOn() ;
544           if (flag==true)
545           {
546              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
547              info(20)<<" CServer : Receive client finalize"<<endl ;
548              // Sending server finalize message to secondary servers (if any)
549              for(itr=interCommRight.begin();itr!=interCommRight.end();itr++)
550              {
551                MPI_Send(&msg,1,MPI_INT,0,0,*itr) ;
552              }
553              /* MPI_Comm_free(&(*it)); */ // WARNING remove freeing communicator !! --> deadlock raised, to be checked
554              interCommLeft.erase(it) ;
555              break ;
556            }
557         }
558
559         if (interCommLeft.empty())
560         {
561           int i,size ;
562           MPI_Comm_size(intraComm,&size) ;
563           MPI_Request* requests= new MPI_Request[size-1] ;
564           MPI_Status* status= new MPI_Status[size-1] ;
565
566           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
567           MPI_Waitall(size-1,requests,status) ;
568
569           finished=true ;
570           delete [] requests ;
571           delete [] status ;
572         }
573     }
574
575
576     void CServer::listenRootFinalize()
577     {
578        int flag ;
579        MPI_Status status ;
580        int msg ;
581
582        traceOff() ;
583        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
584        traceOn() ;
585        if (flag==true)
586        {
587           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
588           finished=true ;
589        }
590      }
591
592
593   /*!
594    * Root process is listening for an order sent by client to call "oasis_enddef".
595    * The root client of a compound send the order (tag 5). It is probed and received.
596    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
597    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
598    */
599   
600     void CServer::listenOasisEnddef(void)
601     {
602        int flag ;
603        MPI_Status status ;
604        list<MPI_Comm>::iterator it;
605        int msg ;
606        static int nbCompound=0 ;
607        int size ;
608        static bool sent=false ;
609        static MPI_Request* allRequests ;
610        static MPI_Status* allStatus ;
611
612
613        if (sent)
614        {
615          MPI_Comm_size(intraComm,&size) ;
616          MPI_Testall(size,allRequests, &flag, allStatus) ;
617          if (flag==true)
618          {
619            delete [] allRequests ;
620            delete [] allStatus ;
621            sent=false ;
622          }
623        }
624       
625
626        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
627        {
628           MPI_Status status ;
629           traceOff() ;
630           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
631           traceOn() ;
632           if (flag==true)
633           {
634              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
635              nbCompound++ ;
636              if (nbCompound==interCommLeft.size())
637              {
638                for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
639                {
640                   MPI_Send(&msg,1,MPI_INT,0,5,*it) ; // tags oasis_endded = 5
641                }
642                MPI_Comm_size(intraComm,&size) ;
643                allRequests= new MPI_Request[size] ;
644                allStatus= new MPI_Status[size] ;
645                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm,&allRequests[i]) ; // tags oasis_endded = 5
646                sent=true ;
647              }
648           }
649        }
650     }
651     
652   /*!
653    * Processes probes message from root process if oasis_enddef call must be done.
654    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
655    */
656     void CServer::listenRootOasisEnddef(void)
657     {
658       int flag ;
659       MPI_Status status ;
660       const int root=0 ;
661       int msg ;
662       static bool eventSent=false ;
663
664       if (eventSent)
665       {
666         boost::hash<string> hashString;
667         size_t hashId = hashString("oasis_enddef");
668         if (eventScheduler->queryEvent(0,hashId))
669         {
670           oasis_enddef() ;
671           eventSent=false ;
672         }
673       }
674         
675       traceOff() ;
676       MPI_Iprobe(root,5,intraComm, &flag, &status) ;
677       traceOn() ;
678       if (flag==true)
679       {
680         MPI_Recv(&msg,1,MPI_INT,root,5,intraComm,&status) ; // tags oasis_endded = 5
681         boost::hash<string> hashString;
682         size_t hashId = hashString("oasis_enddef");
683         eventScheduler->registerEvent(0,hashId);
684         eventSent=true ;
685       }
686     }
687
688
689
690     
691
692     void CServer::listenContext(void)
693     {
694
695       MPI_Status status ;
696       int flag ;
697       static char* buffer ;
698       static MPI_Request request ;
699       static bool recept=false ;
700       int rank ;
701       int count ;
702
703       if (recept==false)
704       {
705         traceOff() ;
706         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
707         traceOn() ;
708         if (flag==true)
709         {
710           rank=status.MPI_SOURCE ;
711           MPI_Get_count(&status,MPI_CHAR,&count) ;
712           buffer=new char[count] ;
713           MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
714           recept=true ;
715         }
716       }
717       else
718       {
719         traceOff() ;
720         MPI_Test(&request,&flag,&status) ;
721         traceOn() ;
722         if (flag==true)
723         {
724           rank=status.MPI_SOURCE ;
725           MPI_Get_count(&status,MPI_CHAR,&count) ;
726           recvContextMessage((void*)buffer,count) ;
727           delete [] buffer ;
728           recept=false ;
729         }
730       }
731     }
732
733     void CServer::recvContextMessage(void* buff,int count)
734     {
735       static map<string,contextMessage> recvContextId;
736       map<string,contextMessage>::iterator it ;
737       CBufferIn buffer(buff,count) ;
738       string id ;
739       int clientLeader ;
740       int nbMessage ;
741
742       buffer>>id>>nbMessage>>clientLeader ;
743
744       it=recvContextId.find(id) ;
745       if (it==recvContextId.end())
746       {
747         contextMessage msg={0,0} ;
748         pair<map<string,contextMessage>::iterator,bool> ret ;
749         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
750         it=ret.first ;
751       }
752       it->second.nbRecv+=1 ;
753       it->second.leaderRank+=clientLeader ;
754
755       if (it->second.nbRecv==nbMessage)
756       {
757         int size ;
758         MPI_Comm_size(intraComm,&size) ;
759//         MPI_Request* requests= new MPI_Request[size-1] ;
760//         MPI_Status* status= new MPI_Status[size-1] ;
761         MPI_Request* requests= new MPI_Request[size] ;
762         MPI_Status* status= new MPI_Status[size] ;
763
764         CMessage msg ;
765         msg<<id<<it->second.leaderRank;
766         int messageSize=msg.size() ;
767         void * sendBuff = new char[messageSize] ;
768         CBufferOut sendBuffer(sendBuff,messageSize) ;
769         sendBuffer<<msg ;
770
771         // Include root itself in order not to have a divergence
772         for(int i=0; i<size; i++)
773         {
774           MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ;
775         }
776
777         recvContextId.erase(it) ;
778         delete [] requests ;
779         delete [] status ;
780
781       }
782     }
783
784     void CServer::listenRootContext(void)
785     {
786       MPI_Status status ;
787       int flag ;
788       static std::vector<void*> buffers;
789       static std::vector<MPI_Request> requests ;
790       static std::vector<int> counts ;
791       static std::vector<bool> isEventRegistered ;
792       static std::vector<bool> isEventQueued ;
793       MPI_Request request;
794
795       int rank ;
796       const int root=0 ;
797       boost::hash<string> hashString;
798       size_t hashId = hashString("RegisterContext");
799
800       // (1) Receive context id from the root, save it into a buffer
801       traceOff() ;
802       MPI_Iprobe(root,2,intraComm, &flag, &status) ;
803       traceOn() ;
804       if (flag==true)
805       {
806         counts.push_back(0);
807         MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ;
808         buffers.push_back(new char[counts.back()]) ;
809         requests.push_back(request);
810         MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&(requests.back())) ;
811         isEventRegistered.push_back(false);
812         isEventQueued.push_back(false);
813         nbContexts++;
814       }
815
816       for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ )
817       {
818         // (2) If context id is received, register an event
819         MPI_Test(&requests[ctxNb],&flag,&status) ;
820         if (flag==true && !isEventRegistered[ctxNb])
821         {
822           eventScheduler->registerEvent(ctxNb,hashId);
823           isEventRegistered[ctxNb] = true;
824         }
825         // (3) If event has been scheduled, call register context
826         if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb])
827         {
828           registerContext(buffers[ctxNb],counts[ctxNb]) ;
829           isEventQueued[ctxNb] = true;
830           delete [] buffers[ctxNb] ;
831         }
832       }
833
834     }
835
836     void CServer::registerContext(void* buff, int count, int leaderRank)
837     {
838       string contextId;
839       CBufferIn buffer(buff, count);
840//       buffer >> contextId;
841       buffer >> contextId>>leaderRank;
842       CContext* context;
843
844       info(20) << "CServer : Register new Context : " << contextId << endl;
845
846       if (contextList.find(contextId) != contextList.end())
847         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
848               << "Context '" << contextId << "' has already been registred");
849
850       context=CContext::create(contextId);
851       contextList[contextId]=context;
852
853       // Primary or classical server: create communication channel with a client
854       // (1) create interComm (with a client)
855       // (2) initialize client and server (contextClient and contextServer)
856       MPI_Comm inter;
857       if (serverLevel < 2)
858       {
859         MPI_Comm contextInterComm;
860         MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm);
861         MPI_Intercomm_merge(contextInterComm,1,&inter);
862         MPI_Barrier(inter);
863         MPI_Comm_free(&inter);
864         context->initServer(intraComm,contextInterComm);
865         contextInterComms.push_back(contextInterComm);
866
867       }
868       // Secondary server: create communication channel with a primary server
869       // (1) duplicate interComm with a primary server
870       // (2) initialize client and server (contextClient and contextServer)
871       // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create,
872       //         because interComm of CContext is defined on the same processes as the interComm of CServer.
873       //         So just duplicate it.
874       else if (serverLevel == 2)
875       {
876         MPI_Comm_dup(interCommLeft.front(), &inter);
877         contextInterComms.push_back(inter);
878         context->initServer(intraComm, contextInterComms.back());
879       }
880
881       // Primary server:
882       // (1) send create context message to secondary servers
883       // (2) initialize communication channels with secondary servers (create contextClient and contextServer)
884       if (serverLevel == 1)
885       {
886         int i = 0, size;
887         MPI_Comm_size(intraComm, &size) ;
888         for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i)
889         {
890           StdString str = contextId +"_server_" + boost::lexical_cast<string>(i);
891           CMessage msg;
892           int messageSize;
893           msg<<str<<size<<rank_ ;
894           messageSize = msg.size() ;
895           buff = new char[messageSize] ;
896           CBufferOut buffer(buff,messageSize) ;
897           buffer<<msg ;
898           MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ;
899           MPI_Comm_dup(*it, &inter);
900           contextInterComms.push_back(inter);
901           MPI_Comm_dup(intraComm, &inter);
902           contextIntraComms.push_back(inter);
903           context->initClient(contextIntraComms.back(), contextInterComms.back()) ;
904           delete [] buff ;
905         }
906       }
907     }
908
909     void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/)
910     {
911       bool isFinalized ;
912       map<string,CContext*>::iterator it ;
913
914       for(it=contextList.begin();it!=contextList.end();it++)
915       {
916         isFinalized=it->second->isFinalized();
917         if (isFinalized)
918         {
919           contextList.erase(it) ;
920           break ;
921         }
922         else
923           it->second->checkBuffersAndListen(enableEventsProcessing);
924       }
925     }
926
927     //! Get rank of the current process in the intraComm
928     int CServer::getRank()
929     {
930       int rank;
931       MPI_Comm_rank(intraComm,&rank);
932       return rank;
933     }
934
935     vector<int>& CServer::getSecondaryServerGlobalRanks()
936     {
937       return sndServerGlobalRanks;
938     }
939
940    /*!
941    * Open a file specified by a suffix and an extension and use it for the given file buffer.
942    * The file name will be suffix+rank+extension.
943    *
944    * \param fileName[in] protype file name
945    * \param ext [in] extension of the file
946    * \param fb [in/out] the file buffer
947    */
948    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
949    {
950      StdStringStream fileNameClient;
951      int numDigit = 0;
952      int size = 0;
953      int id;
954      MPI_Comm_size(CXios::globalComm, &size);
955      while (size)
956      {
957        size /= 10;
958        ++numDigit;
959      }
960      id = rank_; //getRank();
961
962      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
963      fb->open(fileNameClient.str().c_str(), std::ios::out);
964      if (!fb->is_open())
965        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
966              << std::endl << "Can not open <" << fileNameClient.str() << "> file to write the server log(s).");
967    }
968
969    bool CServer::determineWriteLogFromRank()
970    {
971      // Write from rank zero of each intracomm, which will be the rank of
972      // the lead level one and two servers
973      if (!CXios::reduceLogFiles) return true;
974      if (getRank() == 0) return true;
975      return false;
976    }
977
978    /*!
979    * \brief Open a file stream to write the info logs
980    * Open a file stream with a specific file name suffix+rank
981    * to write the info logs.
982    * \param fileName [in] protype file name
983    */
984    void CServer::openInfoStream(const StdString& fileName)
985    {
986      if (writeLogFromRank)
987      {
988        std::filebuf* fb = m_infoStream.rdbuf();
989        openStream(fileName, ".out", fb);
990
991        info.write2File(fb);
992        report.write2File(fb);
993      }
994    }
995
996    //! Write the info logs to standard output
997    void CServer::openInfoStream()
998    {
999      if (writeLogFromRank)
1000      {
1001        info.write2StdOut();
1002        report.write2StdOut();
1003      }
1004    }
1005
1006    //! Close the info logs file if it opens
1007    void CServer::closeInfoStream()
1008    {
1009      if (writeLogFromRank)
1010      {
1011        if (m_infoStream.is_open()) m_infoStream.close();
1012      }
1013    }
1014
1015    /*!
1016    * \brief Open a file stream to write the error log
1017    * Open a file stream with a specific file name suffix+rank
1018    * to write the error log.
1019    * \param fileName [in] protype file name
1020    */
1021    void CServer::openErrorStream(const StdString& fileName)
1022    {
1023      if (writeLogFromRank)
1024      {
1025        std::filebuf* fb = m_errorStream.rdbuf();
1026        openStream(fileName, ".err", fb);
1027
1028        error.write2File(fb);
1029      }
1030    }
1031
1032    //! Write the error log to standard error output
1033    void CServer::openErrorStream()
1034    {
1035      if (writeLogFromRank)
1036      {
1037        error.write2StdErr();
1038      }
1039    }
1040
1041    //! Close the error log file if it opens
1042    void CServer::closeErrorStream()
1043    {
1044      if (writeLogFromRank)
1045      {
1046        if (m_errorStream.is_open()) m_errorStream.close();
1047      }
1048    }
1049}
Note: See TracBrowser for help on using the repository browser.