Ignore:
Timestamp:
01/03/23 19:06:42 (18 months ago)
Author:
ymipsl
Message:

Implementation of files service on dev branch

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/dev/XIOS_FILE_SERVICES/src/node/context.cpp

    r2441 r2453  
    577577    if (commRank==0) 
    578578    { 
    579       CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions) ; 
     579      CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 
    580580      for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 
    581581    } 
     
    588588      parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 
    589589      int type ;  
    590       if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type) ; 
     590      if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 
    591591      MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 
    592592      string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; 
     
    610610  CATCH_DUMP_ATTR 
    611611   
     612   
     613  // obsolete 
    612614  void CContext::createServerInterComm(void)  
    613615  TRY 
     
    645647  } 
    646648  CATCH_DUMP_ATTR 
     649 
     650   
     651  void CContext::getServerInterComm(const string& poolId, const string& serviceId,  vector<pair<CContextClient*,CContextServer*>>& clientServers) 
     652  { 
     653    vector<pair<string, pair<CContextClient*,CContextServer*>>> retClientServers ; 
     654 
     655    auto it=serversMap_.find(make_pair(poolId,serviceId)) ; 
     656    if (it!=serversMap_.end()) clientServers=it->second ; 
     657    else 
     658    { 
     659      if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 
     660      else createServerInterComm(poolId, serviceId, retClientServers) ; 
     661      for(auto& retClientServer : retClientServers)  clientServers.push_back(retClientServer.second) ; 
     662       
     663      int serviceType ; 
     664      if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType) ; 
     665      MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 
     666       
     667      for(auto& clientServer : clientServers)      
     668      { 
     669        if (serviceType==CServicesManager::WRITER) { writerClientOut_.push_back(clientServer.first)      ; writerServerOut_.push_back(clientServer.second) ; } 
     670        else if (serviceType==CServicesManager::READER) { readerClientOut_.push_back(clientServer.first) ; readerServerOut_.push_back(clientServer.second) ; } 
     671        else if (serviceType==CServicesManager::GATHERER) { writerClientOut_.push_back(clientServer.first) ; writerServerOut_.push_back(clientServer.second) ; } 
     672      } 
     673      serversMap_.insert(make_pair(make_pair(poolId,serviceId),clientServers)) ; 
     674    } 
     675 
     676  } 
     677 
     678  vector<CContextClient*> CContext::getContextClient(const string& poolId, const string& serviceId) 
     679  { 
     680    vector<pair<CContextClient*,CContextServer*>> clientServers ; 
     681    getServerInterComm(poolId, serviceId, clientServers ) ; 
     682    vector<CContextClient*> ret ; 
     683    for(auto& clientServer : clientServers) ret.push_back(clientServer.first) ; 
     684    return ret ; 
     685  } 
     686 
    647687 
    648688  void CContext::globalEventLoop(void) 
     
    783823        CContextServer* server ; 
    784824 
     825        /* 
    785826        if (writerClientOut_.size()!=0) 
    786827        { 
     
    803844          info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 
    804845        } 
     846        */ 
     847 
     848        for(int n=0; n<writerClientOut_.size() ; n++) 
     849        { 
     850          client=writerClientOut_[n] ; 
     851          server=writerServerOut_[n] ; 
     852 
     853          info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to writer"<<endl ; 
     854          client->finalize(); 
     855          info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to writer"<<endl ; 
     856          bool bufferReleased; 
     857          do 
     858          { 
     859            client->eventLoop(); 
     860            bufferReleased = !client->havePendingRequests(); 
     861          } while (!bufferReleased); 
     862          info(100)<<"DEBUG: context "<<getId()<<" no pending request on writer ok"<<endl ; 
     863 
     864          bool notifiedFinalized=false ; 
     865          do 
     866          { 
     867            notifiedFinalized=client->isNotifiedFinalized() ; 
     868          } while (!notifiedFinalized) ; 
     869          server->releaseBuffers(); 
     870          client->releaseBuffers(); 
     871          info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 
     872        } 
     873         
    805874 
    806875        if (readerClientOut_.size()!=0) 
     
    879948 
    880949    
     950   void CContext::setDefaultServices(void) 
     951   { 
     952     defaultPoolWriterId_ = CXios::defaultPoolId ; 
     953     defaultPoolReaderId_ = CXios::defaultPoolId ; 
     954     defaultPoolGathererId_ = CXios::defaultPoolId ; 
     955     defaultWriterId_ = CXios::defaultWriterId ; 
     956     defaultReaderId_ = CXios::defaultReaderId ; 
     957     defaultGathererId_ = CXios::defaultGathererId ; 
     958     defaultUsingServer2_ = CXios::usingServer2 ; 
     959      
     960     if (!default_pool.isEmpty())  defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 
     961     if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 
     962     if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 
     963     if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 
     964     if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 
     965     if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 
     966     if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 
     967     if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 
     968   } 
     969 
    881970   /*! 
    882971   \brief Close all the context defintion and do processing data 
     
    894983 
    895984     CTimer::get("Context : close definition").resume() ; 
    896       
     985           
    897986     // create intercommunicator with servers.  
    898987     // not sure it is the good place to be called here  
    899      createServerInterComm() ; 
     988     //createServerInterComm() ; 
    900989 
    901990 
     
    9161005    // pour chacun des contextes. 
    9171006    solveDescInheritance(true); 
    918   
     1007    setDefaultServices() ; 
    9191008    // Check if some axis, domains or grids are eligible to for compressed indexed output. 
    9201009    // Warning: This must be done after solving the inheritance and before the rest of post-processing 
     
    10231112 
    10241113    // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields 
    1025     if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles); 
    1026     //else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 
     1114     
     1115    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles) ; 
     1116    /* 
     1117    if (serviceType_==CServicesManager::CLIENT ) 
     1118    {    
     1119      if (CXios::usingServer2) distributeFiles(this->enabledWriteModeFiles, defaultPoolGathererId_, defaultGathererId_); 
     1120      else distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 
     1121    } 
     1122    if (serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 
     1123    */ 
    10271124 
    10281125    // client side, assign context for file reading 
    1029     if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 
    1030      
     1126//    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 
     1127    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles)  
     1128    { 
     1129      string poolReaderId ; 
     1130      string readerId ; 
     1131      file->getReaderServicesId(defaultPoolReaderId_, defaultReaderId_, poolReaderId, readerId) ; 
     1132      file->setContextClient(poolReaderId, readerId, 0) ; 
     1133    } 
     1134 
    10311135    // server side, assign context where to send file data read 
    10321136    if (serviceType_==CServicesManager::READER) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientIn_[0]) ; 
     
    11631267    // fix size for each context client 
    11641268    for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ; 
     1269 
    11651270 
    11661271     CTimer::get("Context : close definition").suspend() ; 
     
    14361541   TRY 
    14371542   { 
     1543     map< pair<string,string>, vector<CFile*>> fileMaps ; 
     1544     for(auto& file : files) 
     1545     { 
     1546       string poolWriterId ; 
     1547       string poolGathererId ; 
     1548       string writerId  ; 
     1549       string gathererId  ; 
     1550       bool usingServer2 ; 
     1551 
     1552       file->getWriterServicesId(defaultUsingServer2_, defaultPoolWriterId_, defaultWriterId_, defaultPoolGathererId_, defaultGathererId_, 
     1553                                 usingServer2, poolWriterId, writerId, poolGathererId, gathererId) ; 
     1554       if (serviceType_==CServicesManager::CLIENT && usingServer2) fileMaps[make_pair(poolGathererId,gathererId)].push_back(file) ; 
     1555       else fileMaps[make_pair(poolWriterId,writerId)].push_back(file) ; 
     1556     } 
     1557     for(auto& it : fileMaps) distributeFilesOnSameService(it.second, it.first.first, it.first.second) ; 
     1558   } 
     1559   CATCH_DUMP_ATTR 
     1560 
     1561 
     1562   void CContext::distributeFilesOnSameService(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
     1563   TRY 
     1564   { 
    14381565     bool distFileMemory=false ; 
    14391566     distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory); 
    14401567 
    1441      int nbPools = writerClientOut_.size(); 
    1442      if (nbPools==1) distributeFileOverOne(files) ; 
    1443      else if (distFileMemory) distributeFileOverMemoryBandwith(files) ; 
    1444      else distributeFileOverBandwith(files) ; 
    1445    } 
    1446    CATCH_DUMP_ATTR 
    1447  
    1448    void CContext::distributeFileOverOne(const vector<CFile*>& files) 
    1449    TRY 
    1450    { 
    1451     for(auto& file : files) file->setContextClient(writerClientOut_[0]) ; 
    1452    } 
    1453    CATCH_DUMP_ATTR 
    1454  
    1455    void CContext::distributeFileOverBandwith(const vector<CFile*>& files) 
     1568     auto writers = getContextClient(poolId, serviceId) ; 
     1569     int  nbPools = writers.size() ; 
     1570      
     1571     if (nbPools==1) distributeFileOverOne(files, poolId, serviceId) ; 
     1572     else if (distFileMemory) distributeFileOverMemoryBandwith(files, poolId, serviceId) ; 
     1573     else distributeFileOverBandwith(files, poolId, serviceId) ; 
     1574   } 
     1575   CATCH_DUMP_ATTR 
     1576 
     1577   void CContext::distributeFileOverOne(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
     1578   TRY 
     1579   { 
     1580     for(auto& file : files) file->setContextClient(poolId, serviceId,0) ; 
     1581   } 
     1582   CATCH_DUMP_ATTR 
     1583 
     1584   void CContext::distributeFileOverBandwith(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
    14561585   TRY 
    14571586   { 
     
    14591588      
    14601589     std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 
    1461      int nbPools = writerClientOut_.size(); 
     1590     auto writers = getContextClient(poolId, serviceId) ; 
     1591     int nbPools = writers.size(); 
     1592     //int nbPools = writerClientOut_.size(); 
    14621593 
    14631594     // (1) Find all enabled files in write mode 
     
    15121643       dataSize=(*poolDataSize.begin()).first ; 
    15131644       j=(*poolDataSize.begin()).second ; 
    1514        dataSizeMap[i].second->setContextClient(writerClientOut_[j]); 
     1645       dataSizeMap[i].second->setContextClient(poolId, serviceId, j); 
    15151646       dataSize+=dataSizeMap[i].first; 
    15161647       poolDataSize.erase(poolDataSize.begin()) ; 
     
    15221653   CATCH_DUMP_ATTR 
    15231654 
    1524    void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList) 
    1525    TRY 
    1526    { 
    1527      int nbPools = writerClientOut_.size(); 
     1655   void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList, const string& poolId, const string& serviceId) 
     1656   TRY 
     1657   { 
     1658     auto writers = getContextClient(poolId, serviceId) ; 
     1659     int nbPools = writers.size(); 
     1660     
    15281661     double ratio=0.5 ; 
    15291662     ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); 
     
    15911724         } 
    15921725       } 
    1593        filesList[i]->setContextClient(writerClientOut_[files[i].assignedServer_]) ; 
     1726       filesList[i]->setContextClient(poolId, serviceId, files[i].assignedServer_) ; 
    15941727       delete [] files[i].assignedGrid_ ; 
    15951728     } 
Note: See TracChangeset for help on using the changeset viewer.