Changeset 1761 for XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.cpp
- Timestamp:
- 10/18/19 15:40:35 (5 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.cpp
r1757 r1761 20 20 #include "server.hpp" 21 21 #include "distribute_file_server2.hpp" 22 #include "services_manager.hpp" 23 #include "contexts_manager.hpp" 24 #include "cxios.hpp" 25 #include "client.hpp" 22 26 23 27 namespace xios { … … 31 35 , calendar(), hasClient(false), hasServer(false) 32 36 , isPostProcessed(false), finalized(false) 33 , idServer_(), client( 0), server(0)34 , allProcessed(false), countChildCtx_(0) 37 , idServer_(), client(nullptr), server(nullptr) 38 , allProcessed(false), countChildCtx_(0), isProcessingEvent_(false) 35 39 36 40 { /* Ne rien faire de plus */ } … … 40 44 , calendar(), hasClient(false), hasServer(false) 41 45 , isPostProcessed(false), finalized(false) 42 , idServer_(), client( 0), server(0)43 , allProcessed(false), countChildCtx_(0) 46 , idServer_(), client(nullptr), server(nullptr) 47 , allProcessed(false), countChildCtx_(0), isProcessingEvent_(false) 44 48 { /* Ne rien faire de plus */ } 45 49 … … 264 268 ///--------------------------------------------------------------- 265 269 266 //! Initialize client side 270 271 //! Initialize client side : old interface to be removed 267 272 void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/) 268 273 TRY … … 270 275 271 276 hasClient = true; 272 MPI_Comm intraCommServer, interCommServer; 277 MPI_Comm intraCommServer, interCommServer; 273 278 274 279 … … 383 388 CATCH_DUMP_ATTR 384 389 390 391 void CContext::init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType) 392 TRY 393 { 394 parentServerContext_ = parentServerContext ; 395 if (serviceType==CServicesManager::CLIENT) 396 initClient(intraComm, serviceType) ; 397 else 398 initServer(intraComm, serviceType) ; 399 } 400 CATCH_DUMP_ATTR 401 402 403 404 //! Initialize client side 405 void CContext::initClient(MPI_Comm intraComm, int serviceType) 406 TRY 407 { 408 intraComm_=intraComm ; 409 serviceType_ = CServicesManager::CLIENT ; 410 if (serviceType_==CServicesManager::CLIENT) 411 { 412 hasClient=true ; 413 hasServer=false ; 414 } 415 contextId_ = getId() ; 416 417 attached_mode=true ; 418 if (!CXios::isUsingServer()) attached_mode=false ; 419 420 421 string contextRegistryId=getId() ; 422 registryIn=new CRegistry(intraComm); 423 registryIn->setPath(contextRegistryId) ; 424 425 int commRank ; 426 MPI_Comm_rank(intraComm_,&commRank) ; 427 if (commRank==0) registryIn->fromFile("xios_registry.bin") ; 428 registryIn->bcastRegistry() ; 429 registryOut=new CRegistry(intraComm_) ; 430 registryOut->setPath(contextRegistryId) ; 431 432 } 433 CATCH_DUMP_ATTR 434 435 436 void CContext::initServer(MPI_Comm intraComm, int serviceType) 437 TRY 438 { 439 hasServer=true; 440 intraComm_=intraComm ; 441 serviceType_=serviceType ; 442 443 if (serviceType_==CServicesManager::GATHERER) 444 { 445 hasClient=true ; 446 hasServer=true ; 447 } 448 else if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 449 { 450 hasClient=false ; 451 hasServer=true ; 452 } 453 454 CXios::getContextsManager()->getContextId(getId(), contextId_, intraComm) ; 455 456 registryIn=new CRegistry(intraComm); 457 registryIn->setPath(contextId_) ; 458 459 int commRank ; 460 MPI_Comm_rank(intraComm_,&commRank) ; 461 if (commRank==0) registryIn->fromFile("xios_registry.bin") ; 462 463 registryIn->bcastRegistry() ; 464 registryOut=new CRegistry(intraComm) ; 465 registryOut->setPath(contextId_) ; 466 467 } 468 CATCH_DUMP_ATTR 469 470 471 void CContext::createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) // for servers 472 TRY 473 { 474 MPI_Comm intraCommClient ; 475 MPI_Comm_dup(intraComm_, &intraCommClient); 476 comms.push_back(intraCommClient); 477 478 server = new CContextServer(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ? 479 client = new CContextClient(this,intraCommClient,interCommClient); 480 481 } 482 CATCH_DUMP_ATTR 483 484 void CContext::createServerInterComm(void) 485 TRY 486 { 487 488 MPI_Comm interCommClient, interCommServer ; 489 490 if (serviceType_ == CServicesManager::CLIENT) 491 { 492 493 int commRank ; 494 MPI_Comm_rank(intraComm_,&commRank) ; 495 if (commRank==0) 496 { 497 if (attached_mode) CXios::getContextsManager()->createServerContext(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, getContextId()) ; 498 else if (CXios::usingServer2) CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId()) ; 499 else CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId()) ; 500 } 501 502 MPI_Comm interComm ; 503 504 if (attached_mode) 505 { 506 parentServerContext_->createIntercomm(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, getContextId(), intraComm_, 507 interCommClient, interCommServer) ; 508 int type ; 509 if (commRank==0) CXios::getServicesManager()->getServiceType(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, type) ; 510 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 511 setIdServer(CXios::getContextsManager()->getServerContextName(CClient::getPoolRessource()->getId(), CXios::defaultServerId, 0, type, getContextId())) ; 512 setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble 513 } 514 else if (CXios::usingServer2) 515 { 516 // CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, interComm) ; 517 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, getContextId(), intraComm_, 518 interCommClient, interCommServer) ; 519 int type ; 520 if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultGathererId, 0, type) ; 521 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 522 setIdServer(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultGathererId, 0, type, getContextId())) ; 523 } 524 else 525 { 526 //CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, interComm) ; 527 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, 0, getContextId(), intraComm_, 528 interCommClient, interCommServer) ; 529 int type ; 530 if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ; 531 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 532 setIdServer(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultServerId, 0, type, getContextId())) ; 533 } 534 535 // intraComm client is not duplicated. In all the code we use client->intraComm for MPI 536 // in future better to replace it by intracommuncator associated to the context 537 538 /* MPI_Comm intraCommClient, intraCommServer ; 539 MPI_Comm interCommClient, interCommServer ; 540 541 intraCommClient=intraComm_ ; 542 MPI_Comm_dup(intraComm_, &intraCommServer) ; 543 544 interCommClient=interComm ; 545 MPI_Comm_dup(interComm, &interCommServer) ; */ 546 547 MPI_Comm intraCommClient, intraCommServer ; 548 intraCommClient=intraComm_ ; 549 MPI_Comm_dup(intraComm_, &intraCommServer) ; 550 client = new CContextClient(this, intraCommClient, interCommClient); 551 server = new CContextServer(this, intraCommServer, interCommServer); 552 553 } 554 555 if (serviceType_ == CServicesManager::GATHERER) 556 { 557 int commRank ; 558 MPI_Comm_rank(intraComm_,&commRank) ; 559 560 int nbPartitions ; 561 if (commRank==0) 562 { 563 CXios::getServicesManager()->getServiceNbPartitions(CXios::defaultPoolId, CXios::defaultServerId, 0, nbPartitions) ; 564 for(int i=0 ; i<nbPartitions; i++) 565 CXios::getContextsManager()->createServerContext(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId()) ; 566 } 567 MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ; 568 569 MPI_Comm interComm ; 570 for(int i=0 ; i<nbPartitions; i++) 571 { 572 // CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interComm) ; 573 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 574 int type ; 575 if (commRank==0) CXios::getServicesManager()->getServiceType(CXios::defaultPoolId, CXios::defaultServerId, 0, type) ; 576 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 577 primServerId_.push_back(CXios::getContextsManager()->getServerContextName(CXios::defaultPoolId, CXios::defaultServerId, i, type, getContextId())) ; 578 579 // intraComm client is not duplicated. In all the code we use client->intraComm for MPI 580 // in future better to replace it by intracommuncator associated to the context 581 582 MPI_Comm intraCommClient, intraCommServer ; 583 // MPI_Comm interCommClient, interCommServer ; 584 585 intraCommClient=intraComm_ ; 586 MPI_Comm_dup(intraComm_, &intraCommServer) ; 587 588 // interCommClient=interComm ; 589 // MPI_Comm_dup(interComm, &interCommServer) ; 590 591 clientPrimServer.push_back(new CContextClient(this, intraCommClient, interCommClient)); 592 serverPrimServer.push_back(new CContextServer(this, intraCommServer, interCommServer)); 593 594 } 595 } 596 } 597 CATCH_DUMP_ATTR 598 599 385 600 void CContext::initServer(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtClient /*= 0*/) 386 601 TRY … … 418 633 } 419 634 CATCH_DUMP_ATTR 635 636 637 638 bool CContext::eventLoop(bool enableEventsProcessing) 639 { 640 bool finished=true; 641 642 if (client!=nullptr && !finalized) client->checkBuffers(); 643 644 for (int i = 0; i < clientPrimServer.size(); ++i) 645 { 646 if (!finalized) clientPrimServer[i]->checkBuffers(); 647 if (!finalized) finished &= serverPrimServer[i]->eventLoop(enableEventsProcessing); 648 } 649 650 if (server!=nullptr) if (!finalized) finished &= server->eventLoop(enableEventsProcessing); 651 652 return finalized && finished ; 653 } 420 654 421 655 //! Try to send the buffers and receive possible answers … … 451 685 } 452 686 } 453 CATCH_DUMP_ATTR 454 455 //! Terminate a context 687 CATCH_DUMP_ATTR 688 689 690 691 456 692 void CContext::finalize(void) 457 693 TRY … … 462 698 } 463 699 // Send registry upon calling the function the first time 464 if (countChildCtx_ == 0) 465 if (hasClient) sendRegistry() ; 700 if (countChildCtx_ == 0) if (hasClient) sendRegistry() ; 466 701 467 702 // Client: … … 562 797 563 798 closeAllFile(); // Just move to here to make sure that server-level 1 can close files 799 800 /* ym 564 801 if (hasServer && !hasClient) 565 802 { … … 567 804 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 568 805 } 806 */ 569 807 570 808 //! Deallocate client buffers … … 589 827 CATCH_DUMP_ATTR 590 828 829 830 831 //! Terminate a context 832 void CContext::finalize_old(void) 833 TRY 834 { 835 if (hasClient && !hasServer) // For now we only use server level 1 to read data 836 { 837 doPreTimestepOperationsForEnabledReadModeFiles(); 838 } 839 // Send registry upon calling the function the first time 840 if (countChildCtx_ == 0) if (hasClient) sendRegistry() ; 841 842 // Client: 843 // (1) blocking send context finalize to its server 844 // (2) blocking receive context finalize from its server 845 // (3) some memory deallocations 846 if (CXios::isClient) 847 { 848 // Make sure that client (model) enters the loop only once 849 if (countChildCtx_ < 1) 850 { 851 ++countChildCtx_; 852 853 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 854 client->finalize(); 855 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 856 while (client->havePendingRequests()) client->checkBuffers(); 857 858 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 859 while (!server->hasFinished()) 860 server->eventLoop(); 861 info(100)<<"DEBUG: context "<<getId()<<" server has finished"<<endl ; 862 863 bool notifiedFinalized=false ; 864 do 865 { 866 notifiedFinalized=client->isNotifiedFinalized() ; 867 } while (!notifiedFinalized) ; 868 client->releaseBuffers(); 869 870 if (hasServer) // Mode attache 871 { 872 closeAllFile(); 873 registryOut->hierarchicalGatherRegistry() ; 874 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 875 } 876 877 //! Deallocate client buffers 878 // client->releaseBuffers(); 879 info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 880 //! Free internally allocated communicators 881 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 882 MPI_Comm_free(&(*it)); 883 comms.clear(); 884 885 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 886 } 887 } 888 else if (CXios::isServer) 889 { 890 // First context finalize message received from a model 891 // Send context finalize to its child contexts (if any) 892 if (countChildCtx_ == 0) 893 for (int i = 0; i < clientPrimServer.size(); ++i) 894 { 895 clientPrimServer[i]->finalize(); 896 bool bufferReleased; 897 do 898 { 899 clientPrimServer[i]->checkBuffers(); 900 bufferReleased = !clientPrimServer[i]->havePendingRequests(); 901 } while (!bufferReleased); 902 903 bool notifiedFinalized=false ; 904 do 905 { 906 // clientPrimServer[i]->checkBuffers(); 907 notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ; 908 } while (!notifiedFinalized) ; 909 clientPrimServer[i]->releaseBuffers(); 910 } 911 912 913 // (Last) context finalized message received 914 if (countChildCtx_ == clientPrimServer.size()) 915 { 916 // Blocking send of context finalize message to its client (e.g. primary server or model) 917 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 918 client->finalize(); 919 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 920 bool bufferReleased; 921 do 922 { 923 client->checkBuffers(); 924 bufferReleased = !client->havePendingRequests(); 925 } while (!bufferReleased); 926 927 bool notifiedFinalized=false ; 928 do 929 { 930 // client->checkBuffers(); 931 notifiedFinalized=client->isNotifiedFinalized() ; 932 } while (!notifiedFinalized) ; 933 client->releaseBuffers(); 934 935 finalized = true; 936 info(100)<<"DEBUG: context "<<getId()<<" bufferRelease OK"<<endl ; 937 938 closeAllFile(); // Just move to here to make sure that server-level 1 can close files 939 940 /* ym 941 if (hasServer && !hasClient) 942 { 943 registryOut->hierarchicalGatherRegistry() ; 944 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 945 } 946 */ 947 948 //! Deallocate client buffers 949 // client->releaseBuffers(); 950 info(100)<<"DEBUG: context "<<getId()<<" client release"<<endl ; 951 952 /* 953 for (int i = 0; i < clientPrimServer.size(); ++i) 954 clientPrimServer[i]->releaseBuffers(); 955 */ 956 //! Free internally allocated communicators 957 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 958 MPI_Comm_free(&(*it)); 959 comms.clear(); 960 961 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 962 } 963 964 ++countChildCtx_; 965 } 966 } 967 CATCH_DUMP_ATTR 968 591 969 //! Free internally allocated communicators 592 970 void CContext::freeComms(void) … … 614 992 if (allProcessed) return; 615 993 994 // create intercommunicator with servers. 995 // not sure it is the good place to be called here 996 createServerInterComm() ; 997 998 616 999 // After xml is parsed, there are some more works with post processing 617 1000 postProcessing(); … … 734 1117 TRY 735 1118 { 736 CTimer::get("Context : close definition").resume() ; 1119 CTimer::get("Context : close definition").resume() ; 1120 1121 // 737 1122 postProcessingGlobalAttributes(); 738 1123 … … 1548 1933 CATCH_DUMP_ATTR 1549 1934 1935 void CContext::setIdServer(const StdString& idServer) 1936 TRY 1937 { 1938 idServer_=idServer ; 1939 } 1940 CATCH_DUMP_ATTR 1941 1942 1943 const StdString& CContext::getIdServer() 1944 TRY 1945 { 1946 return idServer_; 1947 } 1948 CATCH_DUMP_ATTR 1949 1950 const StdString& CContext::getIdServer(const int i) 1951 TRY 1952 { 1953 // return idServer_ + std::to_string(static_cast<unsigned long long>(i)); 1954 return primServerId_[i] ; 1955 } 1956 CATCH_DUMP_ATTR 1957 1958 /* 1550 1959 const StdString& CContext::getIdServer() 1551 1960 TRY … … 1570 1979 } 1571 1980 CATCH_DUMP_ATTR 1981 */ 1982 1572 1983 1573 1984 /*!
Note: See TracChangeset
for help on using the changeset viewer.