#include "services.hpp" #include "services_manager.hpp" #include "mpi.hpp" #include "cxios.hpp" #include "server_context.hpp" #include "event_scheduler.hpp" #include "timer.hpp" namespace xios { CService::CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId, int type, int nbPartitions) : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId), partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false) { info(40)<<"CService::CService : new service created ; serviceId : "<lockWindow(localRank,0) ; winNotify_->updateToWindow(localRank, this, &CService::createContextDumpOut) ; winNotify_->unlockWindow(localRank,0) ; MPI_Barrier(serviceComm_) ; if (localRank==localLeader_) { globalLeader_=globalRank ; MPI_Comm_rank(serviceComm_,&commSize) ; CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ; } eventScheduler_ = new CEventScheduler(serviceComm_) ; ostringstream oss; oss<lockWindow(rank,0) ; winNotify_->updateFromWindow(rank, this, &CService::createContextDumpIn) ; notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ; winNotify_->updateToWindow(rank, this, &CService::createContextDumpOut) ; winNotify_->unlockWindow(rank,0) ; } void CService::createContextDumpOut(CBufferOut& buffer) { buffer.realloc(maxBufferSize_) ; buffer << (int) (notifications_.size()); for(auto it=notifications_.begin();it!=notifications_.end(); ++it) buffer << std::get<0>(*it) << std::get<1>(*it) << std::get<2>(*it) << std::get<3>(*it) ; } void CService::createContextDumpIn(CBufferIn& buffer) { std::string poolId ; std::string serviceId ; int partitionId ; std::string contextId ; notifications_.clear() ; int nbNotifications ; buffer>>nbNotifications ; for(int i=0;i>poolId>>serviceId>>partitionId>>contextId ; notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ; } } bool CService::eventLoop(bool serviceOnly) { //checkCreateContextNotification() ; CTimer::get("CService::eventLoop").resume(); int flag ; MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); // double time=MPI_Wtime() ; // if (time-lastEventLoop_ > eventLoopLatency_) // { checkNotifications() ; // lastEventLoop_=time ; // } eventScheduler_->checkEvent() ; for(auto it=contexts_.begin();it!=contexts_.end();++it) { if (it->second->eventLoop(serviceOnly)) { delete it->second ; contexts_.erase(it) ; // destroy server_context -> to do later break ; } ; } CTimer::get("CService::eventLoop").suspend(); if (contexts_.empty() && finalizeSignal_) return true ; else return false ; } void CService::sendNotification(int rank) { winNotify_->lockWindowExclusive(rank) ; winNotify_->pushToLockedWindow(rank, this, &CService::notificationsDumpOut) ; winNotify_->unlockWindow(rank) ; } void CService::notificationsDumpOut(CBufferOut& buffer) { buffer.realloc(maxBufferSize_) ; if (notifyOutType_==NOTIFY_CREATE_CONTEXT) { auto& arg=notifyOutCreateContext_ ; buffer << notifyOutType_ << std::get<0>(arg)<(arg) << std::get<2>(arg)<(arg) ; } } void CService::notificationsDumpIn(CBufferIn& buffer) { if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ; else { buffer>>notifyInType_; if (notifyInType_==NOTIFY_CREATE_CONTEXT) { auto& arg=notifyInCreateContext_ ; buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg); } } } void CService::checkNotifications(void) { if (!hasNotification_) { double time=MPI_Wtime() ; if (time-lastEventLoop_ > eventLoopLatency_) { int commRank ; MPI_Comm_rank(serviceComm_, &commRank) ; winNotify_->lockWindowExclusive(commRank) ; winNotify_->popFromLockedWindow(commRank, this, &CService::notificationsDumpIn) ; winNotify_->unlockWindow(commRank) ; if (notifyInType_!= NOTIFY_NOTHING) { hasNotification_=true ; std::hash hashString ; size_t hashId = hashString(name_) ; size_t currentTimeLine=0 ; info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler"<registerEvent(currentTimeLine,hashId); } lastEventLoop_=time ; } } if (hasNotification_) { std::hash hashString ; size_t hashId = hashString(name_) ; size_t currentTimeLine=0 ; info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<queryEvent(currentTimeLine,hashId)) { eventScheduler_->popEvent() ; info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<lockWindow(commRank,0) ; winNotify_->updateFromWindow(commRank, this, &CService::createContextDumpIn) ; if (!notifications_.empty()) { auto info = notifications_.front() ; createNewContext(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ; notifications_.pop_front() ; winNotify_->updateToWindow(commRank, this, &CService::createContextDumpOut) ; } winNotify_->unlockWindow(commRank,0) ; } void CService::createContext(void) { info(40)<<"CService::createContext(void) : receive createContext notification"<(arg) ; string serviceId = get<1>(arg) ; int partitionId = get<2>(arg) ; string contextId = get<3>(arg) ; contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; } //to remove, not used anymore void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId) { contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; } void CService::finalizeSignal(void) { finalizeSignal_=true ; for(auto it=contexts_.begin();it!=contexts_.end();++it) it->second->finalizeSignal() ; } CEventScheduler* CService::getEventScheduler(void) { return eventScheduler_ ; } }