Changeset 2019 for XIOS/dev/dev_trunk_graph/src/node/context.cpp
- Timestamp:
- 01/22/21 12:00:29 (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_trunk_graph/src/node/context.cpp
r1612 r2019 20 20 #include "server.hpp" 21 21 #include "distribute_file_server2.hpp" 22 #include "services_manager.hpp" 23 #include "contexts_manager.hpp" 24 #include "cxios.hpp" 25 #include "client.hpp" 26 #include "coupler_in.hpp" 27 #include "coupler_out.hpp" 22 28 23 29 namespace xios { … … 31 37 , calendar(), hasClient(false), hasServer(false) 32 38 , isPostProcessed(false), finalized(false) 33 , idServer_(), client(0), server(0)34 , allProcessed(false), countChildC tx_(0)39 , client(nullptr), server(nullptr) 40 , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false) 35 41 36 42 { /* Ne rien faire de plus */ } … … 40 46 , calendar(), hasClient(false), hasServer(false) 41 47 , isPostProcessed(false), finalized(false) 42 , idServer_(), client(0), server(0)43 , allProcessed(false), countChildC tx_(0)48 , client(nullptr), server(nullptr) 49 , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false) 44 50 { /* Ne rien faire de plus */ } 45 51 … … 264 270 ///--------------------------------------------------------------- 265 271 266 //! Initialize client side 267 void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/) 268 TRY 269 { 270 271 hasClient = true; 272 MPI_Comm intraCommServer, interCommServer; 272 273 void CContext::setClientServerBuffer(vector<CField*>& fields, bool bufferForWriting) 274 TRY 275 { 276 // Estimated minimum event size for small events (20 is an arbitrary constant just for safety) 277 const size_t minEventSize = CEventClient::headerSize + 20 * sizeof(int); 278 // Ensure there is at least some room for 20 of such events in the buffers 279 size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize); 280 281 #define DECLARE_NODE(Name_, name_) \ 282 if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition); 283 #define DECLARE_NODE_PAR(Name_, name_) 284 #include "node_type.conf" 285 #undef DECLARE_NODE 286 #undef DECLARE_NODE_PAR 287 288 289 map<CContextClient*,map<int,size_t>> dataSize ; 290 map<CContextClient*,map<int,size_t>> maxEventSize ; 291 map<CContextClient*,map<int,size_t>> attributesSize ; 292 293 for(auto field : fields) 294 { 295 field->setContextClientDataBufferSize(dataSize, maxEventSize, bufferForWriting) ; 296 field->setContextClientAttributesBufferSize(attributesSize, maxEventSize, bufferForWriting) ; 297 } 273 298 274 299 275 if (CServer::serverLevel != 1) 276 // initClient is called by client 277 { 278 client = new CContextClient(this, intraComm, interComm, cxtServer); 279 if (cxtServer) // Attached mode 300 for(auto& it : attributesSize) 301 { 302 auto contextClient = it.first ; 303 auto& contextDataSize = dataSize[contextClient] ; 304 auto& contextAttributesSize = attributesSize[contextClient] ; 305 auto& contextMaxEventSize = maxEventSize[contextClient] ; 306 307 for (auto& it : contextAttributesSize) 280 308 { 281 intraCommServer = intraComm; 282 interCommServer = interComm; 309 auto serverRank=it.first ; 310 auto& buffer = contextAttributesSize[serverRank] ; 311 if (contextDataSize[serverRank] > buffer) buffer=contextDataSize[serverRank] ; 312 buffer *= CXios::bufferSizeFactor; 313 if (buffer < minBufferSize) buffer = minBufferSize; 314 if (buffer > CXios::maxBufferSize ) buffer = CXios::maxBufferSize; 283 315 } 284 else 285 { 286 MPI_Comm_dup(intraComm, &intraCommServer); 287 comms.push_back(intraCommServer); 288 MPI_Comm_dup(interComm, &interCommServer); 289 comms.push_back(interCommServer); 290 } 291 /* for registry take the id of client context */ 292 /* for servers, supress the _server_ from id */ 293 string contextRegistryId=getId() ; 294 size_t pos=contextRegistryId.find("_server_") ; 295 if (pos!=std::string::npos) contextRegistryId=contextRegistryId.substr(0,pos) ; 296 297 registryIn=new CRegistry(intraComm); 298 registryIn->setPath(contextRegistryId) ; 299 if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ; 300 registryIn->bcastRegistry() ; 301 registryOut=new CRegistry(intraComm) ; 302 303 registryOut->setPath(contextRegistryId) ; 304 305 server = new CContextServer(this, intraCommServer, interCommServer); 306 } 307 else 308 // initClient is called by primary server 309 { 310 clientPrimServer.push_back(new CContextClient(this, intraComm, interComm)); 311 MPI_Comm_dup(intraComm, &intraCommServer); 312 comms.push_back(intraCommServer); 313 MPI_Comm_dup(interComm, &interCommServer); 314 comms.push_back(interCommServer); 315 serverPrimServer.push_back(new CContextServer(this, intraCommServer, interCommServer)); 316 } 317 } 318 CATCH_DUMP_ATTR 319 320 /*! 316 317 // Leaders will have to send some control events so ensure there is some room for those in the buffers 318 if (contextClient->isServerLeader()) 319 for(auto& rank : contextClient->getRanksServerLeader()) 320 if (!contextAttributesSize.count(rank)) 321 { 322 contextAttributesSize[rank] = minBufferSize; 323 contextMaxEventSize[rank] = minEventSize; 324 } 325 326 contextClient->setBufferSize(contextAttributesSize, contextMaxEventSize); 327 } 328 } 329 CATCH_DUMP_ATTR 330 331 332 /*! 321 333 Sets client buffers. 322 334 \param [in] contextClient … … 324 336 This flag is only true for client and server-1 for communication with server-2 325 337 */ 338 // ym obsolete to be removed 326 339 void CContext::setClientServerBuffer(CContextClient* contextClient, bool bufferForWriting) 327 340 TRY 328 341 { 329 // Estimated minimum event size for small events ( 10 is an arbitrary constant just for safety)330 const size_t minEventSize = CEventClient::headerSize + getIdServer().size() + 10 * sizeof(int);342 // Estimated minimum event size for small events (20 is an arbitrary constant just for safety) 343 const size_t minEventSize = CEventClient::headerSize + 20 * sizeof(int); 331 344 332 345 // Ensure there is at least some room for 20 of such events in the buffers … … 375 388 CATCH_DUMP_ATTR 376 389 390 /*! 391 * Compute the required buffer size to send the fields data. 392 * \param maxEventSize [in/out] the size of the bigger event for each connected server 393 * \param [in] contextClient 394 * \param [in] bufferForWriting True if buffers are used for sending data for writing 395 This flag is only true for client and server-1 for communication with server-2 396 */ 397 std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize, 398 CContextClient* contextClient, bool bufferForWriting /*= "false"*/) 399 TRY 400 { 401 std::map<int, StdSize> dataSize; 402 403 // Find all reference domain and axis of all active fields 404 std::vector<CFile*>& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles; 405 size_t numEnabledFiles = fileList.size(); 406 for (size_t i = 0; i < numEnabledFiles; ++i) 407 { 408 CFile* file = fileList[i]; 409 if (file->getContextClient() == contextClient) 410 { 411 std::vector<CField*> enabledFields = file->getEnabledFields(); 412 size_t numEnabledFields = enabledFields.size(); 413 for (size_t j = 0; j < numEnabledFields; ++j) 414 { 415 // const std::vector<std::map<int, StdSize> > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient); 416 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting); 417 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 418 for (; it != itE; ++it) 419 { 420 // If dataSize[it->first] does not exist, it will be zero-initialized 421 // so we can use it safely without checking for its existance 422 if (CXios::isOptPerformance) 423 dataSize[it->first] += it->second; 424 else if (dataSize[it->first] < it->second) 425 dataSize[it->first] = it->second; 426 427 if (maxEventSize[it->first] < it->second) 428 maxEventSize[it->first] = it->second; 429 } 430 } 431 } 432 } 433 return dataSize; 434 } 435 CATCH_DUMP_ATTR 436 437 /*! 438 * Compute the required buffer size to send the attributes (mostly those grid related). 439 * \param maxEventSize [in/out] the size of the bigger event for each connected server 440 * \param [in] contextClient 441 * \param [in] bufferForWriting True if buffers are used for sending data for writing 442 This flag is only true for client and server-1 for communication with server-2 443 */ 444 std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize, 445 CContextClient* contextClient, bool bufferForWriting /*= "false"*/) 446 TRY 447 { 448 // As calendar attributes are sent even if there are no active files or fields, maps are initialized according the size of calendar attributes 449 std::map<int, StdSize> attributesSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); 450 maxEventSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); 451 452 std::vector<CFile*>& fileList = this->enabledFiles; 453 size_t numEnabledFiles = fileList.size(); 454 for (size_t i = 0; i < numEnabledFiles; ++i) 455 { 456 // CFile* file = this->enabledWriteModeFiles[i]; 457 CFile* file = fileList[i]; 458 std::vector<CField*> enabledFields = file->getEnabledFields(); 459 size_t numEnabledFields = enabledFields.size(); 460 for (size_t j = 0; j < numEnabledFields; ++j) 461 { 462 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting); 463 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 464 for (; it != itE; ++it) 465 { 466 // If attributesSize[it->first] does not exist, it will be zero-initialized 467 // so we can use it safely without checking for its existence 468 if (attributesSize[it->first] < it->second) 469 attributesSize[it->first] = it->second; 470 471 if (maxEventSize[it->first] < it->second) 472 maxEventSize[it->first] = it->second; 473 } 474 } 475 } 476 return attributesSize; 477 } 478 CATCH_DUMP_ATTR 479 480 481 377 482 //! Verify whether a context is initialized 378 483 bool CContext::isInitialized(void) … … 383 488 CATCH_DUMP_ATTR 384 489 385 void CContext::initServer(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtClient /*= 0*/) 490 491 void CContext::init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType) 492 TRY 493 { 494 parentServerContext_ = parentServerContext ; 495 if (serviceType==CServicesManager::CLIENT) 496 initClient(intraComm, serviceType) ; 497 else 498 initServer(intraComm, serviceType) ; 499 } 500 CATCH_DUMP_ATTR 501 502 503 504 //! Initialize client side 505 void CContext::initClient(MPI_Comm intraComm, int serviceType) 506 TRY 507 { 508 intraComm_=intraComm ; 509 MPI_Comm_rank(intraComm_, &intraCommRank_) ; 510 MPI_Comm_size(intraComm_, &intraCommSize_) ; 511 512 serviceType_ = CServicesManager::CLIENT ; 513 if (serviceType_==CServicesManager::CLIENT) 514 { 515 hasClient=true ; 516 hasServer=false ; 517 } 518 contextId_ = getId() ; 519 520 attached_mode=true ; 521 if (!CXios::isUsingServer()) attached_mode=false ; 522 523 524 string contextRegistryId=getId() ; 525 registryIn=new CRegistry(intraComm); 526 registryIn->setPath(contextRegistryId) ; 527 528 int commRank ; 529 MPI_Comm_rank(intraComm_,&commRank) ; 530 if (commRank==0) registryIn->fromFile("xios_registry.bin") ; 531 registryIn->bcastRegistry() ; 532 registryOut=new CRegistry(intraComm_) ; 533 registryOut->setPath(contextRegistryId) ; 534 535 } 536 CATCH_DUMP_ATTR 537 538 539 void CContext::initServer(MPI_Comm intraComm, int serviceType) 386 540 TRY 387 541 { 388 542 hasServer=true; 389 server = new CContextServer(this,intraComm,interComm); 390 391 /* for registry take the id of client context */ 392 /* for servers, supress the _server_ from id */ 393 string contextRegistryId=getId() ; 394 size_t pos=contextRegistryId.find("_server_") ; 395 if (pos!=std::string::npos) contextRegistryId=contextRegistryId.substr(0,pos) ; 396 543 intraComm_=intraComm ; 544 MPI_Comm_rank(intraComm_, &intraCommRank_) ; 545 MPI_Comm_size(intraComm_, &intraCommSize_) ; 546 547 serviceType_=serviceType ; 548 549 if (serviceType_==CServicesManager::GATHERER) 550 { 551 hasClient=true ; 552 hasServer=true ; 553 } 554 else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 555 { 556 hasClient=false ; 557 hasServer=true ; 558 } 559 560 CXios::getContextsManager()->getContextId(getId(), contextId_, intraComm) ; 561 397 562 registryIn=new CRegistry(intraComm); 398 registryIn->setPath(contextRegistryId) ; 399 if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ; 563 registryIn->setPath(contextId_) ; 564 565 int commRank ; 566 MPI_Comm_rank(intraComm_,&commRank) ; 567 if (commRank==0) registryIn->fromFile("xios_registry.bin") ; 568 400 569 registryIn->bcastRegistry() ; 401 570 registryOut=new CRegistry(intraComm) ; 402 registryOut->setPath(contextRegistryId) ; 403 404 MPI_Comm intraCommClient, interCommClient; 405 if (cxtClient) // Attached mode 406 { 407 intraCommClient = intraComm; 408 interCommClient = interComm; 409 } 410 else 411 { 412 MPI_Comm_dup(intraComm, &intraCommClient); 413 comms.push_back(intraCommClient); 414 MPI_Comm_dup(interComm, &interCommClient); 415 comms.push_back(interCommClient); 416 } 417 client = new CContextClient(this,intraCommClient,interCommClient, cxtClient); 418 } 419 CATCH_DUMP_ATTR 420 421 //! Try to send the buffers and receive possible answers 422 bool CContext::checkBuffersAndListen(bool enableEventsProcessing /*= true*/) 571 registryOut->setPath(contextId_) ; 572 573 } 574 CATCH_DUMP_ATTR 575 576 577 void CContext::createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) // for servers 423 578 TRY 424 579 { 425 bool clientReady, serverFinished; 426 427 // Only classical servers are non-blocking 428 if (CServer::serverLevel == 0) 429 { 430 client->checkBuffers(); 431 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 432 if (hasTmpBufferedEvent) 433 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 434 // Don't process events if there is a temporarily buffered event 435 return server->eventLoop(!hasTmpBufferedEvent || !enableEventsProcessing); 436 } 437 else if (CServer::serverLevel == 1) 438 { 439 if (!finalized) 440 client->checkBuffers(); 441 bool serverFinished = true; 442 if (!finalized) 443 serverFinished = server->eventLoop(enableEventsProcessing); 444 bool serverPrimFinished = true; 445 for (int i = 0; i < clientPrimServer.size(); ++i) 446 { 447 if (!finalized) 448 clientPrimServer[i]->checkBuffers(); 449 if (!finalized) 450 serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); 451 } 452 return ( serverFinished && serverPrimFinished); 453 } 454 455 else if (CServer::serverLevel == 2) 456 { 457 client->checkBuffers(); 458 return server->eventLoop(enableEventsProcessing); 459 } 580 MPI_Comm intraCommClient ; 581 MPI_Comm_dup(intraComm_, &intraCommClient); 582 comms.push_back(intraCommClient); 583 // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context 584 server = new CContextServer(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ? 585 client = new CContextClient(this,intraCommClient,interCommClient); 586 client->setAssociatedServer(server) ; 587 server->setAssociatedClient(client) ; 588 460 589 } 461 CATCH_DUMP_ATTR 462 463 //! Terminate a context 590 CATCH_DUMP_ATTR 591 592 void CContext::createServerInterComm(void) 593 TRY 594 { 595 596 MPI_Comm interCommClient, interCommServer ; 597 598 if (serviceType_ == CServicesManager::CLIENT) 599 { 600 601 int commRank ; 602 MPI_Comm_rank(intraComm_,&commRank) ; 603 if (commRank==0) 604 { 605 if (attached_mode) CXios::getContextsManager()->createServerContext(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, getContextId()) ; 606 else if (CXios::usingServer2) CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId()) ; 607 else CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId()) ; 608 } 609 610 MPI_Comm interComm ; 611 612 if (attached_mode) 613 { 614 parentServerContext_->createIntercomm(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, getContextId(), intraComm_, 615 interCommClient, interCommServer) ; 616 int type ; 617 if (commRank==0) CXios::getServicesManager()->getServiceType(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, type) ; 618 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 619 setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble 620 } 621 else if (CXios::usingServer2) 622 { 623 // CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, interComm) ; 624 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, 625 interCommClient, interCommServer) ; 626 int type ; 627 if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultGathererId, 0, type) ; 628 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 629 } 630 else 631 { 632 //CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, interComm) ; 633 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, 634 interCommClient, interCommServer) ; 635 int type ; 636 if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ; 637 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 638 } 639 640 // intraComm client is not duplicated. In all the code we use client->intraComm for MPI 641 // in future better to replace it by intracommuncator associated to the context 642 643 MPI_Comm intraCommClient, intraCommServer ; 644 intraCommClient=intraComm_ ; 645 MPI_Comm_dup(intraComm_, &intraCommServer) ; 646 client = new CContextClient(this, intraCommClient, interCommClient); 647 server = new CContextServer(this, intraCommServer, interCommServer); 648 client->setAssociatedServer(server) ; 649 server->setAssociatedClient(client) ; 650 } 651 652 if (serviceType_ == CServicesManager::GATHERER) 653 { 654 int commRank ; 655 MPI_Comm_rank(intraComm_,&commRank) ; 656 657 int nbPartitions ; 658 if (commRank==0) 659 { 660 CXios::getServicesManager()->getServiceNbPartitions(CXios::defaultPoolId, CXios::defaultServerId, 0, nbPartitions) ; 661 for(int i=0 ; i<nbPartitions; i++) 662 CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId()) ; 663 } 664 MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ; 665 666 MPI_Comm interComm ; 667 for(int i=0 ; i<nbPartitions; i++) 668 { 669 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 670 int type ; 671 if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ; 672 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 673 primServerId_.push_back(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultServerId, i, type, getContextId())) ; 674 675 // intraComm client is not duplicated. In all the code we use client->intraComm for MPI 676 // in future better to replace it by intracommuncator associated to the context 677 678 MPI_Comm intraCommClient, intraCommServer ; 679 680 intraCommClient=intraComm_ ; 681 MPI_Comm_dup(intraComm_, &intraCommServer) ; 682 683 CContextClient* client = new CContextClient(this, intraCommClient, interCommClient) ; 684 CContextServer* server = new CContextServer(this, intraCommServer, interCommServer) ; 685 client->setAssociatedServer(server) ; 686 server->setAssociatedClient(client) ; 687 clientPrimServer.push_back(client); 688 serverPrimServer.push_back(server); 689 690 691 } 692 } 693 } 694 CATCH_DUMP_ATTR 695 696 697 698 bool CContext::eventLoop(bool enableEventsProcessing) 699 { 700 bool finished=true; 701 702 if (client!=nullptr && !finalized) client->checkBuffers(); 703 704 for (int i = 0; i < clientPrimServer.size(); ++i) 705 { 706 if (!finalized) clientPrimServer[i]->checkBuffers(); 707 if (!finalized) finished &= serverPrimServer[i]->eventLoop(enableEventsProcessing); 708 } 709 710 for (auto couplerOut : couplerOutClient_) 711 if (!finalized) couplerOut.second->checkBuffers(); 712 713 for (auto couplerIn : couplerInClient_) 714 if (!finalized) couplerIn.second->checkBuffers(); 715 716 for (auto couplerOut : couplerOutServer_) 717 if (!finalized) couplerOut.second->eventLoop(enableEventsProcessing); 718 719 for (auto couplerIn : couplerInServer_) 720 if (!finalized) couplerIn.second->eventLoop(enableEventsProcessing); 721 722 if (server!=nullptr) if (!finalized) finished &= server->eventLoop(enableEventsProcessing); 723 724 return finalized && finished ; 725 } 726 727 void CContext::addCouplingChanel(const std::string& fullContextId, bool out) 728 { 729 int contextLeader ; 730 731 if (out) 732 { 733 if (couplerOutClient_.find(fullContextId)==couplerOutClient_.end()) 734 { 735 bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 736 737 MPI_Comm interComm, interCommClient, interCommServer ; 738 MPI_Comm intraCommClient, intraCommServer ; 739 740 if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 741 742 MPI_Comm_dup(intraComm_, &intraCommClient) ; 743 MPI_Comm_dup(intraComm_, &intraCommServer) ; 744 MPI_Comm_dup(interComm, &interCommClient) ; 745 MPI_Comm_dup(interComm, &interCommServer) ; 746 CContextClient* client = new CContextClient(this, intraCommClient, interCommClient); 747 CContextServer* server = new CContextServer(this, intraCommServer, interCommServer); 748 client->setAssociatedServer(server) ; 749 server->setAssociatedClient(client) ; 750 MPI_Comm_free(&interComm) ; 751 couplerOutClient_[fullContextId] = client ; 752 couplerOutServer_[fullContextId] = server ; 753 754 /* 755 // for now, we don't now which beffer size must be used for client coupler 756 // It will be evaluated later. Fix a constant size for now... 757 // set to 10Mb for development 758 map<int,size_t> bufferSize, maxEventSize ; 759 for(int i=0;i<client->getRemoteSize();i++) 760 { 761 bufferSize[i]=10000000 ; 762 maxEventSize[i]=10000000 ; 763 } 764 765 client->setBufferSize(bufferSize, maxEventSize); 766 */ 767 } 768 } 769 else if (couplerInClient_.find(fullContextId)==couplerInClient_.end()) 770 { 771 bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 772 773 MPI_Comm interComm, interCommClient, interCommServer ; 774 MPI_Comm intraCommClient, intraCommServer ; 775 776 if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 777 778 MPI_Comm_dup(intraComm_, &intraCommClient) ; 779 MPI_Comm_dup(intraComm_, &intraCommServer) ; 780 MPI_Comm_dup(interComm, &interCommServer) ; 781 MPI_Comm_dup(interComm, &interCommClient) ; 782 CContextServer* server = new CContextServer(this, intraCommServer, interCommServer); 783 CContextClient* client = new CContextClient(this, intraCommClient, interCommClient); 784 client->setAssociatedServer(server) ; 785 server->setAssociatedClient(client) ; 786 MPI_Comm_free(&interComm) ; 787 788 map<int,size_t> bufferSize, maxEventSize ; 789 for(int i=0;i<client->getRemoteSize();i++) 790 { 791 bufferSize[i]=10000000 ; 792 maxEventSize[i]=10000000 ; 793 } 794 795 client->setBufferSize(bufferSize, maxEventSize); 796 couplerInClient_[fullContextId] = client ; 797 couplerInServer_[fullContextId] = server ; 798 } 799 } 800 801 void CContext::globalEventLoop(void) 802 { 803 CXios::getDaemonsManager()->eventLoop() ; 804 setCurrent(getId()) ; 805 } 806 807 464 808 void CContext::finalize(void) 465 809 TRY 466 810 { 467 if (hasClient && !hasServer) // For now we only use server level 1 to read data 468 { 469 doPreTimestepOperationsForEnabledReadModeFiles(); 470 } 471 // Send registry upon calling the function the first time 472 if (countChildCtx_ == 0) 473 if (hasClient) sendRegistry() ; 474 475 // Client: 476 // (1) blocking send context finalize to its server 477 // (2) blocking receive context finalize from its server 478 // (3) some memory deallocations 479 if (CXios::isClient) 480 { 481 // Make sure that client (model) enters the loop only once 482 if (countChildCtx_ < 1) 483 { 484 ++countChildCtx_; 485 486 client->finalize(); 487 while (client->havePendingRequests()) 488 client->checkBuffers(); 489 490 while (!server->hasFinished()) 491 server->eventLoop(); 492 493 if (hasServer) // Mode attache 811 registryOut->hierarchicalGatherRegistry() ; 812 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 813 814 if (serviceType_==CServicesManager::CLIENT) 815 { 816 //ym doPreTimestepOperationsForEnabledReadModeFiles(); // For now we only use server level 1 to read data 817 818 triggerLateFields() ; 819 820 // inform couplerIn that I am finished 821 for(auto& couplerInClient : couplerInClient_) sendCouplerInContextFinalized(couplerInClient.second) ; 822 823 // wait until received message from couplerOut that they have finished 824 bool couplersInFinalized ; 825 do 826 { 827 couplersInFinalized=true ; 828 for(auto& couplerOutClient : couplerOutClient_) couplersInFinalized &= isCouplerInContextFinalized(couplerOutClient.second) ; 829 globalEventLoop() ; 830 } while (!couplersInFinalized) ; 831 832 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 833 client->finalize(); 834 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 835 while (client->havePendingRequests()) client->checkBuffers(); 836 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 837 bool notifiedFinalized=false ; 838 do 839 { 840 notifiedFinalized=client->isNotifiedFinalized() ; 841 } while (!notifiedFinalized) ; 842 client->releaseBuffers(); 843 info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 844 } 845 else if (serviceType_==CServicesManager::GATHERER) 846 { 847 for (int i = 0; i < clientPrimServer.size(); ++i) 494 848 { 495 closeAllFile(); 496 registryOut->hierarchicalGatherRegistry() ; 497 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 849 clientPrimServer[i]->finalize(); 850 bool bufferReleased; 851 do 852 { 853 clientPrimServer[i]->checkBuffers(); 854 bufferReleased = !clientPrimServer[i]->havePendingRequests(); 855 } while (!bufferReleased); 856 857 bool notifiedFinalized=false ; 858 do 859 { 860 notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ; 861 } while (!notifiedFinalized) ; 862 clientPrimServer[i]->releaseBuffers(); 498 863 } 499 500 //! Deallocate client buffers 501 client->releaseBuffers(); 502 503 //! Free internally allocated communicators 504 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 505 MPI_Comm_free(&(*it)); 506 comms.clear(); 507 508 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 509 } 510 } 511 else if (CXios::isServer) 512 { 513 // First context finalize message received from a model 514 // Send context finalize to its child contexts (if any) 515 if (countChildCtx_ == 0) 516 for (int i = 0; i < clientPrimServer.size(); ++i) 517 clientPrimServer[i]->finalize(); 518 519 // (Last) context finalized message received 520 if (countChildCtx_ == clientPrimServer.size()) 521 { 522 // Blocking send of context finalize message to its client (e.g. primary server or model) 523 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize<<"<<endl ; 524 client->finalize(); 525 bool bufferReleased; 526 do 527 { 528 client->checkBuffers(); 529 bufferReleased = !client->havePendingRequests(); 530 } while (!bufferReleased); 531 finalized = true; 532 533 closeAllFile(); // Just move to here to make sure that server-level 1 can close files 534 if (hasServer && !hasClient) 535 { 536 registryOut->hierarchicalGatherRegistry() ; 537 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 538 } 539 540 //! Deallocate client buffers 541 client->releaseBuffers(); 542 for (int i = 0; i < clientPrimServer.size(); ++i) 543 clientPrimServer[i]->releaseBuffers(); 544 545 //! Free internally allocated communicators 546 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 547 MPI_Comm_free(&(*it)); 548 comms.clear(); 549 550 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 551 } 552 553 ++countChildCtx_; 554 } 864 closeAllFile(); 865 866 } 867 else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 868 { 869 closeAllFile(); 870 } 871 872 freeComms() ; 873 874 parentServerContext_->freeComm() ; 875 finalized = true; 876 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 555 877 } 556 878 CATCH_DUMP_ATTR … … 576 898 CATCH_DUMP_ATTR 577 899 578 void CContext::postProcessingGlobalAttributes() 579 TRY 580 { 581 if (allProcessed) return; 582 583 // After xml is parsed, there are some more works with post processing 584 postProcessing(); 585 586 // Check grid and calculate its distribution 587 checkGridEnabledFields(); 588 589 // Distribute files between secondary servers according to the data size 590 distributeFiles(); 591 592 setClientServerBuffer(client, (hasClient && !hasServer)); 593 for (int i = 0; i < clientPrimServer.size(); ++i) 594 setClientServerBuffer(clientPrimServer[i], true); 595 596 if (hasClient) 597 { 598 // Send all attributes of current context to server 599 this->sendAllAttributesToServer(); 600 601 // Send all attributes of current calendar 602 CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(); 603 604 // We have enough information to send to server 605 // First of all, send all enabled files 606 sendEnabledFiles(this->enabledWriteModeFiles); 607 // We only use server-level 1 (for now) to read data 608 if (!hasServer) 609 sendEnabledFiles(this->enabledReadModeFiles); 610 611 // Then, send all enabled fields 612 sendEnabledFieldsInFiles(this->enabledWriteModeFiles); 613 if (!hasServer) 614 sendEnabledFieldsInFiles(this->enabledReadModeFiles); 615 616 // Then, check whether we have domain_ref, axis_ref or scalar_ref attached to the enabled fields 617 // If any, so send them to server 618 sendRefDomainsAxisScalars(this->enabledWriteModeFiles); 619 if (!hasServer) 620 sendRefDomainsAxisScalars(this->enabledReadModeFiles); 621 622 // Check whether enabled fields have grid_ref, if any, send this info to server 623 sendRefGrid(this->enabledFiles); 624 // This code may be useful in the future when we want to seperate completely read and write 625 // sendRefGrid(this->enabledWriteModeFiles); 626 // if (!hasServer) 627 // sendRefGrid(this->enabledReadModeFiles); 628 629 // A grid of enabled fields composed of several components which must be checked then their 630 // checked attributes should be sent to server 631 sendGridComponentEnabledFieldsInFiles(this->enabledFiles); // This code can be seperated in two (one for reading, another for writing) 632 633 // We have a xml tree on the server side and now, it should be also processed 634 sendPostProcessing(); 635 636 // Finally, we send information of grid itself to server 637 sendGridEnabledFieldsInFiles(this->enabledWriteModeFiles); 638 if (!hasServer) 639 sendGridEnabledFieldsInFiles(this->enabledReadModeFiles); 640 } 641 allProcessed = true; 642 } 643 CATCH_DUMP_ATTR 644 645 void CContext::sendPostProcessingGlobalAttributes() 646 TRY 647 { 648 // Use correct context client to send message 649 // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; 650 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 651 for (int i = 0; i < nbSrvPools; ++i) 652 { 653 CContextClient* contextClientTmp = (0 != clientPrimServer.size()) ? clientPrimServer[i] : client; 654 CEventClient event(getType(),EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES); 655 656 if (contextClientTmp->isServerLeader()) 657 { 658 CMessage msg; 659 if (hasServer) 660 msg<<this->getIdServer(i); 661 else 662 msg<<this->getIdServer(); 663 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 664 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 665 event.push(*itRank,1,msg); 666 contextClientTmp->sendEvent(event); 667 } 668 else contextClientTmp->sendEvent(event); 669 } 670 } 671 CATCH_DUMP_ATTR 672 673 void CContext::recvPostProcessingGlobalAttributes(CEventServer& event) 674 TRY 675 { 676 CBufferIn* buffer=event.subEvents.begin()->buffer; 677 string id; 678 *buffer>>id; 679 get(id)->recvPostProcessingGlobalAttributes(*buffer); 680 } 681 CATCH 682 683 void CContext::recvPostProcessingGlobalAttributes(CBufferIn& buffer) 684 TRY 685 { 686 postProcessingGlobalAttributes(); 687 } 688 CATCH_DUMP_ATTR 689 900 690 901 /*! 691 902 \brief Close all the context defintion and do processing data … … 697 908 and the active fields (fields will be written onto active files) 698 909 */ 699 700 void CContext::closeDefinition(void) 701 TRY 702 { 703 CTimer::get("Context : close definition").resume() ; 704 postProcessingGlobalAttributes(); 705 706 if (hasClient) sendPostProcessingGlobalAttributes(); 707 708 // There are some processings that should be done after all of above. For example: check mask or index 709 this->buildFilterGraphOfEnabledFields(); 910 void CContext::closeDefinition(void) 911 TRY 912 { 913 CTimer::get("Context : close definition").resume() ; 914 915 // create intercommunicator with servers. 916 // not sure it is the good place to be called here 917 createServerInterComm() ; 918 919 920 // After xml is parsed, there are some more works with post processing 921 // postProcessing(); 922 710 923 711 if (hasClient && !hasServer) 712 { 713 buildFilterGraphOfFieldsWithReadAccess(); 714 postProcessFilterGraph(); 715 } 924 // Make sure the calendar was correctly created 925 if (serviceType_!=CServicesManager::CLIENT) CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar(); 926 if (!calendar) 927 ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"") 928 else if (calendar->getTimeStep() == NoneDu) 929 ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"") 930 // Calendar first update to set the current date equals to the start date 931 calendar->update(0); 932 933 // Résolution des héritages descendants (cà d des héritages de groupes) 934 // pour chacun des contextes. 935 solveDescInheritance(true); 936 937 // Solve inheritance for field to know if enabled or not. 938 for (auto field : CField::getAll()) field->solveRefInheritance(); 939 940 // Check if some axis, domains or grids are eligible to for compressed indexed output. 941 // Warning: This must be done after solving the inheritance and before the rest of post-processing 942 // --> later ???? checkAxisDomainsGridsEligibilityForCompressedOutput(); 943 944 // Check if some automatic time series should be generated 945 // Warning: This must be done after solving the inheritance and before the rest of post-processing 946 947 // The timeseries should only be prepared in client 948 prepareTimeseries(); 949 950 //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir. 951 findEnabledFiles(); 952 findEnabledWriteModeFiles(); 953 findEnabledReadModeFiles(); 954 findEnabledCouplerIn(); 955 findEnabledCouplerOut(); 956 createCouplerInterCommunicator() ; 957 958 // Find all enabled fields of each file 959 vector<CField*>&& fileOutField = findAllEnabledFieldsInFileOut(this->enabledWriteModeFiles); 960 vector<CField*>&& fileInField = findAllEnabledFieldsInFileIn(this->enabledReadModeFiles); 961 vector<CField*>&& couplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut); 962 vector<CField*>&& couplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn); 963 findFieldsWithReadAccess(); 964 vector<CField*>& fieldWithReadAccess = fieldsWithReadAccess_ ; 965 vector<CField*> fieldModelIn ; // fields potentially from model 966 967 // define if files are on clientSied or serverSide 968 if (serviceType_==CServicesManager::CLIENT) 969 { 970 for (auto& file : enabledWriteModeFiles) file->setClientSide() ; 971 for (auto& file : enabledReadModeFiles) file->setClientSide() ; 972 } 973 else 974 { 975 for (auto& file : enabledWriteModeFiles) file->setServerSide() ; 976 for (auto& file : enabledReadModeFiles) file->setServerSide() ; 977 } 978 716 979 717 checkGridEnabledFields(); 718 719 if (hasClient) this->sendProcessingGridOfEnabledFields(); 720 if (hasClient) this->sendCloseDefinition(); 721 722 // Nettoyage de l'arborescence 723 if (hasClient) CleanTree(); // Only on client side?? 724 725 if (hasClient) 726 { 727 sendCreateFileHeader(); 728 if (!hasServer) startPrefetchingOfEnabledReadModeFiles(); 729 } 730 CTimer::get("Context : close definition").suspend() ; 731 } 732 CATCH_DUMP_ATTR 733 734 void CContext::findAllEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles) 735 TRY 736 { 737 for (unsigned int i = 0; i < activeFiles.size(); i++) 738 (void)activeFiles[i]->getEnabledFields(); 739 } 740 CATCH_DUMP_ATTR 741 980 for (auto& field : couplerInField) 981 { 982 field->unsetGridCompleted() ; 983 } 984 // find all field potentially at workflow end 985 vector<CField*> endWorkflowFields ; 986 endWorkflowFields.reserve(fileOutField.size()+couplerOutField.size()+fieldWithReadAccess.size()) ; 987 endWorkflowFields.insert(endWorkflowFields.end(),fileOutField.begin(), fileOutField.end()) ; 988 endWorkflowFields.insert(endWorkflowFields.end(),couplerOutField.begin(), couplerOutField.end()) ; 989 endWorkflowFields.insert(endWorkflowFields.end(),fieldWithReadAccess.begin(), fieldWithReadAccess.end()) ; 990 991 bool workflowGraphIsCompleted ; 992 993 bool first=true ; 994 do 995 { 996 workflowGraphIsCompleted=true; 997 for(auto endWorkflowField : endWorkflowFields) 998 { 999 workflowGraphIsCompleted &= endWorkflowField->buildWorkflowGraph(garbageCollector) ; 1000 } 1001 1002 for(auto couplerIn : enabledCouplerIn) couplerIn->assignContext() ; 1003 for(auto field : couplerInField) field->makeGridAliasForCoupling(); 1004 for(auto field : couplerInField) this->sendCouplerInReady(field->getContextClient()) ; 1005 1006 1007 // assign context to coupler out and related fields 1008 for(auto couplerOut : enabledCouplerOut) couplerOut->assignContext() ; 1009 // for now supose that all coupling out endpoint are succesfull. The difficultie is client/server buffer evaluation 1010 for(auto field : couplerOutField) 1011 { 1012 // connect to couplerOut -> to do 1013 } 1014 if (first) setClientServerBuffer(couplerOutField, true) ; // set buffer context --> to check 1015 1016 bool couplersReady ; 1017 do 1018 { 1019 couplersReady=true ; 1020 for(auto field : couplerOutField) 1021 { 1022 bool ready = isCouplerInReady(field->getContextClient()) ; 1023 if (ready) field->sendFieldToCouplerOut() ; 1024 couplersReady &= ready ; 1025 } 1026 if (!couplersReady) this->eventLoop() ; 1027 } while (!couplersReady) ; 1028 1029 first=false ; 1030 this->eventLoop() ; 1031 } while (!workflowGraphIsCompleted) ; 1032 1033 for( auto field : couplerInField) couplerInFields_.push_back(field) ; 1034 1035 // get all field coming potentially from model 1036 for (auto field : CField::getAll() ) if (field->getModelIn()) fieldModelIn.push_back(field) ; 1037 1038 // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields 1039 if (serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles); 1040 else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 1041 1042 // client side, assign context for file reading 1043 if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ; 1044 1045 // server side, assign context where to send file data read 1046 if (serviceType_==CServicesManager::CServicesManager::GATHERER || serviceType_==CServicesManager::IO_SERVER) 1047 for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ; 1048 1049 // workflow endpoint => sent to IO/SERVER 1050 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) 1051 { 1052 for(auto field : fileOutField) 1053 { 1054 field->connectToFileServer(garbageCollector) ; // connect the field to server filter 1055 } 1056 setClientServerBuffer(fileOutField, true) ; // set buffer context --> to review 1057 for(auto field : fileOutField) field->sendFieldToFileServer() ; 1058 } 1059 1060 // workflow endpoint => write to file 1061 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 1062 { 1063 for(auto field : fileOutField) 1064 { 1065 field->connectToFileWriter(garbageCollector) ; // connect the field to server filter 1066 } 1067 } 1068 1069 // workflow endpoint => Send data from server to client 1070 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER) 1071 { 1072 for(auto field : fileInField) 1073 { 1074 field->connectToServerToClient(garbageCollector) ; 1075 } 1076 } 1077 1078 // workflow endpoint => sent to model on client side 1079 if (serviceType_==CServicesManager::CLIENT) 1080 { 1081 for(auto field : fieldWithReadAccess) field->connectToModelOutput(garbageCollector) ; 1082 } 1083 1084 1085 // workflow startpoint => data from model 1086 if (serviceType_==CServicesManager::CLIENT) 1087 { 1088 for(auto field : fieldModelIn) 1089 { 1090 field->connectToModelInput(garbageCollector) ; // connect the field to server filter 1091 // grid index will be computed on the fly 1092 } 1093 } 1094 1095 // workflow startpoint => data from client on server side 1096 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER || serviceType_==CServicesManager::OUT_SERVER) 1097 { 1098 for(auto field : fieldModelIn) 1099 { 1100 field->connectToClientInput(garbageCollector) ; // connect the field to server filter 1101 } 1102 } 1103 1104 1105 for(auto field : couplerInField) 1106 { 1107 field->connectToCouplerIn(garbageCollector) ; // connect the field to server filter 1108 } 1109 1110 1111 for(auto field : couplerOutField) 1112 { 1113 field->connectToCouplerOut(garbageCollector) ; // for now the same kind of filter that for file server 1114 } 1115 1116 // workflow startpoint => data from server on client side 1117 if (serviceType_==CServicesManager::CLIENT) 1118 { 1119 for(auto field : fileInField) 1120 { 1121 field->sendFieldToInputFileServer() ; 1122 field->connectToServerInput(garbageCollector) ; // connect the field to server filter 1123 fileInFields_.push_back(field) ; 1124 } 1125 } 1126 1127 // workflow startpoint => data read from file on server side 1128 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER) 1129 { 1130 for(auto field : fileInField) 1131 { 1132 field->connectToFileReader(garbageCollector) ; 1133 } 1134 } 1135 1136 // construct slave server list 1137 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) 1138 { 1139 for(auto field : fileOutField) slaveServers_.insert(field->getContextClient()) ; 1140 for(auto field : fileInField) slaveServers_.insert(field->getContextClient()) ; 1141 } 1142 1143 for(auto& slaveServer : slaveServers_) sendCloseDefinition(slaveServer) ; 1144 1145 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 1146 { 1147 createFileHeader(); 1148 } 1149 1150 if (serviceType_==CServicesManager::CLIENT) startPrefetchingOfEnabledReadModeFiles(); 1151 1152 // send signal to couplerIn context that definition phasis is done 1153 1154 for(auto& couplerInClient : couplerInClient_) sendCouplerInCloseDefinition(couplerInClient.second) ; 1155 1156 // wait until all couplerIn signal that closeDefition is done. 1157 bool ok; 1158 do 1159 { 1160 ok = true ; 1161 for(auto& couplerOutClient : couplerOutClient_) ok &= isCouplerInCloseDefinition(couplerOutClient.second) ; 1162 this->eventLoop() ; 1163 } while (!ok) ; 1164 1165 CTimer::get("Context : close definition").suspend() ; 1166 } 1167 CATCH_DUMP_ATTR 1168 1169 1170 vector<CField*> CContext::findAllEnabledFieldsInFileOut(const std::vector<CFile*>& activeFiles) 1171 TRY 1172 { 1173 vector<CField*> fields ; 1174 for(auto file : activeFiles) 1175 { 1176 const vector<CField*>&& fieldList=file->getEnabledFields() ; 1177 for(auto field : fieldList) field->setFileOut(file) ; 1178 fields.insert(fields.end(),fieldList.begin(),fieldList.end()); 1179 } 1180 return fields ; 1181 } 1182 CATCH_DUMP_ATTR 1183 1184 vector<CField*> CContext::findAllEnabledFieldsInFileIn(const std::vector<CFile*>& activeFiles) 1185 TRY 1186 { 1187 vector<CField*> fields ; 1188 for(auto file : activeFiles) 1189 { 1190 const vector<CField*>&& fieldList=file->getEnabledFields() ; 1191 for(auto field : fieldList) field->setFileIn(file) ; 1192 fields.insert(fields.end(),fieldList.begin(),fieldList.end()); 1193 } 1194 return fields ; 1195 } 1196 CATCH_DUMP_ATTR 1197 1198 vector<CField*> CContext::findAllEnabledFieldsCouplerOut(const std::vector<CCouplerOut*>& activeCouplerOut) 1199 TRY 1200 { 1201 vector<CField*> fields ; 1202 for (auto couplerOut :activeCouplerOut) 1203 { 1204 const vector<CField*>&& fieldList=couplerOut->getEnabledFields() ; 1205 for(auto field : fieldList) field->setCouplerOut(couplerOut) ; 1206 fields.insert(fields.end(),fieldList.begin(),fieldList.end()); 1207 } 1208 return fields ; 1209 } 1210 CATCH_DUMP_ATTR 1211 1212 vector<CField*> CContext::findAllEnabledFieldsCouplerIn(const std::vector<CCouplerIn*>& activeCouplerIn) 1213 TRY 1214 { 1215 vector<CField*> fields ; 1216 for (auto couplerIn :activeCouplerIn) 1217 { 1218 const vector<CField*>&& fieldList=couplerIn->getEnabledFields() ; 1219 for(auto field : fieldList) field->setCouplerIn(couplerIn) ; 1220 fields.insert(fields.end(),fieldList.begin(),fieldList.end()); 1221 } 1222 return fields ; 1223 } 1224 CATCH_DUMP_ATTR 1225 1226 /*! 1227 * Send context attribute and calendar to file server, it must be done once by context file server 1228 * \param[in] client : context client to send 1229 */ 1230 void CContext::sendContextToFileServer(CContextClient* client) 1231 { 1232 if (sendToFileServer_done_.count(client)!=0) return ; 1233 else sendToFileServer_done_.insert(client) ; 1234 1235 this->sendAllAttributesToServer(client); // Send all attributes of current context to server 1236 CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(client); // Send all attributes of current cale 1237 } 1238 1239 742 1240 void CContext::readAttributesOfEnabledFieldsInReadModeFiles() 743 1241 TRY … … 748 1246 CATCH_DUMP_ATTR 749 1247 750 void CContext::sendGridComponentEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)751 TRY752 {753 int size = activeFiles.size();754 for (int i = 0; i < size; ++i)755 {756 activeFiles[i]->sendGridComponentOfEnabledFields();757 }758 }759 CATCH_DUMP_ATTR760 761 /*!762 Send active (enabled) fields in file from a client to others763 \param [in] activeFiles files contains enabled fields to send764 */765 void CContext::sendGridEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)766 TRY767 {768 int size = activeFiles.size();769 for (int i = 0; i < size; ++i)770 {771 activeFiles[i]->sendGridOfEnabledFields();772 }773 }774 CATCH_DUMP_ATTR775 776 void CContext::checkGridEnabledFields()777 TRY778 {779 int size = enabledFiles.size();780 for (int i = 0; i < size; ++i)781 {782 enabledFiles[i]->checkGridOfEnabledFields();783 }784 }785 CATCH_DUMP_ATTR786 787 /*!788 Check grid of active (enabled) fields in file789 \param [in] activeFiles files contains enabled fields whose grid needs checking790 */791 void CContext::checkGridEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles)792 TRY793 {794 int size = activeFiles.size();795 for (int i = 0; i < size; ++i)796 {797 activeFiles[i]->checkGridOfEnabledFields();798 }799 }800 CATCH_DUMP_ATTR801 802 /*!803 Go up the hierachical tree via field_ref and do check of attributes of fields804 This can be done in a client then all computed information will be sent from this client to others805 \param [in] sendToServer Flag to indicate whether calculated information will be sent806 */807 void CContext::solveOnlyRefOfEnabledFields(bool sendToServer)808 TRY809 {810 int size = this->enabledFiles.size();811 for (int i = 0; i < size; ++i)812 {813 this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);814 }815 816 for (int i = 0; i < size; ++i)817 {818 this->enabledFiles[i]->generateNewTransformationGridDest();819 }820 }821 CATCH_DUMP_ATTR822 823 /*!824 Go up the hierachical tree via field_ref and do check of attributes of fields.825 The transformation can be done in this step.826 All computed information will be sent from this client to others.827 \param [in] sendToServer Flag to indicate whether calculated information will be sent828 */829 void CContext::solveAllRefOfEnabledFieldsAndTransform(bool sendToServer)830 TRY831 {832 int size = this->enabledFiles.size();833 for (int i = 0; i < size; ++i)834 {835 this->enabledFiles[i]->solveAllRefOfEnabledFieldsAndTransform(sendToServer);836 }837 }838 CATCH_DUMP_ATTR839 840 void CContext::buildFilterGraphOfEnabledFields()841 TRY842 {843 int size = this->enabledFiles.size();844 for (int i = 0; i < size; ++i)845 {846 this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector);847 }848 }849 CATCH_DUMP_ATTR850 1248 851 1249 void CContext::postProcessFilterGraph() … … 896 1294 TRY 897 1295 { 898 fieldsWithReadAccess .clear();1296 fieldsWithReadAccess_.clear(); 899 1297 const vector<CField*> allFields = CField::getAll(); 900 1298 for (size_t i = 0; i < allFields.size(); ++i) 901 1299 { 902 1300 CField* field = allFields[i]; 903 904 if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read)905 field ->read_access = true;906 else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))907 fieldsWithReadAccess.push_back(field);1301 if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled)) 1302 { 1303 fieldsWithReadAccess_.push_back(field); 1304 field->setModelOut() ; 1305 } 908 1306 } 909 1307 } 910 1308 CATCH_DUMP_ATTR 911 1309 912 void CContext::solveAllRefOfFieldsWithReadAccess() 913 TRY 914 { 915 for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i) 916 fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false); 917 } 918 CATCH_DUMP_ATTR 919 920 void CContext::buildFilterGraphOfFieldsWithReadAccess() 921 TRY 922 { 923 for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i) 924 fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true); 925 } 926 CATCH_DUMP_ATTR 927 1310 928 1311 void CContext::solveAllInheritance(bool apply) 929 1312 TRY … … 935 1318 // Résolution des héritages par référence au niveau des fichiers. 936 1319 const vector<CFile*> allFiles=CFile::getAll(); 1320 const vector<CCouplerIn*> allCouplerIn=CCouplerIn::getAll(); 1321 const vector<CCouplerOut*> allCouplerOut=CCouplerOut::getAll(); 937 1322 const vector<CGrid*> allGrids= CGrid::getAll(); 938 1323 939 if (hasClient && !hasServer) 940 //if (hasClient) 1324 if (serviceType_==CServicesManager::CLIENT) 941 1325 { 942 1326 for (unsigned int i = 0; i < allFiles.size(); i++) 943 1327 allFiles[i]->solveFieldRefInheritance(apply); 1328 1329 for (unsigned int i = 0; i < allCouplerIn.size(); i++) 1330 allCouplerIn[i]->solveFieldRefInheritance(apply); 1331 1332 for (unsigned int i = 0; i < allCouplerOut.size(); i++) 1333 allCouplerOut[i]->solveFieldRefInheritance(apply); 944 1334 } 945 1335 … … 947 1337 unsigned int i = 0; 948 1338 for (i = 0; i < vecSize; ++i) 949 allGrids[i]->solve DomainAxisRefInheritance(apply);1339 allGrids[i]->solveElementsRefInheritance(apply); 950 1340 951 1341 } … … 1004 1394 CATCH_DUMP_ATTR 1005 1395 1006 void CContext::distributeFiles(void) 1396 void CContext::findEnabledCouplerIn(void) 1397 TRY 1398 { 1399 const std::vector<CCouplerIn*> allCouplerIn = CCouplerIn::getAll(); 1400 bool enabled ; 1401 for (size_t i = 0; i < allCouplerIn.size(); i++) 1402 { 1403 if (allCouplerIn[i]->enabled.isEmpty()) enabled=true ; 1404 else enabled=allCouplerIn[i]->enabled ; 1405 if (enabled) enabledCouplerIn.push_back(allCouplerIn[i]) ; 1406 } 1407 } 1408 CATCH_DUMP_ATTR 1409 1410 void CContext::findEnabledCouplerOut(void) 1411 TRY 1412 { 1413 const std::vector<CCouplerOut*> allCouplerOut = CCouplerOut::getAll(); 1414 bool enabled ; 1415 for (size_t i = 0; i < allCouplerOut.size(); i++) 1416 { 1417 if (allCouplerOut[i]->enabled.isEmpty()) enabled=true ; 1418 else enabled=allCouplerOut[i]->enabled ; 1419 if (enabled) enabledCouplerOut.push_back(allCouplerOut[i]) ; 1420 } 1421 } 1422 CATCH_DUMP_ATTR 1423 1424 1425 1426 1427 void CContext::distributeFiles(const vector<CFile*>& files) 1007 1428 TRY 1008 1429 { … … 1010 1431 distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory); 1011 1432 1012 if (distFileMemory) distributeFileOverMemoryBandwith( ) ;1013 else distributeFileOverBandwith( ) ;1014 } 1015 CATCH_DUMP_ATTR 1016 1017 void CContext::distributeFileOverBandwith( void)1433 if (distFileMemory) distributeFileOverMemoryBandwith(files) ; 1434 else distributeFileOverBandwith(files) ; 1435 } 1436 CATCH_DUMP_ATTR 1437 1438 void CContext::distributeFileOverBandwith(const vector<CFile*>& files) 1018 1439 TRY 1019 1440 { 1020 1441 double eps=std::numeric_limits<double>::epsilon()*10 ; 1021 1442 1022 // If primary server 1023 if (hasServer && hasClient) 1024 { 1025 std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 1026 int nbPools = clientPrimServer.size(); 1027 1028 // (1) Find all enabled files in write mode 1029 // for (int i = 0; i < this->enabledFiles.size(); ++i) 1030 // { 1031 // if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write )) 1032 // enabledWriteModeFiles.push_back(enabledFiles[i]); 1033 // } 1034 1035 // (2) Estimate the data volume for each file 1036 int size = this->enabledWriteModeFiles.size(); 1037 std::vector<std::pair<double, CFile*> > dataSizeMap; 1038 double dataPerPool = 0; 1039 int nfield=0 ; 1040 ofs<<size<<endl ; 1041 for (size_t i = 0; i < size; ++i) 1443 std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 1444 int nbPools = clientPrimServer.size(); 1445 1446 // (1) Find all enabled files in write mode 1447 // for (int i = 0; i < this->enabledFiles.size(); ++i) 1448 // { 1449 // if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write )) 1450 // enabledWriteModeFiles.push_back(enabledFiles[i]); 1451 // } 1452 1453 // (2) Estimate the data volume for each file 1454 int size = files.size(); 1455 std::vector<std::pair<double, CFile*> > dataSizeMap; 1456 double dataPerPool = 0; 1457 int nfield=0 ; 1458 ofs<<size<<endl ; 1459 for (size_t i = 0; i < size; ++i) 1460 { 1461 CFile* file = files[i]; 1462 ofs<<file->getId()<<endl ; 1463 StdSize dataSize=0; 1464 std::vector<CField*> enabledFields = file->getEnabledFields(); 1465 size_t numEnabledFields = enabledFields.size(); 1466 ofs<<numEnabledFields<<endl ; 1467 for (size_t j = 0; j < numEnabledFields; ++j) 1042 1468 { 1043 CFile* file = this->enabledWriteModeFiles[i]; 1044 ofs<<file->getId()<<endl ; 1045 StdSize dataSize=0; 1046 std::vector<CField*> enabledFields = file->getEnabledFields(); 1047 size_t numEnabledFields = enabledFields.size(); 1048 ofs<<numEnabledFields<<endl ; 1049 for (size_t j = 0; j < numEnabledFields; ++j) 1469 dataSize += enabledFields[j]->getGlobalWrittenSize() ; 1470 ofs<<enabledFields[j]->getGrid()->getId()<<endl ; 1471 ofs<<enabledFields[j]->getGlobalWrittenSize()<<endl ; 1472 } 1473 double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; 1474 double dataSizeSec= dataSize/ outFreqSec; 1475 ofs<<dataSizeSec<<endl ; 1476 nfield++ ; 1477 // add epsilon*nField to dataSizeSec in order to preserve reproductive ordering when sorting 1478 dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file)); 1479 dataPerPool += dataSizeSec; 1480 } 1481 dataPerPool /= nbPools; 1482 std::sort(dataSizeMap.begin(), dataSizeMap.end()); 1483 1484 // (3) Assign contextClient to each enabled file 1485 1486 std::multimap<double,int> poolDataSize ; 1487 // multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11 1488 1489 int j; 1490 double dataSize ; 1491 for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ; 1492 1493 for (int i = dataSizeMap.size()-1; i >= 0; --i) 1494 { 1495 dataSize=(*poolDataSize.begin()).first ; 1496 j=(*poolDataSize.begin()).second ; 1497 dataSizeMap[i].second->setContextClient(clientPrimServer[j]); 1498 dataSize+=dataSizeMap[i].first; 1499 poolDataSize.erase(poolDataSize.begin()) ; 1500 poolDataSize.insert(std::pair<double,int>(dataSize,j)) ; 1501 } 1502 1503 for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) info(30)<<"Load Balancing for servers (perfect=1) : "<<it->second<<" : ratio "<<it->first*1./dataPerPool<<endl ; 1504 } 1505 CATCH_DUMP_ATTR 1506 1507 void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList) 1508 TRY 1509 { 1510 int nbPools = clientPrimServer.size(); 1511 double ratio=0.5 ; 1512 ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); 1513 1514 int nFiles = filesList.size(); 1515 vector<SDistFile> files(nFiles); 1516 vector<SDistGrid> grids; 1517 map<string,int> gridMap ; 1518 string gridId; 1519 int gridIndex=0 ; 1520 1521 for (size_t i = 0; i < nFiles; ++i) 1522 { 1523 StdSize dataSize=0; 1524 CFile* file = filesList[i]; 1525 std::vector<CField*> enabledFields = file->getEnabledFields(); 1526 size_t numEnabledFields = enabledFields.size(); 1527 1528 files[i].id_=file->getId() ; 1529 files[i].nbGrids_=numEnabledFields; 1530 files[i].assignedGrid_ = new int[files[i].nbGrids_] ; 1531 1532 for (size_t j = 0; j < numEnabledFields; ++j) 1533 { 1534 gridId=enabledFields[j]->getGrid()->getId() ; 1535 if (gridMap.find(gridId)==gridMap.end()) 1050 1536 { 1051 dataSize += enabledFields[j]->getGlobalWrittenSize() ; 1052 ofs<<enabledFields[j]->grid->getId()<<endl ; 1053 ofs<<enabledFields[j]->getGlobalWrittenSize()<<endl ; 1537 gridMap[gridId]=gridIndex ; 1538 SDistGrid newGrid; 1539 grids.push_back(newGrid) ; 1540 gridIndex++ ; 1054 1541 } 1055 double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; 1056 double dataSizeSec= dataSize/ outFreqSec; 1057 ofs<<dataSizeSec<<endl ; 1058 nfield++ ; 1059 // add epsilon*nField to dataSizeSec in order to preserve reproductive ordering when sorting 1060 dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file)); 1061 dataPerPool += dataSizeSec; 1542 files[i].assignedGrid_[j]=gridMap[gridId] ; 1543 grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ; 1544 dataSize += enabledFields[j]->getGlobalWrittenSize() ; // usefull 1062 1545 } 1063 dataPerPool /= nbPools; 1064 std::sort(dataSizeMap.begin(), dataSizeMap.end()); 1065 1066 // (3) Assign contextClient to each enabled file 1067 1068 std::multimap<double,int> poolDataSize ; 1069 // multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11 1070 1071 int j; 1072 double dataSize ; 1073 for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ; 1074 1075 for (int i = dataSizeMap.size()-1; i >= 0; --i) 1546 double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; 1547 files[i].bandwith_= dataSize/ outFreqSec ; 1548 } 1549 1550 double bandwith=0 ; 1551 double memory=0 ; 1552 1553 for(int i=0; i<nFiles; i++) bandwith+=files[i].bandwith_ ; 1554 for(int i=0; i<nFiles; i++) files[i].bandwith_ = files[i].bandwith_/bandwith * ratio ; 1555 1556 for(int i=0; i<grids.size(); i++) memory+=grids[i].size_ ; 1557 for(int i=0; i<grids.size(); i++) grids[i].size_ = grids[i].size_ / memory * (1.0-ratio) ; 1558 1559 distributeFileOverServer2(nbPools, grids.size(), &grids[0], nFiles, &files[0]) ; 1560 1561 vector<double> memorySize(nbPools,0.) ; 1562 vector< set<int> > serverGrids(nbPools) ; 1563 vector<double> bandwithSize(nbPools,0.) ; 1564 1565 for (size_t i = 0; i < nFiles; ++i) 1566 { 1567 bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ; 1568 for(int j=0 ; j<files[i].nbGrids_;j++) 1076 1569 { 1077 dataSize=(*poolDataSize.begin()).first ; 1078 j=(*poolDataSize.begin()).second ; 1079 dataSizeMap[i].second->setContextClient(clientPrimServer[j]); 1080 dataSize+=dataSizeMap[i].first; 1081 poolDataSize.erase(poolDataSize.begin()) ; 1082 poolDataSize.insert(std::pair<double,int>(dataSize,j)) ; 1570 if (serverGrids[files[i].assignedServer_].find(files[i].assignedGrid_[j]) == serverGrids[files[i].assignedServer_].end()) 1571 { 1572 memorySize[files[i].assignedServer_]+= grids[files[i].assignedGrid_[j]].size_ * memory / (1.0-ratio); 1573 serverGrids[files[i].assignedServer_].insert(files[i].assignedGrid_[j]) ; 1574 } 1083 1575 } 1084 1085 for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) info(30)<<"Load Balancing for servers (perfect=1) : "<<it->second<<" : ratio "<<it->first*1./dataPerPool<<endl ; 1086 1087 for (int i = 0; i < this->enabledReadModeFiles.size(); ++i) 1088 { 1089 enabledReadModeFiles[i]->setContextClient(client); 1090 } 1091 } 1092 else 1093 { 1094 for (int i = 0; i < this->enabledFiles.size(); ++i) 1095 enabledFiles[i]->setContextClient(client); 1096 } 1097 } 1098 CATCH_DUMP_ATTR 1099 1100 void CContext::distributeFileOverMemoryBandwith(void) 1101 TRY 1102 { 1103 // If primary server 1104 if (hasServer && hasClient) 1105 { 1106 int nbPools = clientPrimServer.size(); 1107 double ratio=0.5 ; 1108 ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); 1109 1110 int nFiles = this->enabledWriteModeFiles.size(); 1111 vector<SDistFile> files(nFiles); 1112 vector<SDistGrid> grids; 1113 map<string,int> gridMap ; 1114 string gridId; 1115 int gridIndex=0 ; 1116 1117 for (size_t i = 0; i < nFiles; ++i) 1118 { 1119 StdSize dataSize=0; 1120 CFile* file = this->enabledWriteModeFiles[i]; 1121 std::vector<CField*> enabledFields = file->getEnabledFields(); 1122 size_t numEnabledFields = enabledFields.size(); 1123 1124 files[i].id_=file->getId() ; 1125 files[i].nbGrids_=numEnabledFields; 1126 files[i].assignedGrid_ = new int[files[i].nbGrids_] ; 1127 1128 for (size_t j = 0; j < numEnabledFields; ++j) 1129 { 1130 gridId=enabledFields[j]->grid->getId() ; 1131 if (gridMap.find(gridId)==gridMap.end()) 1132 { 1133 gridMap[gridId]=gridIndex ; 1134 SDistGrid newGrid; 1135 grids.push_back(newGrid) ; 1136 gridIndex++ ; 1137 } 1138 files[i].assignedGrid_[j]=gridMap[gridId] ; 1139 grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ; 1140 dataSize += enabledFields[j]->getGlobalWrittenSize() ; // usefull 1141 } 1142 double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; 1143 files[i].bandwith_= dataSize/ outFreqSec ; 1144 } 1145 1146 double bandwith=0 ; 1147 double memory=0 ; 1148 1149 for(int i=0; i<nFiles; i++) bandwith+=files[i].bandwith_ ; 1150 for(int i=0; i<nFiles; i++) files[i].bandwith_ = files[i].bandwith_/bandwith * ratio ; 1151 1152 for(int i=0; i<grids.size(); i++) memory+=grids[i].size_ ; 1153 for(int i=0; i<grids.size(); i++) grids[i].size_ = grids[i].size_ / memory * (1.0-ratio) ; 1154 1155 distributeFileOverServer2(nbPools, grids.size(), &grids[0], nFiles, &files[0]) ; 1156 1157 vector<double> memorySize(nbPools,0.) ; 1158 vector< set<int> > serverGrids(nbPools) ; 1159 vector<double> bandwithSize(nbPools,0.) ; 1160 1161 for (size_t i = 0; i < nFiles; ++i) 1162 { 1163 bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ; 1164 for(int j=0 ; j<files[i].nbGrids_;j++) 1165 { 1166 if (serverGrids[files[i].assignedServer_].find(files[i].assignedGrid_[j]) == serverGrids[files[i].assignedServer_].end()) 1167 { 1168 memorySize[files[i].assignedServer_]+= grids[files[i].assignedGrid_[j]].size_ * memory / (1.0-ratio); 1169 serverGrids[files[i].assignedServer_].insert(files[i].assignedGrid_[j]) ; 1170 } 1171 } 1172 enabledWriteModeFiles[i]->setContextClient(clientPrimServer[files[i].assignedServer_]) ; 1173 delete [] files[i].assignedGrid_ ; 1174 } 1175 1176 for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<" assigned file bandwith "<<bandwithSize[i]*86400.*4./1024/1024.<<" Mb / days"<<endl ; 1177 for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<" assigned grid memory "<<memorySize[i]*100/1024./1024.<<" Mb"<<endl ; 1178 1179 1180 for (int i = 0; i < this->enabledReadModeFiles.size(); ++i) 1181 { 1182 enabledReadModeFiles[i]->setContextClient(client); 1183 } 1184 1185 } 1186 else 1187 { 1188 for (int i = 0; i < this->enabledFiles.size(); ++i) 1189 enabledFiles[i]->setContextClient(client); 1190 } 1191 } 1576 filesList[i]->setContextClient(clientPrimServer[files[i].assignedServer_]) ; 1577 delete [] files[i].assignedGrid_ ; 1578 } 1579 1580 for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<" assigned file bandwith "<<bandwithSize[i]*86400.*4./1024/1024.<<" Mb / days"<<endl ; 1581 for (int i = 0; i < nbPools; ++i) info(100)<<"Pool server level2 "<<i<<" assigned grid memory "<<memorySize[i]*100/1024./1024.<<" Mb"<<endl ; 1582 1583 } 1192 1584 CATCH_DUMP_ATTR 1193 1585 … … 1265 1657 return true; 1266 1658 break; 1267 case EVENT_ID_POST_PROCESS: 1268 recvPostProcessing(event); 1269 return true; 1270 case EVENT_ID_SEND_REGISTRY: 1659 case EVENT_ID_SEND_REGISTRY: 1271 1660 recvRegistry(event); 1272 1661 return true; 1273 1662 break; 1274 case EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES:1275 recv PostProcessingGlobalAttributes(event);1663 case EVENT_ID_COUPLER_IN_READY: 1664 recvCouplerInReady(event); 1276 1665 return true; 1277 1666 break; 1278 case EVENT_ID_PROCESS_GRID_ENABLED_FIELDS:1279 recv ProcessingGridOfEnabledFields(event);1667 case EVENT_ID_COUPLER_IN_CLOSE_DEFINITION: 1668 recvCouplerInCloseDefinition(event); 1280 1669 return true; 1281 1670 break; 1671 case EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED: 1672 recvCouplerInContextFinalized(event); 1673 return true; 1674 break; 1282 1675 default : 1283 1676 ERROR("bool CContext::dispatchEvent(CEventServer& event)", … … 1290 1683 1291 1684 //! Client side: Send a message to server to make it close 1685 // ym obsolete 1292 1686 void CContext::sendCloseDefinition(void) 1293 1687 TRY 1294 1688 { 1295 // Use correct context client to send message 1296 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1297 for (int i = 0; i < nbSrvPools; ++i) 1298 { 1299 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; 1689 int nbSrvPools ; 1690 if (serviceType_==CServicesManager::CLIENT) nbSrvPools = 1 ; 1691 else if (serviceType_==CServicesManager::GATHERER) nbSrvPools = this->clientPrimServer.size() ; 1692 else nbSrvPools = 0 ; 1693 CContextClient* contextClientTmp ; 1694 1695 for (int i = 0; i < nbSrvPools; ++i) 1696 { 1697 if (serviceType_==CServicesManager::CLIENT) contextClientTmp = client ; 1698 else if (serviceType_==CServicesManager::GATHERER ) contextClientTmp = clientPrimServer[i] ; 1300 1699 CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION); 1301 1700 if (contextClientTmp->isServerLeader()) 1302 1701 { 1303 1702 CMessage msg; 1304 if (hasServer)1305 msg<<this->getIdServer(i);1306 else1307 msg<<this->getIdServer();1308 1703 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 1309 1704 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) … … 1315 1710 } 1316 1711 CATCH_DUMP_ATTR 1712 1713 // ! Client side: Send a message to server to make it close 1714 void CContext::sendCloseDefinition(CContextClient* client) 1715 TRY 1716 { 1717 if (sendCloseDefinition_done_.count(client)!=0) return ; 1718 else sendCloseDefinition_done_.insert(client) ; 1719 1720 CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION); 1721 if (client->isServerLeader()) 1722 { 1723 CMessage msg; 1724 for(auto rank : client->getRanksServerLeader()) event.push(rank,1,msg); 1725 client->sendEvent(event); 1726 } 1727 else client->sendEvent(event); 1728 } 1729 CATCH_DUMP_ATTR 1317 1730 1318 1731 //! Server side: Receive a message of client announcing a context close … … 1321 1734 { 1322 1735 CBufferIn* buffer=event.subEvents.begin()->buffer; 1323 string id; 1324 *buffer>>id; 1325 get(id)->closeDefinition(); 1736 getCurrent()->closeDefinition(); 1326 1737 } 1327 1738 CATCH … … 1331 1742 TRY 1332 1743 { 1333 // Use correct context client to send message 1334 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1335 for (int i = 0; i < nbSrvPools; ++i) 1336 { 1337 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; 1338 CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR); 1339 1340 if (contextClientTmp->isServerLeader()) 1341 { 1342 CMessage msg; 1343 if (hasServer) 1344 msg<<this->getIdServer(i)<<step; 1345 else 1346 msg<<this->getIdServer()<<step; 1347 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 1348 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1349 event.push(*itRank,1,msg); 1350 contextClientTmp->sendEvent(event); 1351 } 1352 else contextClientTmp->sendEvent(event); 1744 CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR); 1745 for(auto client : slaveServers_) 1746 { 1747 if (client->isServerLeader()) 1748 { 1749 CMessage msg; 1750 msg<<step; 1751 for (auto& rank : client->getRanksServerLeader() ) event.push(rank,1,msg); 1752 client->sendEvent(event); 1753 } 1754 else client->sendEvent(event); 1353 1755 } 1354 1756 } … … 1360 1762 { 1361 1763 CBufferIn* buffer=event.subEvents.begin()->buffer; 1362 string id; 1363 *buffer>>id; 1364 get(id)->recvUpdateCalendar(*buffer); 1764 getCurrent()->recvUpdateCalendar(*buffer); 1365 1765 } 1366 1766 CATCH … … 1373 1773 buffer>>step; 1374 1774 updateCalendar(step); 1375 if ( hasClient && hasServer)1775 if (serviceType_==CServicesManager::GATHERER) 1376 1776 { 1377 1777 sendUpdateCalendar(step); … … 1384 1784 TRY 1385 1785 { 1386 // Use correct context client to send message 1387 // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; 1388 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1786 int nbSrvPools ; 1787 if (serviceType_==CServicesManager::CLIENT) nbSrvPools = 1 ; 1788 else if (serviceType_==CServicesManager::GATHERER) nbSrvPools = this->clientPrimServer.size() ; 1789 else nbSrvPools = 0 ; 1790 CContextClient* contextClientTmp ; 1791 1389 1792 for (int i = 0; i < nbSrvPools; ++i) 1390 1793 { 1391 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; 1794 if (serviceType_==CServicesManager::CLIENT) contextClientTmp = client ; 1795 else if (serviceType_==CServicesManager::GATHERER ) contextClientTmp = clientPrimServer[i] ; 1392 1796 CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER); 1393 1797 … … 1395 1799 { 1396 1800 CMessage msg; 1397 if (hasServer)1398 msg<<this->getIdServer(i);1399 else1400 msg<<this->getIdServer();1401 1801 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 1402 1802 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) … … 1414 1814 { 1415 1815 CBufferIn* buffer=event.subEvents.begin()->buffer; 1416 string id; 1417 *buffer>>id; 1418 get(id)->recvCreateFileHeader(*buffer); 1816 getCurrent()->recvCreateFileHeader(*buffer); 1419 1817 } 1420 1818 CATCH … … 1424 1822 TRY 1425 1823 { 1426 if ( !hasClient && hasServer)1824 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 1427 1825 createFileHeader(); 1428 1826 } 1429 1827 CATCH_DUMP_ATTR 1430 1828 1431 //! Client side: Send a message to do some post processing on server 1432 void CContext::sendProcessingGridOfEnabledFields() 1433 TRY 1434 { 1435 // Use correct context client to send message 1436 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1437 for (int i = 0; i < nbSrvPools; ++i) 1438 { 1439 CContextClient* contextClientTmp = (0 != clientPrimServer.size()) ? clientPrimServer[i] : client; 1440 CEventClient event(getType(),EVENT_ID_PROCESS_GRID_ENABLED_FIELDS); 1441 1442 if (contextClientTmp->isServerLeader()) 1443 { 1444 CMessage msg; 1445 if (hasServer) 1446 msg<<this->getIdServer(i); 1447 else 1448 msg<<this->getIdServer(); 1449 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 1450 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1451 event.push(*itRank,1,msg); 1452 contextClientTmp->sendEvent(event); 1453 } 1454 else contextClientTmp->sendEvent(event); 1455 } 1456 } 1457 CATCH_DUMP_ATTR 1458 1459 //! Server side: Receive a message to do some post processing 1460 void CContext::recvProcessingGridOfEnabledFields(CEventServer& event) 1461 TRY 1462 { 1463 CBufferIn* buffer=event.subEvents.begin()->buffer; 1464 string id; 1465 *buffer>>id; 1466 } 1467 CATCH 1468 1469 //! Client side: Send a message to do some post processing on server 1470 void CContext::sendPostProcessing() 1471 TRY 1472 { 1473 // Use correct context client to send message 1474 // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; 1475 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1476 for (int i = 0; i < nbSrvPools; ++i) 1477 { 1478 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; 1479 CEventClient event(getType(),EVENT_ID_POST_PROCESS); 1480 if (contextClientTmp->isServerLeader()) 1481 { 1482 CMessage msg; 1483 if (hasServer) 1484 msg<<this->getIdServer(i); 1485 else 1486 msg<<this->getIdServer(); 1487 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 1488 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1489 event.push(*itRank,1,msg); 1490 contextClientTmp->sendEvent(event); 1491 } 1492 else contextClientTmp->sendEvent(event); 1493 } 1494 } 1495 CATCH_DUMP_ATTR 1496 1497 //! Server side: Receive a message to do some post processing 1498 void CContext::recvPostProcessing(CEventServer& event) 1499 TRY 1500 { 1501 CBufferIn* buffer=event.subEvents.begin()->buffer; 1502 string id; 1503 *buffer>>id; 1504 get(id)->recvPostProcessing(*buffer); 1505 } 1506 CATCH 1507 1508 //! Server side: Receive a message to do some post processing 1509 void CContext::recvPostProcessing(CBufferIn& buffer) 1510 TRY 1511 { 1512 CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar(); 1513 postProcessing(); 1514 } 1515 CATCH_DUMP_ATTR 1516 1517 const StdString& CContext::getIdServer() 1518 TRY 1519 { 1520 if (hasClient) 1521 { 1522 idServer_ = this->getId(); 1523 idServer_ += "_server"; 1524 return idServer_; 1525 } 1526 if (hasServer) return (this->getId()); 1527 } 1528 CATCH_DUMP_ATTR 1529 1530 const StdString& CContext::getIdServer(const int i) 1531 TRY 1532 { 1533 idServer_ = this->getId(); 1534 idServer_ += "_server_"; 1535 idServer_ += std::to_string(static_cast<unsigned long long>(i)); 1536 return idServer_; 1537 } 1538 CATCH_DUMP_ATTR 1539 1540 /*! 1541 \brief Do some simple post processings after parsing xml file 1542 After the xml file (iodef.xml) is parsed, it is necessary to build all relations among 1543 created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields), 1544 which will be written out into netcdf files, are processed 1545 */ 1546 void CContext::postProcessing() 1547 TRY 1548 { 1549 if (isPostProcessed) return; 1550 1551 // Make sure the calendar was correctly created 1552 if (!calendar) 1553 ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"") 1554 else if (calendar->getTimeStep() == NoneDu) 1555 ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"") 1556 // Calendar first update to set the current date equals to the start date 1557 calendar->update(0); 1558 1559 // Find all inheritance in xml structure 1560 this->solveAllInheritance(); 1561 1562 // ShowTree(info(10)); 1563 1564 // Check if some axis, domains or grids are eligible to for compressed indexed output. 1565 // Warning: This must be done after solving the inheritance and before the rest of post-processing 1566 checkAxisDomainsGridsEligibilityForCompressedOutput(); 1567 1568 // Check if some automatic time series should be generated 1569 // Warning: This must be done after solving the inheritance and before the rest of post-processing 1570 1571 // The timeseries should only be prepared in client 1572 if (hasClient && !hasServer) prepareTimeseries(); 1573 1574 //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir. 1575 findEnabledFiles(); 1576 findEnabledWriteModeFiles(); 1577 findEnabledReadModeFiles(); 1578 1579 // For now, only read files with client and only one level server 1580 // if (hasClient && !hasServer) findEnabledReadModeFiles(); 1581 1582 // Find all enabled fields of each file 1583 findAllEnabledFieldsInFiles(this->enabledWriteModeFiles); 1584 findAllEnabledFieldsInFiles(this->enabledReadModeFiles); 1585 1586 // For now, only read files with client and only one level server 1587 // if (hasClient && !hasServer) 1588 // findAllEnabledFieldsInFiles(this->enabledReadModeFiles); 1589 1590 if (hasClient && !hasServer) 1591 { 1592 initReadFiles(); 1593 // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis) 1594 this->readAttributesOfEnabledFieldsInReadModeFiles(); 1595 } 1596 1597 // Only search and rebuild all reference objects of enable fields, don't transform 1598 this->solveOnlyRefOfEnabledFields(false); 1599 1600 // Search and rebuild all reference object of enabled fields, and transform 1601 this->solveAllRefOfEnabledFieldsAndTransform(false); 1602 1603 // Find all fields with read access from the public API 1604 if (hasClient && !hasServer) findFieldsWithReadAccess(); 1605 // and solve the all reference for them 1606 if (hasClient && !hasServer) solveAllRefOfFieldsWithReadAccess(); 1607 1608 isPostProcessed = true; 1609 } 1610 CATCH_DUMP_ATTR 1611 1612 /*! 1613 * Compute the required buffer size to send the attributes (mostly those grid related). 1614 * \param maxEventSize [in/out] the size of the bigger event for each connected server 1615 * \param [in] contextClient 1616 * \param [in] bufferForWriting True if buffers are used for sending data for writing 1617 This flag is only true for client and server-1 for communication with server-2 1618 */ 1619 std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize, 1620 CContextClient* contextClient, bool bufferForWriting /*= "false"*/) 1621 TRY 1622 { 1623 // As calendar attributes are sent even if there are no active files or fields, maps are initialized according the size of calendar attributes 1624 std::map<int, StdSize> attributesSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); 1625 maxEventSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); 1626 1627 std::vector<CFile*>& fileList = this->enabledFiles; 1628 size_t numEnabledFiles = fileList.size(); 1629 for (size_t i = 0; i < numEnabledFiles; ++i) 1630 { 1631 // CFile* file = this->enabledWriteModeFiles[i]; 1632 CFile* file = fileList[i]; 1633 std::vector<CField*> enabledFields = file->getEnabledFields(); 1634 size_t numEnabledFields = enabledFields.size(); 1635 for (size_t j = 0; j < numEnabledFields; ++j) 1829 void CContext::createCouplerInterCommunicator(void) 1830 TRY 1831 { 1832 int rank=this->getIntraCommRank() ; 1833 map<string,list<CCouplerOut*>> listCouplerOut ; 1834 map<string,list<CCouplerIn*>> listCouplerIn ; 1835 1836 for(auto couplerOut : enabledCouplerOut) listCouplerOut[couplerOut->getCouplingContextId()].push_back(couplerOut) ; 1837 for(auto couplerIn : enabledCouplerIn) listCouplerIn[couplerIn->getCouplingContextId()].push_back(couplerIn) ; 1838 1839 CCouplerManager* couplerManager = CXios::getCouplerManager() ; 1840 if (rank==0) 1841 { 1842 for(auto couplerOut : listCouplerOut) couplerManager->registerCoupling(this->getContextId(),couplerOut.first) ; 1843 for(auto couplerIn : listCouplerIn) couplerManager->registerCoupling(couplerIn.first,this->getContextId()) ; 1844 } 1845 1846 do 1847 { 1848 for(auto couplerOut : listCouplerOut) 1636 1849 { 1637 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting); 1638 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 1639 for (; it != itE; ++it) 1850 bool isNextCoupling ; 1851 if (rank==0) isNextCoupling = couplerManager->isNextCoupling(this->getContextId(),couplerOut.first) ; 1852 MPI_Bcast(&isNextCoupling,1,MPI_C_BOOL, 0, getIntraComm()) ; 1853 if (isNextCoupling) 1640 1854 { 1641 // If attributesSize[it->first] does not exist, it will be zero-initialized 1642 // so we can use it safely without checking for its existence 1643 if (attributesSize[it->first] < it->second) 1644 attributesSize[it->first] = it->second; 1645 1646 if (maxEventSize[it->first] < it->second) 1647 maxEventSize[it->first] = it->second; 1648 } 1855 addCouplingChanel(couplerOut.first, true) ; 1856 listCouplerOut.erase(couplerOut.first) ; 1857 break ; 1858 } 1649 1859 } 1650 } 1651 return attributesSize; 1652 } 1653 CATCH_DUMP_ATTR 1654 1655 /*! 1656 * Compute the required buffer size to send the fields data. 1657 * \param maxEventSize [in/out] the size of the bigger event for each connected server 1658 * \param [in] contextClient 1659 * \param [in] bufferForWriting True if buffers are used for sending data for writing 1660 This flag is only true for client and server-1 for communication with server-2 1661 */ 1662 std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize, 1663 CContextClient* contextClient, bool bufferForWriting /*= "false"*/) 1664 TRY 1665 { 1666 std::map<int, StdSize> dataSize; 1667 1668 // Find all reference domain and axis of all active fields 1669 std::vector<CFile*>& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles; 1670 size_t numEnabledFiles = fileList.size(); 1671 for (size_t i = 0; i < numEnabledFiles; ++i) 1672 { 1673 CFile* file = fileList[i]; 1674 if (file->getContextClient() == contextClient) 1675 { 1676 std::vector<CField*> enabledFields = file->getEnabledFields(); 1677 size_t numEnabledFields = enabledFields.size(); 1678 for (size_t j = 0; j < numEnabledFields; ++j) 1679 { 1680 // const std::vector<std::map<int, StdSize> > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient); 1681 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting); 1682 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 1683 for (; it != itE; ++it) 1684 { 1685 // If dataSize[it->first] does not exist, it will be zero-initialized 1686 // so we can use it safely without checking for its existance 1687 if (CXios::isOptPerformance) 1688 dataSize[it->first] += it->second; 1689 else if (dataSize[it->first] < it->second) 1690 dataSize[it->first] = it->second; 1691 1692 if (maxEventSize[it->first] < it->second) 1693 maxEventSize[it->first] = it->second; 1694 } 1695 } 1696 } 1697 } 1698 return dataSize; 1699 } 1700 CATCH_DUMP_ATTR 1701 1702 //! Client side: Send infomation of active files (files are enabled to write out) 1860 for(auto couplerIn : listCouplerIn) 1861 { 1862 bool isNextCoupling ; 1863 if (rank==0) isNextCoupling = couplerManager->isNextCoupling(couplerIn.first,this->getContextId()); 1864 MPI_Bcast(&isNextCoupling,1,MPI_C_BOOL, 0, getIntraComm()) ; 1865 if (isNextCoupling) 1866 { 1867 addCouplingChanel(couplerIn.first, false) ; 1868 listCouplerIn.erase(couplerIn.first) ; 1869 break ; 1870 } 1871 } 1872 1873 } while (!listCouplerOut.empty() || !listCouplerIn.empty()) ; 1874 1875 } 1876 CATCH_DUMP_ATTR 1877 1878 1879 //! Client side: Send infomation of active files (files are enabled to write out) 1703 1880 void CContext::sendEnabledFiles(const std::vector<CFile*>& activeFiles) 1704 1881 TRY … … 1734 1911 CATCH_DUMP_ATTR 1735 1912 1736 //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output 1737 void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput() 1738 TRY 1739 { 1740 if (!hasClient) return; 1741 1742 const vector<CAxis*> allAxis = CAxis::getAll(); 1743 for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++) 1744 (*it)->checkEligibilityForCompressedOutput(); 1745 1746 const vector<CDomain*> allDomains = CDomain::getAll(); 1747 for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++) 1748 (*it)->checkEligibilityForCompressedOutput(); 1749 1750 const vector<CGrid*> allGrids = CGrid::getAll(); 1751 for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++) 1752 (*it)->checkEligibilityForCompressedOutput(); 1753 } 1754 CATCH_DUMP_ATTR 1755 1913 1756 1914 //! Client side: Prepare the timeseries by adding the necessary files 1757 1915 void CContext::prepareTimeseries() 1758 1916 TRY 1759 1917 { 1760 if (!hasClient) return;1761 1762 1918 const std::vector<CFile*> allFiles = CFile::getAll(); 1763 1919 for (size_t i = 0; i < allFiles.size(); i++) … … 1856 2012 CATCH_DUMP_ATTR 1857 2013 1858 //! Client side: Send information of reference grid of active fields 1859 void CContext::sendRefGrid(const std::vector<CFile*>& activeFiles) 1860 TRY 1861 { 1862 std::set<StdString> gridIds; 1863 int sizeFile = activeFiles.size(); 1864 CFile* filePtr(NULL); 1865 1866 // Firstly, find all reference grids of all active fields 1867 for (int i = 0; i < sizeFile; ++i) 1868 { 1869 filePtr = activeFiles[i]; 1870 std::vector<CField*> enabledFields = filePtr->getEnabledFields(); 1871 int sizeField = enabledFields.size(); 1872 for (int numField = 0; numField < sizeField; ++numField) 1873 { 1874 if (0 != enabledFields[numField]->getRelGrid()) 1875 gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId()); 1876 } 1877 } 1878 1879 // Create all reference grids on server side 1880 StdString gridDefRoot("grid_definition"); 1881 CGridGroup* gridPtr = CGridGroup::get(gridDefRoot); 1882 std::set<StdString>::const_iterator it, itE = gridIds.end(); 1883 for (it = gridIds.begin(); it != itE; ++it) 1884 { 1885 gridPtr->sendCreateChild(*it); 1886 CGrid::get(*it)->sendAllAttributesToServer(); 1887 CGrid::get(*it)->sendAllDomains(); 1888 CGrid::get(*it)->sendAllAxis(); 1889 CGrid::get(*it)->sendAllScalars(); 1890 } 1891 } 1892 CATCH_DUMP_ATTR 1893 2014 1894 2015 //! Client side: Send information of reference domain, axis and scalar of active fields 1895 2016 void CContext::sendRefDomainsAxisScalars(const std::vector<CFile*>& activeFiles) 1896 2017 TRY 1897 2018 { 1898 std::set< StdString> domainIds, axisIds, scalarIds;2019 std::set<pair<StdString,CContextClient*>> domainIds, axisIds, scalarIds; 1899 2020 1900 2021 // Find all reference domain and axis of all active fields … … 1906 2027 for (int j = 0; j < numEnabledFields; ++j) 1907 2028 { 2029 CContextClient* contextClient=enabledFields[j]->getContextClient() ; 1908 2030 const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds(); 1909 if ("" != prDomAxisScalarId[0]) domainIds.insert( prDomAxisScalarId[0]);1910 if ("" != prDomAxisScalarId[1]) axisIds.insert( prDomAxisScalarId[1]);1911 if ("" != prDomAxisScalarId[2]) scalarIds.insert( prDomAxisScalarId[2]);2031 if ("" != prDomAxisScalarId[0]) domainIds.insert(make_pair(prDomAxisScalarId[0],contextClient)); 2032 if ("" != prDomAxisScalarId[1]) axisIds.insert(make_pair(prDomAxisScalarId[1],contextClient)); 2033 if ("" != prDomAxisScalarId[2]) scalarIds.insert(make_pair(prDomAxisScalarId[2],contextClient)); 1912 2034 } 1913 2035 } … … 1919 2041 StdString scalarDefRoot("scalar_definition"); 1920 2042 CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot); 1921 itE = scalarIds.end();1922 for ( itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)1923 { 1924 if (!itScalar-> empty())2043 2044 for (auto itScalar = scalarIds.begin(); itScalar != scalarIds.end(); ++itScalar) 2045 { 2046 if (!itScalar->first.empty()) 1925 2047 { 1926 scalarPtr->sendCreateChild( *itScalar);1927 CScalar::get( *itScalar)->sendAllAttributesToServer();2048 scalarPtr->sendCreateChild(itScalar->first,itScalar->second); 2049 CScalar::get(itScalar->first)->sendAllAttributesToServer(itScalar->second); 1928 2050 } 1929 2051 } … … 1931 2053 StdString axiDefRoot("axis_definition"); 1932 2054 CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot); 1933 itE = axisIds.end();1934 for ( itAxis = axisIds.begin(); itAxis != itE; ++itAxis)1935 { 1936 if (!itAxis-> empty())2055 2056 for (auto itAxis = axisIds.begin(); itAxis != axisIds.end(); ++itAxis) 2057 { 2058 if (!itAxis->first.empty()) 1937 2059 { 1938 axisPtr->sendCreateChild( *itAxis);1939 CAxis::get( *itAxis)->sendAllAttributesToServer();2060 axisPtr->sendCreateChild(itAxis->first, itAxis->second); 2061 CAxis::get(itAxis->first)->sendAllAttributesToServer(itAxis->second); 1940 2062 } 1941 2063 } … … 1944 2066 StdString domDefRoot("domain_definition"); 1945 2067 CDomainGroup* domPtr = CDomainGroup::get(domDefRoot); 1946 itE = domainIds.end();1947 for ( itDom = domainIds.begin(); itDom != itE; ++itDom)1948 { 1949 if (!itDom-> empty()) {1950 domPtr->sendCreateChild( *itDom);1951 CDomain::get( *itDom)->sendAllAttributesToServer();2068 2069 for (auto itDom = domainIds.begin(); itDom != domainIds.end(); ++itDom) 2070 { 2071 if (!itDom->first.empty()) { 2072 domPtr->sendCreateChild(itDom->first, itDom->second); 2073 CDomain::get(itDom->first)->sendAllAttributesToServer(itDom->second); 1952 2074 } 1953 2075 } 2076 } 2077 CATCH_DUMP_ATTR 2078 2079 void CContext::triggerLateFields(void) 2080 TRY 2081 { 2082 for(auto& field : fileInFields_) field->triggerLateField() ; 2083 for(auto& field : couplerInFields_) field->triggerLateField() ; 1954 2084 } 1955 2085 CATCH_DUMP_ATTR … … 1963 2093 if (prevStep < step) 1964 2094 { 1965 if ( hasClient && !hasServer) // For now we only use server level 1 to read data2095 if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data 1966 2096 { 1967 doPreTimestepOperationsForEnabledReadModeFiles();2097 triggerLateFields(); 1968 2098 } 1969 2099 … … 1975 2105 #endif 1976 2106 1977 if ( hasClient && !hasServer) // For now we only use server level 1 to read data2107 if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data 1978 2108 { 1979 2109 doPostTimestepOperationsForEnabledReadModeFiles(); … … 2007 2137 vector<CFile*>::const_iterator it; 2008 2138 2009 for (it=enabledFiles.begin(); it != enabledFiles.end(); it++)2010 //for (it=enabledWriteModeFiles.begin(); it != enabledWriteModeFiles.end(); it++)2139 //for (it=enabledFiles.begin(); it != enabledFiles.end(); it++) 2140 for (it=enabledWriteModeFiles.begin(); it != enabledWriteModeFiles.end(); it++) 2011 2141 { 2012 2142 (*it)->initWrite(); … … 2064 2194 { 2065 2195 CBufferIn* buffer=event.subEvents.begin()->buffer; 2066 string id; 2067 *buffer>>id; 2068 get(id)->recvRegistry(*buffer); 2196 getCurrent()->recvRegistry(*buffer); 2069 2197 } 2070 2198 CATCH … … 2087 2215 registryOut->hierarchicalGatherRegistry() ; 2088 2216 2089 // Use correct context client to send message 2090 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 2217 int nbSrvPools ; 2218 if (serviceType_==CServicesManager::CLIENT) nbSrvPools = 1 ; 2219 else if (serviceType_==CServicesManager::GATHERER) nbSrvPools = this->clientPrimServer.size() ; 2220 else nbSrvPools = 0 ; 2221 CContextClient* contextClientTmp ; 2222 2091 2223 for (int i = 0; i < nbSrvPools; ++i) 2092 2224 { 2093 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; 2225 if (serviceType_==CServicesManager::CLIENT) contextClientTmp = client ; 2226 else if (serviceType_==CServicesManager::GATHERER ) contextClientTmp = clientPrimServer[i] ; 2227 2094 2228 CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY); 2095 if (contextClientTmp->isServerLeader()) 2096 { 2097 CMessage msg ; 2098 if (hasServer) 2099 msg<<this->getIdServer(i); 2100 else 2101 msg<<this->getIdServer(); 2102 if (contextClientTmp->clientRank==0) msg<<*registryOut ; 2103 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 2104 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 2229 if (contextClientTmp->isServerLeader()) 2230 { 2231 CMessage msg ; 2232 if (contextClientTmp->clientRank==0) msg<<*registryOut ; 2233 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 2234 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 2105 2235 event.push(*itRank,1,msg); 2106 2107 2108 2236 contextClientTmp->sendEvent(event); 2237 } 2238 else contextClientTmp->sendEvent(event); 2109 2239 } 2110 2240 } 2111 2241 CATCH_DUMP_ATTR 2242 2243 2244 void CContext::sendFinalizeClient(CContextClient* contextClient, const string& contextClientId) 2245 TRY 2246 { 2247 CEventClient event(getType(),EVENT_ID_CONTEXT_FINALIZE_CLIENT); 2248 if (contextClient->isServerLeader()) 2249 { 2250 CMessage msg; 2251 msg<<contextClientId ; 2252 const std::list<int>& ranks = contextClient->getRanksServerLeader(); 2253 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 2254 event.push(*itRank,1,msg); 2255 contextClient->sendEvent(event); 2256 } 2257 else contextClient->sendEvent(event); 2258 } 2259 CATCH_DUMP_ATTR 2260 2261 2262 void CContext::recvFinalizeClient(CEventServer& event) 2263 TRY 2264 { 2265 CBufferIn* buffer=event.subEvents.begin()->buffer; 2266 string id; 2267 *buffer>>id; 2268 get(id)->recvFinalizeClient(*buffer); 2269 } 2270 CATCH 2271 2272 void CContext::recvFinalizeClient(CBufferIn& buffer) 2273 TRY 2274 { 2275 countChildContextFinalized_++ ; 2276 } 2277 CATCH_DUMP_ATTR 2278 2279 2280 2281 2282 //! Client side: Send a message announcing that context can receive grid definition from coupling 2283 void CContext::sendCouplerInReady(CContextClient* client) 2284 TRY 2285 { 2286 if (sendCouplerInReady_done_.count(client)!=0) return ; 2287 else sendCouplerInReady_done_.insert(client) ; 2288 2289 CEventClient event(getType(),EVENT_ID_COUPLER_IN_READY); 2290 2291 if (client->isServerLeader()) 2292 { 2293 CMessage msg; 2294 msg<<this->getId(); 2295 for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg); 2296 client->sendEvent(event); 2297 } 2298 else client->sendEvent(event); 2299 } 2300 CATCH_DUMP_ATTR 2301 2302 //! Server side: Receive a message announcing that context can send grid definition for context coupling 2303 void CContext::recvCouplerInReady(CEventServer& event) 2304 TRY 2305 { 2306 CBufferIn* buffer=event.subEvents.begin()->buffer; 2307 getCurrent()->recvCouplerInReady(*buffer); 2308 } 2309 CATCH 2310 2311 //! Server side: Receive a message announcing that context can send grid definition for context coupling 2312 void CContext::recvCouplerInReady(CBufferIn& buffer) 2313 TRY 2314 { 2315 string contextId ; 2316 buffer>>contextId; 2317 couplerInReady_.insert(getCouplerOutClient(contextId)) ; 2318 } 2319 CATCH_DUMP_ATTR 2320 2321 2322 2323 2324 2325 //! Client side: Send a message announcing that a coupling context have done it closeDefinition, so data can be sent now. 2326 void CContext::sendCouplerInCloseDefinition(CContextClient* client) 2327 TRY 2328 { 2329 if (sendCouplerInCloseDefinition_done_.count(client)!=0) return ; 2330 else sendCouplerInCloseDefinition_done_.insert(client) ; 2331 2332 CEventClient event(getType(),EVENT_ID_COUPLER_IN_CLOSE_DEFINITION); 2333 2334 if (client->isServerLeader()) 2335 { 2336 CMessage msg; 2337 msg<<this->getId(); 2338 for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg); 2339 client->sendEvent(event); 2340 } 2341 else client->sendEvent(event); 2342 } 2343 CATCH_DUMP_ATTR 2344 2345 //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now. 2346 void CContext::recvCouplerInCloseDefinition(CEventServer& event) 2347 TRY 2348 { 2349 CBufferIn* buffer=event.subEvents.begin()->buffer; 2350 getCurrent()->recvCouplerInCloseDefinition(*buffer); 2351 } 2352 CATCH 2353 2354 //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now. 2355 void CContext::recvCouplerInCloseDefinition(CBufferIn& buffer) 2356 TRY 2357 { 2358 string contextId ; 2359 buffer>>contextId; 2360 couplerInCloseDefinition_.insert(getCouplerOutClient(contextId)) ; 2361 } 2362 CATCH_DUMP_ATTR 2363 2364 2365 2366 2367 //! Client side: Send a message announcing that a coupling context have done it contextFinalize, so it can also close it own context. 2368 void CContext::sendCouplerInContextFinalized(CContextClient* client) 2369 TRY 2370 { 2371 if (sendCouplerInContextFinalized_done_.count(client)!=0) return ; 2372 else sendCouplerInContextFinalized_done_.insert(client) ; 2373 2374 CEventClient event(getType(),EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED); 2375 2376 if (client->isServerLeader()) 2377 { 2378 CMessage msg; 2379 msg<<this->getId(); 2380 for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg); 2381 client->sendEvent(event); 2382 } 2383 else client->sendEvent(event); 2384 } 2385 CATCH_DUMP_ATTR 2386 2387 //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context. 2388 void CContext::recvCouplerInContextFinalized(CEventServer& event) 2389 TRY 2390 { 2391 CBufferIn* buffer=event.subEvents.begin()->buffer; 2392 getCurrent()->recvCouplerInContextFinalized(*buffer); 2393 } 2394 CATCH 2395 2396 //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context. 2397 void CContext::recvCouplerInContextFinalized(CBufferIn& buffer) 2398 TRY 2399 { 2400 string contextId ; 2401 buffer>>contextId; 2402 couplerInContextFinalized_.insert(getCouplerOutClient(contextId)) ; 2403 } 2404 CATCH_DUMP_ATTR 2405 2406 2407 2112 2408 2113 2409 /*!
Note: See TracChangeset
for help on using the changeset viewer.