Changeset 1870 for XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp
- Timestamp:
- 04/17/20 18:55:28 (4 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp
r1869 r1870 271 271 272 272 273 void CContext::setClientServerBuffer(vector<CField*>& fields, bool bufferForWriting) 274 TRY 275 { 276 // Estimated minimum event size for small events (20 is an arbitrary constant just for safety) 277 const size_t minEventSize = CEventClient::headerSize + 20 * sizeof(int); 278 // Ensure there is at least some room for 20 of such events in the buffers 279 size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize); 280 281 #define DECLARE_NODE(Name_, name_) \ 282 if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition); 283 #define DECLARE_NODE_PAR(Name_, name_) 284 #include "node_type.conf" 285 #undef DECLARE_NODE 286 #undef DECLARE_NODE_PAR 287 288 289 map<CContextClient*,map<int,size_t>> dataSize ; 290 map<CContextClient*,map<int,size_t>> maxEventSize ; 291 map<CContextClient*,map<int,size_t>> attributesSize ; 292 293 for(auto field : fields) 294 { 295 field->setContextClientDataBufferSize(dataSize, maxEventSize, bufferForWriting) ; 296 field->setContextClientAttributesBufferSize(attributesSize, maxEventSize, bufferForWriting) ; 297 } 298 299 300 for(auto& it : attributesSize) 301 { 302 auto contextClient = it.first ; 303 auto& contextDataSize = dataSize[contextClient] ; 304 auto& contextAttributesSize = attributesSize[contextClient] ; 305 auto& contextMaxEventSize = maxEventSize[contextClient] ; 306 307 for (auto& it : contextAttributesSize) 308 { 309 auto serverRank=it.first ; 310 auto& buffer = contextAttributesSize[serverRank] ; 311 if (contextDataSize[serverRank] > buffer) buffer=contextDataSize[serverRank] ; 312 buffer *= CXios::bufferSizeFactor; 313 if (buffer < minBufferSize) buffer = minBufferSize; 314 if (buffer > CXios::maxBufferSize ) buffer = CXios::maxBufferSize; 315 } 316 317 // Leaders will have to send some control events so ensure there is some room for those in the buffers 318 if (contextClient->isServerLeader()) 319 for(auto& rank : contextClient->getRanksServerLeader()) 320 if (!contextAttributesSize.count(rank)) 321 { 322 contextAttributesSize[rank] = minBufferSize; 323 contextMaxEventSize[rank] = minEventSize; 324 } 325 326 contextClient->setBufferSize(contextAttributesSize, contextMaxEventSize); 327 } 328 } 329 CATCH_DUMP_ATTR 330 331 273 332 /*! 274 333 Sets client buffers. … … 277 336 This flag is only true for client and server-1 for communication with server-2 278 337 */ 338 // ym obsolete to be removed 279 339 void CContext::setClientServerBuffer(CContextClient* contextClient, bool bufferForWriting) 280 340 TRY … … 327 387 } 328 388 CATCH_DUMP_ATTR 389 390 /*! 391 * Compute the required buffer size to send the fields data. 392 * \param maxEventSize [in/out] the size of the bigger event for each connected server 393 * \param [in] contextClient 394 * \param [in] bufferForWriting True if buffers are used for sending data for writing 395 This flag is only true for client and server-1 for communication with server-2 396 */ 397 std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize, 398 CContextClient* contextClient, bool bufferForWriting /*= "false"*/) 399 TRY 400 { 401 std::map<int, StdSize> dataSize; 402 403 // Find all reference domain and axis of all active fields 404 std::vector<CFile*>& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles; 405 size_t numEnabledFiles = fileList.size(); 406 for (size_t i = 0; i < numEnabledFiles; ++i) 407 { 408 CFile* file = fileList[i]; 409 if (file->getContextClient() == contextClient) 410 { 411 std::vector<CField*> enabledFields = file->getEnabledFields(); 412 size_t numEnabledFields = enabledFields.size(); 413 for (size_t j = 0; j < numEnabledFields; ++j) 414 { 415 // const std::vector<std::map<int, StdSize> > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient); 416 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting); 417 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 418 for (; it != itE; ++it) 419 { 420 // If dataSize[it->first] does not exist, it will be zero-initialized 421 // so we can use it safely without checking for its existance 422 if (CXios::isOptPerformance) 423 dataSize[it->first] += it->second; 424 else if (dataSize[it->first] < it->second) 425 dataSize[it->first] = it->second; 426 427 if (maxEventSize[it->first] < it->second) 428 maxEventSize[it->first] = it->second; 429 } 430 } 431 } 432 } 433 return dataSize; 434 } 435 CATCH_DUMP_ATTR 436 437 /*! 438 * Compute the required buffer size to send the attributes (mostly those grid related). 439 * \param maxEventSize [in/out] the size of the bigger event for each connected server 440 * \param [in] contextClient 441 * \param [in] bufferForWriting True if buffers are used for sending data for writing 442 This flag is only true for client and server-1 for communication with server-2 443 */ 444 std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize, 445 CContextClient* contextClient, bool bufferForWriting /*= "false"*/) 446 TRY 447 { 448 // As calendar attributes are sent even if there are no active files or fields, maps are initialized according the size of calendar attributes 449 std::map<int, StdSize> attributesSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); 450 maxEventSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); 451 452 std::vector<CFile*>& fileList = this->enabledFiles; 453 size_t numEnabledFiles = fileList.size(); 454 for (size_t i = 0; i < numEnabledFiles; ++i) 455 { 456 // CFile* file = this->enabledWriteModeFiles[i]; 457 CFile* file = fileList[i]; 458 std::vector<CField*> enabledFields = file->getEnabledFields(); 459 size_t numEnabledFields = enabledFields.size(); 460 for (size_t j = 0; j < numEnabledFields; ++j) 461 { 462 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting); 463 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 464 for (; it != itE; ++it) 465 { 466 // If attributesSize[it->first] does not exist, it will be zero-initialized 467 // so we can use it safely without checking for its existence 468 if (attributesSize[it->first] < it->second) 469 attributesSize[it->first] = it->second; 470 471 if (maxEventSize[it->first] < it->second) 472 maxEventSize[it->first] = it->second; 473 } 474 } 475 } 476 return attributesSize; 477 } 478 CATCH_DUMP_ATTR 479 480 329 481 330 482 //! Verify whether a context is initialized … … 899 1051 900 1052 // Find all enabled fields of each file 901 constvector<CField*>&& fileOutField = findAllEnabledFieldsInFileOut(this->enabledWriteModeFiles);902 constvector<CField*>&& fileInField = findAllEnabledFieldsInFileIn(this->enabledReadModeFiles);903 constvector<CField*>&& CouplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut);904 constvector<CField*>&& CouplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn);1053 vector<CField*>&& fileOutField = findAllEnabledFieldsInFileOut(this->enabledWriteModeFiles); 1054 vector<CField*>&& fileInField = findAllEnabledFieldsInFileIn(this->enabledReadModeFiles); 1055 vector<CField*>&& CouplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut); 1056 vector<CField*>&& CouplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn); 905 1057 findFieldsWithReadAccess(); 906 const vector<CField*>& fieldWithReadAccess = fieldsWithReadAccess_ ; 1058 vector<CField*>& fieldWithReadAccess = fieldsWithReadAccess_ ; 1059 vector<CField*> fieldModelIn ; // fields potentially from model 907 1060 908 1061 // find all field potentially at workflow end … … 915 1068 for(auto endWorkflowField : endWorkflowFields) endWorkflowField->buildWorkflowGraph(garbageCollector) ; 916 1069 917 // Distribute files between secondary servers according to the data size => assign a context to a file 1070 // get all field coming potentially from model 1071 for (auto field : CField::getAll() ) if (field->getModelIn()) fieldModelIn.push_back(field) ; 1072 1073 // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields 918 1074 if (serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles); 919 1075 else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 920 1076 1077 1078 // workflow endpoint => sent to IO/SERVER 921 1079 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) 922 1080 { … … 926 1084 field->computeGridIndexToFileServer() ; // compute grid index for transfer to the server context 927 1085 } 1086 setClientServerBuffer(fileOutField, true) ; // set context 1087 for(auto field : fileOutField) field->sendFieldToFileServer() ; 928 1088 } 929 930 931 932 933 1089 1090 1091 // workflow startpoint => data from model 1092 if (serviceType_==CServicesManager::CLIENT) 1093 { 1094 for(auto field : fieldModelIn) 1095 { 1096 field->connectToModelInput(garbageCollector) ; // connect the field to server filter 1097 // grid index will be computed on the fly 1098 } 1099 } 1100 1101 1102 1103 1104 return ; 934 1105 // For now, only read files with client and only one level server 935 1106 // if (hasClient && !hasServer) findEnabledReadModeFiles(); … … 1057 1228 CATCH_DUMP_ATTR 1058 1229 1059 1230 /*! 1231 * Send context attribute and calendar to file server, it must be done once by context file server 1232 * \param[in] client : context client to send 1233 */ 1234 void CContext::sendContextToFileServer(CContextClient* client) 1235 { 1236 if (sendToFileServer_done_.count(client)!=0) return ; 1237 else sendToFileServer_done_.insert(client) ; 1238 1239 this->sendAllAttributesToServer(client); // Send all attributes of current context to server 1240 CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(client); // Send all attributes of current cale 1241 } 1242 1243 // ym obsolete now to be removed 1060 1244 void CContext::closeDefinition_old(void) 1061 1245 TRY … … 2068 2252 CATCH_DUMP_ATTR 2069 2253 2070 /*! 2071 * Compute the required buffer size to send the attributes (mostly those grid related). 2072 * \param maxEventSize [in/out] the size of the bigger event for each connected server 2073 * \param [in] contextClient 2074 * \param [in] bufferForWriting True if buffers are used for sending data for writing 2075 This flag is only true for client and server-1 for communication with server-2 2076 */ 2077 std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize, 2078 CContextClient* contextClient, bool bufferForWriting /*= "false"*/) 2079 TRY 2080 { 2081 // As calendar attributes are sent even if there are no active files or fields, maps are initialized according the size of calendar attributes 2082 std::map<int, StdSize> attributesSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); 2083 maxEventSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); 2084 2085 std::vector<CFile*>& fileList = this->enabledFiles; 2086 size_t numEnabledFiles = fileList.size(); 2087 for (size_t i = 0; i < numEnabledFiles; ++i) 2088 { 2089 // CFile* file = this->enabledWriteModeFiles[i]; 2090 CFile* file = fileList[i]; 2091 std::vector<CField*> enabledFields = file->getEnabledFields(); 2092 size_t numEnabledFields = enabledFields.size(); 2093 for (size_t j = 0; j < numEnabledFields; ++j) 2094 { 2095 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting); 2096 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 2097 for (; it != itE; ++it) 2098 { 2099 // If attributesSize[it->first] does not exist, it will be zero-initialized 2100 // so we can use it safely without checking for its existence 2101 if (attributesSize[it->first] < it->second) 2102 attributesSize[it->first] = it->second; 2103 2104 if (maxEventSize[it->first] < it->second) 2105 maxEventSize[it->first] = it->second; 2106 } 2107 } 2108 } 2109 return attributesSize; 2110 } 2111 CATCH_DUMP_ATTR 2112 2113 /*! 2114 * Compute the required buffer size to send the fields data. 2115 * \param maxEventSize [in/out] the size of the bigger event for each connected server 2116 * \param [in] contextClient 2117 * \param [in] bufferForWriting True if buffers are used for sending data for writing 2118 This flag is only true for client and server-1 for communication with server-2 2119 */ 2120 std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize, 2121 CContextClient* contextClient, bool bufferForWriting /*= "false"*/) 2122 TRY 2123 { 2124 std::map<int, StdSize> dataSize; 2125 2126 // Find all reference domain and axis of all active fields 2127 std::vector<CFile*>& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles; 2128 size_t numEnabledFiles = fileList.size(); 2129 for (size_t i = 0; i < numEnabledFiles; ++i) 2130 { 2131 CFile* file = fileList[i]; 2132 if (file->getContextClient() == contextClient) 2133 { 2134 std::vector<CField*> enabledFields = file->getEnabledFields(); 2135 size_t numEnabledFields = enabledFields.size(); 2136 for (size_t j = 0; j < numEnabledFields; ++j) 2137 { 2138 // const std::vector<std::map<int, StdSize> > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient); 2139 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting); 2140 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 2141 for (; it != itE; ++it) 2142 { 2143 // If dataSize[it->first] does not exist, it will be zero-initialized 2144 // so we can use it safely without checking for its existance 2145 if (CXios::isOptPerformance) 2146 dataSize[it->first] += it->second; 2147 else if (dataSize[it->first] < it->second) 2148 dataSize[it->first] = it->second; 2149 2150 if (maxEventSize[it->first] < it->second) 2151 maxEventSize[it->first] = it->second; 2152 } 2153 } 2154 } 2155 } 2156 return dataSize; 2157 } 2158 CATCH_DUMP_ATTR 2159 2160 //! Client side: Send infomation of active files (files are enabled to write out) 2254 2255 //! Client side: Send infomation of active files (files are enabled to write out) 2161 2256 void CContext::sendEnabledFiles(const std::vector<CFile*>& activeFiles) 2162 2257 TRY
Note: See TracChangeset
for help on using the changeset viewer.