Changeset 1765 for XIOS/dev/dev_ym/XIOS_SERVICES/src/node
- Timestamp:
- 11/06/19 11:03:38 (5 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_SERVICES/src/node
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.cpp
r1764 r1765 269 269 270 270 271 //! Initialize client side : old interface to be removed 272 void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/) 273 TRY 274 { 275 276 hasClient = true; 277 MPI_Comm intraCommServer, interCommServer; 278 279 280 if (CServer::serverLevel != 1) 281 // initClient is called by client 282 { 283 client = new CContextClient(this, intraComm, interComm, cxtServer); 284 if (cxtServer) // Attached mode 285 { 286 intraCommServer = intraComm; 287 interCommServer = interComm; 288 } 289 else 290 { 291 MPI_Comm_dup(intraComm, &intraCommServer); 292 comms.push_back(intraCommServer); 293 MPI_Comm_dup(interComm, &interCommServer); 294 comms.push_back(interCommServer); 295 } 296 /* for registry take the id of client context */ 297 /* for servers, supress the _server_ from id */ 298 string contextRegistryId=getId() ; 299 size_t pos=contextRegistryId.find("_server_") ; 300 if (pos!=std::string::npos) contextRegistryId=contextRegistryId.substr(0,pos) ; 301 302 registryIn=new CRegistry(intraComm); 303 registryIn->setPath(contextRegistryId) ; 304 if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ; 305 registryIn->bcastRegistry() ; 306 registryOut=new CRegistry(intraComm) ; 307 308 registryOut->setPath(contextRegistryId) ; 309 310 server = new CContextServer(this, intraCommServer, interCommServer); 311 } 312 else 313 // initClient is called by primary server 314 { 315 clientPrimServer.push_back(new CContextClient(this, intraComm, interComm)); 316 MPI_Comm_dup(intraComm, &intraCommServer); 317 comms.push_back(intraCommServer); 318 MPI_Comm_dup(interComm, &interCommServer); 319 comms.push_back(interCommServer); 320 serverPrimServer.push_back(new CContextServer(this, intraCommServer, interCommServer)); 321 } 322 } 323 CATCH_DUMP_ATTR 324 325 /*! 271 /*! 326 272 Sets client buffers. 327 273 \param [in] contextClient … … 535 481 // intraComm client is not duplicated. In all the code we use client->intraComm for MPI 536 482 // in future better to replace it by intracommuncator associated to the context 537 538 /* MPI_Comm intraCommClient, intraCommServer ; 539 MPI_Comm interCommClient, interCommServer ; 540 541 intraCommClient=intraComm_ ; 542 MPI_Comm_dup(intraComm_, &intraCommServer) ; 543 544 interCommClient=interComm ; 545 MPI_Comm_dup(interComm, &interCommServer) ; */ 546 483 547 484 MPI_Comm intraCommClient, intraCommServer ; 548 485 intraCommClient=intraComm_ ; … … 570 507 for(int i=0 ; i<nbPartitions; i++) 571 508 { 572 // CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interComm) ;573 509 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 574 510 int type ; … … 581 517 582 518 MPI_Comm intraCommClient, intraCommServer ; 583 // MPI_Comm interCommClient, interCommServer ;584 519 585 520 intraCommClient=intraComm_ ; 586 521 MPI_Comm_dup(intraComm_, &intraCommServer) ; 587 522 588 // interCommClient=interComm ;589 // MPI_Comm_dup(interComm, &interCommServer) ;590 523 591 524 clientPrimServer.push_back(new CContextClient(this, intraCommClient, interCommClient)); … … 653 586 } 654 587 655 //! Try to send the buffers and receive possible answers 656 bool CContext::checkBuffersAndListen(bool enableEventsProcessing /*= true*/) 657 TRY 658 { 659 bool clientReady, serverFinished; 660 661 // Only classical servers are non-blocking 662 if (CServer::serverLevel == 0) 663 { 664 client->checkBuffers(); 665 return server->eventLoop(true); 666 } 667 else if (CServer::serverLevel == 1) 668 { 669 if (!finalized) client->checkBuffers(); 670 bool serverFinished = true; 671 if (!finalized) serverFinished = server->eventLoop(enableEventsProcessing); 672 bool serverPrimFinished = true; 673 for (int i = 0; i < clientPrimServer.size(); ++i) 674 { 675 if (!finalized) clientPrimServer[i]->checkBuffers(); 676 if (!finalized) serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); 677 } 678 return ( serverFinished && serverPrimFinished); 679 } 680 681 else if (CServer::serverLevel == 2) 682 { 683 client->checkBuffers(); 684 return server->eventLoop(enableEventsProcessing); 685 } 686 } 687 CATCH_DUMP_ATTR 688 588 689 589 690 590 void CContext::globalEventLoop(void) … … 750 650 finalized = true; 751 651 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 752 }753 CATCH_DUMP_ATTR754 755 756 757 //! Terminate a context758 void CContext::finalize_old(void)759 TRY760 {761 int countChildCtx_ ; // ym temporary762 763 if (hasClient && !hasServer) // For now we only use server level 1 to read data764 {765 doPreTimestepOperationsForEnabledReadModeFiles();766 }767 // Send registry upon calling the function the first time768 if (countChildCtx_ == 0) if (hasClient) sendRegistry() ;769 770 // Client:771 // (1) blocking send context finalize to its server772 // (2) blocking receive context finalize from its server773 // (3) some memory deallocations774 if (CXios::isClient)775 {776 // Make sure that client (model) enters the loop only once777 if (countChildCtx_ < 1)778 {779 ++countChildCtx_;780 781 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ;782 client->finalize();783 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ;784 while (client->havePendingRequests()) client->checkBuffers();785 786 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ;787 while (!server->hasFinished())788 server->eventLoop();789 info(100)<<"DEBUG: context "<<getId()<<" server has finished"<<endl ;790 791 bool notifiedFinalized=false ;792 do793 {794 notifiedFinalized=client->isNotifiedFinalized() ;795 } while (!notifiedFinalized) ;796 client->releaseBuffers();797 798 if (hasServer) // Mode attache799 {800 closeAllFile();801 registryOut->hierarchicalGatherRegistry() ;802 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;803 }804 805 //! Deallocate client buffers806 // client->releaseBuffers();807 info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ;808 //! Free internally allocated communicators809 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)810 MPI_Comm_free(&(*it));811 comms.clear();812 813 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl;814 }815 }816 else if (CXios::isServer)817 {818 // First context finalize message received from a model819 // Send context finalize to its child contexts (if any)820 if (countChildCtx_ == 0)821 for (int i = 0; i < clientPrimServer.size(); ++i)822 {823 clientPrimServer[i]->finalize();824 bool bufferReleased;825 do826 {827 clientPrimServer[i]->checkBuffers();828 bufferReleased = !clientPrimServer[i]->havePendingRequests();829 } while (!bufferReleased);830 831 bool notifiedFinalized=false ;832 do833 {834 // clientPrimServer[i]->checkBuffers();835 notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ;836 } while (!notifiedFinalized) ;837 clientPrimServer[i]->releaseBuffers();838 }839 840 841 // (Last) context finalized message received842 if (countChildCtx_ == clientPrimServer.size())843 {844 // Blocking send of context finalize message to its client (e.g. primary server or model)845 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ;846 client->finalize();847 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ;848 bool bufferReleased;849 do850 {851 client->checkBuffers();852 bufferReleased = !client->havePendingRequests();853 } while (!bufferReleased);854 855 bool notifiedFinalized=false ;856 do857 {858 // client->checkBuffers();859 notifiedFinalized=client->isNotifiedFinalized() ;860 } while (!notifiedFinalized) ;861 client->releaseBuffers();862 863 finalized = true;864 info(100)<<"DEBUG: context "<<getId()<<" bufferRelease OK"<<endl ;865 866 closeAllFile(); // Just move to here to make sure that server-level 1 can close files867 868 /* ym869 if (hasServer && !hasClient)870 {871 registryOut->hierarchicalGatherRegistry() ;872 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;873 }874 */875 876 //! Deallocate client buffers877 // client->releaseBuffers();878 info(100)<<"DEBUG: context "<<getId()<<" client release"<<endl ;879 880 /*881 for (int i = 0; i < clientPrimServer.size(); ++i)882 clientPrimServer[i]->releaseBuffers();883 */884 //! Free internally allocated communicators885 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)886 MPI_Comm_free(&(*it));887 comms.clear();888 889 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl;890 }891 892 ++countChildCtx_;893 }894 652 } 895 653 CATCH_DUMP_ATTR -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.hpp
r1764 r1765 91 91 public : 92 92 // Initialize server or client 93 void initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer = 0);94 93 void init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType); 95 94 void initClient(MPI_Comm intraComm, int serviceType); … … 105 104 106 105 // Put sever or client into loop state 107 bool checkBuffersAndListen(bool enableEventsProcessing=true);108 106 bool eventLoop(bool enableEventsProcessing=true); 109 107 void globalEventLoop(void); … … 112 110 void finalize(void); 113 111 114 void finalize_old(void);115 112 bool isFinalized(void); 116 113
Note: See TracChangeset
for help on using the changeset viewer.