Changeset 1869 for XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp
- Timestamp:
- 04/15/20 13:23:39 (4 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp
r1853 r1869 730 730 731 731 // Distribute files between secondary servers according to the data size 732 distributeFiles( );732 distributeFiles(this->enabledWriteModeFiles); 733 733 734 734 // Check grid and calculate its distribution … … 852 852 and the active fields (fields will be written onto active files) 853 853 */ 854 855 void CContext::closeDefinition(void) 854 void CContext::closeDefinition(void) 856 855 TRY 857 856 { 858 857 CTimer::get("Context : close definition").resume() ; 858 859 // create intercommunicator with servers. 860 // not sure it is the good place to be called here 861 createServerInterComm() ; 862 863 864 // After xml is parsed, there are some more works with post processing 865 // postProcessing(); 866 867 // Make sure the calendar was correctly created 868 if (!calendar) 869 ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"") 870 else if (calendar->getTimeStep() == NoneDu) 871 ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"") 872 // Calendar first update to set the current date equals to the start date 873 calendar->update(0); 874 875 // Résolution des héritages descendants (cà d des héritages de groupes) 876 // pour chacun des contextes. 877 solveDescInheritance(true); 878 879 // Solve inheritance for field to know if enabled or not. 880 for (auto field : CField::getAll()) field->solveRefInheritance(); 881 882 // Check if some axis, domains or grids are eligible to for compressed indexed output. 883 // Warning: This must be done after solving the inheritance and before the rest of post-processing 884 // --> later ???? checkAxisDomainsGridsEligibilityForCompressedOutput(); 885 886 // Check if some automatic time series should be generated 887 // Warning: This must be done after solving the inheritance and before the rest of post-processing 888 889 // The timeseries should only be prepared in client 890 prepareTimeseries(); 891 892 //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir. 893 findEnabledFiles(); 894 findEnabledWriteModeFiles(); 895 findEnabledReadModeFiles(); 896 findEnabledCouplerIn(); 897 findEnabledCouplerOut(); 898 createCouplerInterCommunicator() ; 899 900 // Find all enabled fields of each file 901 const vector<CField*>&& fileOutField = findAllEnabledFieldsInFileOut(this->enabledWriteModeFiles); 902 const vector<CField*>&& fileInField = findAllEnabledFieldsInFileIn(this->enabledReadModeFiles); 903 const vector<CField*>&& CouplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut); 904 const vector<CField*>&& CouplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn); 905 findFieldsWithReadAccess(); 906 const vector<CField*>& fieldWithReadAccess = fieldsWithReadAccess_ ; 907 908 // find all field potentially at workflow end 909 vector<CField*> endWorkflowFields ; 910 endWorkflowFields.reserve(fileOutField.size()+CouplerOutField.size()+fieldWithReadAccess.size()) ; 911 endWorkflowFields.insert(endWorkflowFields.end(),fileOutField.begin(), fileOutField.end()) ; 912 endWorkflowFields.insert(endWorkflowFields.end(),CouplerOutField.begin(), CouplerOutField.end()) ; 913 endWorkflowFields.insert(endWorkflowFields.end(),fieldWithReadAccess.begin(), fieldWithReadAccess.end()) ; 914 915 for(auto endWorkflowField : endWorkflowFields) endWorkflowField->buildWorkflowGraph(garbageCollector) ; 859 916 860 // 861 postProcessingGlobalAttributes(); 917 // Distribute files between secondary servers according to the data size => assign a context to a file 918 if (serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles); 919 else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 920 921 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) 922 { 923 for(auto field : fileOutField) 924 { 925 field->connectToFileServer(garbageCollector) ; // connect the field to server filter 926 field->computeGridIndexToFileServer() ; // compute grid index for transfer to the server context 927 } 928 } 929 930 931 932 933 934 // For now, only read files with client and only one level server 935 // if (hasClient && !hasServer) findEnabledReadModeFiles(); 936 937 // Find all enabled fields of each file 938 findAllEnabledFieldsInFiles(this->enabledWriteModeFiles); 939 findAllEnabledFieldsInFiles(this->enabledReadModeFiles); 940 941 // For now, only read files with client and only one level server 942 // if (hasClient && !hasServer) 943 // findAllEnabledFieldsInFiles(this->enabledReadModeFiles); 944 945 if (serviceType_==CServicesManager::CLIENT) 946 { 947 initReadFiles(); 948 // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis) 949 this->readAttributesOfEnabledFieldsInReadModeFiles(); 950 } 951 952 // Only search and rebuild all reference objects of enable fields, don't transform 953 this->solveOnlyRefOfEnabledFields(); 954 955 // Search and rebuild all reference object of enabled fields, and transform 956 this->solveAllRefOfEnabledFieldsAndTransform(); 957 958 // Find all fields with read access from the public API 959 if (serviceType_==CServicesManager::CLIENT) findFieldsWithReadAccess(); 960 // and solve the all reference for them 961 if (serviceType_==CServicesManager::CLIENT) solveAllRefOfFieldsWithReadAccess(); 962 963 isPostProcessed = true; 964 965 966 967 // Distribute files between secondary servers according to the data size 968 distributeFiles(this->enabledWriteModeFiles); 969 970 // Check grid and calculate its distribution 971 checkGridEnabledFields(); 972 973 setClientServerBuffer(client, (serviceType_==CServicesManager::CLIENT) ) ; 974 for (int i = 0; i < clientPrimServer.size(); ++i) 975 setClientServerBuffer(clientPrimServer[i], true); 976 977 978 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) 979 { 980 if (serviceType_==CServicesManager::GATHERER) 981 { 982 for (auto it=clientPrimServer.begin(); it!=clientPrimServer.end();++it) 983 { 984 this->sendAllAttributesToServer(*it); // Send all attributes of current context to server 985 CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(*it); // Send all attributes of current calendar 986 } 987 } 988 else 989 { 990 this->sendAllAttributesToServer(client); // Send all attributes of current context to server 991 CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(client); // Send all attributes of current calendar 992 } 993 994 // We have enough information to send to server 995 // First of all, send all enabled files 996 sendEnabledFiles(this->enabledWriteModeFiles); 997 // We only use server-level 1 (for now) to read data 998 if (serviceType_==CServicesManager::CLIENT) sendEnabledFiles(this->enabledReadModeFiles); 999 1000 // Then, send all enabled fields 1001 sendEnabledFieldsInFiles(this->enabledWriteModeFiles); 1002 1003 if (serviceType_==CServicesManager::CLIENT) sendEnabledFieldsInFiles(this->enabledReadModeFiles); 1004 1005 // Then, check whether we have domain_ref, axis_ref or scalar_ref attached to the enabled fields 1006 // If any, so send them to server 1007 sendRefDomainsAxisScalars(this->enabledWriteModeFiles); 1008 1009 if (serviceType_==CServicesManager::CLIENT) sendRefDomainsAxisScalars(this->enabledReadModeFiles); 1010 1011 // Check whether enabled fields have grid_ref, if any, send this info to server 1012 sendRefGrid(this->enabledFiles); 1013 // This code may be useful in the future when we want to seperate completely read and write 1014 // sendRefGrid(this->enabledWriteModeFiles); 1015 // if (!hasServer) 1016 // sendRefGrid(this->enabledReadModeFiles); 1017 1018 // A grid of enabled fields composed of several components which must be checked then their 1019 // checked attributes should be sent to server 1020 sendGridComponentEnabledFieldsInFiles(this->enabledFiles); // This code can be seperated in two (one for reading, another for writing) 1021 1022 // We have a xml tree on the server side and now, it should be also processed 1023 sendPostProcessing(); 1024 1025 // Finally, we send information of grid itself to server 1026 sendGridEnabledFieldsInFiles(this->enabledWriteModeFiles); 1027 1028 if (serviceType_==CServicesManager::CLIENT) sendGridEnabledFieldsInFiles(this->enabledReadModeFiles); 1029 } 1030 allProcessed = true; 1031 862 1032 863 1033 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) sendPostProcessingGlobalAttributes(); … … 884 1054 885 1055 CTimer::get("Context : close definition").suspend() ; 886 } 887 CATCH_DUMP_ATTR 888 889 void CContext::findAllEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles) 890 TRY 891 { 1056 } 1057 CATCH_DUMP_ATTR 1058 1059 1060 void CContext::closeDefinition_old(void) 1061 TRY 1062 { 1063 CTimer::get("Context : close definition").resume() ; 1064 1065 // 1066 postProcessingGlobalAttributes(); 1067 1068 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) sendPostProcessingGlobalAttributes(); 1069 1070 // There are some processings that should be done after all of above. For example: check mask or index 1071 this->buildFilterGraphOfEnabledFields(); 1072 1073 if (serviceType_==CServicesManager::CLIENT) 1074 { 1075 buildFilterGraphOfFieldsWithReadAccess(); 1076 postProcessFilterGraph(); // For coupling in, modify this later 1077 } 1078 1079 checkGridEnabledFields(); 1080 1081 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) this->sendProcessingGridOfEnabledFields(); 1082 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) this->sendCloseDefinition(); 1083 1084 // Nettoyage de l'arborescence 1085 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) CleanTree(); // Only on client side?? 1086 1087 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) sendCreateFileHeader(); 1088 if (serviceType_==CServicesManager::CLIENT) startPrefetchingOfEnabledReadModeFiles(); 1089 1090 CTimer::get("Context : close definition").suspend() ; 1091 } 1092 CATCH_DUMP_ATTR 1093 1094 vector<CField*> CContext::findAllEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles) 1095 TRY 1096 { 1097 vector<CField*> fields ; 892 1098 for (unsigned int i = 0; i < activeFiles.size(); i++) 893 (void)activeFiles[i]->getEnabledFields(); 894 } 895 CATCH_DUMP_ATTR 1099 { 1100 const vector<CField*>&& field=activeFiles[i]->getEnabledFields() ; 1101 fields.insert(fields.end(),field.begin(),field.end()); 1102 } 1103 return fields ; 1104 } 1105 CATCH_DUMP_ATTR 1106 1107 vector<CField*> CContext::findAllEnabledFieldsInFileOut(const std::vector<CFile*>& activeFiles) 1108 TRY 1109 { 1110 vector<CField*> fields ; 1111 for(auto file : activeFiles) 1112 { 1113 const vector<CField*>&& fieldList=file->getEnabledFields() ; 1114 for(auto field : fieldList) field->setFileOut(file) ; 1115 fields.insert(fields.end(),fieldList.begin(),fieldList.end()); 1116 } 1117 return fields ; 1118 } 1119 CATCH_DUMP_ATTR 1120 1121 vector<CField*> CContext::findAllEnabledFieldsInFileIn(const std::vector<CFile*>& activeFiles) 1122 TRY 1123 { 1124 vector<CField*> fields ; 1125 for(auto file : activeFiles) 1126 { 1127 const vector<CField*>&& fieldList=file->getEnabledFields() ; 1128 for(auto field : fieldList) field->setFileIn(file) ; 1129 fields.insert(fields.end(),fieldList.begin(),fieldList.end()); 1130 } 1131 return fields ; 1132 } 1133 CATCH_DUMP_ATTR 1134 1135 vector<CField*> CContext::findAllEnabledFieldsCouplerOut(const std::vector<CCouplerOut*>& activeCouplerOut) 1136 TRY 1137 { 1138 vector<CField*> fields ; 1139 for (auto couplerOut :activeCouplerOut) 1140 { 1141 const vector<CField*>&& fieldList=couplerOut->getEnabledFields() ; 1142 for(auto field : fieldList) field->setCouplerOut(couplerOut) ; 1143 fields.insert(fields.end(),fieldList.begin(),fieldList.end()); 1144 } 1145 return fields ; 1146 } 1147 CATCH_DUMP_ATTR 1148 1149 vector<CField*> CContext::findAllEnabledFieldsCouplerIn(const std::vector<CCouplerIn*>& activeCouplerIn) 1150 TRY 1151 { 1152 vector<CField*> fields ; 1153 for (auto couplerIn :activeCouplerIn) 1154 { 1155 const vector<CField*>&& fieldList=couplerIn->getEnabledFields() ; 1156 for(auto field : fieldList) field->setCouplerIn(couplerIn) ; 1157 fields.insert(fields.end(),fieldList.begin(),fieldList.end()); 1158 } 1159 return fields ; 1160 } 1161 CATCH_DUMP_ATTR 1162 1163 896 1164 897 1165 void CContext::readAttributesOfEnabledFieldsInReadModeFiles() … … 1081 1349 TRY 1082 1350 { 1083 fieldsWithReadAccess .clear();1351 fieldsWithReadAccess_.clear(); 1084 1352 const vector<CField*> allFields = CField::getAll(); 1085 1353 for (size_t i = 0; i < allFields.size(); ++i) 1086 1354 { 1087 1355 CField* field = allFields[i]; 1088 1089 if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)1090 field ->read_access = true;1091 else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))1092 fieldsWithReadAccess.push_back(field);1356 if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled)) 1357 { 1358 fieldsWithReadAccess_.push_back(field); 1359 field->setModelOut() ; 1360 } 1093 1361 } 1094 1362 } … … 1098 1366 TRY 1099 1367 { 1100 for (size_t i = 0; i < fieldsWithReadAccess .size(); ++i)1101 fieldsWithReadAccess [i]->solveAllReferenceEnabledField(false);1368 for (size_t i = 0; i < fieldsWithReadAccess_.size(); ++i) 1369 fieldsWithReadAccess_[i]->solveAllReferenceEnabledField(false); 1102 1370 } 1103 1371 CATCH_DUMP_ATTR … … 1106 1374 TRY 1107 1375 { 1108 for (size_t i = 0; i < fieldsWithReadAccess .size(); ++i)1109 fieldsWithReadAccess [i]->buildFilterGraph(garbageCollector, true);1376 for (size_t i = 0; i < fieldsWithReadAccess_.size(); ++i) 1377 fieldsWithReadAccess_[i]->buildFilterGraph(garbageCollector, true); 1110 1378 } 1111 1379 CATCH_DUMP_ATTR … … 1139 1407 unsigned int i = 0; 1140 1408 for (i = 0; i < vecSize; ++i) 1141 allGrids[i]->solve DomainAxisRefInheritance(apply);1409 allGrids[i]->solveElementsRefInheritance(apply); 1142 1410 1143 1411 } … … 1227 1495 1228 1496 1229 void CContext::distributeFiles( void)1497 void CContext::distributeFiles(const vector<CFile*>& files) 1230 1498 TRY 1231 1499 { … … 1233 1501 distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory); 1234 1502 1235 if (distFileMemory) distributeFileOverMemoryBandwith( ) ;1236 else distributeFileOverBandwith( ) ;1237 } 1238 CATCH_DUMP_ATTR 1239 1240 void CContext::distributeFileOverBandwith( void)1503 if (distFileMemory) distributeFileOverMemoryBandwith(files) ; 1504 else distributeFileOverBandwith(files) ; 1505 } 1506 CATCH_DUMP_ATTR 1507 1508 void CContext::distributeFileOverBandwith(const vector<CFile*>& files) 1241 1509 TRY 1242 1510 { 1243 1511 double eps=std::numeric_limits<double>::epsilon()*10 ; 1244 1512 1245 if (serviceType_==CServicesManager::GATHERER) 1246 { 1247 std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 1248 int nbPools = clientPrimServer.size(); 1249 1250 // (1) Find all enabled files in write mode 1251 // for (int i = 0; i < this->enabledFiles.size(); ++i) 1252 // { 1253 // if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write )) 1254 // enabledWriteModeFiles.push_back(enabledFiles[i]); 1255 // } 1256 1257 // (2) Estimate the data volume for each file 1258 int size = this->enabledWriteModeFiles.size(); 1259 std::vector<std::pair<double, CFile*> > dataSizeMap; 1260 double dataPerPool = 0; 1261 int nfield=0 ; 1262 ofs<<size<<endl ; 1263 for (size_t i = 0; i < size; ++i) 1513 std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 1514 int nbPools = clientPrimServer.size(); 1515 1516 // (1) Find all enabled files in write mode 1517 // for (int i = 0; i < this->enabledFiles.size(); ++i) 1518 // { 1519 // if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write )) 1520 // enabledWriteModeFiles.push_back(enabledFiles[i]); 1521 // } 1522 1523 // (2) Estimate the data volume for each file 1524 int size = files.size(); 1525 std::vector<std::pair<double, CFile*> > dataSizeMap; 1526 double dataPerPool = 0; 1527 int nfield=0 ; 1528 ofs<<size<<endl ; 1529 for (size_t i = 0; i < size; ++i) 1530 { 1531 CFile* file = files[i]; 1532 ofs<<file->getId()<<endl ; 1533 StdSize dataSize=0; 1534 std::vector<CField*> enabledFields = file->getEnabledFields(); 1535 size_t numEnabledFields = enabledFields.size(); 1536 ofs<<numEnabledFields<<endl ; 1537 for (size_t j = 0; j < numEnabledFields; ++j) 1264 1538 { 1265 CFile* file = this->enabledWriteModeFiles[i]; 1266 ofs<<file->getId()<<endl ; 1267 StdSize dataSize=0; 1268 std::vector<CField*> enabledFields = file->getEnabledFields(); 1269 size_t numEnabledFields = enabledFields.size(); 1270 ofs<<numEnabledFields<<endl ; 1271 for (size_t j = 0; j < numEnabledFields; ++j) 1539 dataSize += enabledFields[j]->getGlobalWrittenSize() ; 1540 ofs<<enabledFields[j]->getGrid()->getId()<<endl ; 1541 ofs<<enabledFields[j]->getGlobalWrittenSize()<<endl ; 1542 } 1543 double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; 1544 double dataSizeSec= dataSize/ outFreqSec; 1545 ofs<<dataSizeSec<<endl ; 1546 nfield++ ; 1547 // add epsilon*nField to dataSizeSec in order to preserve reproductive ordering when sorting 1548 dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file)); 1549 dataPerPool += dataSizeSec; 1550 } 1551 dataPerPool /= nbPools; 1552 std::sort(dataSizeMap.begin(), dataSizeMap.end()); 1553 1554 // (3) Assign contextClient to each enabled file 1555 1556 std::multimap<double,int> poolDataSize ; 1557 // multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11 1558 1559 int j; 1560 double dataSize ; 1561 for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ; 1562 1563 for (int i = dataSizeMap.size()-1; i >= 0; --i) 1564 { 1565 dataSize=(*poolDataSize.begin()).first ; 1566 j=(*poolDataSize.begin()).second ; 1567 dataSizeMap[i].second->setContextClient(clientPrimServer[j]); 1568 dataSize+=dataSizeMap[i].first; 1569 poolDataSize.erase(poolDataSize.begin()) ; 1570 poolDataSize.insert(std::pair<double,int>(dataSize,j)) ; 1571 } 1572 1573 for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) info(30)<<"Load Balancing for servers (perfect=1) : "<<it->second<<" : ratio "<<it->first*1./dataPerPool<<endl ; 1574 } 1575 CATCH_DUMP_ATTR 1576 1577 void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList) 1578 TRY 1579 { 1580 int nbPools = clientPrimServer.size(); 1581 double ratio=0.5 ; 1582 ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); 1583 1584 int nFiles = filesList.size(); 1585 vector<SDistFile> files(nFiles); 1586 vector<SDistGrid> grids; 1587 map<string,int> gridMap ; 1588 string gridId; 1589 int gridIndex=0 ; 1590 1591 for (size_t i = 0; i < nFiles; ++i) 1592 { 1593 StdSize dataSize=0; 1594 CFile* file = filesList[i]; 1595 std::vector<CField*> enabledFields = file->getEnabledFields(); 1596 size_t numEnabledFields = enabledFields.size(); 1597 1598 files[i].id_=file->getId() ; 1599 files[i].nbGrids_=numEnabledFields; 1600 files[i].assignedGrid_ = new int[files[i].nbGrids_] ; 1601 1602 for (size_t j = 0; j < numEnabledFields; ++j) 1603 { 1604 gridId=enabledFields[j]->getGrid()->getId() ; 1605 if (gridMap.find(gridId)==gridMap.end()) 1272 1606 { 1273 dataSize += enabledFields[j]->getGlobalWrittenSize() ; 1274 ofs<<enabledFields[j]->grid->getId()<<endl ; 1275 ofs<<enabledFields[j]->getGlobalWrittenSize()<<endl ; 1607 gridMap[gridId]=gridIndex ; 1608 SDistGrid newGrid; 1609 grids.push_back(newGrid) ; 1610 gridIndex++ ; 1276 1611 } 1277 double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; 1278 double dataSizeSec= dataSize/ outFreqSec; 1279 ofs<<dataSizeSec<<endl ; 1280 nfield++ ; 1281 // add epsilon*nField to dataSizeSec in order to preserve reproductive ordering when sorting 1282 dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file)); 1283 dataPerPool += dataSizeSec; 1612 files[i].assignedGrid_[j]=gridMap[gridId] ; 1613 grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ; 1614 dataSize += enabledFields[j]->getGlobalWrittenSize() ; // usefull 1284 1615 } 1285 dataPerPool /= nbPools; 1286 std::sort(dataSizeMap.begin(), dataSizeMap.end()); 1287 1288 // (3) Assign contextClient to each enabled file 1289 1290 std::multimap<double,int> poolDataSize ; 1291 // multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11 1292 1293 int j; 1294 double dataSize ; 1295 for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ; 1296 1297 for (int i = dataSizeMap.size()-1; i >= 0; --i) 1616 double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; 1617 files[i].bandwith_= dataSize/ outFreqSec ; 1618 } 1619 1620 double bandwith=0 ; 1621 double memory=0 ; 1622 1623 for(int i=0; i<nFiles; i++) bandwith+=files[i].bandwith_ ; 1624 for(int i=0; i<nFiles; i++) files[i].bandwith_ = files[i].bandwith_/bandwith * ratio ; 1625 1626 for(int i=0; i<grids.size(); i++) memory+=grids[i].size_ ; 1627 for(int i=0; i<grids.size(); i++) grids[i].size_ = grids[i].size_ / memory * (1.0-ratio) ; 1628 1629 distributeFileOverServer2(nbPools, grids.size(), &grids[0], nFiles, &files[0]) ; 1630 1631 vector<double> memorySize(nbPools,0.) ; 1632 vector< set<int> > serverGrids(nbPools) ; 1633 vector<double> bandwithSize(nbPools,0.) ; 1634 1635 for (size_t i = 0; i < nFiles; ++i) 1636 { 1637 bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ; 1638 for(int j=0 ; j<files[i].nbGrids_;j++) 1298 1639 { 1299 dataSize=(*poolDataSize.begin()).first ; 1300 j=(*poolDataSize.begin()).second ; 1301 dataSizeMap[i].second->setContextClient(clientPrimServer[j]); 1302 dataSize+=dataSizeMap[i].first; 1303 poolDataSize.erase(poolDataSize.begin()) ; 1304 poolDataSize.insert(std::pair<double,int>(dataSize,j)) ; 1640 if (serverGrids[files[i].assignedServer_].find(files[i].assignedGrid_[j]) == serverGrids[files[i].assignedServer_].end()) 1641 { 1642 memorySize[files[i].assignedServer_]+= grids[files[i].assignedGrid_[j]].size_ * memory / (1.0-ratio); 1643 serverGrids[files[i].assignedServer_].insert(files[i].assignedGrid_[j]) ; 1644 } 1305 1645 } 1306 1307 for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) info(30)<<"Load Balancing for servers (perfect=1) : "<<it->second<<" : ratio "<<it->first*1./dataPerPool<<endl ; 1308 1309 for (int i = 0; i < this->enabledReadModeFiles.size(); ++i) 1310 { 1311 enabledReadModeFiles[i]->setContextClient(client); 1312 } 1313 } 1314 else 1315 { 1316 for (int i = 0; i < this->enabledFiles.size(); ++i) 1317 enabledFiles[i]->setContextClient(client); 1318 } 1319 } 1320 CATCH_DUMP_ATTR 1321 1322 void CContext::distributeFileOverMemoryBandwith(void) 1323 TRY 1324 { 1325 if (serviceType_==CServicesManager::GATHERER) 1326 { 1327 int nbPools = clientPrimServer.size(); 1328 double ratio=0.5 ; 1329 ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); 1330 1331 int nFiles = this->enabledWriteModeFiles.size(); 1332 vector<SDistFile> files(nFiles); 1333 vector<SDistGrid> grids; 1334 map<string,int> gridMap ; 1335 string gridId; 1336 int gridIndex=0 ; 1337 1338 for (size_t i = 0; i < nFiles; ++i) 1339 { 1340 StdSize dataSize=0; 1341 CFile* file = this->enabledWriteModeFiles[i]; 1342 std::vector<CField*> enabledFields = file->getEnabledFields(); 1343 size_t numEnabledFields = enabledFields.size(); 1344 1345 files[i].id_=file->getId() ; 1346 files[i].nbGrids_=numEnabledFields; 1347 files[i].assignedGrid_ = new int[files[i].nbGrids_] ; 1348 1349 for (size_t j = 0; j < numEnabledFields; ++j) 1350 { 1351 gridId=enabledFields[j]->grid->getId() ; 1352 if (gridMap.find(gridId)==gridMap.end()) 1353 { 1354 gridMap[gridId]=gridIndex ; 1355 SDistGrid newGrid; 1356 grids.push_back(newGrid) ; 1357 gridIndex++ ; 1358 } 1359 files[i].assignedGrid_[j]=gridMap[gridId] ; 1360 grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ; 1361 dataSize += enabledFields[j]->getGlobalWrittenSize() ; // usefull 1362 } 1363 double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; 1364 files[i].bandwith_= dataSize/ outFreqSec ; 1365 } 1366 1367 double bandwith=0 ; 1368 double memory=0 ; 1369 1370 for(int i=0; i<nFiles; i++) bandwith+=files[i].bandwith_ ; 1371 for(int i=0; i<nFiles; i++) files[i].bandwith_ = files[i].bandwith_/bandwith * ratio ; 1372 1373 for(int i=0; i<grids.size(); i++) memory+=grids[i].size_ ; 1374 for(int i=0; i<grids.size(); i++) grids[i].size_ = grids[i].size_ / memory * (1.0-ratio) ; 1375 1376 distributeFileOverServer2(nbPools, grids.size(), &grids[0], nFiles, &files[0]) ; 1377 1378 vector<double> memorySize(nbPools,0.) ; 1379 vector< set<int> > serverGrids(nbPools) ; 1380 vector<double> bandwithSize(nbPools,0.) ; 1381 1382 for (size_t i = 0; i < nFiles; ++i) 1383 { 1384 bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ; 1385 for(int j=0 ; j<files[i].nbGrids_;j++) 1386 { 1387 if (serverGrids[files[i].assignedServer_].find(files[i].assignedGrid_[j]) == serverGrids[files[i].assignedServer_].end()) 1388 { 1389 memorySize[files[i].assignedServer_]+= grids[files[i].assignedGrid_[j]].size_ * memory / (1.0-ratio); 1390 serverGrids[files[i].assignedServer_].insert(files[i].assignedGrid_[j]) ; 1391 } 1392 } 1393 enabledWriteModeFiles[i]->setContextClient(clientPrimServer[files[i].assignedServer_]) ; 1394 delete [] files[i].assignedGrid_ ; 1395 } 1396 1397 for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<" assigned file bandwith "<<bandwithSize[i]*86400.*4./1024/1024.<<" Mb / days"<<endl ; 1398 for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<" assigned grid memory "<<memorySize[i]*100/1024./1024.<<" Mb"<<endl ; 1399 1400 1401 for (int i = 0; i < this->enabledReadModeFiles.size(); ++i) 1402 { 1403 enabledReadModeFiles[i]->setContextClient(client); 1404 } 1405 1406 } 1407 else 1408 { 1409 for (int i = 0; i < this->enabledFiles.size(); ++i) 1410 enabledFiles[i]->setContextClient(client); 1411 } 1412 } 1646 filesList[i]->setContextClient(clientPrimServer[files[i].assignedServer_]) ; 1647 delete [] files[i].assignedGrid_ ; 1648 } 1649 1650 for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<" assigned file bandwith "<<bandwithSize[i]*86400.*4./1024/1024.<<" Mb / days"<<endl ; 1651 for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<" assigned grid memory "<<memorySize[i]*100/1024./1024.<<" Mb"<<endl ; 1652 1653 } 1413 1654 CATCH_DUMP_ATTR 1414 1655 … … 1759 2000 // Check if some axis, domains or grids are eligible to for compressed indexed output. 1760 2001 // Warning: This must be done after solving the inheritance and before the rest of post-processing 1761 checkAxisDomainsGridsEligibilityForCompressedOutput(); 2002 checkAxisDomainsGridsEligibilityForCompressedOutput(); // only for field written on IO_SERVER service ???? 1762 2003 1763 2004 // Check if some automatic time series should be generated 1764 2005 // Warning: This must be done after solving the inheritance and before the rest of post-processing 1765 1766 // The timeseries should only be prepared in client 1767 1768 if (serviceType_==CServicesManager::CLIENT) prepareTimeseries(); 2006 prepareTimeseries(); 1769 2007 1770 2008 //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir. … … 1774 2012 findEnabledCouplerIn(); 1775 2013 findEnabledCouplerOut(); 1776 1777 2014 createCouplerInterCommunicator() ; 2015 2016 // Find all enabled fields of each file 2017 const vector<CField*>&& fileOutField = findAllEnabledFieldsInFiles(this->enabledWriteModeFiles); 2018 const vector<CField*>&& fileInField = findAllEnabledFieldsInFiles(this->enabledReadModeFiles); 2019 const vector<CField*>&& CouplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut); 2020 const vector<CField*>&& CouplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn); 2021 2022 1778 2023 1779 2024 // For now, only read files with client and only one level server 1780 2025 // if (hasClient && !hasServer) findEnabledReadModeFiles(); 1781 2026 1782 // Find all enabled fields of each file1783 findAllEnabledFieldsInFiles(this->enabledWriteModeFiles);1784 findAllEnabledFieldsInFiles(this->enabledReadModeFiles);1785 2027 1786 2028 // For now, only read files with client and only one level server … … 1974 2216 TRY 1975 2217 { 1976 if (!(serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER)) return;1977 1978 2218 const std::vector<CFile*> allFiles = CFile::getAll(); 1979 2219 for (size_t i = 0; i < allFiles.size(); i++)
Note: See TracChangeset
for help on using the changeset viewer.