Changeset 2453 for XIOS3/dev/XIOS_FILE_SERVICES/src/node/context.cpp
- Timestamp:
- 01/03/23 19:06:42 (18 months ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/dev/XIOS_FILE_SERVICES/src/node/context.cpp
r2441 r2453 577 577 if (commRank==0) 578 578 { 579 CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions ) ;579 CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 580 580 for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 581 581 } … … 588 588 parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 589 589 int type ; 590 if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type ) ;590 if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 591 591 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 592 592 string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; … … 610 610 CATCH_DUMP_ATTR 611 611 612 613 // obsolete 612 614 void CContext::createServerInterComm(void) 613 615 TRY … … 645 647 } 646 648 CATCH_DUMP_ATTR 649 650 651 void CContext::getServerInterComm(const string& poolId, const string& serviceId, vector<pair<CContextClient*,CContextServer*>>& clientServers) 652 { 653 vector<pair<string, pair<CContextClient*,CContextServer*>>> retClientServers ; 654 655 auto it=serversMap_.find(make_pair(poolId,serviceId)) ; 656 if (it!=serversMap_.end()) clientServers=it->second ; 657 else 658 { 659 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 660 else createServerInterComm(poolId, serviceId, retClientServers) ; 661 for(auto& retClientServer : retClientServers) clientServers.push_back(retClientServer.second) ; 662 663 int serviceType ; 664 if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType) ; 665 MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 666 667 for(auto& clientServer : clientServers) 668 { 669 if (serviceType==CServicesManager::WRITER) { writerClientOut_.push_back(clientServer.first) ; writerServerOut_.push_back(clientServer.second) ; } 670 else if (serviceType==CServicesManager::READER) { readerClientOut_.push_back(clientServer.first) ; readerServerOut_.push_back(clientServer.second) ; } 671 else if (serviceType==CServicesManager::GATHERER) { writerClientOut_.push_back(clientServer.first) ; writerServerOut_.push_back(clientServer.second) ; } 672 } 673 serversMap_.insert(make_pair(make_pair(poolId,serviceId),clientServers)) ; 674 } 675 676 } 677 678 vector<CContextClient*> CContext::getContextClient(const string& poolId, const string& serviceId) 679 { 680 vector<pair<CContextClient*,CContextServer*>> clientServers ; 681 getServerInterComm(poolId, serviceId, clientServers ) ; 682 vector<CContextClient*> ret ; 683 for(auto& clientServer : clientServers) ret.push_back(clientServer.first) ; 684 return ret ; 685 } 686 647 687 648 688 void CContext::globalEventLoop(void) … … 783 823 CContextServer* server ; 784 824 825 /* 785 826 if (writerClientOut_.size()!=0) 786 827 { … … 803 844 info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 804 845 } 846 */ 847 848 for(int n=0; n<writerClientOut_.size() ; n++) 849 { 850 client=writerClientOut_[n] ; 851 server=writerServerOut_[n] ; 852 853 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to writer"<<endl ; 854 client->finalize(); 855 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to writer"<<endl ; 856 bool bufferReleased; 857 do 858 { 859 client->eventLoop(); 860 bufferReleased = !client->havePendingRequests(); 861 } while (!bufferReleased); 862 info(100)<<"DEBUG: context "<<getId()<<" no pending request on writer ok"<<endl ; 863 864 bool notifiedFinalized=false ; 865 do 866 { 867 notifiedFinalized=client->isNotifiedFinalized() ; 868 } while (!notifiedFinalized) ; 869 server->releaseBuffers(); 870 client->releaseBuffers(); 871 info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 872 } 873 805 874 806 875 if (readerClientOut_.size()!=0) … … 879 948 880 949 950 void CContext::setDefaultServices(void) 951 { 952 defaultPoolWriterId_ = CXios::defaultPoolId ; 953 defaultPoolReaderId_ = CXios::defaultPoolId ; 954 defaultPoolGathererId_ = CXios::defaultPoolId ; 955 defaultWriterId_ = CXios::defaultWriterId ; 956 defaultReaderId_ = CXios::defaultReaderId ; 957 defaultGathererId_ = CXios::defaultGathererId ; 958 defaultUsingServer2_ = CXios::usingServer2 ; 959 960 if (!default_pool.isEmpty()) defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 961 if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 962 if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 963 if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 964 if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 965 if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 966 if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 967 if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 968 } 969 881 970 /*! 882 971 \brief Close all the context defintion and do processing data … … 894 983 895 984 CTimer::get("Context : close definition").resume() ; 896 985 897 986 // create intercommunicator with servers. 898 987 // not sure it is the good place to be called here 899 createServerInterComm() ;988 //createServerInterComm() ; 900 989 901 990 … … 916 1005 // pour chacun des contextes. 917 1006 solveDescInheritance(true); 918 1007 setDefaultServices() ; 919 1008 // Check if some axis, domains or grids are eligible to for compressed indexed output. 920 1009 // Warning: This must be done after solving the inheritance and before the rest of post-processing … … 1023 1112 1024 1113 // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields 1025 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles); 1026 //else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 1114 1115 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles) ; 1116 /* 1117 if (serviceType_==CServicesManager::CLIENT ) 1118 { 1119 if (CXios::usingServer2) distributeFiles(this->enabledWriteModeFiles, defaultPoolGathererId_, defaultGathererId_); 1120 else distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 1121 } 1122 if (serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 1123 */ 1027 1124 1028 1125 // client side, assign context for file reading 1029 if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 1030 1126 // if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 1127 if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) 1128 { 1129 string poolReaderId ; 1130 string readerId ; 1131 file->getReaderServicesId(defaultPoolReaderId_, defaultReaderId_, poolReaderId, readerId) ; 1132 file->setContextClient(poolReaderId, readerId, 0) ; 1133 } 1134 1031 1135 // server side, assign context where to send file data read 1032 1136 if (serviceType_==CServicesManager::READER) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientIn_[0]) ; … … 1163 1267 // fix size for each context client 1164 1268 for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ; 1269 1165 1270 1166 1271 CTimer::get("Context : close definition").suspend() ; … … 1436 1541 TRY 1437 1542 { 1543 map< pair<string,string>, vector<CFile*>> fileMaps ; 1544 for(auto& file : files) 1545 { 1546 string poolWriterId ; 1547 string poolGathererId ; 1548 string writerId ; 1549 string gathererId ; 1550 bool usingServer2 ; 1551 1552 file->getWriterServicesId(defaultUsingServer2_, defaultPoolWriterId_, defaultWriterId_, defaultPoolGathererId_, defaultGathererId_, 1553 usingServer2, poolWriterId, writerId, poolGathererId, gathererId) ; 1554 if (serviceType_==CServicesManager::CLIENT && usingServer2) fileMaps[make_pair(poolGathererId,gathererId)].push_back(file) ; 1555 else fileMaps[make_pair(poolWriterId,writerId)].push_back(file) ; 1556 } 1557 for(auto& it : fileMaps) distributeFilesOnSameService(it.second, it.first.first, it.first.second) ; 1558 } 1559 CATCH_DUMP_ATTR 1560 1561 1562 void CContext::distributeFilesOnSameService(const vector<CFile*>& files, const string& poolId, const string& serviceId) 1563 TRY 1564 { 1438 1565 bool distFileMemory=false ; 1439 1566 distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory); 1440 1567 1441 int nbPools = writerClientOut_.size(); 1442 if (nbPools==1) distributeFileOverOne(files) ; 1443 else if (distFileMemory) distributeFileOverMemoryBandwith(files) ; 1444 else distributeFileOverBandwith(files) ; 1445 } 1446 CATCH_DUMP_ATTR 1447 1448 void CContext::distributeFileOverOne(const vector<CFile*>& files) 1449 TRY 1450 { 1451 for(auto& file : files) file->setContextClient(writerClientOut_[0]) ; 1452 } 1453 CATCH_DUMP_ATTR 1454 1455 void CContext::distributeFileOverBandwith(const vector<CFile*>& files) 1568 auto writers = getContextClient(poolId, serviceId) ; 1569 int nbPools = writers.size() ; 1570 1571 if (nbPools==1) distributeFileOverOne(files, poolId, serviceId) ; 1572 else if (distFileMemory) distributeFileOverMemoryBandwith(files, poolId, serviceId) ; 1573 else distributeFileOverBandwith(files, poolId, serviceId) ; 1574 } 1575 CATCH_DUMP_ATTR 1576 1577 void CContext::distributeFileOverOne(const vector<CFile*>& files, const string& poolId, const string& serviceId) 1578 TRY 1579 { 1580 for(auto& file : files) file->setContextClient(poolId, serviceId,0) ; 1581 } 1582 CATCH_DUMP_ATTR 1583 1584 void CContext::distributeFileOverBandwith(const vector<CFile*>& files, const string& poolId, const string& serviceId) 1456 1585 TRY 1457 1586 { … … 1459 1588 1460 1589 std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 1461 int nbPools = writerClientOut_.size(); 1590 auto writers = getContextClient(poolId, serviceId) ; 1591 int nbPools = writers.size(); 1592 //int nbPools = writerClientOut_.size(); 1462 1593 1463 1594 // (1) Find all enabled files in write mode … … 1512 1643 dataSize=(*poolDataSize.begin()).first ; 1513 1644 j=(*poolDataSize.begin()).second ; 1514 dataSizeMap[i].second->setContextClient( writerClientOut_[j]);1645 dataSizeMap[i].second->setContextClient(poolId, serviceId, j); 1515 1646 dataSize+=dataSizeMap[i].first; 1516 1647 poolDataSize.erase(poolDataSize.begin()) ; … … 1522 1653 CATCH_DUMP_ATTR 1523 1654 1524 void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList) 1525 TRY 1526 { 1527 int nbPools = writerClientOut_.size(); 1655 void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList, const string& poolId, const string& serviceId) 1656 TRY 1657 { 1658 auto writers = getContextClient(poolId, serviceId) ; 1659 int nbPools = writers.size(); 1660 1528 1661 double ratio=0.5 ; 1529 1662 ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); … … 1591 1724 } 1592 1725 } 1593 filesList[i]->setContextClient( writerClientOut_[files[i].assignedServer_]) ;1726 filesList[i]->setContextClient(poolId, serviceId, files[i].assignedServer_) ; 1594 1727 delete [] files[i].assignedGrid_ ; 1595 1728 }
Note: See TracChangeset
for help on using the changeset viewer.