Changeset 2404 for XIOS3/trunk/src/manager/pool_ressource.cpp
- Timestamp:
- 09/19/22 10:38:09 (21 months ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/manager/pool_ressource.cpp
r2274 r2404 27 27 28 28 winNotify_->lockWindow(commRank,0) ; 29 winNotify_->updateToWindow(commRank, this, &CPoolRessource:: createServiceDumpOut) ;29 winNotify_->updateToWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ; 30 30 winNotify_->unlockWindow(commRank,0) ; 31 31 MPI_Barrier(poolComm_) ; … … 60 60 } 61 61 62 62 void CPoolRessource::createServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId) 63 { 64 // for now suppose nbPartitions=1 65 66 auto it=occupancy_.begin() ; 67 int commSize ; 68 MPI_Comm_size(poolComm_, &commSize) ; 69 70 info(40)<<"CPoolRessource::createService : notify createServiceOnto to all pool members ; serviceId : "<<serviceId 71 <<" onto service Id :"<< serviceId<<endl ; 72 for(int rank=0; rank<commSize; rank++) createServiceOntoNotify(rank, serviceId, type, onServiceId) ; 73 } 74 75 /* 63 76 void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions, 64 77 bool in) … … 70 83 winNotify_->unlockWindow(rank,0) ; 71 84 } 72 73 85 */ 86 87 void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in) 88 { 89 notifyType_=NOTIFY_CREATE_SERVICE ; 90 notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ; 91 sendNotification(rank) ; 92 } 93 94 95 void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId) 96 { 97 notifyType_=NOTIFY_CREATE_SERVICE_ONTO ; 98 notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ; 99 sendNotification(rank) ; 100 } 101 102 103 void CPoolRessource::sendNotification(int rank) 104 { 105 winNotify_->lockWindowExclusive(rank) ; 106 winNotify_->pushToLockedWindow(rank, this, &CPoolRessource::notificationsDumpOut) ; 107 winNotify_->unlockWindow(rank) ; 108 } 109 110 void CPoolRessource::checkNotifications(void) 111 { 112 int commRank ; 113 MPI_Comm_rank(poolComm_, &commRank) ; 114 winNotify_->lockWindowExclusive(commRank) ; 115 winNotify_->popFromLockedWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ; 116 winNotify_->unlockWindow(commRank) ; 117 if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ; 118 else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ; 119 } 120 121 122 void CPoolRessource::notificationsDumpOut(CBufferOut& buffer) 123 { 124 125 buffer.realloc(maxBufferSize_) ; 126 127 if (notifyType_==NOTIFY_CREATE_SERVICE) 128 { 129 auto& arg=notifyCreateService_ ; 130 buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg); 131 } 132 else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 133 { 134 auto& arg=notifyCreateServiceOnto_ ; 135 buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg) ; 136 } 137 } 138 139 void CPoolRessource::notificationsDumpIn(CBufferIn& buffer) 140 { 141 if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ; 142 else 143 { 144 buffer>>notifyType_; 145 if (notifyType_==NOTIFY_CREATE_SERVICE) 146 { 147 auto& arg=notifyCreateService_ ; 148 buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ; 149 } 150 else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 151 { 152 auto& arg=notifyCreateServiceOnto_ ; 153 buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ; 154 } 155 } 156 } 157 158 void CPoolRessource::createService(void) 159 { 160 auto& arg = notifyCreateService_ ; 161 createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ; 162 } 163 164 void CPoolRessource::createServiceOnto(void) 165 { 166 auto& arg = notifyCreateServiceOnto_ ; 167 createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ; 168 } 169 170 /* 74 171 void CPoolRessource::createServiceDumpOut(CBufferOut& buffer) 75 172 { … … 82 179 } 83 180 84 181 */ 182 183 /* 85 184 void CPoolRessource::createServiceDumpIn(CBufferIn& buffer) 86 185 { … … 100 199 } 101 200 } 201 */ 102 202 103 203 bool CPoolRessource::eventLoop(bool serviceOnly) … … 110 210 if (time-lastEventLoop_ > eventLoopLatency_) 111 211 { 112 checkCreateServiceNotification() ; 212 //checkCreateServiceNotification() ; 213 checkNotifications() ; 113 214 lastEventLoop_=time ; 114 215 } … … 128 229 else return false ; 129 230 } 130 231 /* 131 232 void CPoolRessource::checkCreateServiceNotification(void) 132 233 { … … 146 247 147 248 } 249 */ 148 250 149 251 void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) … … 186 288 MPI_Comm_free(&serviceComm) ; 187 289 } 290 291 void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId) 292 { 293 294 info(40)<<"CPoolRessource::createNewServiceOnto : receive createServiceOnto notification ; serviceId : " 295 <<serviceId<<" ontoServiceId : "<<onServiceId<<endl ; 296 for(auto& service : services_) 297 { 298 if (std::get<0>(service.first)==onServiceId) 299 { 300 const MPI_Comm& serviceComm = service.second->getCommunicator() ; 301 MPI_Comm newServiceComm ; 302 MPI_Comm_dup(serviceComm, &newServiceComm) ; 303 int nbPartitions = service.second->getNbPartitions() ; 304 int partitionId = service.second->getPartitionId() ; 305 shared_ptr<CEventScheduler> eventScheduler = service.second->getEventScheduler() ; 306 info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "<<onServiceId<<endl ; 307 services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, Id_, serviceId, partitionId, type, 308 nbPartitions, eventScheduler) ; 309 } 310 } 311 312 } 188 313 189 314 void CPoolRessource::createService(MPI_Comm serviceComm, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached
Note: See TracChangeset
for help on using the changeset viewer.