Changeset 2407
- Timestamp:
- 09/21/22 15:53:51 (21 months ago)
- Location:
- XIOS3/trunk/src
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/client.cpp
r2335 r2407 434 434 435 435 getPoolRessource()->createService(contextComm, id, 0, CServicesManager::CLIENT, 1) ; 436 getPoolRessource()->createService(contextComm, id+"_"+CXios::defaultServerId, 0, CServicesManager::IO_SERVER, 1) ; 436 getPoolRessource()->createService(contextComm, id+"_"+CXios::defaultWriterId, 0, CServicesManager::WRITER, 1) ; 437 getPoolRessource()->createService(contextComm, id+"_"+CXios::defaultReaderId, 0, CServicesManager::READER, 1) ; 437 438 438 439 if (commRank==0) while (!CXios::getServicesManager()->hasService(getPoolRessource()->getId(), id, 0)) { CXios::getDaemonsManager()->eventLoop();} -
XIOS3/trunk/src/cxios.cpp
r2335 r2407 26 26 const string CXios::defaultPoolId="default_pool_id" ; 27 27 const string CXios::defaultServerId="default_server_id" ; 28 const string CXios::defaultWriterId="default_writer_id" ; 29 const string CXios::defaultReaderId="default_reader_id" ; 28 30 const string CXios::defaultGathererId="default_gatherer_id" ; 29 31 const string CXios::defaultServicesId="default_services_id" ; -
XIOS3/trunk/src/cxios.hpp
r2335 r2407 71 71 static const string defaultServerId ; 72 72 static const string defaultGathererId ; 73 static const string defaultWriterId ; 74 static const string defaultReaderId ; 73 75 static const string defaultServicesId ; 74 76 -
XIOS3/trunk/src/node/context.cpp
r2406 r2407 515 515 hasServer=true ; 516 516 } 517 else if (serviceType_==CServicesManager:: IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)517 else if (serviceType_==CServicesManager::WRITER || serviceType_==CServicesManager::READER) 518 518 { 519 519 hasClient=false ; … … 549 549 client->setAssociatedServer(server) ; 550 550 server->setAssociatedClient(client) ; 551 552 writerServerIn_.push_back(server) ; 553 writerClientIn_.push_back(client) ; 551 552 if (serviceType_ == CServicesManager::GATHERER || serviceType_ == CServicesManager::WRITER) 553 { 554 writerServerIn_.push_back(server) ; 555 writerClientIn_.push_back(client) ; 556 } 557 else if (serviceType_ == CServicesManager::READER) 558 { 559 readerServerIn_.push_back(server) ; 560 readerClientIn_.push_back(client) ; 561 } 554 562 } 555 563 CATCH_DUMP_ATTR … … 592 600 593 601 clientServers.push_back({fullServerId,{client,server}}) ; 602 clientsId_[client] = fullServerId ; 603 serversId_[server] = fullServerId ; 594 604 } 595 605 } … … 603 613 if (serviceType_ == CServicesManager::CLIENT) 604 614 { 605 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::default ServerId, clientServers) ;615 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultWriterId, clientServers) ; 606 616 else if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ; 607 else createServerInterComm(CXios::defaultPoolId, CXios::default ServerId, clientServers) ;617 else createServerInterComm(CXios::defaultPoolId, CXios::defaultWriterId, clientServers) ; 608 618 609 619 writerClientOut_.push_back(clientServers[0].second.first) ; 610 620 writerServerOut_.push_back(clientServers[0].second.second) ; 621 622 clientServers.clear() ; 623 624 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultReaderId, clientServers) ; 625 else createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ; 626 readerClientOut_.push_back(clientServers[0].second.first) ; 627 readerServerOut_.push_back(clientServers[0].second.second) ; 628 629 611 630 } 612 631 else if (serviceType_ == CServicesManager::GATHERER) 613 632 { 614 createServerInterComm(CXios::defaultPoolId, CXios::default ServerId, clientServers) ;633 createServerInterComm(CXios::defaultPoolId, CXios::defaultWriterId, clientServers) ; 615 634 for(auto& clientServer : clientServers) 616 635 { 617 primServerId_.push_back(clientServer.first) ;618 clientPrimServer.push_back(clientServer.second.first);619 serverPrimServer.push_back(clientServer.second.second);620 636 writerClientOut_.push_back(clientServer.second.first) ; 621 637 writerServerOut_.push_back(clientServer.second.second) ; … … 672 688 for(auto client : writerClientIn_) client->eventLoop(); 673 689 for(auto server : writerServerIn_) finished &= server->eventLoop(enableEventsProcessing); 690 for(auto client : readerClientOut_) client->eventLoop(); 691 for(auto server : readerServerOut_) finished &= server->eventLoop(enableEventsProcessing); 692 for(auto client : readerClientIn_) client->eventLoop(); 693 for(auto server : readerServerIn_) finished &= server->eventLoop(enableEventsProcessing); 674 694 for(auto couplerOut : couplerOutClient_) couplerOut.second->eventLoop(); 675 695 for(auto couplerIn : couplerInClient_) couplerIn.second->eventLoop(); … … 756 776 } while (!couplersInFinalized) ; 757 777 758 759 auto& client=writerClientOut_[0] ; 760 auto& server=writerServerOut_[0] ; 761 762 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 763 client->finalize(); 764 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 765 while (client->havePendingRequests()) client->eventLoop(); 766 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 767 bool notifiedFinalized=false ; 768 do 778 CContextClient* client ; 779 CContextServer* server ; 780 781 if (writerClientOut_.size()!=0) 769 782 { 770 notifiedFinalized = client->isNotifiedFinalized() ; 771 } while (!notifiedFinalized) ; 772 773 server->releaseBuffers(); 774 client->releaseBuffers(); 775 info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 783 client=writerClientOut_[0] ; 784 server=writerServerOut_[0] ; 785 786 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to writer"<<endl ; 787 client->finalize(); 788 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to writer"<<endl ; 789 while (client->havePendingRequests()) client->eventLoop(); 790 info(100)<<"DEBUG: context "<<getId()<<" no pending request on writer ok"<<endl ; 791 bool notifiedFinalized=false ; 792 do 793 { 794 notifiedFinalized = client->isNotifiedFinalized() ; 795 } while (!notifiedFinalized) ; 796 797 server->releaseBuffers(); 798 client->releaseBuffers(); 799 info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 800 } 801 802 if (readerClientOut_.size()!=0) 803 { 804 client=readerClientOut_[0] ; 805 server=readerServerOut_[0] ; 806 807 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to reader"<<endl ; 808 client->finalize(); 809 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to reader"<<endl ; 810 while (client->havePendingRequests()) client->eventLoop(); 811 info(100)<<"DEBUG: context "<<getId()<<" no pending request on reader ok"<<endl ; 812 bool notifiedFinalized=false ; 813 do 814 { 815 notifiedFinalized = client->isNotifiedFinalized() ; 816 } while (!notifiedFinalized) ; 817 818 server->releaseBuffers(); 819 client->releaseBuffers(); 820 info(100)<<"DEBUG: context "<<getId()<<" release client reader ok"<<endl ; 821 } 776 822 } 777 823 else if (serviceType_==CServicesManager::GATHERER) … … 797 843 //ym writerClientIn & writerServerIn not released here ==> to check !! 798 844 } 799 else if (serviceType_==CServicesManager:: IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)845 else if (serviceType_==CServicesManager::WRITER) 800 846 { 801 847 closeAllFile(); 802 848 writerClientIn_[0]->releaseBuffers(); 803 849 writerServerIn_[0]->releaseBuffers(); 850 } 851 else if (serviceType_==CServicesManager::READER) 852 { 853 closeAllFile(); 854 readerClientIn_[0]->releaseBuffers(); 855 readerServerIn_[0]->releaseBuffers(); 804 856 } 805 857 … … 969 1021 970 1022 // client side, assign context for file reading 971 if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient( writerClientOut_[0]) ;1023 if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 972 1024 973 1025 // server side, assign context where to send file data read 974 if (serviceType_==CServicesManager::CServicesManager::GATHERER || serviceType_==CServicesManager::IO_SERVER) 975 for(auto file : this->enabledReadModeFiles) file->setContextClient(writerClientIn_[0]) ; 1026 if (serviceType_==CServicesManager::READER) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientIn_[0]) ; 976 1027 977 1028 … … 1000 1051 1001 1052 // workflow endpoint => write to file 1002 if (serviceType_==CServicesManager:: IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)1053 if (serviceType_==CServicesManager::WRITER) 1003 1054 { 1004 1055 for(auto field : fileOutField) … … 1009 1060 1010 1061 // workflow endpoint => Send data from server to client 1011 if (serviceType_==CServicesManager:: IO_SERVER || serviceType_==CServicesManager::GATHERER)1062 if (serviceType_==CServicesManager::READER || serviceType_==CServicesManager::GATHERER) 1012 1063 { 1013 1064 for(auto field : fileInField) … … 1035 1086 1036 1087 // workflow startpoint => data from client on server side 1037 if (serviceType_==CServicesManager:: IO_SERVER || serviceType_==CServicesManager::GATHERER || serviceType_==CServicesManager::OUT_SERVER)1088 if (serviceType_==CServicesManager::WRITER || serviceType_==CServicesManager::GATHERER) 1038 1089 { 1039 1090 for(auto field : fieldModelIn) … … 1056 1107 1057 1108 // workflow startpoint => data read from file on server side 1058 if (serviceType_==CServicesManager:: IO_SERVER || serviceType_==CServicesManager::GATHERER)1109 if (serviceType_==CServicesManager::READER) 1059 1110 { 1060 1111 for(auto field : fileInField) … … 1065 1116 1066 1117 // construct slave server list 1118 map<string, CContextClient*> slaves ; // need an ordered list ; 1067 1119 if (serviceType_==CServicesManager::CLIENT) 1068 1120 { 1069 for(auto field : fileOutField) slave Servers_.insert(field->getContextClient()) ;1070 for(auto field : fileInField) slave Servers_.insert(field->getContextClient()) ;1121 for(auto field : fileOutField) slaves[clientsId_[field->getContextClient()]] = field->getContextClient() ; 1122 for(auto field : fileInField) slaves[clientsId_[field->getContextClient()]] = field->getContextClient() ; 1071 1123 } 1072 1124 else if (serviceType_==CServicesManager::GATHERER) 1073 for(auto field : fileOutField) slaveServers_.insert(field->getContextClient()) ; 1125 for(auto field : fileOutField) slaves[clientsId_[field->getContextClient()]] = field->getContextClient() ; 1126 for(auto& slave : slaves) slaveServers_.push_back(slave.second) ; 1127 1074 1128 1075 1129 for(auto& slaveServer : slaveServers_) sendCloseDefinition(slaveServer) ; 1076 1130 1077 if (serviceType_==CServicesManager:: IO_SERVER || serviceType_==CServicesManager::OUT_SERVER)1131 if (serviceType_==CServicesManager::WRITER) 1078 1132 { 1079 1133 createFileHeader(); -
XIOS3/trunk/src/node/context.hpp
r2406 r2407 308 308 bool hasServer; 309 309 310 private:311 std::vector<CContextServer*> serverPrimServer;312 std::vector<CContextClient*> clientPrimServer;313 314 // list of slave servers (IO server or others)315 set<CContextClient*> slaveServers_ ;316 310 317 311 private: … … 333 327 std::map<std::string, CContextClient*> clients_ ; 334 328 std::map<std::string, CContextClient*> servers_ ; 335 336 private: 337 // the map containing context client associated to it string id for coupling out ; 338 std::map<std::string, CContextClient*> couplerOutClient_ ; 339 // the map containing context server associated to it string id for coupling out ; 340 std::map<std::string, CContextServer*> couplerOutServer_ ; 341 // the map containing context client associated to it string id for coupling in ; 342 std::map<std::string, CContextClient*> couplerInClient_ ; 343 // the map containing context server associated to it string id for coupling in ; 344 std::map<std::string, CContextServer*> couplerInServer_ ; 329 std::map<CContextClient*, std::string> clientsId_ ; 330 std::map<CContextServer*, std::string> serversId_ ; 331 332 // list of slave servers (IO server or others) 333 std::vector<CContextClient*> slaveServers_ ; 334 335 // the map containing context client associated to it string id for coupling out ; 336 std::map<std::string, CContextClient*> couplerOutClient_ ; 337 // the map containing context server associated to it string id for coupling out ; 338 std::map<std::string, CContextServer*> couplerOutServer_ ; 339 // the map containing context client associated to it string id for coupling in ; 340 std::map<std::string, CContextClient*> couplerInClient_ ; 341 // the map containing context server associated to it string id for coupling in ; 342 std::map<std::string, CContextServer*> couplerInServer_ ; 345 343 public: 346 344 CContextClient* getCouplerInClient(const string& contextId) { return couplerInClient_[contextId] ;} … … 348 346 CContextClient* getCouplerOutClient(const string& contextId) { return couplerOutClient_[contextId] ;} 349 347 CContextServer* getCouplerOutServer(const string& contextId) { return couplerOutServer_[contextId] ;} 350 351 352 std::vector<std::string> primServerId_;353 348 354 349 CRegistry* registryIn=nullptr ; //!< input registry which is read from file -
XIOS3/trunk/src/server.cpp
r2404 r2407 189 189 { 190 190 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 191 servicesManager->createServices(CXios::defaultPoolId, CXios::default ServerId, CServicesManager::IO_SERVER,nbRessources,1) ;192 servicesManager->createServicesOnto(CXios::defaultPoolId, "default_reader", CServicesManager::READER, CXios::defaultServerId) ;191 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ; 192 servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ; 193 193 } 194 194 else … … 201 201 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 202 202 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 203 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ; 203 servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ; 204 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ; 204 205 205 206 206 207 } 207 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;208 // servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ; 208 209 } 209 210 CTimer::get("XIOS initialize").suspend() ; -
XIOS3/trunk/src/transport/one_sided_context_server.cpp
r2399 r2407 201 201 CEventServer event(this) ; 202 202 for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine, event) ; 203 MPI_Barrier(intraComm) ;203 // MPI_Barrier(intraComm) ; 204 204 CTimer::get("Process events").resume(); 205 205 info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event.classId<<" of type "<<event.type<<endl ;
Note: See TracChangeset
for help on using the changeset viewer.