Ignore:
Timestamp:
01/22/21 12:00:29 (3 years ago)
Author:
yushan
Message:

Graph intermedia commit to a tmp branch

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_trunk_graph/src/server.cpp

    r1590 r2019  
    1515#include "event_scheduler.hpp" 
    1616#include "string_tools.hpp" 
     17#include "ressources_manager.hpp" 
     18#include "services_manager.hpp" 
     19#include "contexts_manager.hpp" 
     20#include "servers_ressource.hpp" 
     21#include <cstdio> 
     22#include "workflow_graph.hpp" 
     23 
    1724 
    1825namespace xios 
    1926{ 
    2027    MPI_Comm CServer::intraComm ; 
     28    MPI_Comm CServer::serversComm_ ; 
    2129    std::list<MPI_Comm> CServer::interCommLeft ; 
    2230    std::list<MPI_Comm> CServer::interCommRight ; 
     
    3442    bool CServer::is_MPI_Initialized ; 
    3543    CEventScheduler* CServer::eventScheduler = 0; 
    36  
    37 //--------------------------------------------------------------- 
    38 /*! 
    39  * \fn void CServer::initialize(void) 
    40  * Creates intraComm for each possible type of servers (classical, primary or secondary). 
    41  * Creates interComm and stores them into the following lists: 
    42  *   classical server -- interCommLeft 
    43  *   primary server -- interCommLeft and interCommRight 
    44  *   secondary server -- interCommLeft for each pool. 
    45  *   IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead. 
    46  */ 
     44    CServersRessource* CServer::serversRessource_=nullptr ; 
     45 
     46        
    4747    void CServer::initialize(void) 
    4848    { 
     49       
     50      MPI_Comm serverComm ; 
    4951      int initialized ; 
    5052      MPI_Initialized(&initialized) ; 
    5153      if (initialized) is_MPI_Initialized=true ; 
    5254      else is_MPI_Initialized=false ; 
    53       int rank ; 
    54  
    55       // Not using OASIS 
     55      MPI_Comm globalComm=CXios::getGlobalComm() ; 
     56 
     57      ///////////////////////////////////////// 
     58      ///////////// PART 1 //////////////////// 
     59      ///////////////////////////////////////// 
     60 
     61      // don't use OASIS 
    5662      if (!CXios::usingOasis) 
    5763      { 
    58  
    59         if (!is_MPI_Initialized) 
     64        if (!is_MPI_Initialized) MPI_Init(NULL, NULL); 
     65        
     66        // split the global communicator 
     67        // get hash from all model to attribute a unique color (int) and then split to get client communicator 
     68        // every mpi process of globalComm (MPI_COMM_WORLD) must participate 
     69          
     70        int commRank, commSize ; 
     71        MPI_Comm_rank(globalComm,&commRank) ; 
     72        MPI_Comm_size(globalComm,&commSize) ; 
     73 
     74        std::hash<string> hashString ; 
     75        size_t hashServer=hashString(CXios::xiosCodeId) ; 
     76           
     77        size_t* hashAll = new size_t[commSize] ; 
     78        MPI_Allgather(&hashServer,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ; 
     79           
     80        int color=0 ; 
     81        set<size_t> listHash ; 
     82        for(int i=0 ; i<=commRank ; i++)  
     83          if (listHash.count(hashAll[i])==1) 
     84          { 
     85            listHash.insert(hashAll[i]) ; 
     86            color=color+1 ; 
     87          } 
     88        delete[] hashAll ; 
     89 
     90        MPI_Comm_split(globalComm, color, commRank, &serverComm) ; 
     91      } 
     92      else // using OASIS 
     93      { 
     94        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 
     95 
     96        CTimer::get("XIOS").resume() ; 
     97        oasis_get_localcomm(serverComm); 
     98      } 
     99  
     100      ///////////////////////////////////////// 
     101      ///////////// PART 2 //////////////////// 
     102      ///////////////////////////////////////// 
     103       
     104 
     105      // Create the XIOS communicator for every process which is related 
     106      // to XIOS, as well on client side as on server side 
     107      MPI_Comm xiosGlobalComm ; 
     108      string strIds=CXios::getin<string>("clients_code_id","") ; 
     109      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 
     110      if (strIds.empty()) 
     111      { 
     112        // no code Ids given, suppose XIOS initialisation is global             
     113        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ; 
     114        MPI_Comm splitComm,interComm ; 
     115        MPI_Comm_rank(globalComm,&commGlobalRank) ; 
     116        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ; 
     117        MPI_Comm_rank(splitComm,&commRank) ; 
     118        if (commRank==0) serverLeader=commGlobalRank ; 
     119        else serverLeader=0 ; 
     120        clientLeader=0 ; 
     121        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 
     122        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; 
     123        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ; 
     124        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ; 
     125        CXios::setXiosComm(xiosGlobalComm) ; 
     126      } 
     127      else 
     128      { 
     129 
     130        xiosGlobalCommByFileExchange(serverComm) ; 
     131 
     132      } 
     133       
     134      ///////////////////////////////////////// 
     135      ///////////// PART 4 //////////////////// 
     136      //  create servers intra communicator  // 
     137      /////////////////////////////////////////  
     138       
     139      int commRank ; 
     140      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ; 
     141      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ; 
     142       
     143      CXios::setUsingServer() ; 
     144 
     145      ///////////////////////////////////////// 
     146      ///////////// PART 5 //////////////////// 
     147      //       redirect files output         // 
     148      /////////////////////////////////////////  
     149       
     150      CServer::openInfoStream(CXios::serverFile); 
     151      CServer::openErrorStream(CXios::serverFile); 
     152 
     153      ///////////////////////////////////////// 
     154      ///////////// PART 4 //////////////////// 
     155      ///////////////////////////////////////// 
     156 
     157      CXios::launchDaemonsManager(true) ; 
     158      
     159      ///////////////////////////////////////// 
     160      ///////////// PART 5 //////////////////// 
     161      ///////////////////////////////////////// 
     162 
     163      // create the services 
     164 
     165      auto ressourcesManager=CXios::getRessourcesManager() ; 
     166      auto servicesManager=CXios::getServicesManager() ; 
     167      auto contextsManager=CXios::getContextsManager() ; 
     168      auto daemonsManager=CXios::getDaemonsManager() ; 
     169      auto serversRessource=CServer::getServersRessource() ; 
     170 
     171      if (serversRessource->isServerLeader()) 
     172      { 
     173        int nbRessources = ressourcesManager->getRessourcesSize() ; 
     174        if (!CXios::usingServer2) 
    60175        { 
    61           MPI_Init(NULL, NULL); 
    62         } 
    63         CTimer::get("XIOS").resume() ; 
    64  
    65         boost::hash<string> hashString ; 
    66         unsigned long hashServer = hashString(CXios::xiosCodeId); 
    67  
    68         unsigned long* hashAll ; 
    69         unsigned long* srvLevelAll ; 
    70  
    71         int size ; 
    72         int myColor ; 
    73         int i,c ; 
    74         MPI_Comm newComm; 
    75  
    76         MPI_Comm_size(CXios::globalComm, &size) ; 
    77         MPI_Comm_rank(CXios::globalComm, &rank_); 
    78  
    79         hashAll=new unsigned long[size] ; 
    80         MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ; 
    81  
    82         map<unsigned long, int> colors ; 
    83         map<unsigned long, int> leaders ; 
    84         map<unsigned long, int>::iterator it ; 
    85  
    86         // (1) Establish client leaders, distribute processes between two server levels 
    87         std::vector<int> srvRanks; 
    88         for(i=0,c=0;i<size;i++) 
    89         { 
    90           if (colors.find(hashAll[i])==colors.end()) 
    91           { 
    92             colors[hashAll[i]]=c ; 
    93             leaders[hashAll[i]]=i ; 
    94             c++ ; 
    95           } 
    96           if (CXios::usingServer2) 
    97             if (hashAll[i] == hashServer) 
    98               srvRanks.push_back(i); 
    99         } 
    100  
    101         if (CXios::usingServer2) 
    102         { 
    103           int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.; 
    104           if (reqNbProc<1 || reqNbProc==srvRanks.size()) 
    105           { 
    106             error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
    107                 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc 
    108                 <<" to secondary server. XIOS will run in the classical server mode."<<endl; 
    109           } 
    110           else 
    111           { 
    112             if (CXios::nbPoolsServer2 == 0) CXios::nbPoolsServer2 = reqNbProc; 
    113             int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ; 
    114             int poolLeader = firstSndSrvRank; 
    115 //*********** (1) Comment out the line below to set one process per pool 
    116             sndServerGlobalRanks.push_back(srvRanks[poolLeader]); 
    117             int nbPools = CXios::nbPoolsServer2; 
    118             if ( nbPools > reqNbProc || nbPools < 1) 
    119             { 
    120               error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
    121                   << "It is impossible to allocate the requested number of pools = "<<nbPools 
    122                   <<" on the secondary server. It will be set so that there is one process per pool."<<endl; 
    123               nbPools = reqNbProc; 
    124             } 
    125             int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools; 
    126             int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools; 
    127             for (i=0; i<srvRanks.size(); i++) 
    128             { 
    129               if (i >= firstSndSrvRank) 
    130               { 
    131                 if (rank_ == srvRanks[i]) 
    132                 { 
    133                   serverLevel=2; 
    134                 } 
    135                 poolLeader += procsPerPool; 
    136                 if (remainder != 0) 
    137                 { 
    138                   ++poolLeader; 
    139                   --remainder; 
    140                 } 
    141 //*********** (2) Comment out the two lines below to set one process per pool 
    142                 if (poolLeader < srvRanks.size()) 
    143                   sndServerGlobalRanks.push_back(srvRanks[poolLeader]); 
    144 //*********** (3) Uncomment the line below to set one process per pool 
    145 //                sndServerGlobalRanks.push_back(srvRanks[i]); 
    146               } 
    147               else 
    148               { 
    149                 if (rank_ == srvRanks[i]) serverLevel=1; 
    150               } 
    151             } 
    152             if (serverLevel==2) 
    153             { 
    154               info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; 
    155               for (i=0; i<sndServerGlobalRanks.size(); i++) 
    156               { 
    157                 if (rank_>= sndServerGlobalRanks[i]) 
    158                 { 
    159                   if ( i == sndServerGlobalRanks.size()-1) 
    160                   { 
    161                     myColor = colors.size() + sndServerGlobalRanks[i]; 
    162                   } 
    163                   else if (rank_< sndServerGlobalRanks[i+1]) 
    164                   { 
    165                     myColor = colors.size() + sndServerGlobalRanks[i]; 
    166                     break; 
    167                   } 
    168                 } 
    169               } 
    170             } 
    171           } 
    172         } 
    173  
    174         // (2) Create intraComm 
    175         if (serverLevel != 2) myColor=colors[hashServer]; 
    176         MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ; 
    177  
    178         // (3) Create interComm 
    179         if (serverLevel == 0) 
    180         { 
    181           int clientLeader; 
    182           for(it=leaders.begin();it!=leaders.end();it++) 
    183           { 
    184             if (it->first!=hashServer) 
    185             { 
    186               clientLeader=it->second ; 
    187               int intraCommSize, intraCommRank ; 
    188               MPI_Comm_size(intraComm,&intraCommSize) ; 
    189               MPI_Comm_rank(intraComm,&intraCommRank) ; 
    190               info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize 
    191                        <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ; 
    192  
    193               MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 
    194               interCommLeft.push_back(newComm) ; 
    195             } 
    196           } 
    197         } 
    198         else if (serverLevel == 1) 
    199         { 
    200           int clientLeader, srvSndLeader; 
    201           int srvPrmLeader ; 
    202  
    203           for (it=leaders.begin();it!=leaders.end();it++) 
    204           { 
    205             if (it->first != hashServer) 
    206             { 
    207               clientLeader=it->second ; 
    208               int intraCommSize, intraCommRank ; 
    209               MPI_Comm_size(intraComm, &intraCommSize) ; 
    210               MPI_Comm_rank(intraComm, &intraCommRank) ; 
    211               info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize 
    212                        <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ; 
    213               MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 
    214               interCommLeft.push_back(newComm) ; 
    215             } 
    216           } 
    217  
    218           for (int i = 0; i < sndServerGlobalRanks.size(); ++i) 
    219           { 
    220             int intraCommSize, intraCommRank ; 
    221             MPI_Comm_size(intraComm, &intraCommSize) ; 
    222             MPI_Comm_rank(intraComm, &intraCommRank) ; 
    223             info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize 
    224                 <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< sndServerGlobalRanks[i]<<endl ; 
    225             MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ; 
    226             interCommRight.push_back(newComm) ; 
    227           } 
     176          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
     177          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ; 
    228178        } 
    229179        else 
    230180        { 
    231           int clientLeader; 
    232           clientLeader = leaders[hashString(CXios::xiosCodeId)]; 
    233           int intraCommSize, intraCommRank ; 
    234           MPI_Comm_size(intraComm, &intraCommSize) ; 
    235           MPI_Comm_rank(intraComm, &intraCommRank) ; 
    236           info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize 
    237                    <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ; 
    238  
    239           MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ; 
    240           interCommLeft.push_back(newComm) ; 
     181          int nprocsServer = nbRessources*CXios::ratioServer2/100.; 
     182          int nprocsGatherer = nbRessources - nprocsServer ; 
     183           
     184          int nbPoolsServer2 = CXios::nbPoolsServer2 ; 
     185          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 
     186          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
     187          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 
     188          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ; 
    241189        } 
    242  
    243         delete [] hashAll ; 
    244  
    245       } 
    246       // using OASIS 
    247       else 
    248       { 
    249         int size; 
    250         int myColor; 
    251         int* srvGlobalRanks; 
    252         if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 
    253  
    254         CTimer::get("XIOS").resume() ; 
    255         MPI_Comm localComm; 
    256         oasis_get_localcomm(localComm); 
    257         MPI_Comm_rank(localComm,&rank_) ; 
    258  
    259 //      (1) Create server intraComm 
    260         if (!CXios::usingServer2) 
     190      } 
     191 
     192      ///////////////////////////////////////// 
     193      ///////////// PART 5 //////////////////// 
     194      ///////////////////////////////////////// 
     195      // loop on event loop 
     196 
     197      bool finished=false ; 
     198      while (!finished) 
     199      { 
     200        finished=daemonsManager->eventLoop() ; 
     201      } 
     202 
     203    } 
     204 
     205 
     206 
     207 
     208 
     209    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm) 
     210    { 
     211         
     212      MPI_Comm globalComm=CXios::getGlobalComm() ; 
     213      MPI_Comm xiosGlobalComm ; 
     214       
     215      string strIds=CXios::getin<string>("clients_code_id","") ; 
     216      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; 
     217       
     218      int commRank, globalRank ; 
     219      MPI_Comm_rank(serverComm, &commRank) ; 
     220      MPI_Comm_rank(globalComm, &globalRank) ; 
     221      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ; 
     222 
     223      if (commRank==0) // if root process publish name 
     224      {   
     225        std::ofstream ofs (serverFileName, std::ofstream::out); 
     226        ofs<<globalRank ; 
     227        ofs.close(); 
     228      } 
     229         
     230      vector<int> clientsRank(clientsCodeId.size()) ; 
     231      for(int i=0;i<clientsRank.size();i++) 
     232      { 
     233        std::ifstream ifs ; 
     234        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ; 
     235        do 
    261236        { 
    262           MPI_Comm_dup(localComm, &intraComm); 
     237          ifs.clear() ; 
     238          ifs.open(fileName, std::ifstream::in) ; 
     239        } while (ifs.fail()) ; 
     240        ifs>>clientsRank[i] ; 
     241        ifs.close() ;  
     242      } 
     243 
     244      MPI_Comm intraComm ; 
     245      MPI_Comm_dup(serverComm,&intraComm) ; 
     246      MPI_Comm interComm ; 
     247      for(int i=0 ; i<clientsRank.size(); i++) 
     248      {   
     249        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm); 
     250        MPI_Comm_free(&intraComm) ; 
     251        MPI_Intercomm_merge(interComm,false, &intraComm ) ; 
     252      } 
     253      xiosGlobalComm=intraComm ;   
     254      MPI_Barrier(xiosGlobalComm); 
     255      if (commRank==0) std::remove(serverFileName.c_str()) ; 
     256      MPI_Barrier(xiosGlobalComm); 
     257 
     258      CXios::setXiosComm(xiosGlobalComm) ; 
     259       
     260    } 
     261 
     262 
     263    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm) 
     264    { 
     265        // untested, need to be tested on a true MPI-2 compliant library 
     266 
     267        // try to discover other client/server 
     268/* 
     269        // publish server name 
     270        char portName[MPI_MAX_PORT_NAME]; 
     271        int ierr ; 
     272        int commRank ; 
     273        MPI_Comm_rank(serverComm, &commRank) ; 
     274         
     275        if (commRank==0) // if root process publish name 
     276        {   
     277          MPI_Open_port(MPI_INFO_NULL, portName); 
     278          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName); 
    263279        } 
    264         else 
    265         { 
    266           int globalRank; 
    267           MPI_Comm_size(localComm,&size) ; 
    268           MPI_Comm_rank(CXios::globalComm,&globalRank) ; 
    269           srvGlobalRanks = new int[size] ; 
    270           MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ; 
    271  
    272           int reqNbProc = size*CXios::ratioServer2/100.; 
    273           if (reqNbProc < 1 || reqNbProc == size) 
    274           { 
    275             error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
    276                 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc 
    277                 <<" to secondary server. XIOS will run in the classical server mode."<<endl; 
    278             MPI_Comm_dup(localComm, &intraComm); 
    279           } 
    280           else 
    281           { 
    282             int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ; 
    283             int poolLeader = firstSndSrvRank; 
    284 //*********** (1) Comment out the line below to set one process per pool 
    285 //            sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); 
    286             int nbPools = CXios::nbPoolsServer2; 
    287             if ( nbPools > reqNbProc || nbPools < 1) 
    288             { 
    289               error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
    290                   << "It is impossible to allocate the requested number of pools = "<<nbPools 
    291                   <<" on the secondary server. It will be set so that there is one process per pool."<<endl; 
    292               nbPools = reqNbProc; 
    293             } 
    294             int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools; 
    295             int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools; 
    296             for (int i=0; i<size; i++) 
    297             { 
    298               if (i >= firstSndSrvRank) 
    299               { 
    300                 if (globalRank == srvGlobalRanks[i]) 
    301                 { 
    302                   serverLevel=2; 
    303                 } 
    304                 poolLeader += procsPerPool; 
    305                 if (remainder != 0) 
    306                 { 
    307                   ++poolLeader; 
    308                   --remainder; 
    309                 } 
    310 //*********** (2) Comment out the two lines below to set one process per pool 
    311 //                if (poolLeader < size) 
    312 //                  sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); 
    313 //*********** (3) Uncomment the line below to set one process per pool 
    314                 sndServerGlobalRanks.push_back(srvGlobalRanks[i]); 
    315               } 
    316               else 
    317               { 
    318                 if (globalRank == srvGlobalRanks[i]) serverLevel=1; 
    319               } 
    320             } 
    321             if (serverLevel==2) 
    322             { 
    323               info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; 
    324               for (int i=0; i<sndServerGlobalRanks.size(); i++) 
    325               { 
    326                 if (globalRank>= sndServerGlobalRanks[i]) 
    327                 { 
    328                   if (i == sndServerGlobalRanks.size()-1) 
    329                   { 
    330                     myColor = sndServerGlobalRanks[i]; 
    331                   } 
    332                   else if (globalRank< sndServerGlobalRanks[i+1]) 
    333                   { 
    334                     myColor = sndServerGlobalRanks[i]; 
    335                     break; 
    336                   } 
    337                 } 
    338               } 
    339             } 
    340             if (serverLevel != 2) myColor=0; 
    341             MPI_Comm_split(localComm, myColor, rank_, &intraComm) ; 
    342           } 
     280 
     281        MPI_Comm intraComm=serverComm ; 
     282        MPI_Comm interComm ; 
     283        for(int i=0 ; i<clientsCodeId.size(); i++) 
     284        {   
     285          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm); 
     286          MPI_Intercomm_merge(interComm,false, &intraComm ) ; 
    343287        } 
    344  
    345         string codesId=CXios::getin<string>("oasis_codes_id") ; 
    346         vector<string> oasisCodeId=splitRegex(codesId,"\\s*,\\s*") ; 
    347   
    348         vector<string>::iterator it ; 
    349  
    350         MPI_Comm newComm ; 
    351         int globalRank ; 
    352         MPI_Comm_rank(CXios::globalComm,&globalRank); 
    353  
    354 //      (2) Create interComms with models 
    355         for(it=oasisCodeId.begin();it!=oasisCodeId.end();it++) 
    356         { 
    357           oasis_get_intercomm(newComm,*it) ; 
    358           if ( serverLevel == 0 || serverLevel == 1) 
    359           { 
    360             interCommLeft.push_back(newComm) ; 
    361             if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 
    362           } 
    363         } 
    364  
    365 //      (3) Create interComms between primary and secondary servers 
    366         int intraCommSize, intraCommRank ; 
    367         MPI_Comm_size(intraComm,&intraCommSize) ; 
    368         MPI_Comm_rank(intraComm, &intraCommRank) ; 
    369  
    370         if (serverLevel == 1) 
    371         { 
    372           for (int i = 0; i < sndServerGlobalRanks.size(); ++i) 
    373           { 
    374             int srvSndLeader = sndServerGlobalRanks[i]; 
    375             info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize 
    376                 <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvSndLeader<<endl ; 
    377             MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ; 
    378             interCommRight.push_back(newComm) ; 
    379           } 
    380         } 
    381         else if (serverLevel == 2) 
    382         { 
    383           info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize 
    384                    <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvGlobalRanks[0] <<endl ; 
    385           MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ; 
    386           interCommLeft.push_back(newComm) ; 
    387         } 
    388         if (CXios::usingServer2) delete [] srvGlobalRanks ; 
    389  
    390         bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ; 
    391         if (!oasisEnddef) oasis_enddef() ; 
    392       } 
    393  
    394  
    395       MPI_Comm_rank(intraComm, &rank) ; 
    396       if (rank==0) isRoot=true; 
    397       else isRoot=false; 
    398        
    399       eventScheduler = new CEventScheduler(intraComm) ; 
    400     } 
     288*/       
     289    } 
     290 
    401291 
    402292    void CServer::finalize(void) 
     
    412302        MPI_Comm_free(&(*it)); 
    413303 
    414 //      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++) 
    415 //        MPI_Comm_free(&(*it)); 
    416  
    417 //        for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++) 
    418 //          MPI_Comm_free(&(*it)); 
    419  
    420304        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 
    421305          MPI_Comm_free(&(*it)); 
    422306 
    423       MPI_Comm_free(&intraComm); 
    424  
     307//      MPI_Comm_free(&intraComm); 
     308 
     309      CXios::finalizeDaemonsManager(); 
     310       
    425311      if (!is_MPI_Initialized) 
    426312      { 
     
    432318      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ; 
    433319      report(100)<<CTimer::getAllCumulatedTime()<<endl ; 
    434     } 
    435  
    436      void CServer::eventLoop(void) 
    437      { 
    438        bool stop=false ; 
    439  
    440        CTimer::get("XIOS server").resume() ; 
    441        while(!stop) 
    442        { 
    443          if (isRoot) 
    444          { 
    445            listenContext(); 
    446            listenRootContext(); 
    447            listenOasisEnddef() ; 
    448            listenRootOasisEnddef() ; 
    449            if (!finished) listenFinalize() ; 
    450          } 
    451          else 
    452          { 
    453            listenRootContext(); 
    454            listenRootOasisEnddef() ; 
    455            if (!finished) listenRootFinalize() ; 
    456          } 
    457  
    458          contextEventLoop() ; 
    459          if (finished && contextList.empty()) stop=true ; 
    460          eventScheduler->checkEvent() ; 
    461        } 
    462        CTimer::get("XIOS server").suspend() ; 
    463      } 
    464  
    465      void CServer::listenFinalize(void) 
    466      { 
    467         list<MPI_Comm>::iterator it, itr; 
    468         int msg ; 
    469         int flag ; 
    470  
    471         for(it=interCommLeft.begin();it!=interCommLeft.end();it++) 
    472         { 
    473            MPI_Status status ; 
    474            traceOff() ; 
    475            MPI_Iprobe(0,0,*it,&flag,&status) ; 
    476            traceOn() ; 
    477            if (flag==true) 
    478            { 
    479               MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; 
    480               info(20)<<" CServer : Receive client finalize"<<endl ; 
    481               // Sending server finalize message to secondary servers (if any) 
    482               for(itr=interCommRight.begin();itr!=interCommRight.end();itr++) 
    483               { 
    484                 MPI_Send(&msg,1,MPI_INT,0,0,*itr) ; 
    485               } 
    486               MPI_Comm_free(&(*it)); 
    487               interCommLeft.erase(it) ; 
    488               break ; 
    489             } 
    490          } 
    491  
    492          if (interCommLeft.empty()) 
    493          { 
    494            int i,size ; 
    495            MPI_Comm_size(intraComm,&size) ; 
    496            MPI_Request* requests= new MPI_Request[size-1] ; 
    497            MPI_Status* status= new MPI_Status[size-1] ; 
    498  
    499            for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ; 
    500            MPI_Waitall(size-1,requests,status) ; 
    501  
    502            finished=true ; 
    503            delete [] requests ; 
    504            delete [] status ; 
    505          } 
    506      } 
    507  
    508  
    509      void CServer::listenRootFinalize() 
    510      { 
    511         int flag ; 
    512         MPI_Status status ; 
    513         int msg ; 
    514  
    515         traceOff() ; 
    516         MPI_Iprobe(0,4,intraComm, &flag, &status) ; 
    517         traceOn() ; 
    518         if (flag==true) 
    519         { 
    520            MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ; 
    521            finished=true ; 
    522         } 
    523       } 
    524  
    525  
    526    /*! 
    527     * Root process is listening for an order sent by client to call "oasis_enddef". 
    528     * The root client of a compound send the order (tag 5). It is probed and received. 
    529     * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any). 
    530     * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done 
    531     */ 
    532      
    533      void CServer::listenOasisEnddef(void) 
    534      { 
    535         int flag ; 
    536         MPI_Status status ; 
    537         list<MPI_Comm>::iterator it; 
    538         int msg ; 
    539         static int nbCompound=0 ; 
    540         int size ; 
    541         static bool sent=false ; 
    542         static MPI_Request* allRequests ; 
    543         static MPI_Status* allStatus ; 
    544  
    545  
    546         if (sent) 
    547         { 
    548           MPI_Comm_size(intraComm,&size) ; 
    549           MPI_Testall(size,allRequests, &flag, allStatus) ; 
    550           if (flag==true) 
    551           { 
    552             delete [] allRequests ; 
    553             delete [] allStatus ; 
    554             sent=false ; 
    555           } 
    556         } 
    557          
    558  
    559         for(it=interCommLeft.begin();it!=interCommLeft.end();it++) 
    560         { 
    561            MPI_Status status ; 
    562            traceOff() ; 
    563            MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5 
    564            traceOn() ; 
    565            if (flag==true) 
    566            { 
    567               MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5 
    568               nbCompound++ ; 
    569               if (nbCompound==interCommLeft.size()) 
    570               { 
    571                 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 
    572                 { 
    573                    MPI_Send(&msg,1,MPI_INT,0,5,*it) ; // tags oasis_endded = 5 
    574                 } 
    575                 MPI_Comm_size(intraComm,&size) ; 
    576                 allRequests= new MPI_Request[size] ; 
    577                 allStatus= new MPI_Status[size] ; 
    578                 for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm,&allRequests[i]) ; // tags oasis_endded = 5 
    579                 sent=true ; 
    580               } 
    581            } 
    582         } 
    583      } 
    584       
    585    /*! 
    586     * Processes probes message from root process if oasis_enddef call must be done. 
    587     * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator 
    588     */ 
    589      void CServer::listenRootOasisEnddef(void) 
    590      { 
    591        int flag ; 
    592        MPI_Status status ; 
    593        const int root=0 ; 
    594        int msg ; 
    595        static bool eventSent=false ; 
    596  
    597        if (eventSent) 
    598        { 
    599          boost::hash<string> hashString; 
    600          size_t hashId = hashString("oasis_enddef"); 
    601          if (eventScheduler->queryEvent(0,hashId)) 
    602          { 
    603            oasis_enddef() ; 
    604            eventSent=false ; 
    605          } 
    606        } 
    607           
    608        traceOff() ; 
    609        MPI_Iprobe(root,5,intraComm, &flag, &status) ; 
    610        traceOn() ; 
    611        if (flag==true) 
    612        { 
    613          MPI_Recv(&msg,1,MPI_INT,root,5,intraComm,&status) ; // tags oasis_endded = 5 
    614          boost::hash<string> hashString; 
    615          size_t hashId = hashString("oasis_enddef"); 
    616          eventScheduler->registerEvent(0,hashId); 
    617          eventSent=true ; 
    618        } 
    619      } 
    620  
    621  
    622  
    623       
    624  
    625      void CServer::listenContext(void) 
    626      { 
    627  
    628        MPI_Status status ; 
    629        int flag ; 
    630        static char* buffer ; 
    631        static MPI_Request request ; 
    632        static bool recept=false ; 
    633        int rank ; 
    634        int count ; 
    635  
    636        if (recept==false) 
    637        { 
    638          traceOff() ; 
    639          MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ; 
    640          traceOn() ; 
    641          if (flag==true) 
    642          { 
    643            rank=status.MPI_SOURCE ; 
    644            MPI_Get_count(&status,MPI_CHAR,&count) ; 
    645            buffer=new char[count] ; 
    646            MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ; 
    647            recept=true ; 
    648          } 
    649        } 
    650        else 
    651        { 
    652          traceOff() ; 
    653          MPI_Test(&request,&flag,&status) ; 
    654          traceOn() ; 
    655          if (flag==true) 
    656          { 
    657            rank=status.MPI_SOURCE ; 
    658            MPI_Get_count(&status,MPI_CHAR,&count) ; 
    659            recvContextMessage((void*)buffer,count) ; 
    660            delete [] buffer ; 
    661            recept=false ; 
    662          } 
    663        } 
    664      } 
    665  
    666      void CServer::recvContextMessage(void* buff,int count) 
    667      { 
    668        static map<string,contextMessage> recvContextId; 
    669        map<string,contextMessage>::iterator it ; 
    670        CBufferIn buffer(buff,count) ; 
    671        string id ; 
    672        int clientLeader ; 
    673        int nbMessage ; 
    674  
    675        buffer>>id>>nbMessage>>clientLeader ; 
    676  
    677        it=recvContextId.find(id) ; 
    678        if (it==recvContextId.end()) 
    679        { 
    680          contextMessage msg={0,0} ; 
    681          pair<map<string,contextMessage>::iterator,bool> ret ; 
    682          ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ; 
    683          it=ret.first ; 
    684        } 
    685        it->second.nbRecv+=1 ; 
    686        it->second.leaderRank+=clientLeader ; 
    687  
    688        if (it->second.nbRecv==nbMessage) 
    689        { 
    690          int size ; 
    691          MPI_Comm_size(intraComm,&size) ; 
    692 //         MPI_Request* requests= new MPI_Request[size-1] ; 
    693 //         MPI_Status* status= new MPI_Status[size-1] ; 
    694          MPI_Request* requests= new MPI_Request[size] ; 
    695          MPI_Status* status= new MPI_Status[size] ; 
    696  
    697          CMessage msg ; 
    698          msg<<id<<it->second.leaderRank; 
    699          int messageSize=msg.size() ; 
    700          void * sendBuff = new char[messageSize] ; 
    701          CBufferOut sendBuffer(sendBuff,messageSize) ; 
    702          sendBuffer<<msg ; 
    703  
    704          // Include root itself in order not to have a divergence 
    705          for(int i=0; i<size; i++) 
    706          { 
    707            MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ; 
    708          } 
    709  
    710          recvContextId.erase(it) ; 
    711          delete [] requests ; 
    712          delete [] status ; 
    713  
    714        } 
    715      } 
    716  
    717      void CServer::listenRootContext(void) 
    718      { 
    719        MPI_Status status ; 
    720        int flag ; 
    721        static std::vector<void*> buffers; 
    722        static std::vector<MPI_Request> requests ; 
    723        static std::vector<int> counts ; 
    724        static std::vector<bool> isEventRegistered ; 
    725        static std::vector<bool> isEventQueued ; 
    726        MPI_Request request; 
    727  
    728        int rank ; 
    729        const int root=0 ; 
    730        boost::hash<string> hashString; 
    731        size_t hashId = hashString("RegisterContext"); 
    732  
    733        // (1) Receive context id from the root, save it into a buffer 
    734        traceOff() ; 
    735        MPI_Iprobe(root,2,intraComm, &flag, &status) ; 
    736        traceOn() ; 
    737        if (flag==true) 
    738        { 
    739          counts.push_back(0); 
    740          MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ; 
    741          buffers.push_back(new char[counts.back()]) ; 
    742          requests.push_back(request); 
    743          MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&(requests.back())) ; 
    744          isEventRegistered.push_back(false); 
    745          isEventQueued.push_back(false); 
    746          nbContexts++; 
    747        } 
    748  
    749        for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ ) 
    750        { 
    751          // (2) If context id is received, register an event 
    752          MPI_Test(&requests[ctxNb],&flag,&status) ; 
    753          if (flag==true && !isEventRegistered[ctxNb]) 
    754          { 
    755            eventScheduler->registerEvent(ctxNb,hashId); 
    756            isEventRegistered[ctxNb] = true; 
    757          } 
    758          // (3) If event has been scheduled, call register context 
    759          if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb]) 
    760          { 
    761            registerContext(buffers[ctxNb],counts[ctxNb]) ; 
    762            isEventQueued[ctxNb] = true; 
    763            delete [] buffers[ctxNb] ; 
    764          } 
    765        } 
    766  
    767      } 
    768  
    769      void CServer::registerContext(void* buff, int count, int leaderRank) 
    770      { 
    771        string contextId; 
    772        CBufferIn buffer(buff, count); 
    773 //       buffer >> contextId; 
    774        buffer >> contextId>>leaderRank; 
    775        CContext* context; 
    776  
    777        info(20) << "CServer : Register new Context : " << contextId << endl; 
    778  
    779        if (contextList.find(contextId) != contextList.end()) 
    780          ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)", 
    781                << "Context '" << contextId << "' has already been registred"); 
    782  
    783        context=CContext::create(contextId); 
    784        contextList[contextId]=context; 
    785  
    786        // Primary or classical server: create communication channel with a client 
    787        // (1) create interComm (with a client) 
    788        // (2) initialize client and server (contextClient and contextServer) 
    789        MPI_Comm inter; 
    790        if (serverLevel < 2) 
    791        { 
    792          MPI_Comm contextInterComm; 
    793          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm); 
    794          MPI_Intercomm_merge(contextInterComm,1,&inter); 
    795          MPI_Barrier(inter); 
    796          MPI_Comm_free(&inter); 
    797          context->initServer(intraComm,contextInterComm); 
    798          contextInterComms.push_back(contextInterComm); 
    799  
    800        } 
    801        // Secondary server: create communication channel with a primary server 
    802        // (1) duplicate interComm with a primary server 
    803        // (2) initialize client and server (contextClient and contextServer) 
    804        // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create, 
    805        //         because interComm of CContext is defined on the same processes as the interComm of CServer. 
    806        //         So just duplicate it. 
    807        else if (serverLevel == 2) 
    808        { 
    809          MPI_Comm_dup(interCommLeft.front(), &inter); 
    810          contextInterComms.push_back(inter); 
    811          context->initServer(intraComm, contextInterComms.back()); 
    812        } 
    813  
    814        // Primary server: 
    815        // (1) send create context message to secondary servers 
    816        // (2) initialize communication channels with secondary servers (create contextClient and contextServer) 
    817        if (serverLevel == 1) 
    818        { 
    819          int i = 0, size; 
    820          MPI_Comm_size(intraComm, &size) ; 
    821          for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i) 
    822          { 
    823            StdString str = contextId +"_server_" + boost::lexical_cast<string>(i); 
    824            CMessage msg; 
    825            int messageSize; 
    826            msg<<str<<size<<rank_ ; 
    827            messageSize = msg.size() ; 
    828            buff = new char[messageSize] ; 
    829            CBufferOut buffer(buff,messageSize) ; 
    830            buffer<<msg ; 
    831            MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ; 
    832            MPI_Comm_dup(*it, &inter); 
    833            contextInterComms.push_back(inter); 
    834            MPI_Comm_dup(intraComm, &inter); 
    835            contextIntraComms.push_back(inter); 
    836            context->initClient(contextIntraComms.back(), contextInterComms.back()) ; 
    837            delete [] buff ; 
    838          } 
    839        } 
    840      } 
    841  
    842      void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/) 
    843      { 
    844        bool isFinalized ; 
    845        map<string,CContext*>::iterator it ; 
    846  
    847        for(it=contextList.begin();it!=contextList.end();it++) 
    848        { 
    849          isFinalized=it->second->isFinalized(); 
    850          if (isFinalized) 
    851          { 
    852            contextList.erase(it) ; 
    853            break ; 
    854          } 
    855          else 
    856            it->second->checkBuffersAndListen(enableEventsProcessing); 
    857        } 
    858      } 
    859  
    860      //! Get rank of the current process in the intraComm 
    861      int CServer::getRank() 
    862      { 
    863        int rank; 
    864        MPI_Comm_rank(intraComm,&rank); 
    865        return rank; 
    866      } 
    867  
    868      vector<int>& CServer::getSecondaryServerGlobalRanks() 
    869      { 
    870        return sndServerGlobalRanks; 
    871      } 
     320 
     321      CWorkflowGraph::drawWorkFlowGraph_server(); 
     322    } 
    872323 
    873324    /*! 
     
    881332    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb) 
    882333    { 
    883       StdStringStream fileNameClient; 
     334      StdStringStream fileNameServer; 
    884335      int numDigit = 0; 
    885       int size = 0; 
     336      int commSize = 0; 
     337      int commRank ; 
    886338      int id; 
    887       MPI_Comm_size(CXios::globalComm, &size); 
    888       while (size) 
    889       { 
    890         size /= 10; 
     339       
     340      MPI_Comm_size(CXios::getGlobalComm(), &commSize); 
     341      MPI_Comm_rank(CXios::getGlobalComm(), &commRank); 
     342 
     343      while (commSize) 
     344      { 
     345        commSize /= 10; 
    891346        ++numDigit; 
    892347      } 
    893       id = rank_; //getRank(); 
    894  
    895       fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; 
    896       fb->open(fileNameClient.str().c_str(), std::ios::out); 
     348      id = commRank; 
     349 
     350      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; 
     351      fb->open(fileNameServer.str().c_str(), std::ios::out); 
    897352      if (!fb->is_open()) 
    898353        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)", 
    899               << std::endl << "Can not open <" << fileNameClient.str() << "> file to write the server log(s)."); 
     354              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s)."); 
    900355    } 
    901356 
     
    953408      if (m_errorStream.is_open()) m_errorStream.close(); 
    954409    } 
     410 
     411    void CServer::launchServersRessource(MPI_Comm serverComm) 
     412    { 
     413      serversRessource_ = new CServersRessource(serverComm) ; 
     414    } 
    955415} 
Note: See TracChangeset for help on using the changeset viewer.