Ignore:
Timestamp:
01/25/23 16:59:46 (17 months ago)
Author:
ymipsl
Message:

Merge XIOS_FILE_SERVICE dev branch into trunk

YM

Location:
XIOS3/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk

  • XIOS3/trunk/src/node/context.cpp

    r2441 r2458  
    577577    if (commRank==0) 
    578578    { 
    579       CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions) ; 
     579      CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 
    580580      for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 
    581581    } 
     
    588588      parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 
    589589      int type ;  
    590       if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type) ; 
     590      if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 
    591591      MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 
    592592      string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; 
     
    610610  CATCH_DUMP_ATTR 
    611611   
     612   
     613  // obsolete 
    612614  void CContext::createServerInterComm(void)  
    613615  TRY 
     
    645647  } 
    646648  CATCH_DUMP_ATTR 
     649 
     650   
     651  void CContext::getServerInterComm(const string& poolId, const string& serviceId,  vector<pair<CContextClient*,CContextServer*>>& clientServers) 
     652  { 
     653    vector<pair<string, pair<CContextClient*,CContextServer*>>> retClientServers ; 
     654 
     655    auto it=serversMap_.find(make_pair(poolId,serviceId)) ; 
     656    if (it!=serversMap_.end()) clientServers=it->second ; 
     657    else 
     658    { 
     659      if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 
     660      else createServerInterComm(poolId, serviceId, retClientServers) ; 
     661      for(auto& retClientServer : retClientServers)  clientServers.push_back(retClientServer.second) ; 
     662       
     663      int serviceType ; 
     664      if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType) ; 
     665      MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 
     666       
     667      for(auto& clientServer : clientServers)      
     668      { 
     669        if (serviceType==CServicesManager::WRITER) { writerClientOut_.push_back(clientServer.first)      ; writerServerOut_.push_back(clientServer.second) ; } 
     670        else if (serviceType==CServicesManager::READER) { readerClientOut_.push_back(clientServer.first) ; readerServerOut_.push_back(clientServer.second) ; } 
     671        else if (serviceType==CServicesManager::GATHERER) { writerClientOut_.push_back(clientServer.first) ; writerServerOut_.push_back(clientServer.second) ; } 
     672      } 
     673      serversMap_.insert(make_pair(make_pair(poolId,serviceId),clientServers)) ; 
     674    } 
     675 
     676  } 
     677 
     678  vector<CContextClient*> CContext::getContextClient(const string& poolId, const string& serviceId) 
     679  { 
     680    vector<pair<CContextClient*,CContextServer*>> clientServers ; 
     681    getServerInterComm(poolId, serviceId, clientServers ) ; 
     682    vector<CContextClient*> ret ; 
     683    for(auto& clientServer : clientServers) ret.push_back(clientServer.first) ; 
     684    return ret ; 
     685  } 
     686 
    647687 
    648688  void CContext::globalEventLoop(void) 
     
    698738      for(auto couplerOut : couplerOutClient_) couplerOut.second->eventLoop(); 
    699739      for(auto couplerIn : couplerInClient_) couplerIn.second->eventLoop(); 
    700       for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 
    701       for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 
     740      //for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 
     741      //for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 
     742      for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(); 
     743      for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(); 
    702744    } 
    703745    setCurrent(getId()) ; 
     
    783825        CContextServer* server ; 
    784826 
     827        /* 
    785828        if (writerClientOut_.size()!=0) 
    786829        { 
     
    803846          info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 
    804847        } 
     848        */ 
     849 
     850        for(int n=0; n<writerClientOut_.size() ; n++) 
     851        { 
     852          client=writerClientOut_[n] ; 
     853          server=writerServerOut_[n] ; 
     854 
     855          info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to writer"<<endl ; 
     856          client->finalize(); 
     857          info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to writer"<<endl ; 
     858          bool bufferReleased; 
     859          do 
     860          { 
     861            client->eventLoop(); 
     862            bufferReleased = !client->havePendingRequests(); 
     863          } while (!bufferReleased); 
     864          info(100)<<"DEBUG: context "<<getId()<<" no pending request on writer ok"<<endl ; 
     865 
     866          bool notifiedFinalized=false ; 
     867          do 
     868          { 
     869            notifiedFinalized=client->isNotifiedFinalized() ; 
     870          } while (!notifiedFinalized) ; 
     871          server->releaseBuffers(); 
     872          client->releaseBuffers(); 
     873          info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 
     874        } 
     875         
    805876 
    806877        if (readerClientOut_.size()!=0) 
     
    879950 
    880951    
     952   void CContext::setDefaultServices(void) 
     953   { 
     954     defaultPoolWriterId_ = CXios::defaultPoolId ; 
     955     defaultPoolReaderId_ = CXios::defaultPoolId ; 
     956     defaultPoolGathererId_ = CXios::defaultPoolId ; 
     957     defaultWriterId_ = CXios::defaultWriterId ; 
     958     defaultReaderId_ = CXios::defaultReaderId ; 
     959     defaultGathererId_ = CXios::defaultGathererId ; 
     960     defaultUsingServer2_ = CXios::usingServer2 ; 
     961      
     962     if (!default_pool.isEmpty())  defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 
     963     if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 
     964     if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 
     965     if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 
     966     if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 
     967     if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 
     968     if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 
     969     if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 
     970   } 
     971 
    881972   /*! 
    882973   \brief Close all the context defintion and do processing data 
     
    894985 
    895986     CTimer::get("Context : close definition").resume() ; 
    896       
     987           
    897988     // create intercommunicator with servers.  
    898989     // not sure it is the good place to be called here  
    899      createServerInterComm() ; 
     990     //createServerInterComm() ; 
    900991 
    901992 
     
    9161007    // pour chacun des contextes. 
    9171008    solveDescInheritance(true); 
    918   
     1009    setDefaultServices() ; 
    9191010    // Check if some axis, domains or grids are eligible to for compressed indexed output. 
    9201011    // Warning: This must be done after solving the inheritance and before the rest of post-processing 
     
    10231114 
    10241115    // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields 
    1025     if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles); 
    1026     //else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 
     1116     
     1117    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles) ; 
     1118    /* 
     1119    if (serviceType_==CServicesManager::CLIENT ) 
     1120    {    
     1121      if (CXios::usingServer2) distributeFiles(this->enabledWriteModeFiles, defaultPoolGathererId_, defaultGathererId_); 
     1122      else distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 
     1123    } 
     1124    if (serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 
     1125    */ 
    10271126 
    10281127    // client side, assign context for file reading 
    1029     if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 
    1030      
     1128//    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 
     1129    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles)  
     1130    { 
     1131      string poolReaderId ; 
     1132      string readerId ; 
     1133      file->getReaderServicesId(defaultPoolReaderId_, defaultReaderId_, poolReaderId, readerId) ; 
     1134      file->setContextClient(poolReaderId, readerId, 0) ; 
     1135    } 
     1136 
    10311137    // server side, assign context where to send file data read 
    10321138    if (serviceType_==CServicesManager::READER) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientIn_[0]) ; 
     
    11631269    // fix size for each context client 
    11641270    for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ; 
     1271 
    11651272 
    11661273     CTimer::get("Context : close definition").suspend() ; 
     
    14361543   TRY 
    14371544   { 
     1545     map< pair<string,string>, vector<CFile*>> fileMaps ; 
     1546     for(auto& file : files) 
     1547     { 
     1548       string poolWriterId ; 
     1549       string poolGathererId ; 
     1550       string writerId  ; 
     1551       string gathererId  ; 
     1552       bool usingServer2 ; 
     1553 
     1554       file->getWriterServicesId(defaultUsingServer2_, defaultPoolWriterId_, defaultWriterId_, defaultPoolGathererId_, defaultGathererId_, 
     1555                                 usingServer2, poolWriterId, writerId, poolGathererId, gathererId) ; 
     1556       if (serviceType_==CServicesManager::CLIENT && usingServer2) fileMaps[make_pair(poolGathererId,gathererId)].push_back(file) ; 
     1557       else fileMaps[make_pair(poolWriterId,writerId)].push_back(file) ; 
     1558     } 
     1559     for(auto& it : fileMaps) distributeFilesOnSameService(it.second, it.first.first, it.first.second) ; 
     1560   } 
     1561   CATCH_DUMP_ATTR 
     1562 
     1563 
     1564   void CContext::distributeFilesOnSameService(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
     1565   TRY 
     1566   { 
    14381567     bool distFileMemory=false ; 
    14391568     distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory); 
    14401569 
    1441      int nbPools = writerClientOut_.size(); 
    1442      if (nbPools==1) distributeFileOverOne(files) ; 
    1443      else if (distFileMemory) distributeFileOverMemoryBandwith(files) ; 
    1444      else distributeFileOverBandwith(files) ; 
    1445    } 
    1446    CATCH_DUMP_ATTR 
    1447  
    1448    void CContext::distributeFileOverOne(const vector<CFile*>& files) 
    1449    TRY 
    1450    { 
    1451     for(auto& file : files) file->setContextClient(writerClientOut_[0]) ; 
    1452    } 
    1453    CATCH_DUMP_ATTR 
    1454  
    1455    void CContext::distributeFileOverBandwith(const vector<CFile*>& files) 
     1570     auto writers = getContextClient(poolId, serviceId) ; 
     1571     int  nbPools = writers.size() ; 
     1572      
     1573     if (nbPools==1) distributeFileOverOne(files, poolId, serviceId) ; 
     1574     else if (distFileMemory) distributeFileOverMemoryBandwith(files, poolId, serviceId) ; 
     1575     else distributeFileOverBandwith(files, poolId, serviceId) ; 
     1576   } 
     1577   CATCH_DUMP_ATTR 
     1578 
     1579   void CContext::distributeFileOverOne(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
     1580   TRY 
     1581   { 
     1582     for(auto& file : files) file->setContextClient(poolId, serviceId,0) ; 
     1583   } 
     1584   CATCH_DUMP_ATTR 
     1585 
     1586   void CContext::distributeFileOverBandwith(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
    14561587   TRY 
    14571588   { 
     
    14591590      
    14601591     std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 
    1461      int nbPools = writerClientOut_.size(); 
     1592     auto writers = getContextClient(poolId, serviceId) ; 
     1593     int nbPools = writers.size(); 
     1594     //int nbPools = writerClientOut_.size(); 
    14621595 
    14631596     // (1) Find all enabled files in write mode 
     
    15121645       dataSize=(*poolDataSize.begin()).first ; 
    15131646       j=(*poolDataSize.begin()).second ; 
    1514        dataSizeMap[i].second->setContextClient(writerClientOut_[j]); 
     1647       dataSizeMap[i].second->setContextClient(poolId, serviceId, j); 
    15151648       dataSize+=dataSizeMap[i].first; 
    15161649       poolDataSize.erase(poolDataSize.begin()) ; 
     
    15221655   CATCH_DUMP_ATTR 
    15231656 
    1524    void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList) 
    1525    TRY 
    1526    { 
    1527      int nbPools = writerClientOut_.size(); 
     1657   void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList, const string& poolId, const string& serviceId) 
     1658   TRY 
     1659   { 
     1660     auto writers = getContextClient(poolId, serviceId) ; 
     1661     int nbPools = writers.size(); 
     1662     
    15281663     double ratio=0.5 ; 
    15291664     ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); 
     
    15911726         } 
    15921727       } 
    1593        filesList[i]->setContextClient(writerClientOut_[files[i].assignedServer_]) ; 
     1728       filesList[i]->setContextClient(poolId, serviceId, files[i].assignedServer_) ; 
    15941729       delete [] files[i].assignedGrid_ ; 
    15951730     } 
Note: See TracChangeset for help on using the changeset viewer.