Changeset 2458 for XIOS3/trunk/src/node/context.cpp
- Timestamp:
- 01/25/23 16:59:46 (17 months ago)
- Location:
- XIOS3/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk
-
Property
svn:mergeinfo
set to
False
/XIOS3/dev/XIOS_FILE_SERVICES merged eligible
-
Property
svn:mergeinfo
set to
False
-
XIOS3/trunk/src/node/context.cpp
r2441 r2458 577 577 if (commRank==0) 578 578 { 579 CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions ) ;579 CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 580 580 for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 581 581 } … … 588 588 parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 589 589 int type ; 590 if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type ) ;590 if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 591 591 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 592 592 string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; … … 610 610 CATCH_DUMP_ATTR 611 611 612 613 // obsolete 612 614 void CContext::createServerInterComm(void) 613 615 TRY … … 645 647 } 646 648 CATCH_DUMP_ATTR 649 650 651 void CContext::getServerInterComm(const string& poolId, const string& serviceId, vector<pair<CContextClient*,CContextServer*>>& clientServers) 652 { 653 vector<pair<string, pair<CContextClient*,CContextServer*>>> retClientServers ; 654 655 auto it=serversMap_.find(make_pair(poolId,serviceId)) ; 656 if (it!=serversMap_.end()) clientServers=it->second ; 657 else 658 { 659 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 660 else createServerInterComm(poolId, serviceId, retClientServers) ; 661 for(auto& retClientServer : retClientServers) clientServers.push_back(retClientServer.second) ; 662 663 int serviceType ; 664 if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType) ; 665 MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 666 667 for(auto& clientServer : clientServers) 668 { 669 if (serviceType==CServicesManager::WRITER) { writerClientOut_.push_back(clientServer.first) ; writerServerOut_.push_back(clientServer.second) ; } 670 else if (serviceType==CServicesManager::READER) { readerClientOut_.push_back(clientServer.first) ; readerServerOut_.push_back(clientServer.second) ; } 671 else if (serviceType==CServicesManager::GATHERER) { writerClientOut_.push_back(clientServer.first) ; writerServerOut_.push_back(clientServer.second) ; } 672 } 673 serversMap_.insert(make_pair(make_pair(poolId,serviceId),clientServers)) ; 674 } 675 676 } 677 678 vector<CContextClient*> CContext::getContextClient(const string& poolId, const string& serviceId) 679 { 680 vector<pair<CContextClient*,CContextServer*>> clientServers ; 681 getServerInterComm(poolId, serviceId, clientServers ) ; 682 vector<CContextClient*> ret ; 683 for(auto& clientServer : clientServers) ret.push_back(clientServer.first) ; 684 return ret ; 685 } 686 647 687 648 688 void CContext::globalEventLoop(void) … … 698 738 for(auto couplerOut : couplerOutClient_) couplerOut.second->eventLoop(); 699 739 for(auto couplerIn : couplerInClient_) couplerIn.second->eventLoop(); 700 for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 701 for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 740 //for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 741 //for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 742 for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(); 743 for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(); 702 744 } 703 745 setCurrent(getId()) ; … … 783 825 CContextServer* server ; 784 826 827 /* 785 828 if (writerClientOut_.size()!=0) 786 829 { … … 803 846 info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 804 847 } 848 */ 849 850 for(int n=0; n<writerClientOut_.size() ; n++) 851 { 852 client=writerClientOut_[n] ; 853 server=writerServerOut_[n] ; 854 855 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to writer"<<endl ; 856 client->finalize(); 857 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to writer"<<endl ; 858 bool bufferReleased; 859 do 860 { 861 client->eventLoop(); 862 bufferReleased = !client->havePendingRequests(); 863 } while (!bufferReleased); 864 info(100)<<"DEBUG: context "<<getId()<<" no pending request on writer ok"<<endl ; 865 866 bool notifiedFinalized=false ; 867 do 868 { 869 notifiedFinalized=client->isNotifiedFinalized() ; 870 } while (!notifiedFinalized) ; 871 server->releaseBuffers(); 872 client->releaseBuffers(); 873 info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 874 } 875 805 876 806 877 if (readerClientOut_.size()!=0) … … 879 950 880 951 952 void CContext::setDefaultServices(void) 953 { 954 defaultPoolWriterId_ = CXios::defaultPoolId ; 955 defaultPoolReaderId_ = CXios::defaultPoolId ; 956 defaultPoolGathererId_ = CXios::defaultPoolId ; 957 defaultWriterId_ = CXios::defaultWriterId ; 958 defaultReaderId_ = CXios::defaultReaderId ; 959 defaultGathererId_ = CXios::defaultGathererId ; 960 defaultUsingServer2_ = CXios::usingServer2 ; 961 962 if (!default_pool.isEmpty()) defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 963 if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 964 if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 965 if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 966 if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 967 if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 968 if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 969 if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 970 } 971 881 972 /*! 882 973 \brief Close all the context defintion and do processing data … … 894 985 895 986 CTimer::get("Context : close definition").resume() ; 896 987 897 988 // create intercommunicator with servers. 898 989 // not sure it is the good place to be called here 899 createServerInterComm() ;990 //createServerInterComm() ; 900 991 901 992 … … 916 1007 // pour chacun des contextes. 917 1008 solveDescInheritance(true); 918 1009 setDefaultServices() ; 919 1010 // Check if some axis, domains or grids are eligible to for compressed indexed output. 920 1011 // Warning: This must be done after solving the inheritance and before the rest of post-processing … … 1023 1114 1024 1115 // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields 1025 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles); 1026 //else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 1116 1117 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles) ; 1118 /* 1119 if (serviceType_==CServicesManager::CLIENT ) 1120 { 1121 if (CXios::usingServer2) distributeFiles(this->enabledWriteModeFiles, defaultPoolGathererId_, defaultGathererId_); 1122 else distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 1123 } 1124 if (serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 1125 */ 1027 1126 1028 1127 // client side, assign context for file reading 1029 if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 1030 1128 // if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 1129 if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) 1130 { 1131 string poolReaderId ; 1132 string readerId ; 1133 file->getReaderServicesId(defaultPoolReaderId_, defaultReaderId_, poolReaderId, readerId) ; 1134 file->setContextClient(poolReaderId, readerId, 0) ; 1135 } 1136 1031 1137 // server side, assign context where to send file data read 1032 1138 if (serviceType_==CServicesManager::READER) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientIn_[0]) ; … … 1163 1269 // fix size for each context client 1164 1270 for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ; 1271 1165 1272 1166 1273 CTimer::get("Context : close definition").suspend() ; … … 1436 1543 TRY 1437 1544 { 1545 map< pair<string,string>, vector<CFile*>> fileMaps ; 1546 for(auto& file : files) 1547 { 1548 string poolWriterId ; 1549 string poolGathererId ; 1550 string writerId ; 1551 string gathererId ; 1552 bool usingServer2 ; 1553 1554 file->getWriterServicesId(defaultUsingServer2_, defaultPoolWriterId_, defaultWriterId_, defaultPoolGathererId_, defaultGathererId_, 1555 usingServer2, poolWriterId, writerId, poolGathererId, gathererId) ; 1556 if (serviceType_==CServicesManager::CLIENT && usingServer2) fileMaps[make_pair(poolGathererId,gathererId)].push_back(file) ; 1557 else fileMaps[make_pair(poolWriterId,writerId)].push_back(file) ; 1558 } 1559 for(auto& it : fileMaps) distributeFilesOnSameService(it.second, it.first.first, it.first.second) ; 1560 } 1561 CATCH_DUMP_ATTR 1562 1563 1564 void CContext::distributeFilesOnSameService(const vector<CFile*>& files, const string& poolId, const string& serviceId) 1565 TRY 1566 { 1438 1567 bool distFileMemory=false ; 1439 1568 distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory); 1440 1569 1441 int nbPools = writerClientOut_.size(); 1442 if (nbPools==1) distributeFileOverOne(files) ; 1443 else if (distFileMemory) distributeFileOverMemoryBandwith(files) ; 1444 else distributeFileOverBandwith(files) ; 1445 } 1446 CATCH_DUMP_ATTR 1447 1448 void CContext::distributeFileOverOne(const vector<CFile*>& files) 1449 TRY 1450 { 1451 for(auto& file : files) file->setContextClient(writerClientOut_[0]) ; 1452 } 1453 CATCH_DUMP_ATTR 1454 1455 void CContext::distributeFileOverBandwith(const vector<CFile*>& files) 1570 auto writers = getContextClient(poolId, serviceId) ; 1571 int nbPools = writers.size() ; 1572 1573 if (nbPools==1) distributeFileOverOne(files, poolId, serviceId) ; 1574 else if (distFileMemory) distributeFileOverMemoryBandwith(files, poolId, serviceId) ; 1575 else distributeFileOverBandwith(files, poolId, serviceId) ; 1576 } 1577 CATCH_DUMP_ATTR 1578 1579 void CContext::distributeFileOverOne(const vector<CFile*>& files, const string& poolId, const string& serviceId) 1580 TRY 1581 { 1582 for(auto& file : files) file->setContextClient(poolId, serviceId,0) ; 1583 } 1584 CATCH_DUMP_ATTR 1585 1586 void CContext::distributeFileOverBandwith(const vector<CFile*>& files, const string& poolId, const string& serviceId) 1456 1587 TRY 1457 1588 { … … 1459 1590 1460 1591 std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 1461 int nbPools = writerClientOut_.size(); 1592 auto writers = getContextClient(poolId, serviceId) ; 1593 int nbPools = writers.size(); 1594 //int nbPools = writerClientOut_.size(); 1462 1595 1463 1596 // (1) Find all enabled files in write mode … … 1512 1645 dataSize=(*poolDataSize.begin()).first ; 1513 1646 j=(*poolDataSize.begin()).second ; 1514 dataSizeMap[i].second->setContextClient( writerClientOut_[j]);1647 dataSizeMap[i].second->setContextClient(poolId, serviceId, j); 1515 1648 dataSize+=dataSizeMap[i].first; 1516 1649 poolDataSize.erase(poolDataSize.begin()) ; … … 1522 1655 CATCH_DUMP_ATTR 1523 1656 1524 void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList) 1525 TRY 1526 { 1527 int nbPools = writerClientOut_.size(); 1657 void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList, const string& poolId, const string& serviceId) 1658 TRY 1659 { 1660 auto writers = getContextClient(poolId, serviceId) ; 1661 int nbPools = writers.size(); 1662 1528 1663 double ratio=0.5 ; 1529 1664 ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); … … 1591 1726 } 1592 1727 } 1593 filesList[i]->setContextClient( writerClientOut_[files[i].assignedServer_]) ;1728 filesList[i]->setContextClient(poolId, serviceId, files[i].assignedServer_) ; 1594 1729 delete [] files[i].assignedGrid_ ; 1595 1730 }
Note: See TracChangeset
for help on using the changeset viewer.