Changeset 2404
- Timestamp:
- 09/19/22 10:38:09 (21 months ago)
- Location:
- XIOS3/trunk/src
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/manager/pool_ressource.cpp
r2274 r2404 27 27 28 28 winNotify_->lockWindow(commRank,0) ; 29 winNotify_->updateToWindow(commRank, this, &CPoolRessource:: createServiceDumpOut) ;29 winNotify_->updateToWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ; 30 30 winNotify_->unlockWindow(commRank,0) ; 31 31 MPI_Barrier(poolComm_) ; … … 60 60 } 61 61 62 62 void CPoolRessource::createServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId) 63 { 64 // for now suppose nbPartitions=1 65 66 auto it=occupancy_.begin() ; 67 int commSize ; 68 MPI_Comm_size(poolComm_, &commSize) ; 69 70 info(40)<<"CPoolRessource::createService : notify createServiceOnto to all pool members ; serviceId : "<<serviceId 71 <<" onto service Id :"<< serviceId<<endl ; 72 for(int rank=0; rank<commSize; rank++) createServiceOntoNotify(rank, serviceId, type, onServiceId) ; 73 } 74 75 /* 63 76 void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions, 64 77 bool in) … … 70 83 winNotify_->unlockWindow(rank,0) ; 71 84 } 72 73 85 */ 86 87 void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in) 88 { 89 notifyType_=NOTIFY_CREATE_SERVICE ; 90 notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ; 91 sendNotification(rank) ; 92 } 93 94 95 void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId) 96 { 97 notifyType_=NOTIFY_CREATE_SERVICE_ONTO ; 98 notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ; 99 sendNotification(rank) ; 100 } 101 102 103 void CPoolRessource::sendNotification(int rank) 104 { 105 winNotify_->lockWindowExclusive(rank) ; 106 winNotify_->pushToLockedWindow(rank, this, &CPoolRessource::notificationsDumpOut) ; 107 winNotify_->unlockWindow(rank) ; 108 } 109 110 void CPoolRessource::checkNotifications(void) 111 { 112 int commRank ; 113 MPI_Comm_rank(poolComm_, &commRank) ; 114 winNotify_->lockWindowExclusive(commRank) ; 115 winNotify_->popFromLockedWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ; 116 winNotify_->unlockWindow(commRank) ; 117 if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ; 118 else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ; 119 } 120 121 122 void CPoolRessource::notificationsDumpOut(CBufferOut& buffer) 123 { 124 125 buffer.realloc(maxBufferSize_) ; 126 127 if (notifyType_==NOTIFY_CREATE_SERVICE) 128 { 129 auto& arg=notifyCreateService_ ; 130 buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg); 131 } 132 else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 133 { 134 auto& arg=notifyCreateServiceOnto_ ; 135 buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg) ; 136 } 137 } 138 139 void CPoolRessource::notificationsDumpIn(CBufferIn& buffer) 140 { 141 if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ; 142 else 143 { 144 buffer>>notifyType_; 145 if (notifyType_==NOTIFY_CREATE_SERVICE) 146 { 147 auto& arg=notifyCreateService_ ; 148 buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ; 149 } 150 else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 151 { 152 auto& arg=notifyCreateServiceOnto_ ; 153 buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ; 154 } 155 } 156 } 157 158 void CPoolRessource::createService(void) 159 { 160 auto& arg = notifyCreateService_ ; 161 createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ; 162 } 163 164 void CPoolRessource::createServiceOnto(void) 165 { 166 auto& arg = notifyCreateServiceOnto_ ; 167 createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ; 168 } 169 170 /* 74 171 void CPoolRessource::createServiceDumpOut(CBufferOut& buffer) 75 172 { … … 82 179 } 83 180 84 181 */ 182 183 /* 85 184 void CPoolRessource::createServiceDumpIn(CBufferIn& buffer) 86 185 { … … 100 199 } 101 200 } 201 */ 102 202 103 203 bool CPoolRessource::eventLoop(bool serviceOnly) … … 110 210 if (time-lastEventLoop_ > eventLoopLatency_) 111 211 { 112 checkCreateServiceNotification() ; 212 //checkCreateServiceNotification() ; 213 checkNotifications() ; 113 214 lastEventLoop_=time ; 114 215 } … … 128 229 else return false ; 129 230 } 130 231 /* 131 232 void CPoolRessource::checkCreateServiceNotification(void) 132 233 { … … 146 247 147 248 } 249 */ 148 250 149 251 void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) … … 186 288 MPI_Comm_free(&serviceComm) ; 187 289 } 290 291 void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId) 292 { 293 294 info(40)<<"CPoolRessource::createNewServiceOnto : receive createServiceOnto notification ; serviceId : " 295 <<serviceId<<" ontoServiceId : "<<onServiceId<<endl ; 296 for(auto& service : services_) 297 { 298 if (std::get<0>(service.first)==onServiceId) 299 { 300 const MPI_Comm& serviceComm = service.second->getCommunicator() ; 301 MPI_Comm newServiceComm ; 302 MPI_Comm_dup(serviceComm, &newServiceComm) ; 303 int nbPartitions = service.second->getNbPartitions() ; 304 int partitionId = service.second->getPartitionId() ; 305 shared_ptr<CEventScheduler> eventScheduler = service.second->getEventScheduler() ; 306 info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "<<onServiceId<<endl ; 307 services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, Id_, serviceId, partitionId, type, 308 nbPartitions, eventScheduler) ; 309 } 310 } 311 312 } 188 313 189 314 void CPoolRessource::createService(MPI_Comm serviceComm, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached -
XIOS3/trunk/src/manager/pool_ressource.hpp
r2274 r2404 21 21 CWindowManager* winNotify_ ; 22 22 23 public: 23 private: 24 const int NOTIFY_NOTHING=0 ; 25 const int NOTIFY_CREATE_SERVICE=1 ; 26 const int NOTIFY_CREATE_SERVICE_ONTO=2 ; 27 28 public: 24 29 CPoolRessource(MPI_Comm poolComm, const std::string& Id) ; 25 30 ~CPoolRessource() ; … … 27 32 void createService(const std::string& serviceId, int type, int size, int nbPartition) ; 28 33 void createService(MPI_Comm serviceComm, const std::string& serviceId, int partitionId, int type, int nbPartitions) ; 29 void createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; 30 void createServiceDumpOut(CBufferOut& buffer) ; 31 void createServiceDumpIn(CBufferIn& buffer) ; 32 void checkCreateServiceNotification(void) ; 33 void createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; 34 bool eventLoop(bool serviceOnly=false) ; 34 void createServiceOnto(const std::string& serviceId, int type, const std::string& OnServiceId) ; 35 bool eventLoop(bool serviceOnly=false) ; 35 36 CService* getService(const std::string serviceId, int partitionId) { return services_[make_tuple(serviceId,partitionId)]; } 36 37 void finalizeSignal(void) ; 37 38 string getId(void) { return Id_; } 39 40 private: 41 void createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in) ; 42 void createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId) ; 43 void sendNotification(int rank) ; 44 void checkNotifications(void) ; 45 void notificationsDumpOut(CBufferOut& buffer) ; 46 void notificationsDumpIn(CBufferIn& buffer) ; 47 void createService(void) ; 48 void createServiceOnto(void) ; 38 49 39 private: 50 // void createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; 51 // void createServiceDumpOut(CBufferOut& buffer) ; 52 // void createServiceDumpIn(CBufferIn& buffer) ; 53 // void checkCreateServiceNotification(void) ; 54 void createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; 55 void createNewServiceOnto(const std::string& serviceId, int type, const string& onServiceId) ; 56 57 private: 40 58 MPI_Comm poolComm_ ; 41 59 42 60 std::multimap<int,int> occupancy_ ; 43 std::list<std::tuple<std::string, int, int, int, bool> > notifications_; 61 62 // std::list<std::tuple<std::string, int, int, int, bool> > notifications_; 63 64 int notifyType_ ; 65 tuple<std::string, int, int, int, bool> notifyCreateService_ ; 66 tuple<std::string, int, std::string> notifyCreateServiceOnto_ ; 67 68 44 69 std::map< std::tuple<std::string, int>, CService*> services_ ; 45 70 std::string Id_ ; -
XIOS3/trunk/src/manager/services.cpp
r2287 r2404 10 10 { 11 11 CService::CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId, 12 int type, int nbPartitions) : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId), 13 partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false) 12 int type, int nbPartitions, shared_ptr<CEventScheduler> eventScheduler) 13 : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId), 14 partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false) 14 15 15 16 … … 36 37 CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ; 37 38 } 38 eventScheduler_ = new CEventScheduler(serviceComm_) ; 39 if (eventScheduler) eventScheduler_ = eventScheduler ; 40 eventScheduler_ = make_shared<CEventScheduler>(serviceComm_) ; 39 41 40 42 ostringstream oss; … … 45 47 CService::~CService() 46 48 { 47 delete eventScheduler_ ;48 49 delete winNotify_ ; 49 50 for(auto& it : contexts_) delete it.second ; … … 262 263 } 263 264 264 CEventScheduler*CService::getEventScheduler(void)265 shared_ptr<CEventScheduler> CService::getEventScheduler(void) 265 266 { 266 267 return eventScheduler_ ; -
XIOS3/trunk/src/manager/services.hpp
r2274 r2404 20 20 21 21 CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId, 22 int type, int nbPartitions ) ;22 int type, int nbPartitions, shared_ptr<CEventScheduler> = nullptr) ; 23 23 ~CService() ; 24 24 … … 32 32 CServerContext* getServerContext(const std::string& contextId) { return contexts_[contextId]; } 33 33 void finalizeSignal(void) ; 34 CEventScheduler*getEventScheduler(void) ;34 shared_ptr<CEventScheduler> getEventScheduler(void) ; 35 35 36 36 std::string getPoolId(void) {return poolId_;} … … 39 39 int getType(void) {return type_;} 40 40 int getNbPartitions(void) {return nbPartitions_;} 41 const MPI_Comm& getCommunicator(void) { return serviceComm_ ;} 41 42 42 43 private: … … 65 66 std::map<std::string, CServerContext*> contexts_ ; 66 67 bool finalizeSignal_ ; 67 CEventScheduler*eventScheduler_ ;68 shared_ptr<CEventScheduler> eventScheduler_ ; 68 69 69 70 std::string poolId_ ; -
XIOS3/trunk/src/manager/services_manager.cpp
r2403 r2404 76 76 } 77 77 78 bool CServicesManager::createServicesOnto(const std::string& poolId, const std::string& serviceId, const std::string& OnServiceId, bool wait)78 bool CServicesManager::createServicesOnto(const std::string& poolId, const std::string& serviceId, int type, const std::string& OnServiceId, bool wait) 79 79 { 80 80 … … 96 96 { 97 97 info(40)<<"CServicesManager : create service on other, notification to leader "<<leader<<", serviceId : "<<serviceId<<", service onto : "<<OnServiceId<<endl ; 98 createServicesOntoNotify(leader, serviceId, OnServiceId) ;98 createServicesOntoNotify(leader, serviceId, type, OnServiceId) ; 99 99 return true ; 100 100 } … … 110 110 111 111 112 void CServicesManager::createServicesOntoNotify(int rank, const string& serviceId, const string& OnServiceId)112 void CServicesManager::createServicesOntoNotify(int rank, const string& serviceId, int type, const string& OnServiceId) 113 113 { 114 114 notifyType_=NOTIFY_CREATE_SERVICE_ONTO ; 115 notifyCreateServiceOnto_=make_tuple(serviceId, OnServiceId) ;115 notifyCreateServiceOnto_=make_tuple(serviceId, type, OnServiceId) ; 116 116 sendNotification(rank) ; 117 117 } … … 160 160 void CServicesManager::createServiceOnto(void) 161 161 { 162 auto& arg=notifyCreateService _ ;163 //CServer::getServersRessource()->getPoolRessource()->createService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;162 auto& arg=notifyCreateServiceOnto_ ; 163 CServer::getServersRessource()->getPoolRessource()->createServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ; 164 164 } 165 165 … … 177 177 { 178 178 auto& arg=notifyCreateServiceOnto_ ; 179 buffer << notifyType_<< get<0>(arg) << get<1>(arg) ;179 buffer << notifyType_<< get<0>(arg) << get<1>(arg) << get<2>(arg) ; 180 180 } 181 181 } … … 195 195 { 196 196 auto& arg=notifyCreateServiceOnto_ ; 197 buffer >> get<0>(arg) >> get<1>(arg) ;197 buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ; 198 198 } 199 199 } -
XIOS3/trunk/src/manager/services_manager.hpp
r2403 r2404 38 38 39 39 bool createServices(const std::string& poolId, const std::string& serviceId, int type, int size, int nbPartition, bool wait=true) ; 40 bool createServicesOnto(const std::string& poolId, const std::string& serviceId, const std::string& onServiceId, bool wait=true) ;40 bool createServicesOnto(const std::string& poolId, const std::string& serviceId, int type, const std::string& onServiceId, bool wait=true) ; 41 41 42 42 void eventLoop(void) ; … … 56 56 void createServiceOnto(void) ; 57 57 void createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions) ; 58 void createServicesOntoNotify(int rank, const string& serviceId, const string& OnServiceId) ;58 void createServicesOntoNotify(int rank, const string& serviceId, int type, const string& OnServiceId) ; 59 59 void sendNotification(int rank) ; 60 60 void checkNotifications(void) ; … … 73 73 int notifyType_ ; 74 74 tuple<std::string, int, int, int> notifyCreateService_ ; 75 tuple<std::string, std::string> notifyCreateServiceOnto_ ;75 tuple<std::string, int, std::string> notifyCreateServiceOnto_ ; 76 76 77 77 std::map<tuple<std::string, std::string, int>, std::tuple<int, int, int, int> > services_ ; -
XIOS3/trunk/src/node/context.hpp
r2326 r2404 342 342 343 343 private: 344 CEventScheduler*eventScheduler_ ; //! The local event scheduler for context344 shared_ptr<CEventScheduler> eventScheduler_ ; //! The local event scheduler for context 345 345 size_t hashId_ ; //! the local hashId for scheduler 346 346 size_t timeLine_=0 ; -
XIOS3/trunk/src/server.cpp
r2399 r2404 190 190 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 191 191 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ; 192 servicesManager->createServicesOnto(CXios::defaultPoolId, "default_reader", CServicesManager::READER, CXios::defaultServerId) ; 192 193 } 193 194 else -
XIOS3/trunk/src/transport/context_server.hpp
r2343 r2404 48 48 49 49 size_t hashId ; 50 CEventScheduler*eventScheduler_=nullptr ;50 shared_ptr<CEventScheduler> eventScheduler_=nullptr ; 51 51 } ; 52 52
Note: See TracChangeset
for help on using the changeset viewer.