Changeset 2246 for XIOS/dev/dev_ym/XIOS_COUPLING/src/manager
- Timestamp:
- 10/11/21 14:41:56 (3 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_COUPLING/src/manager
- Files:
-
- 16 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/contexts_manager.cpp
r1765 r2246 7 7 #include "servers_ressource.hpp" 8 8 #include "server.hpp" 9 #include "timer.hpp" 9 10 #include <functional> 10 11 … … 48 49 int serviceLeader ; 49 50 auto servicesManager = CXios::getServicesManager() ; 50 51 51 52 bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ; 52 53 54 info(40)<<"CContextsManager::createServerContext : waiting for service leader ; serviceId : "<<serviceId<<endl ; 53 55 if (wait) 54 56 { … … 64 66 notifyType_=NOTIFY_CREATE_CONTEXT ; 65 67 notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ; 68 info(40)<<"CContextsManager::createServerContext : notification create_context to service leader "<<serviceLeader<<", serviceId : "<<serviceId<<", contextId "<<contextId<<endl ; 66 69 sendNotification(serviceLeader) ; 67 70 return true ; … … 80 83 81 84 int type ; 85 info(40)<<"CContextsManager::createServerContextIntercomm : waiting for context leader ; contextId : "<<contextId<<endl ; 82 86 ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ; 83 87 if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ; … … 96 100 notifyType_=NOTIFY_CREATE_INTERCOMM ; 97 101 notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ; 102 info(40)<<"CContextsManager::createServerContextIntercomm : notification create_intercomm to context leader : "<<contextLeader<<", contextId :"<<contextId<<endl ; 98 103 sendNotification(contextLeader) ; 99 104 return true ; … … 149 154 void CContextsManager::eventLoop(void) 150 155 { 151 checkNotifications() ; 156 CTimer::get("CContextsManager::eventLoop").resume(); 157 double time=MPI_Wtime() ; 158 if (time-lastEventLoop_ > eventLoopLatency_) 159 { 160 checkNotifications() ; 161 lastEventLoop_=time ; 162 } 163 CTimer::get("CContextsManager::eventLoop").suspend(); 152 164 } 153 165 … … 166 178 void CContextsManager::createServerContext(void) 167 179 { 180 info(40)<<"CContextsManager::createServerContext : receive create server context notification"<<endl ; 168 181 auto arg=notifyCreateContext_ ; 169 182 CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg)) … … 174 187 void CContextsManager::createServerContextIntercomm(void) 175 188 { 189 info(40)<<"CContextsManager::createServerContext : receive create intercomm context notification"<<endl ; 176 190 auto arg=notifyCreateIntercomm_ ; 177 191 CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg)) … … 194 208 void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo) 195 209 { 196 winContexts_->lockWindow(managerGlobalLeader_,0) ; 197 winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 210 winContexts_->lockWindowExclusive(managerGlobalLeader_) ; 211 winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 212 winContexts_->flushWindow(managerGlobalLeader_) ; 198 213 contexts_[fullContextId] = contextInfo ; 199 winContexts_->updateTo Window(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ;200 winContexts_->unlockWindow(managerGlobalLeader_ ,0) ;214 winContexts_->updateToLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ; 215 winContexts_->unlockWindow(managerGlobalLeader_) ; 201 216 } 202 217 … … 210 225 { 211 226 212 winContexts_->lockWindow (managerGlobalLeader_,0) ;213 winContexts_->updateFrom Window(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;214 winContexts_->unlockWindow(managerGlobalLeader_ ,0) ;227 winContexts_->lockWindowShared(managerGlobalLeader_) ; 228 winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 229 winContexts_->unlockWindow(managerGlobalLeader_) ; 215 230 216 231 auto it=contexts_.find(fullContextId) ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/contexts_manager.hpp
r1765 r2246 82 82 83 83 int managerGlobalLeader_ ; 84 84 85 const double eventLoopLatency_=1e-2; 86 double lastEventLoop_=0. ; 87 85 88 friend class CWindowManager ; 86 89 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/daemons_manager.cpp
r2209 r2246 41 41 bool CDaemonsManager::eventLoop(void) 42 42 { 43 43 44 CXios::getRessourcesManager()->eventLoop() ; 44 45 CXios::getServicesManager()->eventLoop() ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/pool_ressource.cpp
r2208 r2246 6 6 #include "type.hpp" 7 7 #include "cxios.hpp" 8 #include "timer.hpp" 8 9 9 10 namespace xios … … 16 17 MPI_Comm_rank(poolComm, &commRank) ; 17 18 MPI_Comm_size(poolComm, &commSize) ; 18 19 19 info(40)<<"CPoolRessource::CPoolRessource : creating new pool : "<<Id<<endl ; 20 20 if (commRank==localLeader_) 21 21 { … … 51 51 occupancy_.erase(occupancy_.begin(),it) ; 52 52 occupancy_.insert(procs_update.begin(),procs_update.end()) ; 53 53 54 info(40)<<"CPoolRessource::createService : notify createService to all pool members ; serviceId : "<<serviceId<<endl ; 54 55 for(int rank=0; rank<commSize; rank++) 55 56 { … … 102 103 bool CPoolRessource::eventLoop(bool serviceOnly) 103 104 { 104 checkCreateServiceNotification() ; 105 CTimer::get("CPoolRessource::eventLoop").resume(); 106 107 double time=MPI_Wtime() ; 108 if (time-lastEventLoop_ > eventLoopLatency_) 109 { 110 checkCreateServiceNotification() ; 111 lastEventLoop_=time ; 112 } 113 105 114 for (auto it=services_.begin(); it!=services_.end() ; ++it) 106 115 { … … 112 121 } 113 122 } 114 123 CTimer::get("CPoolRessource::eventLoop").suspend(); 115 124 if (services_.empty() && finalizeSignal_) return true ; 116 125 else return false ; … … 137 146 void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) 138 147 { 148 149 info(40)<<"CPoolRessource::createNewService : receive createService notification ; serviceId : "<<serviceId<<endl ; 139 150 MPI_Comm serviceComm, newServiceComm ; 140 151 int commRank ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/pool_ressource.hpp
r1764 r2246 44 44 std::string Id_ ; 45 45 bool finalizeSignal_ ; 46 46 47 const double eventLoopLatency_=1e-2; 48 double lastEventLoop_=0. ; 47 49 }; 48 50 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/ressources_manager.cpp
r1764 r2246 2 2 #include "server.hpp" 3 3 #include "servers_ressource.hpp" 4 #include "timer.hpp" 4 5 5 6 … … 43 44 void CRessourcesManager::createPool(const string& poolId, int size) 44 45 { 46 info(40)<<"CRessourcesManager::createPool : calling createPool : "<<poolId<<" of size"<<size<<endl ; 47 info(40)<<"send notification to leader : "<<serverLeader_<<endl ; 45 48 winRessources_->lockWindow(managerGlobalLeader_,0) ; 46 49 winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; … … 49 52 notifyType_=NOTIFY_CREATE_POOL ; 50 53 notifyCreatePool_=make_tuple(poolId, size) ; 54 info(40)<<"CRessourcesManager::createPool : send notification creating pool to server leader "<<serverLeader_<<endl ; 51 55 sendNotification(serverLeader_) ; 52 56 } … … 61 65 { 62 66 notifyType_=NOTIFY_FINALIZE ; 67 info(40)<<"CRessourcesManager::finalize : send notification finalize to server leader "<<serverLeader_<<endl ; 63 68 sendNotification(serverLeader_) ; 64 69 } … … 107 112 void CRessourcesManager::eventLoop(void) 108 113 { 109 checkNotifications() ; 114 CTimer::get("CRessourcesManager::eventLoop").resume(); 115 double time=MPI_Wtime() ; 116 if (time-lastEventLoop_ > eventLoopLatency_) 117 { 118 checkNotifications() ; 119 lastEventLoop_=time ; 120 } 121 122 CTimer::get("CRessourcesManager::eventLoop").suspend(); 110 123 } 111 124 … … 114 127 int commRank ; 115 128 MPI_Comm_rank(xiosComm_, &commRank) ; 129 CTimer::get("CRessourcesManager::checkNotifications lock").resume(); 116 130 winNotify_->lockWindow(commRank,0) ; 131 CTimer::get("CRessourcesManager::checkNotifications lock").suspend(); 132 CTimer::get("CRessourcesManager::checkNotifications pop").resume(); 117 133 winNotify_->popFromWindow(commRank, this, &CRessourcesManager::notificationsDumpIn) ; 134 CTimer::get("CRessourcesManager::checkNotifications pop").suspend(); 135 CTimer::get("CRessourcesManager::checkNotifications unlock").resume(); 118 136 winNotify_->unlockWindow(commRank,0) ; 137 CTimer::get("CRessourcesManager::checkNotifications unlock").suspend(); 119 138 if (notifyType_==NOTIFY_CREATE_POOL) createPool() ; 120 139 else if (notifyType_==NOTIFY_FINALIZE) finalizeSignal() ; … … 123 142 void CRessourcesManager::createPool(void) 124 143 { 144 125 145 auto& arg=notifyCreatePool_ ; 126 146 string poolId=get<0>(arg) ; 127 147 int size=get<1>(arg) ; 148 info(40)<<"CRessourcesManager::createPool : receive create pool notification : "<< poolId<<" of size "<<size<<endl ; 128 149 CServer::getServersRessource()->createPool(poolId,size) ; 129 150 } … … 131 152 void CRessourcesManager::finalizeSignal(void) 132 153 { 154 info(40)<<"CRessourcesManager::createPool : receive finalize notification"<<endl ; 133 155 CServer::getServersRessource()->finalize() ; 134 156 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/ressources_manager.hpp
r1764 r2246 72 72 int freeRessourcesSize_ ; 73 73 74 const double eventLoopLatency_=1e-2; 75 double lastEventLoop_=0. ; 76 74 77 friend class CWindowManager ; 75 78 } ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.cpp
r2230 r2246 6 6 #include "register_context_info.hpp" 7 7 #include "services.hpp" 8 #include "timer.hpp" 8 9 9 10 … … 18 19 hasNotification_(false) 19 20 { 21 info(40)<<"CCServerContext::CServerContext : new context creation ; contextId : "<<contextId<<endl ; 20 22 int localRank, globalRank, commSize ; 21 23 … … 58 60 const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait) 59 61 { 62 info(40)<<"CServerContext::createIntercomm : context intercomm creation ; contextId : "<<contextId<<endl ; 60 63 int intraCommRank ; 61 64 MPI_Comm_rank(intraComm, &intraCommRank) ; … … 145 148 int commSize ; 146 149 MPI_Comm_size(contextComm_,&commSize) ; 150 info(40)<<"CServerContext::createIntercomm : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ; 151 147 152 for(int rank=0; rank<commSize; rank++) 148 153 { … … 191 196 if (!hasNotification_) 192 197 { 193 int commRank ; 194 MPI_Comm_rank(contextComm_, &commRank) ; 195 winNotify_->lockWindow(commRank,0) ; 196 winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ; 197 winNotify_->unlockWindow(commRank,0) ; 198 double time=MPI_Wtime() ; 199 if (time-lastEventLoop_ > eventLoopLatency_) 200 { 201 int commRank ; 202 MPI_Comm_rank(contextComm_, &commRank) ; 203 winNotify_->lockWindow(commRank,0) ; 204 winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ; 205 winNotify_->unlockWindow(commRank,0) ; 198 206 199 if (notifyInType_!= NOTIFY_NOTHING) 200 { 201 hasNotification_=true ; 202 auto eventScheduler=parentService_->getEventScheduler() ; 203 std::hash<string> hashString ; 204 size_t hashId = hashString(name_) ; 205 size_t currentTimeLine=0 ; 206 eventScheduler->registerEvent(currentTimeLine,hashId); 207 if (notifyInType_!= NOTIFY_NOTHING) 208 { 209 hasNotification_=true ; 210 auto eventScheduler=parentService_->getEventScheduler() ; 211 std::hash<string> hashString ; 212 size_t hashId = hashString(name_) ; 213 size_t currentTimeLine=0 ; 214 eventScheduler->registerEvent(currentTimeLine,hashId); 215 } 216 lastEventLoop_=time ; 207 217 } 208 218 } … … 225 235 bool CServerContext::eventLoop(bool serviceOnly) 226 236 { 237 CTimer::get("CServerContext::eventLoop").resume(); 227 238 bool finished=false ; 228 if (winNotify_!=nullptr) checkNotifications() ; 239 240 // double time=MPI_Wtime() ; 241 // if (time-lastEventLoop_ > eventLoopLatency_) 242 // { 243 if (winNotify_!=nullptr) checkNotifications() ; 244 // lastEventLoop_=time ; 245 // } 246 247 229 248 if (!serviceOnly && context_!=nullptr) 230 249 { … … 235 254 } 236 255 } 237 256 CTimer::get("CServerContext::eventLoop").suspend(); 238 257 if (context_==nullptr && finalizeSignal_) finished=true ; 239 258 return finished ; … … 242 261 void CServerContext::createIntercomm(void) 243 262 { 263 info(40)<<"CServerContext::createIntercomm : received createIntercomm notification"<<endl ; 264 244 265 MPI_Comm interCommServer, interCommClient ; 245 266 auto& arg=notifyInCreateIntercomm_ ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.hpp
r2130 r2246 61 61 bool isAttachedMode_ ; 62 62 63 const double eventLoopLatency_=1e-2; 64 double lastEventLoop_=0. ; 65 63 66 friend class CWindowManager ; 64 67 } ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/servers_ressource.cpp
r2220 r2246 5 5 #include "cxios.hpp" 6 6 #include "mpi.hpp" 7 #include "timer.hpp" 7 8 #include <vector> 8 9 #include <string> … … 115 116 bool CServersRessource::eventLoop(bool serviceOnly) 116 117 { 117 checkNotifications() ; 118 CTimer::get("CServersRessource::eventLoop").resume(); 119 double time=MPI_Wtime() ; 120 if (time-lastEventLoop_ > eventLoopLatency_) 121 { 122 checkNotifications() ; 123 lastEventLoop_=time ; 124 } 125 118 126 if (poolRessource_!=nullptr) 119 127 { … … 124 132 } 125 133 } 126 134 CTimer::get("CServersRessource::eventLoop").suspend(); 127 135 if (poolRessource_==nullptr && finalizeSignal_) return true ; 128 136 else return false ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/servers_ressource.hpp
r1764 r2246 50 50 bool finalizeSignal_ ; 51 51 52 const double eventLoopLatency_=1e-2; 53 double lastEventLoop_=0. ; 54 52 55 friend class CWindowManager ; 53 56 } ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.cpp
r2230 r2246 5 5 #include "server_context.hpp" 6 6 #include "event_scheduler.hpp" 7 #include "timer.hpp" 7 8 8 9 namespace xios … … 14 15 15 16 { 17 info(40)<<"CService::CService : new service created ; serviceId : "<<serviceId<<endl ; 18 16 19 int localRank, globalRank, commSize ; 17 20 … … 44 47 int commSize ; 45 48 MPI_Comm_size(serviceComm_, &commSize) ; 46 49 info(40)<<"CService::createContext : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ; 50 47 51 for(int rank=0; rank<commSize; rank++) 48 52 { … … 51 55 sendNotification(rank) ; 52 56 } 57 info(40)<<"CService::createContext : notify CreateContext to all services members : DONE "<<endl ; 53 58 } 54 59 /* … … 99 104 { 100 105 //checkCreateContextNotification() ; 101 checkNotifications() ; 106 CTimer::get("CService::eventLoop").resume(); 107 108 // double time=MPI_Wtime() ; 109 // if (time-lastEventLoop_ > eventLoopLatency_) 110 // { 111 checkNotifications() ; 112 // lastEventLoop_=time ; 113 // } 114 102 115 103 116 eventScheduler_->checkEvent() ; … … 111 124 } ; 112 125 } 113 126 CTimer::get("CService::eventLoop").suspend(); 114 127 if (contexts_.empty() && finalizeSignal_) return true ; 115 128 else return false ; … … 144 157 if (notifyInType_==NOTIFY_CREATE_CONTEXT) 145 158 { 146 info(10)<<"NotifyDumpOut"<<endl ;147 159 auto& arg=notifyInCreateContext_ ; 148 160 buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg); … … 158 170 if (!hasNotification_) 159 171 { 160 int commRank ; 161 MPI_Comm_rank(serviceComm_, &commRank) ; 162 winNotify_->lockWindow(commRank,0) ; 163 winNotify_->popFromWindow(commRank, this, &CService::notificationsDumpIn) ; 164 winNotify_->unlockWindow(commRank,0) ; 172 double time=MPI_Wtime() ; 173 if (time-lastEventLoop_ > eventLoopLatency_) 174 { 175 int commRank ; 176 MPI_Comm_rank(serviceComm_, &commRank) ; 177 winNotify_->lockWindow(commRank,0) ; 178 winNotify_->popFromWindow(commRank, this, &CService::notificationsDumpIn) ; 179 winNotify_->unlockWindow(commRank,0) ; 165 180 166 if (notifyInType_!= NOTIFY_NOTHING) 167 { 168 hasNotification_=true ; 169 std::hash<string> hashString ; 170 size_t hashId = hashString(name_) ; 171 size_t currentTimeLine=0 ; 172 eventScheduler_->registerEvent(currentTimeLine,hashId); 181 if (notifyInType_!= NOTIFY_NOTHING) 182 { 183 hasNotification_=true ; 184 std::hash<string> hashString ; 185 size_t hashId = hashString(name_) ; 186 size_t currentTimeLine=0 ; 187 info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler"<<endl ; 188 eventScheduler_->registerEvent(currentTimeLine,hashId); 189 } 190 lastEventLoop_=time ; 173 191 } 174 192 } … … 179 197 size_t hashId = hashString(name_) ; 180 198 size_t currentTimeLine=0 ; 199 info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ; 181 200 if (eventScheduler_->queryEvent(currentTimeLine,hashId)) 182 201 { 183 202 eventScheduler_->popEvent() ; 203 info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ; 184 204 if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ; 185 205 hasNotification_=false ; … … 190 210 191 211 192 212 //ym not use any more 193 213 void CService::checkCreateContextNotification(void) 194 214 { … … 210 230 void CService::createContext(void) 211 231 { 232 info(40)<<"CService::createContext(void) : receive createContext notification"<<endl ; 212 233 auto& arg=notifyInCreateContext_ ; 213 234 string poolId = get<0>(arg) ; … … 218 239 } 219 240 220 //to remove 241 //to remove, not used anymore 221 242 void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId) 222 243 { -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.hpp
r2130 r2246 71 71 int nbPartitions_ ; 72 72 73 const double eventLoopLatency_=1e-2; 74 double lastEventLoop_=0. ; 75 73 76 }; 74 77 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services_manager.cpp
r1764 r2246 7 7 #include "server.hpp" 8 8 #include "servers_ressource.hpp" 9 #include "timer.hpp" 9 10 10 11 namespace xios … … 55 56 int poolSize ; 56 57 58 info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ; 57 59 bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ; 58 60 if (wait) … … 67 69 if (ok) 68 70 { 71 info(40)<<"CServicesManager : create service notification to leader "<<leader<<", serviceId : "<<serviceId<<", size : "<<size<<endl ; 69 72 createServicesNotify(leader, serviceId, type, size, nbPartitions) ; 70 73 return true ; … … 94 97 { 95 98 auto info = notifications_.front() ; 99 xios::info(40)<<"CServicesManager : receive create service notification : "<<get<0>(info)<<endl ; 96 100 CServer::getServersRessource()->getPoolRessource()->createService(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ; 97 101 notifications_.pop_front() ; … … 104 108 void CServicesManager::eventLoop(void) 105 109 { 106 checkCreateServicesNotification() ; 110 CTimer::get("CServicesManager::eventLoop").resume(); 111 double time=MPI_Wtime() ; 112 if (time-lastEventLoop_ > eventLoopLatency_) 113 { 114 checkCreateServicesNotification() ; 115 lastEventLoop_=time ; 116 } 117 CTimer::get("CServicesManager::eventLoop").suspend(); 107 118 } 108 119 … … 176 187 { 177 188 189 info(40)<<"CServicesManager : registering service, poolId : "<<poolId<<", serviceId : "<<serviceId<<endl ; ; 190 191 winServices_->lockWindowExclusive(managerGlobalLeader_) ; 192 winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 193 winServices_->flushWindow(managerGlobalLeader_) ; 194 services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ; 195 winServices_->updateToLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ; 196 winServices_->unlockWindow(managerGlobalLeader_) ; 197 198 /* 178 199 winServices_->lockWindow(managerGlobalLeader_,0) ; 179 200 winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 180 201 services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ; 181 202 winServices_->updateToWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ; 182 winServices_->unlockWindow(managerGlobalLeader_,0) ; 203 winServices_->unlockWindow(managerGlobalLeader_,0) ;*/ 183 204 } 184 205 … … 186 207 int& size, int& nbPartitions, int& leader) 187 208 { 209 210 winServices_->lockWindowShared(managerGlobalLeader_) ; 211 winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 212 winServices_->unlockWindow(managerGlobalLeader_) ; 213 /* 188 214 winServices_->lockWindow(managerGlobalLeader_,0) ; 189 215 winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 190 winServices_->unlockWindow(managerGlobalLeader_,0) ; 216 winServices_->unlockWindow(managerGlobalLeader_,0) ;*/ 191 217 192 218 auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services_manager.hpp
r1764 r2246 59 59 60 60 int managerGlobalLeader_ ; 61 62 const double eventLoopLatency_=1e-2; 63 double lastEventLoop_=0. ; 61 64 62 65 friend class CWindowManager ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/window_manager.hpp
r1764 r2246 25 25 MPI_Win window_ ; 26 26 void * winBuffer_ ; 27 map<int,double> lastTimeLock_ ; 28 const double latency_=0e-2 ; 27 29 28 30 public : … … 46 48 { 47 49 int lock=state ; 48 50 double time ; 51 auto it=lastTimeLock_.find(rank) ; 52 if (it == lastTimeLock_.end()) 53 { 54 lastTimeLock_[rank] = 0. ; 55 it=lastTimeLock_.find(rank) ; 56 } 57 double& lastTime = it->second ; 58 49 59 do 50 60 { 61 time=MPI_Wtime() ; 62 while(time-lastTime < latency_) time=MPI_Wtime() ; 51 63 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 52 64 MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ; 53 65 MPI_Win_unlock(rank, window_) ; 66 lastTime=MPI_Wtime() ; 54 67 } while (lock!=state) ; 55 56 57 } 58 68 69 70 } 71 72 void lockWindowExclusive(int rank, int state ) 73 { 74 int lock=state ; 75 double time ; 76 auto it=lastTimeLock_.find(rank) ; 77 if (it == lastTimeLock_.end()) 78 { 79 lastTimeLock_[rank] = 0. ; 80 it=lastTimeLock_.find(rank) ; 81 } 82 double& lastTime = it->second ; 83 84 do 85 { 86 time=MPI_Wtime() ; 87 while(time-lastTime < latency_) time=MPI_Wtime() ; 88 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 89 MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ; 90 MPI_Win_unlock(rank, window_) ; 91 lastTime=MPI_Wtime() ; 92 } while (lock!=state) ; 93 } 94 95 void lockWindowExclusive(int rank) 96 { 97 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 98 } 99 100 void lockWindowShared(int rank) 101 { 102 MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ; 103 } 104 105 void unlockWindow(int rank) 106 { 107 MPI_Win_unlock(rank, window_) ; 108 } 109 110 void flushWindow(int rank) 111 { 112 MPI_Win_flush(rank, window_) ; 113 } 59 114 60 115 void unlockWindow(int rank, int state ) … … 77 132 MPI_Win_unlock(rank, window_) ; 78 133 } 79 134 135 template< class T > 136 void updateToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) ) 137 { 138 CBufferOut buffer ; 139 (object->*dumpOut)(buffer) ; 140 size_t size=buffer.count() ; 141 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 142 MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ; 143 MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ; 144 // MPI_Win_unlock(rank, window_) ; 145 } 146 80 147 template< typename T > 81 148 void updateFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) … … 90 157 (object->*dumpIn)(buffer) ; 91 158 } 159 160 template< typename T > 161 void updateFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 162 { 163 size_t size ; 164 // MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ; 165 MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ; 166 MPI_Win_flush(rank,window_) ; 167 CBufferIn buffer(size) ; 168 MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ; 169 // MPI_Win_unlock(rank, window_) ; 170 MPI_Win_flush(rank, window_) ; 171 (object->*dumpIn)(buffer) ; 172 } 173 92 174 93 175 template< class T >
Note: See TracChangeset
for help on using the changeset viewer.