Changeset 1764 for XIOS/dev/dev_ym/XIOS_SERVICES/src/node
- Timestamp:
- 11/05/19 16:02:34 (5 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_SERVICES/src/node
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.cpp
r1761 r1764 36 36 , isPostProcessed(false), finalized(false) 37 37 , idServer_(), client(nullptr), server(nullptr) 38 , allProcessed(false), countChildC tx_(0), isProcessingEvent_(false)38 , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false) 39 39 40 40 { /* Ne rien faire de plus */ } … … 45 45 , isPostProcessed(false), finalized(false) 46 46 , idServer_(), client(nullptr), server(nullptr) 47 , allProcessed(false), countChildC tx_(0), isProcessingEvent_(false)47 , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false) 48 48 { /* Ne rien faire de plus */ } 49 49 … … 475 475 MPI_Comm_dup(intraComm_, &intraCommClient); 476 476 comms.push_back(intraCommClient); 477 477 // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context 478 478 server = new CContextServer(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ? 479 479 client = new CContextClient(this,intraCommClient,interCommClient); … … 687 687 CATCH_DUMP_ATTR 688 688 689 689 690 void CContext::globalEventLoop(void) 691 { 692 CXios::getDaemonsManager()->eventLoop() ; 693 setCurrent(getId()) ; 694 } 690 695 691 696 … … 693 698 TRY 694 699 { 695 if (hasClient && !hasServer) // For now we only use server level 1 to read data 700 registryOut->hierarchicalGatherRegistry() ; 701 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 702 703 if (hasClient && !hasServer) 696 704 { 697 doPreTimestepOperationsForEnabledReadModeFiles(); 698 } 699 // Send registry upon calling the function the first time 700 if (countChildCtx_ == 0) if (hasClient) sendRegistry() ; 701 702 // Client: 703 // (1) blocking send context finalize to its server 704 // (2) blocking receive context finalize from its server 705 // (3) some memory deallocations 706 if (CXios::isClient) 707 { 708 // Make sure that client (model) enters the loop only once 709 if (countChildCtx_ < 1) 710 { 711 ++countChildCtx_; 712 713 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 714 client->finalize(); 715 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 716 while (client->havePendingRequests()) client->checkBuffers(); 717 718 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 719 while (!server->hasFinished()) 720 server->eventLoop(); 721 info(100)<<"DEBUG: context "<<getId()<<" server has finished"<<endl ; 722 705 doPreTimestepOperationsForEnabledReadModeFiles(); // For now we only use server level 1 to read data 706 707 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 708 client->finalize(); 709 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 710 while (client->havePendingRequests()) client->checkBuffers(); 711 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 723 712 bool notifiedFinalized=false ; 724 713 do … … 727 716 } while (!notifiedFinalized) ; 728 717 client->releaseBuffers(); 729 730 if (hasServer) // Mode attache731 {732 closeAllFile();733 registryOut->hierarchicalGatherRegistry() ;734 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;735 }736 737 //! Deallocate client buffers738 // client->releaseBuffers();739 718 info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 740 //! Free internally allocated communicators 741 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 742 MPI_Comm_free(&(*it)); 743 comms.clear(); 744 745 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 746 } 747 } 748 else if (CXios::isServer) 749 { 750 // First context finalize message received from a model 751 // Send context finalize to its child contexts (if any) 752 if (countChildCtx_ == 0) 719 } 720 else if (hasClient && hasServer) 721 { 753 722 for (int i = 0; i < clientPrimServer.size(); ++i) 754 723 { … … 764 733 do 765 734 { 766 // clientPrimServer[i]->checkBuffers();767 735 notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ; 768 736 } while (!notifiedFinalized) ; 769 737 clientPrimServer[i]->releaseBuffers(); 770 738 } 771 772 773 // (Last) context finalized message received 774 if (countChildCtx_ == clientPrimServer.size()) 775 { 776 // Blocking send of context finalize message to its client (e.g. primary server or model) 777 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 778 client->finalize(); 779 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 780 bool bufferReleased; 781 do 782 { 783 client->checkBuffers(); 784 bufferReleased = !client->havePendingRequests(); 785 } while (!bufferReleased); 786 787 bool notifiedFinalized=false ; 788 do 789 { 790 // client->checkBuffers(); 791 notifiedFinalized=client->isNotifiedFinalized() ; 792 } while (!notifiedFinalized) ; 793 client->releaseBuffers(); 794 795 finalized = true; 796 info(100)<<"DEBUG: context "<<getId()<<" bufferRelease OK"<<endl ; 797 798 closeAllFile(); // Just move to here to make sure that server-level 1 can close files 739 closeAllFile(); 740 741 } 742 else if (!hasClient && hasServer) 743 { 744 closeAllFile(); 745 } 746 747 freeComms() ; 799 748 800 /* ym 801 if (hasServer && !hasClient) 802 { 803 registryOut->hierarchicalGatherRegistry() ; 804 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 805 } 806 */ 807 808 //! Deallocate client buffers 809 // client->releaseBuffers(); 810 info(100)<<"DEBUG: context "<<getId()<<" client release"<<endl ; 811 812 /* 813 for (int i = 0; i < clientPrimServer.size(); ++i) 814 clientPrimServer[i]->releaseBuffers(); 815 */ 816 //! Free internally allocated communicators 817 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 818 MPI_Comm_free(&(*it)); 819 comms.clear(); 820 821 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 822 } 823 824 ++countChildCtx_; 825 } 749 parentServerContext_->freeComm() ; 750 finalized = true; 751 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 826 752 } 827 753 CATCH_DUMP_ATTR … … 833 759 TRY 834 760 { 761 int countChildCtx_ ; // ym temporary 762 835 763 if (hasClient && !hasServer) // For now we only use server level 1 to read data 836 764 { … … 2555 2483 CATCH_DUMP_ATTR 2556 2484 2485 2486 void CContext::sendFinalizeClient(CContextClient* contextClient, const string& contextClientId) 2487 TRY 2488 { 2489 CEventClient event(getType(),EVENT_ID_CONTEXT_FINALIZE_CLIENT); 2490 if (contextClient->isServerLeader()) 2491 { 2492 CMessage msg; 2493 msg<<contextClientId ; 2494 const std::list<int>& ranks = contextClient->getRanksServerLeader(); 2495 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 2496 event.push(*itRank,1,msg); 2497 contextClient->sendEvent(event); 2498 } 2499 else contextClient->sendEvent(event); 2500 } 2501 CATCH_DUMP_ATTR 2502 2503 2504 void CContext::recvFinalizeClient(CEventServer& event) 2505 TRY 2506 { 2507 CBufferIn* buffer=event.subEvents.begin()->buffer; 2508 string id; 2509 *buffer>>id; 2510 get(id)->recvFinalizeClient(*buffer); 2511 } 2512 CATCH 2513 2514 void CContext::recvFinalizeClient(CBufferIn& buffer) 2515 TRY 2516 { 2517 countChildContextFinalized_++ ; 2518 } 2519 CATCH_DUMP_ATTR 2520 2521 2522 2523 2557 2524 /*! 2558 2525 * \fn bool CContext::isFinalized(void) -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.hpp
r1761 r1764 53 53 EVENT_ID_POST_PROCESS, EVENT_ID_SEND_REGISTRY, 54 54 EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES, 55 EVENT_ID_PROCESS_GRID_ENABLED_FIELDS 55 EVENT_ID_PROCESS_GRID_ENABLED_FIELDS, 56 EVENT_ID_CONTEXT_FINALIZE_CLIENT, 56 57 }; 57 58 … … 106 107 bool checkBuffersAndListen(bool enableEventsProcessing=true); 107 108 bool eventLoop(bool enableEventsProcessing=true); 109 void globalEventLoop(void); 108 110 109 111 // Finalize a context 110 112 void finalize(void); 113 111 114 void finalize_old(void); 112 115 bool isFinalized(void); … … 170 173 //!< after be gathered to the root process of the context, merged registry is sent to the root process of the servers 171 174 void sendRegistry(void) ; 175 void sendFinalizeClient(CContextClient* contextClient, const string& contextClientId); 176 172 177 173 178 const StdString& getIdServer(); … … 191 196 static void recvRegistry(CEventServer& event) ; 192 197 void recvRegistry(CBufferIn& buffer) ; //!< registry is received by the servers 193 198 static void recvFinalizeClient(CEventServer& event) ; 199 void recvFinalizeClient(CBufferIn& buffer); 200 194 201 void freeComms(void); //!< Free internally allcoated communicators 195 202 void releaseClientBuffers(void); //! Deallocate buffers allocated by clientContexts … … 281 288 bool allProcessed; 282 289 bool finalized; 283 int countChildC tx_; //!< Counter of child contexts (for now it is the number of secondary server pools)290 int countChildContextFinalized_; //!< Counter of child contexts (for now it is the number of secondary server pools) 284 291 StdString idServer_; 285 292 CGarbageCollector garbageCollector; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/field.cpp
r1761 r1764 635 635 636 636 //ym context->checkBuffersAndListen(); 637 context->eventLoop(); 637 //ym context->eventLoop(); 638 context->globalEventLoop(); 638 639 639 640 timer.suspend();
Note: See TracChangeset
for help on using the changeset viewer.