source: XIOS/dev/dev_trunk_omp/src/server.cpp @ 1646

Last change on this file since 1646 was 1646, checked in by yushan, 5 years ago

branch merged with trunk @1645. arch file (ep&mpi) added for ADA

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 33.2 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "event_scheduler.hpp"
16#include "string_tools.hpp"
17#ifdef _usingEP
18using namespace ep_lib;
19#endif
20
21namespace xios
22{
23    MPI_Comm CServer::intraComm ;
24    std::list<MPI_Comm> CServer::interCommLeft ;
25    std::list<MPI_Comm> CServer::interCommRight ;
26    std::list<MPI_Comm> CServer::contextInterComms;
27    std::list<MPI_Comm> CServer::contextIntraComms;
28    int CServer::serverLevel = 0 ;
29    int CServer::nbContexts = 0;
30    bool CServer::isRoot = false ;
31    int CServer::rank_ = INVALID_RANK;
32    StdOFStream CServer::m_infoStream;
33    StdOFStream CServer::m_errorStream;
34    map<string,CContext*> CServer::contextList ;
35    vector<int> CServer::sndServerGlobalRanks;
36    bool CServer::finished=false ;
37    bool CServer::is_MPI_Initialized ;
38    CEventScheduler* CServer::eventScheduler = 0;
39
40//---------------------------------------------------------------
41/*!
42 * \fn void CServer::initialize(void)
43 * Creates intraComm for each possible type of servers (classical, primary or secondary).
44 * Creates interComm and stores them into the following lists:
45 *   classical server -- interCommLeft
46 *   primary server -- interCommLeft and interCommRight
47 *   secondary server -- interCommLeft for each pool.
48 *   IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead.
49 */
50    void CServer::initialize(void)
51    {
52      //int initialized ;
53      //MPI_Initialized(&initialized) ;
54      //if (initialized) is_MPI_Initialized=true ;
55      //else is_MPI_Initialized=false ;
56      int rank ;
57
58      // Not using OASIS
59      if (!CXios::usingOasis)
60      {
61
62        //if (!is_MPI_Initialized)
63        //{
64        //  MPI_Init(NULL, NULL);
65        //}
66        CTimer::get("XIOS").resume() ;
67
68        boost::hash<string> hashString ;
69        unsigned long hashServer = hashString(CXios::xiosCodeId);
70
71        unsigned long* hashAll ;
72        unsigned long* srvLevelAll ;
73
74        int size ;
75        int myColor ;
76        int i,c ;
77        MPI_Comm newComm;
78
79        MPI_Comm_size(CXios::globalComm, &size) ;
80        MPI_Comm_rank(CXios::globalComm, &rank_);
81
82        hashAll=new unsigned long[size] ;
83        MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ;
84
85        map<unsigned long, int> colors ;
86        map<unsigned long, int> leaders ;
87        map<unsigned long, int>::iterator it ;
88
89        // (1) Establish client leaders, distribute processes between two server levels
90        std::vector<int> srvRanks;
91        for(i=0,c=0;i<size;i++)
92        {
93          if (colors.find(hashAll[i])==colors.end())
94          {
95            colors[hashAll[i]]=c ;
96            leaders[hashAll[i]]=i ;
97            c++ ;
98          }
99          if (CXios::usingServer2)
100            if (hashAll[i] == hashServer)
101              srvRanks.push_back(i);
102        }
103
104        if (CXios::usingServer2)
105        {
106          int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.;
107          if (reqNbProc<1 || reqNbProc==srvRanks.size())
108          {
109            error(0)<<"WARNING: void CServer::initialize(void)"<<endl
110                << "It is impossible to dedicate the requested number of processes = "<<reqNbProc
111                <<" to secondary server. XIOS will run in the classical server mode."<<endl;
112          }
113          else
114          {
115            if (CXios::nbPoolsServer2 == 0) CXios::nbPoolsServer2 = reqNbProc;
116            int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ;
117            int poolLeader = firstSndSrvRank;
118//*********** (1) Comment out the line below to set one process per pool
119            sndServerGlobalRanks.push_back(srvRanks[poolLeader]);
120            int nbPools = CXios::nbPoolsServer2;
121            if ( nbPools > reqNbProc || nbPools < 1)
122            {
123              error(0)<<"WARNING: void CServer::initialize(void)"<<endl
124                  << "It is impossible to allocate the requested number of pools = "<<nbPools
125                  <<" on the secondary server. It will be set so that there is one process per pool."<<endl;
126              nbPools = reqNbProc;
127            }
128            int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools;
129            int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools;
130            for (i=0; i<srvRanks.size(); i++)
131            {
132              if (i >= firstSndSrvRank)
133              {
134                if (rank_ == srvRanks[i])
135                {
136                  serverLevel=2;
137                }
138                poolLeader += procsPerPool;
139                if (remainder != 0)
140                {
141                  ++poolLeader;
142                  --remainder;
143                }
144//*********** (2) Comment out the two lines below to set one process per pool
145                if (poolLeader < srvRanks.size())
146                  sndServerGlobalRanks.push_back(srvRanks[poolLeader]);
147//*********** (3) Uncomment the line below to set one process per pool
148//                sndServerGlobalRanks.push_back(srvRanks[i]);
149              }
150              else
151              {
152                if (rank_ == srvRanks[i]) serverLevel=1;
153              }
154            }
155            if (serverLevel==2)
156            {
157              #pragma omp critical (_output)
158              {
159                info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ;
160              }
161              for (i=0; i<sndServerGlobalRanks.size(); i++)
162              {
163                if (rank_>= sndServerGlobalRanks[i])
164                {
165                  if ( i == sndServerGlobalRanks.size()-1)
166                  {
167                    myColor = colors.size() + sndServerGlobalRanks[i];
168                  }
169                  else if (rank_< sndServerGlobalRanks[i+1])
170                  {
171                    myColor = colors.size() + sndServerGlobalRanks[i];
172                    break;
173                  }
174                }
175              }
176            }
177          }
178        }
179
180        // (2) Create intraComm
181        if (serverLevel != 2) myColor=colors[hashServer];
182        MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ;
183
184        // (3) Create interComm
185        if (serverLevel == 0)
186        {
187          int clientLeader;
188          for(it=leaders.begin();it!=leaders.end();it++)
189          {
190            if (it->first!=hashServer)
191            {
192              clientLeader=it->second ;
193              int intraCommSize, intraCommRank ;
194              MPI_Comm_size(intraComm,&intraCommSize) ;
195              MPI_Comm_rank(intraComm,&intraCommRank) ;
196
197              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
198              #pragma omp critical (_output)
199              {
200                info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize
201                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
202              }
203             
204              interCommLeft.push_back(newComm) ;
205            }
206          }
207        }
208        else if (serverLevel == 1)
209        {
210          int clientLeader, srvSndLeader;
211          int srvPrmLeader ;
212
213          for (it=leaders.begin();it!=leaders.end();it++)
214          {
215            if (it->first != hashServer)
216            {
217              clientLeader=it->second ;
218              int intraCommSize, intraCommRank ;
219              MPI_Comm_size(intraComm, &intraCommSize) ;
220              MPI_Comm_rank(intraComm, &intraCommRank) ;
221             
222              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
223              #pragma omp critical (_output)
224              {
225                info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize
226                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
227              }
228              interCommLeft.push_back(newComm) ;
229            }
230          }
231
232          for (int i = 0; i < sndServerGlobalRanks.size(); ++i)
233          {
234            int intraCommSize, intraCommRank ;
235            MPI_Comm_size(intraComm, &intraCommSize) ;
236            MPI_Comm_rank(intraComm, &intraCommRank) ;
237
238            MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ;
239            #pragma omp critical (_output)
240            {
241              info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize
242                <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< sndServerGlobalRanks[i]<<endl ;
243            }
244            interCommRight.push_back(newComm) ;
245          }
246        }
247        else
248        {
249          int clientLeader;
250          clientLeader = leaders[hashString(CXios::xiosCodeId)];
251          int intraCommSize, intraCommRank ;
252          MPI_Comm_size(intraComm, &intraCommSize) ;
253          MPI_Comm_rank(intraComm, &intraCommRank) ;
254
255          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ;
256          #pragma omp critical (_output)
257          {
258            info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize
259                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
260          }
261
262          interCommLeft.push_back(newComm) ;
263        }
264
265        delete [] hashAll ;
266
267      }
268      // using OASIS
269      else
270      {
271        int size;
272        int myColor;
273        int* srvGlobalRanks;
274        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
275
276        CTimer::get("XIOS").resume() ;
277        MPI_Comm localComm;
278        oasis_get_localcomm(localComm);
279        MPI_Comm_rank(localComm,&rank_) ;
280
281//      (1) Create server intraComm
282        if (!CXios::usingServer2)
283        {
284          MPI_Comm_dup(localComm, &intraComm);
285        }
286        else
287        {
288          int globalRank;
289          MPI_Comm_size(localComm,&size) ;
290          MPI_Comm_rank(CXios::globalComm,&globalRank) ;
291          srvGlobalRanks = new int[size] ;
292          MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ;
293
294          int reqNbProc = size*CXios::ratioServer2/100.;
295          if (reqNbProc < 1 || reqNbProc == size)
296          {
297            error(0)<<"WARNING: void CServer::initialize(void)"<<endl
298                << "It is impossible to dedicate the requested number of processes = "<<reqNbProc
299                <<" to secondary server. XIOS will run in the classical server mode."<<endl;
300            MPI_Comm_dup(localComm, &intraComm);
301          }
302          else
303          {
304            int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ;
305            int poolLeader = firstSndSrvRank;
306//*********** (1) Comment out the line below to set one process per pool
307//            sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]);
308            int nbPools = CXios::nbPoolsServer2;
309            if ( nbPools > reqNbProc || nbPools < 1)
310            {
311              error(0)<<"WARNING: void CServer::initialize(void)"<<endl
312                  << "It is impossible to allocate the requested number of pools = "<<nbPools
313                  <<" on the secondary server. It will be set so that there is one process per pool."<<endl;
314              nbPools = reqNbProc;
315            }
316            int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools;
317            int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools;
318            for (int i=0; i<size; i++)
319            {
320              if (i >= firstSndSrvRank)
321              {
322                if (globalRank == srvGlobalRanks[i])
323                {
324                  serverLevel=2;
325                }
326                poolLeader += procsPerPool;
327                if (remainder != 0)
328                {
329                  ++poolLeader;
330                  --remainder;
331                }
332//*********** (2) Comment out the two lines below to set one process per pool
333//                if (poolLeader < size)
334//                  sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]);
335//*********** (3) Uncomment the line below to set one process per pool
336                sndServerGlobalRanks.push_back(srvGlobalRanks[i]);
337              }
338              else
339              {
340                if (globalRank == srvGlobalRanks[i]) serverLevel=1;
341              }
342            }
343            if (serverLevel==2)
344            {
345              #pragma omp critical (_output)
346              {
347                info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ;
348              }
349              for (int i=0; i<sndServerGlobalRanks.size(); i++)
350              {
351                if (globalRank>= sndServerGlobalRanks[i])
352                {
353                  if (i == sndServerGlobalRanks.size()-1)
354                  {
355                    myColor = sndServerGlobalRanks[i];
356                  }
357                  else if (globalRank< sndServerGlobalRanks[i+1])
358                  {
359                    myColor = sndServerGlobalRanks[i];
360                    break;
361                  }
362                }
363              }
364            }
365            if (serverLevel != 2) myColor=0;
366            MPI_Comm_split(localComm, myColor, rank_, &intraComm) ;
367          }
368        }
369
370        string codesId=CXios::getin<string>("oasis_codes_id") ;
371        vector<string> oasisCodeId=splitRegex(codesId,"\\s*,\\s*") ;
372 
373        vector<string>::iterator it ;
374
375        MPI_Comm newComm ;
376        int globalRank ;
377        MPI_Comm_rank(CXios::globalComm,&globalRank);
378
379//      (2) Create interComms with models
380        for(it=oasisCodeId.begin();it!=oasisCodeId.end();it++)
381        {
382          oasis_get_intercomm(newComm,*it) ;
383          if ( serverLevel == 0 || serverLevel == 1)
384          {
385            interCommLeft.push_back(newComm) ;
386            if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
387          }
388        }
389
390//      (3) Create interComms between primary and secondary servers
391        int intraCommSize, intraCommRank ;
392        MPI_Comm_size(intraComm,&intraCommSize) ;
393        MPI_Comm_rank(intraComm, &intraCommRank) ;
394
395        if (serverLevel == 1)
396        {
397          for (int i = 0; i < sndServerGlobalRanks.size(); ++i)
398          {
399            int srvSndLeader = sndServerGlobalRanks[i];
400            #pragma omp critical (_output)
401            {
402              info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize
403                <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvSndLeader<<endl ;
404            }
405            MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ;
406            interCommRight.push_back(newComm) ;
407          }
408        }
409        else if (serverLevel == 2)
410        {
411          #pragma omp critical (_output)
412          {
413            info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize
414                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvGlobalRanks[0] <<endl ;
415          }
416          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ;
417          interCommLeft.push_back(newComm) ;
418        }
419        if (CXios::usingServer2) delete [] srvGlobalRanks ;
420
421        bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ;
422        if (!oasisEnddef) oasis_enddef() ;
423      }
424
425
426      MPI_Comm_rank(intraComm, &rank) ;
427      if (rank==0) isRoot=true;
428      else isRoot=false;
429     
430      eventScheduler = new CEventScheduler(intraComm) ;
431    }
432
433    void CServer::finalize(void)
434    {
435      //CTimer::get("XIOS").suspend() ;
436     
437      delete eventScheduler ;
438
439      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
440        MPI_Comm_free(&(*it));
441
442      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
443        MPI_Comm_free(&(*it));
444
445        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
446          MPI_Comm_free(&(*it));
447
448      MPI_Comm_free(&intraComm);
449
450      if (!is_MPI_Initialized)
451      {
452        if (CXios::usingOasis) oasis_finalize();
453        //else MPI_Finalize() ;
454      }
455      #pragma omp critical (_output)
456      {
457        report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
458        report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
459        report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
460        report(100)<<CTimer::getAllCumulatedTime()<<endl ;
461      }
462   }
463
464
465
466     void CServer::eventLoop(void)
467     {
468       bool stop=false ;
469
470       CTimer::get("XIOS server").resume() ;
471       while(!stop)
472       {
473         if (isRoot)
474         {
475           listenContext();
476           listenRootContext();
477           listenOasisEnddef() ;
478           listenRootOasisEnddef() ;
479           if (!finished) listenFinalize() ;
480         }
481         else
482         {
483           listenRootContext();
484           listenRootOasisEnddef() ;
485           if (!finished) listenRootFinalize() ;
486         }
487
488         contextEventLoop() ;
489         if (finished && contextList.empty()) stop=true ;
490         eventScheduler->checkEvent() ;
491       }
492       CTimer::get("XIOS server").suspend() ;
493     }
494
495     void CServer::listenFinalize(void)
496     {
497        list<MPI_Comm>::iterator it, itr;
498        int msg ;
499        int flag ;
500
501        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
502        {
503           MPI_Status status ;
504           traceOff() ;
505           MPI_Iprobe(0,0,*it,&flag,&status) ;
506           traceOn() ;
507           if (flag==true)
508           {
509              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
510              #pragma omp critical (_output)
511              {
512                info(20)<<" CServer : Receive client finalize"<<endl ;
513              }
514              // Sending server finalize message to secondary servers (if any)
515              for(itr=interCommRight.begin();itr!=interCommRight.end();itr++)
516              {
517                MPI_Send(&msg,1,MPI_INT,0,0,*itr) ;
518              }
519              MPI_Comm_free(&(*it));
520              interCommLeft.erase(it) ;
521              break ;
522            }
523         }
524
525         if (interCommLeft.empty())
526         {
527           int i,size ;
528           MPI_Comm_size(intraComm,&size) ;
529           MPI_Request* requests= new MPI_Request[size-1] ;
530           MPI_Status* status= new MPI_Status[size-1] ;
531
532           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
533           MPI_Waitall(size-1,requests,status) ;
534
535           finished=true ;
536           delete [] requests ;
537           delete [] status ;
538         }
539     }
540
541
542     void CServer::listenRootFinalize()
543     {
544        int flag ;
545        MPI_Status status ;
546        int msg ;
547
548        traceOff() ;
549        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
550        traceOn() ;
551        if (flag==true)
552        {
553           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
554           finished=true ;
555        }
556      }
557
558
559   /*!
560    * Root process is listening for an order sent by client to call "oasis_enddef".
561    * The root client of a compound send the order (tag 5). It is probed and received.
562    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
563    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
564    */
565   
566     void CServer::listenOasisEnddef(void)
567     {
568        int flag ;
569        MPI_Status status ;
570        list<MPI_Comm>::iterator it;
571        int msg ;
572        static int nbCompound=0 ;
573        int size ;
574        static bool sent=false ;
575        static MPI_Request* allRequests ;
576        static MPI_Status* allStatus ;
577
578
579        if (sent)
580        {
581          MPI_Comm_size(intraComm,&size) ;
582          MPI_Testall(size,allRequests, &flag, allStatus) ;
583          if (flag==true)
584          {
585            delete [] allRequests ;
586            delete [] allStatus ;
587            sent=false ;
588          }
589        }
590       
591
592        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
593        {
594           MPI_Status status ;
595           traceOff() ;
596           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
597           traceOn() ;
598           if (flag==true)
599           {
600              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
601              nbCompound++ ;
602              if (nbCompound==interCommLeft.size())
603              {
604                for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
605                {
606                   MPI_Send(&msg,1,MPI_INT,0,5,*it) ; // tags oasis_endded = 5
607                }
608                MPI_Comm_size(intraComm,&size) ;
609                allRequests= new MPI_Request[size] ;
610                allStatus= new MPI_Status[size] ;
611                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm,&allRequests[i]) ; // tags oasis_endded = 5
612                sent=true ;
613              }
614           }
615        }
616     }
617     
618   /*!
619    * Processes probes message from root process if oasis_enddef call must be done.
620    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
621    */
622     void CServer::listenRootOasisEnddef(void)
623     {
624       int flag ;
625       MPI_Status status ;
626       const int root=0 ;
627       int msg ;
628       static bool eventSent=false ;
629
630       if (eventSent)
631       {
632         boost::hash<string> hashString;
633         size_t hashId = hashString("oasis_enddef");
634         if (eventScheduler->queryEvent(0,hashId))
635         {
636           oasis_enddef() ;
637           eventSent=false ;
638         }
639       }
640         
641       traceOff() ;
642       MPI_Iprobe(root,5,intraComm, &flag, &status) ;
643       traceOn() ;
644       if (flag==true)
645       {
646         MPI_Recv(&msg,1,MPI_INT,root,5,intraComm,&status) ; // tags oasis_endded = 5
647         boost::hash<string> hashString;
648         size_t hashId = hashString("oasis_enddef");
649         eventScheduler->registerEvent(0,hashId);
650         eventSent=true ;
651       }
652     }
653
654
655
656     
657
658     void CServer::listenContext(void)
659     {
660
661       MPI_Status status ;
662       int flag ;
663       static char* buffer ;
664       static MPI_Request request ;
665       static bool recept=false ;
666       int rank ;
667       int count ;
668
669       if (recept==false)
670       {
671         traceOff() ;
672         MPI_Iprobe(-2,1,CXios::globalComm, &flag, &status) ;
673         traceOn() ;
674         if (flag==true)
675         {
676           #ifdef _usingMPI
677           rank=status.MPI_SOURCE ;
678           #elif _usingEP
679           rank=status.ep_src;
680           #endif
681           MPI_Get_count(&status,MPI_CHAR,&count) ;
682           buffer=new char[count] ;
683           MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
684           recept=true ;
685         }
686       }
687       else
688       {
689         traceOff() ;
690         MPI_Test(&request,&flag,&status) ;
691         traceOn() ;
692         if (flag==true)
693         {
694           #ifdef _usingMPI
695           rank=status.MPI_SOURCE ;
696           #elif _usingEP
697           rank=status.ep_src;
698           #endif
699           MPI_Get_count(&status,MPI_CHAR,&count) ;
700           recvContextMessage((void*)buffer,count) ;
701           delete [] buffer ;
702           recept=false ;
703         }
704       }
705     }
706
707     void CServer::recvContextMessage(void* buff,int count)
708     {
709       static map<string,contextMessage> recvContextId;
710       map<string,contextMessage>::iterator it ;
711       CBufferIn buffer(buff,count) ;
712       string id ;
713       int clientLeader ;
714       int nbMessage ;
715
716       buffer>>id>>nbMessage>>clientLeader ;
717
718       it=recvContextId.find(id) ;
719       if (it==recvContextId.end())
720       {
721         contextMessage msg={0,0} ;
722         pair<map<string,contextMessage>::iterator,bool> ret ;
723         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
724         it=ret.first ;
725       }
726       it->second.nbRecv+=1 ;
727       it->second.leaderRank+=clientLeader ;
728
729       if (it->second.nbRecv==nbMessage)
730       {
731         int size ;
732         MPI_Comm_size(intraComm,&size) ;
733//         MPI_Request* requests= new MPI_Request[size-1] ;
734//         MPI_Status* status= new MPI_Status[size-1] ;
735         MPI_Request* requests= new MPI_Request[size] ;
736         MPI_Status* status= new MPI_Status[size] ;
737
738         CMessage msg ;
739         msg<<id<<it->second.leaderRank;
740         int messageSize=msg.size() ;
741         void * sendBuff = new char[messageSize] ;
742         CBufferOut sendBuffer(sendBuff,messageSize) ;
743         sendBuffer<<msg ;
744
745         // Include root itself in order not to have a divergence
746         for(int i=0; i<size; i++)
747         {
748           MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ;
749         }
750
751         recvContextId.erase(it) ;
752         delete [] requests ;
753         delete [] status ;
754
755       }
756     }
757
758     void CServer::listenRootContext(void)
759     {
760       MPI_Status status ;
761       int flag ;
762       static std::vector<void*> buffers;
763       static std::vector<MPI_Request> requests ;
764       static std::vector<int> counts ;
765       static std::vector<bool> isEventRegistered ;
766       static std::vector<bool> isEventQueued ;
767       MPI_Request request;
768
769       int rank ;
770       const int root=0 ;
771       boost::hash<string> hashString;
772       size_t hashId = hashString("RegisterContext");
773
774       // (1) Receive context id from the root, save it into a buffer
775       traceOff() ;
776       MPI_Iprobe(root,2,intraComm, &flag, &status) ;
777       traceOn() ;
778       if (flag==true)
779       {
780         counts.push_back(0);
781         MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ;
782         buffers.push_back(new char[counts.back()]) ;
783         MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&request) ;
784         requests.push_back(request);
785         isEventRegistered.push_back(false);
786         isEventQueued.push_back(false);
787         nbContexts++;
788       }
789
790       for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ )
791       {
792         // (2) If context id is received, register an event
793         if(!isEventRegistered[ctxNb]) MPI_Test(&requests[ctxNb],&flag,&status) ;
794         if (flag==true && !isEventRegistered[ctxNb])
795         {
796           eventScheduler->registerEvent(ctxNb,hashId);
797           isEventRegistered[ctxNb] = true;
798         }
799         // (3) If event has been scheduled, call register context
800         if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb])
801         {
802           registerContext(buffers[ctxNb],counts[ctxNb]) ;
803           isEventQueued[ctxNb] = true;
804           delete [] buffers[ctxNb] ;
805         }
806       }
807
808     }
809
810     void CServer::registerContext(void* buff, int count, int leaderRank)
811     {
812       string contextId;
813       CBufferIn buffer(buff, count);
814//       buffer >> contextId;
815       buffer >> contextId>>leaderRank;
816       CContext* context;
817
818       #pragma omp critical (_output)
819       {
820         info(20) << "CServer : Register new Context : " << contextId << endl;
821       }
822
823       if (contextList.find(contextId) != contextList.end())
824         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
825               << "Context '" << contextId << "' has already been registred");
826
827       context=CContext::create(contextId);
828       contextList[contextId]=context;
829
830       // Primary or classical server: create communication channel with a client
831       // (1) create interComm (with a client)
832       // (2) initialize client and server (contextClient and contextServer)
833       MPI_Comm inter;
834       if (serverLevel < 2)
835       {
836         MPI_Comm contextInterComm;
837         MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm);
838         MPI_Intercomm_merge(contextInterComm,1,&inter);
839         MPI_Barrier(inter);
840         context->initServer(intraComm,contextInterComm);
841         contextInterComms.push_back(contextInterComm);
842
843         MPI_Comm_free(&inter);
844       }
845       // Secondary server: create communication channel with a primary server
846       // (1) duplicate interComm with a primary server
847       // (2) initialize client and server (contextClient and contextServer)
848       // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create,
849       //         because interComm of CContext is defined on the same processes as the interComm of CServer.
850       //         So just duplicate it.
851       else if (serverLevel == 2)
852       {
853         MPI_Comm_dup(interCommLeft.front(), &inter);
854         contextInterComms.push_back(inter);
855         context->initServer(intraComm, contextInterComms.back());
856       }
857
858       // Primary server:
859       // (1) send create context message to secondary servers
860       // (2) initialize communication channels with secondary servers (create contextClient and contextServer)
861       if (serverLevel == 1)
862       {
863         int i = 0, size;
864         MPI_Comm_size(intraComm, &size) ;
865         for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i)
866         {
867           StdString str = contextId +"_server_" + boost::lexical_cast<string>(i);
868           CMessage msg;
869           int messageSize;
870           msg<<str<<size<<rank_ ;
871           messageSize = msg.size() ;
872           buff = new char[messageSize] ;
873           CBufferOut buffer(buff,messageSize) ;
874           buffer<<msg ;
875           MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ;
876           MPI_Comm_dup(*it, &inter);
877           contextInterComms.push_back(inter);
878           MPI_Comm_dup(intraComm, &inter);
879           contextIntraComms.push_back(inter);
880           context->initClient(contextIntraComms.back(), contextInterComms.back()) ;
881           delete [] buff ;
882         }
883       }
884     }
885
886     void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/)
887     {
888       bool isFinalized ;
889       map<string,CContext*>::iterator it ;
890
891       for(it=contextList.begin();it!=contextList.end();it++)
892       {
893         isFinalized=it->second->isFinalized();
894         if (isFinalized)
895         {
896           contextList.erase(it) ;
897           break ;
898         }
899         else
900           it->second->checkBuffersAndListen(enableEventsProcessing);
901       }
902     }
903
904     //! Get rank of the current process in the intraComm
905     int CServer::getRank()
906     {
907       int rank;
908       MPI_Comm_rank(intraComm,&rank);
909       return rank;
910     }
911
912     vector<int>& CServer::getSecondaryServerGlobalRanks()
913     {
914       return sndServerGlobalRanks;
915     }
916
917    /*!
918    * Open a file specified by a suffix and an extension and use it for the given file buffer.
919    * The file name will be suffix+rank+extension.
920    *
921    * \param fileName[in] protype file name
922    * \param ext [in] extension of the file
923    * \param fb [in/out] the file buffer
924    */
925    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
926    {
927      StdStringStream fileNameClient;
928      int numDigit = 0;
929      int size = 0;
930      int id;
931      MPI_Comm_size(CXios::globalComm, &size);
932      while (size)
933      {
934        size /= 10;
935        ++numDigit;
936      }
937      id = rank_; //getRank();
938
939      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
940      fb->open(fileNameClient.str().c_str(), std::ios::out);
941      if (!fb->is_open())
942        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
943              << std::endl << "Can not open <" << fileNameClient.str() << "> file to write the server log(s).");
944    }
945
946    /*!
947    * \brief Open a file stream to write the info logs
948    * Open a file stream with a specific file name suffix+rank
949    * to write the info logs.
950    * \param fileName [in] protype file name
951    */
952    void CServer::openInfoStream(const StdString& fileName)
953    {
954      std::filebuf* fb = m_infoStream.rdbuf();
955      openStream(fileName, ".out", fb);
956
957      info.write2File(fb);
958      report.write2File(fb);
959    }
960
961    //! Write the info logs to standard output
962    void CServer::openInfoStream()
963    {
964      info.write2StdOut();
965      report.write2StdOut();
966    }
967
968    //! Close the info logs file if it opens
969    void CServer::closeInfoStream()
970    {
971      if (m_infoStream.is_open()) m_infoStream.close();
972    }
973
974    /*!
975    * \brief Open a file stream to write the error log
976    * Open a file stream with a specific file name suffix+rank
977    * to write the error log.
978    * \param fileName [in] protype file name
979    */
980    void CServer::openErrorStream(const StdString& fileName)
981    {
982      std::filebuf* fb = m_errorStream.rdbuf();
983      openStream(fileName, ".err", fb);
984
985      error.write2File(fb);
986    }
987
988    //! Write the error log to standard error output
989    void CServer::openErrorStream()
990    {
991      error.write2StdErr();
992    }
993
994    //! Close the error log file if it opens
995    void CServer::closeErrorStream()
996    {
997      if (m_errorStream.is_open()) m_errorStream.close();
998    }
999}
Note: See TracBrowser for help on using the repository browser.