Ignore:
Timestamp:
11/06/19 11:03:38 (5 years ago)
Author:
ymipsl
Message:

Some cleaning On XIOS services branch

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_SERVICES/src/server.cpp

    r1764 r1765  
    4444    CServersRessource* CServer::serversRessource_=nullptr ; 
    4545 
    46     void CServer::initRessources(void) 
    47     {  
    48       auto ressourcesManager=CXios::getRessourcesManager() ; 
    49       auto servicesManager=CXios::getServicesManager() ; 
    50       auto contextsManager=CXios::getContextsManager() ; 
    51       auto daemonsManager=CXios::getDaemonsManager() ; 
    52       auto serversRessource=CServer::getServersRessource() ; 
    53  
    54       if (serversRessource->isServerLeader()) 
    55       { 
    56  //        ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()/2) ; 
    57  //        ressourcesManager->createPool("NEMO",ressourcesManager->getRessourcesSize()/2) ; 
    58           ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()) ; 
    59           servicesManager->createServices("LMDZ", "ioserver", CServicesManager::IO_SERVER, 8, 5) ; 
    60           for(int i=0 ; i<5;i++) 
    61           { 
    62             contextsManager->createServerContext("LMDZ","ioserver",i,"lmdz") ; 
    63           } 
    64       } 
    65  
    66  
    67  
    68       while (true) 
    69       { 
    70         daemonsManager->eventLoop() ; 
    71       } 
    72  
    73  
    74     } 
    75      
     46        
    7647    void CServer::initialize(void) 
    7748    { 
     
    318289    } 
    319290 
    320 //--------------------------------------------------------------- 
    321 /*! 
    322  * \fn void CServer::initialize(void) 
    323  * Creates intraComm for each possible type of servers (classical, primary or secondary). 
    324  * Creates interComm and stores them into the following lists: 
    325  *   classical server -- interCommLeft 
    326  *   primary server -- interCommLeft and interCommRight 
    327  *   secondary server -- interCommLeft for each pool. 
    328  *   IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead. 
    329  */ 
    330     void CServer::initialize_old(void) 
    331     { 
    332       int initialized ; 
    333       MPI_Initialized(&initialized) ; 
    334       if (initialized) is_MPI_Initialized=true ; 
    335       else is_MPI_Initialized=false ; 
    336       int rank ; 
    337  
    338       CXios::launchRessourcesManager(true) ; 
    339       CXios::launchServicesManager(true) ; 
    340       CXios::launchContextsManager(true) ; 
    341        
    342       initRessources() ; 
    343       // Not using OASIS 
    344       if (!CXios::usingOasis) 
    345       { 
    346  
    347         if (!is_MPI_Initialized) 
    348         { 
    349           MPI_Init(NULL, NULL); 
    350         } 
    351         CTimer::get("XIOS").resume() ; 
    352  
    353         boost::hash<string> hashString ; 
    354         unsigned long hashServer = hashString(CXios::xiosCodeId); 
    355  
    356         unsigned long* hashAll ; 
    357         unsigned long* srvLevelAll ; 
    358  
    359         int size ; 
    360         int myColor ; 
    361         int i,c ; 
    362         MPI_Comm newComm; 
    363  
    364         MPI_Comm_size(CXios::globalComm, &size) ; 
    365         MPI_Comm_rank(CXios::globalComm, &rank_); 
    366  
    367         hashAll=new unsigned long[size] ; 
    368         MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ; 
    369  
    370         map<unsigned long, int> colors ; 
    371         map<unsigned long, int> leaders ; 
    372         map<unsigned long, int>::iterator it ; 
    373  
    374         // (1) Establish client leaders, distribute processes between two server levels 
    375         std::vector<int> srvRanks; 
    376         for(i=0,c=0;i<size;i++) 
    377         { 
    378           if (colors.find(hashAll[i])==colors.end()) 
    379           { 
    380             colors[hashAll[i]]=c ; 
    381             leaders[hashAll[i]]=i ; 
    382             c++ ; 
    383           } 
    384           if (CXios::usingServer2) 
    385             if (hashAll[i] == hashServer) 
    386               srvRanks.push_back(i); 
    387         } 
    388  
    389         if (CXios::usingServer2) 
    390         { 
    391           int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.; 
    392           if (reqNbProc<1 || reqNbProc==srvRanks.size()) 
    393           { 
    394             error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
    395                 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc 
    396                 <<" to secondary server. XIOS will run in the classical server mode."<<endl; 
    397           } 
    398           else 
    399           { 
    400             if (CXios::nbPoolsServer2 == 0) CXios::nbPoolsServer2 = reqNbProc; 
    401             int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ; 
    402             int poolLeader = firstSndSrvRank; 
    403 //*********** (1) Comment out the line below to set one process per pool 
    404             sndServerGlobalRanks.push_back(srvRanks[poolLeader]); 
    405             int nbPools = CXios::nbPoolsServer2; 
    406             if ( nbPools > reqNbProc || nbPools < 1) 
    407             { 
    408               error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
    409                   << "It is impossible to allocate the requested number of pools = "<<nbPools 
    410                   <<" on the secondary server. It will be set so that there is one process per pool."<<endl; 
    411               nbPools = reqNbProc; 
    412             } 
    413             int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools; 
    414             int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools; 
    415             for (i=0; i<srvRanks.size(); i++) 
    416             { 
    417               if (i >= firstSndSrvRank) 
    418               { 
    419                 if (rank_ == srvRanks[i]) 
    420                 { 
    421                   serverLevel=2; 
    422                 } 
    423                 poolLeader += procsPerPool; 
    424                 if (remainder != 0) 
    425                 { 
    426                   ++poolLeader; 
    427                   --remainder; 
    428                 } 
    429 //*********** (2) Comment out the two lines below to set one process per pool 
    430                 if (poolLeader < srvRanks.size()) 
    431                   sndServerGlobalRanks.push_back(srvRanks[poolLeader]); 
    432 //*********** (3) Uncomment the line below to set one process per pool 
    433 //                sndServerGlobalRanks.push_back(srvRanks[i]); 
    434               } 
    435               else 
    436               { 
    437                 if (rank_ == srvRanks[i]) serverLevel=1; 
    438               } 
    439             } 
    440             if (serverLevel==2) 
    441             { 
    442               info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; 
    443               for (i=0; i<sndServerGlobalRanks.size(); i++) 
    444               { 
    445                 if (rank_>= sndServerGlobalRanks[i]) 
    446                 { 
    447                   if ( i == sndServerGlobalRanks.size()-1) 
    448                   { 
    449                     myColor = colors.size() + sndServerGlobalRanks[i]; 
    450                   } 
    451                   else if (rank_< sndServerGlobalRanks[i+1]) 
    452                   { 
    453                     myColor = colors.size() + sndServerGlobalRanks[i]; 
    454                     break; 
    455                   } 
    456                 } 
    457               } 
    458             } 
    459           } 
    460         } 
    461  
    462         // (2) Create intraComm 
    463         if (serverLevel != 2) myColor=colors[hashServer]; 
    464         MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ; 
    465  
    466         // (3) Create interComm 
    467         if (serverLevel == 0) 
    468         { 
    469           int clientLeader; 
    470           for(it=leaders.begin();it!=leaders.end();it++) 
    471           { 
    472             if (it->first!=hashServer) 
    473             { 
    474               clientLeader=it->second ; 
    475               int intraCommSize, intraCommRank ; 
    476               MPI_Comm_size(intraComm,&intraCommSize) ; 
    477               MPI_Comm_rank(intraComm,&intraCommRank) ; 
    478               info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize 
    479                        <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ; 
    480  
    481               MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 
    482               interCommLeft.push_back(newComm) ; 
    483             } 
    484           } 
    485         } 
    486         else if (serverLevel == 1) 
    487         { 
    488           int clientLeader, srvSndLeader; 
    489           int srvPrmLeader ; 
    490  
    491           for (it=leaders.begin();it!=leaders.end();it++) 
    492           { 
    493             if (it->first != hashServer) 
    494             { 
    495               clientLeader=it->second ; 
    496               int intraCommSize, intraCommRank ; 
    497               MPI_Comm_size(intraComm, &intraCommSize) ; 
    498               MPI_Comm_rank(intraComm, &intraCommRank) ; 
    499               info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize 
    500                        <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ; 
    501               MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 
    502               interCommLeft.push_back(newComm) ; 
    503             } 
    504           } 
    505  
    506           for (int i = 0; i < sndServerGlobalRanks.size(); ++i) 
    507           { 
    508             int intraCommSize, intraCommRank ; 
    509             MPI_Comm_size(intraComm, &intraCommSize) ; 
    510             MPI_Comm_rank(intraComm, &intraCommRank) ; 
    511             info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize 
    512                 <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< sndServerGlobalRanks[i]<<endl ; 
    513             MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ; 
    514             interCommRight.push_back(newComm) ; 
    515           } 
    516         } 
    517         else 
    518         { 
    519           int clientLeader; 
    520           clientLeader = leaders[hashString(CXios::xiosCodeId)]; 
    521           int intraCommSize, intraCommRank ; 
    522           MPI_Comm_size(intraComm, &intraCommSize) ; 
    523           MPI_Comm_rank(intraComm, &intraCommRank) ; 
    524           info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize 
    525                    <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ; 
    526  
    527           MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ; 
    528           interCommLeft.push_back(newComm) ; 
    529         } 
    530  
    531         delete [] hashAll ; 
    532  
    533       } 
    534       // using OASIS 
    535       else 
    536       { 
    537         int size; 
    538         int myColor; 
    539         int* srvGlobalRanks; 
    540         if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 
    541  
    542         CTimer::get("XIOS").resume() ; 
    543         MPI_Comm localComm; 
    544         oasis_get_localcomm(localComm); 
    545         MPI_Comm_rank(localComm,&rank_) ; 
    546  
    547 //      (1) Create server intraComm 
    548         if (!CXios::usingServer2) 
    549         { 
    550           MPI_Comm_dup(localComm, &intraComm); 
    551         } 
    552         else 
    553         { 
    554           int globalRank; 
    555           MPI_Comm_size(localComm,&size) ; 
    556           MPI_Comm_rank(CXios::globalComm,&globalRank) ; 
    557           srvGlobalRanks = new int[size] ; 
    558           MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ; 
    559  
    560           int reqNbProc = size*CXios::ratioServer2/100.; 
    561           if (reqNbProc < 1 || reqNbProc == size) 
    562           { 
    563             error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
    564                 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc 
    565                 <<" to secondary server. XIOS will run in the classical server mode."<<endl; 
    566             MPI_Comm_dup(localComm, &intraComm); 
    567           } 
    568           else 
    569           { 
    570             int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ; 
    571             int poolLeader = firstSndSrvRank; 
    572 //*********** (1) Comment out the line below to set one process per pool 
    573 //            sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); 
    574             int nbPools = CXios::nbPoolsServer2; 
    575             if ( nbPools > reqNbProc || nbPools < 1) 
    576             { 
    577               error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
    578                   << "It is impossible to allocate the requested number of pools = "<<nbPools 
    579                   <<" on the secondary server. It will be set so that there is one process per pool."<<endl; 
    580               nbPools = reqNbProc; 
    581             } 
    582             int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools; 
    583             int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools; 
    584             for (int i=0; i<size; i++) 
    585             { 
    586               if (i >= firstSndSrvRank) 
    587               { 
    588                 if (globalRank == srvGlobalRanks[i]) 
    589                 { 
    590                   serverLevel=2; 
    591                 } 
    592                 poolLeader += procsPerPool; 
    593                 if (remainder != 0) 
    594                 { 
    595                   ++poolLeader; 
    596                   --remainder; 
    597                 } 
    598 //*********** (2) Comment out the two lines below to set one process per pool 
    599 //                if (poolLeader < size) 
    600 //                  sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); 
    601 //*********** (3) Uncomment the line below to set one process per pool 
    602                 sndServerGlobalRanks.push_back(srvGlobalRanks[i]); 
    603               } 
    604               else 
    605               { 
    606                 if (globalRank == srvGlobalRanks[i]) serverLevel=1; 
    607               } 
    608             } 
    609             if (serverLevel==2) 
    610             { 
    611               info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; 
    612               for (int i=0; i<sndServerGlobalRanks.size(); i++) 
    613               { 
    614                 if (globalRank>= sndServerGlobalRanks[i]) 
    615                 { 
    616                   if (i == sndServerGlobalRanks.size()-1) 
    617                   { 
    618                     myColor = sndServerGlobalRanks[i]; 
    619                   } 
    620                   else if (globalRank< sndServerGlobalRanks[i+1]) 
    621                   { 
    622                     myColor = sndServerGlobalRanks[i]; 
    623                     break; 
    624                   } 
    625                 } 
    626               } 
    627             } 
    628             if (serverLevel != 2) myColor=0; 
    629             MPI_Comm_split(localComm, myColor, rank_, &intraComm) ; 
    630           } 
    631         } 
    632  
    633         string codesId=CXios::getin<string>("oasis_codes_id") ; 
    634         vector<string> oasisCodeId=splitRegex(codesId,"\\s*,\\s*") ; 
    635   
    636         vector<string>::iterator it ; 
    637  
    638         MPI_Comm newComm ; 
    639         int globalRank ; 
    640         MPI_Comm_rank(CXios::globalComm,&globalRank); 
    641  
    642 //      (2) Create interComms with models 
    643         for(it=oasisCodeId.begin();it!=oasisCodeId.end();it++) 
    644         { 
    645           oasis_get_intercomm(newComm,*it) ; 
    646           if ( serverLevel == 0 || serverLevel == 1) 
    647           { 
    648             interCommLeft.push_back(newComm) ; 
    649             if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 
    650           } 
    651         } 
    652  
    653 //      (3) Create interComms between primary and secondary servers 
    654         int intraCommSize, intraCommRank ; 
    655         MPI_Comm_size(intraComm,&intraCommSize) ; 
    656         MPI_Comm_rank(intraComm, &intraCommRank) ; 
    657  
    658         if (serverLevel == 1) 
    659         { 
    660           for (int i = 0; i < sndServerGlobalRanks.size(); ++i) 
    661           { 
    662             int srvSndLeader = sndServerGlobalRanks[i]; 
    663             info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize 
    664                 <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvSndLeader<<endl ; 
    665             MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ; 
    666             interCommRight.push_back(newComm) ; 
    667           } 
    668         } 
    669         else if (serverLevel == 2) 
    670         { 
    671           info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize 
    672                    <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvGlobalRanks[0] <<endl ; 
    673           MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ; 
    674           interCommLeft.push_back(newComm) ; 
    675         } 
    676         if (CXios::usingServer2) delete [] srvGlobalRanks ; 
    677  
    678         bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ; 
    679         if (!oasisEnddef) oasis_enddef() ; 
    680       } 
    681  
    682  
    683       MPI_Comm_rank(intraComm, &rank) ; 
    684       if (rank==0) isRoot=true; 
    685       else isRoot=false; 
    686        
    687       eventScheduler = new CEventScheduler(intraComm) ; 
    688     } 
    689291 
    690292    void CServer::finalize(void) 
     
    699301      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++) 
    700302        MPI_Comm_free(&(*it)); 
    701  
    702 //      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++) 
    703 //        MPI_Comm_free(&(*it)); 
    704  
    705 //        for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++) 
    706 //          MPI_Comm_free(&(*it)); 
    707303 
    708304        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 
     
    723319      report(100)<<CTimer::getAllCumulatedTime()<<endl ; 
    724320    } 
    725  
    726      void CServer::eventLoop(void) 
    727      { 
    728        bool stop=false ; 
    729  
    730        CTimer::get("XIOS server").resume() ; 
    731        while(!stop) 
    732        { 
    733          if (isRoot) 
    734          { 
    735            listenContext(); 
    736            listenRootContext(); 
    737            listenOasisEnddef() ; 
    738            listenRootOasisEnddef() ; 
    739            if (!finished) listenFinalize() ; 
    740          } 
    741          else 
    742          { 
    743            listenRootContext(); 
    744            listenRootOasisEnddef() ; 
    745            if (!finished) listenRootFinalize() ; 
    746          } 
    747  
    748          contextEventLoop() ; 
    749          if (finished && contextList.empty()) stop=true ; 
    750          eventScheduler->checkEvent() ; 
    751        } 
    752        CTimer::get("XIOS server").suspend() ; 
    753      } 
    754  
    755      void CServer::listenFinalize(void) 
    756      { 
    757         list<MPI_Comm>::iterator it, itr; 
    758         int msg ; 
    759         int flag ; 
    760  
    761         for(it=interCommLeft.begin();it!=interCommLeft.end();it++) 
    762         { 
    763            MPI_Status status ; 
    764            traceOff() ; 
    765            MPI_Iprobe(0,0,*it,&flag,&status) ; 
    766            traceOn() ; 
    767            if (flag==true) 
    768            { 
    769               MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; 
    770               info(20)<<" CServer : Receive client finalize"<<endl ; 
    771               // Sending server finalize message to secondary servers (if any) 
    772               for(itr=interCommRight.begin();itr!=interCommRight.end();itr++) 
    773               { 
    774                 MPI_Send(&msg,1,MPI_INT,0,0,*itr) ; 
    775               } 
    776               MPI_Comm_free(&(*it)); 
    777               interCommLeft.erase(it) ; 
    778               break ; 
    779             } 
    780          } 
    781  
    782          if (interCommLeft.empty()) 
    783          { 
    784            int i,size ; 
    785            MPI_Comm_size(intraComm,&size) ; 
    786            MPI_Request* requests= new MPI_Request[size-1] ; 
    787            MPI_Status* status= new MPI_Status[size-1] ; 
    788  
    789            for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ; 
    790            MPI_Waitall(size-1,requests,status) ; 
    791  
    792            finished=true ; 
    793            delete [] requests ; 
    794            delete [] status ; 
    795          } 
    796      } 
    797  
    798  
    799      void CServer::listenRootFinalize() 
    800      { 
    801         int flag ; 
    802         MPI_Status status ; 
    803         int msg ; 
    804  
    805         traceOff() ; 
    806         MPI_Iprobe(0,4,intraComm, &flag, &status) ; 
    807         traceOn() ; 
    808         if (flag==true) 
    809         { 
    810            MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ; 
    811            finished=true ; 
    812         } 
    813       } 
    814  
    815  
    816    /*! 
    817     * Root process is listening for an order sent by client to call "oasis_enddef". 
    818     * The root client of a compound send the order (tag 5). It is probed and received. 
    819     * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any). 
    820     * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done 
    821     */ 
    822      
    823      void CServer::listenOasisEnddef(void) 
    824      { 
    825         int flag ; 
    826         MPI_Status status ; 
    827         list<MPI_Comm>::iterator it; 
    828         int msg ; 
    829         static int nbCompound=0 ; 
    830         int size ; 
    831         static bool sent=false ; 
    832         static MPI_Request* allRequests ; 
    833         static MPI_Status* allStatus ; 
    834  
    835  
    836         if (sent) 
    837         { 
    838           MPI_Comm_size(intraComm,&size) ; 
    839           MPI_Testall(size,allRequests, &flag, allStatus) ; 
    840           if (flag==true) 
    841           { 
    842             delete [] allRequests ; 
    843             delete [] allStatus ; 
    844             sent=false ; 
    845           } 
    846         } 
    847          
    848  
    849         for(it=interCommLeft.begin();it!=interCommLeft.end();it++) 
    850         { 
    851            MPI_Status status ; 
    852            traceOff() ; 
    853            MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5 
    854            traceOn() ; 
    855            if (flag==true) 
    856            { 
    857               MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5 
    858               nbCompound++ ; 
    859               if (nbCompound==interCommLeft.size()) 
    860               { 
    861                 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 
    862                 { 
    863                    MPI_Send(&msg,1,MPI_INT,0,5,*it) ; // tags oasis_endded = 5 
    864                 } 
    865                 MPI_Comm_size(intraComm,&size) ; 
    866                 allRequests= new MPI_Request[size] ; 
    867                 allStatus= new MPI_Status[size] ; 
    868                 for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm,&allRequests[i]) ; // tags oasis_endded = 5 
    869                 sent=true ; 
    870               } 
    871            } 
    872         } 
    873      } 
    874       
    875    /*! 
    876     * Processes probes message from root process if oasis_enddef call must be done. 
    877     * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator 
    878     */ 
    879      void CServer::listenRootOasisEnddef(void) 
    880      { 
    881        int flag ; 
    882        MPI_Status status ; 
    883        const int root=0 ; 
    884        int msg ; 
    885        static bool eventSent=false ; 
    886  
    887        if (eventSent) 
    888        { 
    889          boost::hash<string> hashString; 
    890          size_t hashId = hashString("oasis_enddef"); 
    891          if (eventScheduler->queryEvent(0,hashId)) 
    892          { 
    893            oasis_enddef() ; 
    894            eventSent=false ; 
    895          } 
    896        } 
    897           
    898        traceOff() ; 
    899        MPI_Iprobe(root,5,intraComm, &flag, &status) ; 
    900        traceOn() ; 
    901        if (flag==true) 
    902        { 
    903          MPI_Recv(&msg,1,MPI_INT,root,5,intraComm,&status) ; // tags oasis_endded = 5 
    904          boost::hash<string> hashString; 
    905          size_t hashId = hashString("oasis_enddef"); 
    906          eventScheduler->registerEvent(0,hashId); 
    907          eventSent=true ; 
    908        } 
    909      } 
    910  
    911  
    912  
    913       
    914  
    915      void CServer::listenContext(void) 
    916      { 
    917  
    918        MPI_Status status ; 
    919        int flag ; 
    920        static char* buffer ; 
    921        static MPI_Request request ; 
    922        static bool recept=false ; 
    923        int rank ; 
    924        int count ; 
    925  
    926        if (recept==false) 
    927        { 
    928          traceOff() ; 
    929          MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ; 
    930          traceOn() ; 
    931          if (flag==true) 
    932          { 
    933            rank=status.MPI_SOURCE ; 
    934            MPI_Get_count(&status,MPI_CHAR,&count) ; 
    935            buffer=new char[count] ; 
    936            MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ; 
    937            recept=true ; 
    938          } 
    939        } 
    940        else 
    941        { 
    942          traceOff() ; 
    943          MPI_Test(&request,&flag,&status) ; 
    944          traceOn() ; 
    945          if (flag==true) 
    946          { 
    947            rank=status.MPI_SOURCE ; 
    948            MPI_Get_count(&status,MPI_CHAR,&count) ; 
    949            recvContextMessage((void*)buffer,count) ; 
    950            delete [] buffer ; 
    951            recept=false ; 
    952          } 
    953        } 
    954      } 
    955  
    956      void CServer::recvContextMessage(void* buff,int count) 
    957      { 
    958        static map<string,contextMessage> recvContextId; 
    959        map<string,contextMessage>::iterator it ; 
    960        CBufferIn buffer(buff,count) ; 
    961        string id ; 
    962        int clientLeader ; 
    963        int nbMessage ; 
    964  
    965        buffer>>id>>nbMessage>>clientLeader ; 
    966  
    967        it=recvContextId.find(id) ; 
    968        if (it==recvContextId.end()) 
    969        { 
    970          contextMessage msg={0,0} ; 
    971          pair<map<string,contextMessage>::iterator,bool> ret ; 
    972          ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ; 
    973          it=ret.first ; 
    974        } 
    975        it->second.nbRecv+=1 ; 
    976        it->second.leaderRank+=clientLeader ; 
    977  
    978        if (it->second.nbRecv==nbMessage) 
    979        { 
    980          int size ; 
    981          MPI_Comm_size(intraComm,&size) ; 
    982 //         MPI_Request* requests= new MPI_Request[size-1] ; 
    983 //         MPI_Status* status= new MPI_Status[size-1] ; 
    984          MPI_Request* requests= new MPI_Request[size] ; 
    985          MPI_Status* status= new MPI_Status[size] ; 
    986  
    987          CMessage msg ; 
    988          msg<<id<<it->second.leaderRank; 
    989          int messageSize=msg.size() ; 
    990          void * sendBuff = new char[messageSize] ; 
    991          CBufferOut sendBuffer(sendBuff,messageSize) ; 
    992          sendBuffer<<msg ; 
    993  
    994          // Include root itself in order not to have a divergence 
    995          for(int i=0; i<size; i++) 
    996          { 
    997            MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ; 
    998          } 
    999  
    1000          recvContextId.erase(it) ; 
    1001          delete [] requests ; 
    1002          delete [] status ; 
    1003  
    1004        } 
    1005      } 
    1006  
    1007      void CServer::listenRootContext(void) 
    1008      { 
    1009        MPI_Status status ; 
    1010        int flag ; 
    1011        static std::vector<void*> buffers; 
    1012        static std::vector<MPI_Request> requests ; 
    1013        static std::vector<int> counts ; 
    1014        static std::vector<bool> isEventRegistered ; 
    1015        static std::vector<bool> isEventQueued ; 
    1016        MPI_Request request; 
    1017  
    1018        int rank ; 
    1019        const int root=0 ; 
    1020        boost::hash<string> hashString; 
    1021        size_t hashId = hashString("RegisterContext"); 
    1022  
    1023        // (1) Receive context id from the root, save it into a buffer 
    1024        traceOff() ; 
    1025        MPI_Iprobe(root,2,intraComm, &flag, &status) ; 
    1026        traceOn() ; 
    1027        if (flag==true) 
    1028        { 
    1029          counts.push_back(0); 
    1030          MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ; 
    1031          buffers.push_back(new char[counts.back()]) ; 
    1032          requests.push_back(request); 
    1033          MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&(requests.back())) ; 
    1034          isEventRegistered.push_back(false); 
    1035          isEventQueued.push_back(false); 
    1036          nbContexts++; 
    1037        } 
    1038  
    1039        for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ ) 
    1040        { 
    1041          // (2) If context id is received, register an event 
    1042          MPI_Test(&requests[ctxNb],&flag,&status) ; 
    1043          if (flag==true && !isEventRegistered[ctxNb]) 
    1044          { 
    1045            eventScheduler->registerEvent(ctxNb,hashId); 
    1046            isEventRegistered[ctxNb] = true; 
    1047          } 
    1048          // (3) If event has been scheduled, call register context 
    1049          if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb]) 
    1050          { 
    1051            registerContext(buffers[ctxNb],counts[ctxNb]) ; 
    1052            isEventQueued[ctxNb] = true; 
    1053            delete [] buffers[ctxNb] ; 
    1054          } 
    1055        } 
    1056  
    1057      } 
    1058  
    1059      void CServer::registerContext(void* buff, int count, int leaderRank) 
    1060      { 
    1061        string contextId; 
    1062        CBufferIn buffer(buff, count); 
    1063 //       buffer >> contextId; 
    1064        buffer >> contextId>>leaderRank; 
    1065        CContext* context; 
    1066  
    1067        info(20) << "CServer : Register new Context : " << contextId << endl; 
    1068  
    1069        if (contextList.find(contextId) != contextList.end()) 
    1070          ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)", 
    1071                << "Context '" << contextId << "' has already been registred"); 
    1072  
    1073        context=CContext::create(contextId); 
    1074        contextList[contextId]=context; 
    1075  
    1076        // Primary or classical server: create communication channel with a client 
    1077        // (1) create interComm (with a client) 
    1078        // (2) initialize client and server (contextClient and contextServer) 
    1079        MPI_Comm inter; 
    1080        if (serverLevel < 2) 
    1081        { 
    1082          MPI_Comm contextInterComm; 
    1083          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm); 
    1084          MPI_Intercomm_merge(contextInterComm,1,&inter); 
    1085          MPI_Barrier(inter); 
    1086          MPI_Comm_free(&inter); 
    1087          context->initServer(intraComm,contextInterComm); 
    1088          contextInterComms.push_back(contextInterComm); 
    1089  
    1090        } 
    1091        // Secondary server: create communication channel with a primary server 
    1092        // (1) duplicate interComm with a primary server 
    1093        // (2) initialize client and server (contextClient and contextServer) 
    1094        // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create, 
    1095        //         because interComm of CContext is defined on the same processes as the interComm of CServer. 
    1096        //         So just duplicate it. 
    1097        else if (serverLevel == 2) 
    1098        { 
    1099          MPI_Comm_dup(interCommLeft.front(), &inter); 
    1100          contextInterComms.push_back(inter); 
    1101          context->initServer(intraComm, contextInterComms.back()); 
    1102        } 
    1103  
    1104        // Primary server: 
    1105        // (1) send create context message to secondary servers 
    1106        // (2) initialize communication channels with secondary servers (create contextClient and contextServer) 
    1107        if (serverLevel == 1) 
    1108        { 
    1109          int i = 0, size; 
    1110          MPI_Comm_size(intraComm, &size) ; 
    1111          for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i) 
    1112          { 
    1113            StdString str = contextId +"_server_" + boost::lexical_cast<string>(i); 
    1114            CMessage msg; 
    1115            int messageSize; 
    1116            msg<<str<<size<<rank_ ; 
    1117            messageSize = msg.size() ; 
    1118            buff = new char[messageSize] ; 
    1119            CBufferOut buffer(buff,messageSize) ; 
    1120            buffer<<msg ; 
    1121            MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ; 
    1122            MPI_Comm_dup(*it, &inter); 
    1123            contextInterComms.push_back(inter); 
    1124            MPI_Comm_dup(intraComm, &inter); 
    1125            contextIntraComms.push_back(inter); 
    1126            context->initClient(contextIntraComms.back(), contextInterComms.back()) ; 
    1127            delete [] buff ; 
    1128          } 
    1129        } 
    1130      } 
    1131  
    1132      void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/) 
    1133      { 
    1134        bool isFinalized ; 
    1135        map<string,CContext*>::iterator it ; 
    1136  
    1137        for(it=contextList.begin();it!=contextList.end();it++) 
    1138        { 
    1139          isFinalized=it->second->isFinalized(); 
    1140          if (isFinalized) 
    1141          { 
    1142            contextList.erase(it) ; 
    1143            break ; 
    1144          } 
    1145          else 
    1146           it->second->eventLoop(enableEventsProcessing); 
    1147 //ym          it->second->checkBuffersAndListen(enableEventsProcessing); 
    1148        } 
    1149      } 
    1150  
    1151      //! Get rank of the current process in the intraComm 
    1152      int CServer::getRank() 
    1153      { 
    1154        int rank; 
    1155        MPI_Comm_rank(intraComm,&rank); 
    1156        return rank; 
    1157      } 
    1158  
    1159      vector<int>& CServer::getSecondaryServerGlobalRanks() 
    1160      { 
    1161        return sndServerGlobalRanks; 
    1162      } 
    1163321 
    1164322    /*! 
Note: See TracChangeset for help on using the changeset viewer.