#include "pool_ressource.hpp" #include "services.hpp" #include "buffer_in.hpp" #include "buffer_out.hpp" #include "message.hpp" #include "type.hpp" #include "cxios.hpp" #include "timer.hpp" #include "event_scheduler.hpp" #include "thread_manager.hpp" namespace xios { extern CLogType logTimers ; CPoolRessource::CPoolRessource(MPI_Comm poolComm, shared_ptr eventScheduler, const std::string& Id, bool isServer) : Id_(Id), finalizeSignal_(false) { int commRank, commSize ; xios::MPI_Comm_dup(poolComm, &poolComm_) ; CXios::getMpiGarbageCollector().registerCommunicator(poolComm_) ; winNotify_ = new CWindowManager(poolComm_, maxBufferSize_,"CPoolRessource::winNotify_") ; MPI_Comm_rank(poolComm, &commRank) ; MPI_Comm_size(poolComm, &commSize) ; info(40)<<"CPoolRessource::CPoolRessource : creating new pool : "<(0,i)) ; int globalLeaderRank ; MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ; if (isServer) CXios::getRessourcesManager()->registerPoolServer(Id, commSize, globalLeaderRank) ; } notifyType_=NOTIFY_NOTHING; winNotify_->updateToExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ; MPI_Barrier(poolComm_) ; if (eventScheduler) eventScheduler_=eventScheduler ; else eventScheduler_= make_shared(poolComm) ; freeRessourceEventScheduler_ = eventScheduler_ ; std::hash hashString ; hashId_ = hashString("CPoolRessource::"+Id) ; if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CPoolRessource::threadEventLoop, this) ; } void CPoolRessource::synchronize(void) { bool out=false ; size_t timeLine=0 ; eventScheduler_->registerEvent(timeLine, hashId_) ; while (!out) { CThreadManager::yield() ; out = eventScheduler_->queryEvent(timeLine,hashId_) ; if (out) eventScheduler_->popEvent() ; } } void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions) { // for now suppose nbPartitions=1 auto it=occupancy_.begin() ; // ym obsolete, service cannot overlap, only created on separate ressource or matching excatly existing service // occupancy management must not be used anymore => simplification // for now raise a message error when no ressources are availables int commSize ; MPI_Comm_size(poolComm_, &commSize) ; vector procs_in(commSize,false) ; vector> procs_update ; for(int i=0; ifirst != 0) ERROR("void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)", << "No enough free ressources on pool id="<lockWindow(rank,0) ; winNotify_->updateFromWindow(rank, this, &CPoolRessource::createServiceDumpIn) ; notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ; winNotify_->updateToWindow(rank, this, &CPoolRessource::createServiceDumpOut) ; winNotify_->unlockWindow(rank,0) ; } */ void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in) { notifyType_=NOTIFY_CREATE_SERVICE ; notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ; sendNotification(rank) ; } void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId) { notifyType_=NOTIFY_CREATE_SERVICE_ONTO ; notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ; sendNotification(rank) ; } void CPoolRessource::sendNotification(int rank) { winNotify_->pushToExclusiveWindow(rank, this, &CPoolRessource::notificationsDumpOut) ; } void CPoolRessource::checkNotifications(void) { int commRank ; MPI_Comm_rank(poolComm_, &commRank) ; winNotify_->popFromExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ; if (notifyType_==NOTIFY_CREATE_SERVICE) { if (CThreadManager::isUsingThreads()) synchronize() ; createService() ; } else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) { if (CThreadManager::isUsingThreads()) synchronize() ; createServiceOnto() ; } } void CPoolRessource::notificationsDumpOut(CBufferOut& buffer) { buffer.realloc(maxBufferSize_) ; if (notifyType_==NOTIFY_CREATE_SERVICE) { auto& arg=notifyCreateService_ ; buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg); } else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) { auto& arg=notifyCreateServiceOnto_ ; buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg) ; } } void CPoolRessource::notificationsDumpIn(CBufferIn& buffer) { if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ; else { buffer>>notifyType_; if (notifyType_==NOTIFY_CREATE_SERVICE) { auto& arg=notifyCreateService_ ; buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ; } else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) { auto& arg=notifyCreateServiceOnto_ ; buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ; } } } void CPoolRessource::createService(void) { auto& arg = notifyCreateService_ ; createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ; } void CPoolRessource::createServiceOnto(void) { auto& arg = notifyCreateServiceOnto_ ; createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ; } /* void CPoolRessource::createServiceDumpOut(CBufferOut& buffer) { buffer.realloc(maxBufferSize_) ; buffer << (int) (notifications_.size()); for(auto it=notifications_.begin();it!=notifications_.end(); ++it) buffer << std::get<0>(*it) << static_cast(std::get<1>(*it))<< std::get<2>(*it)<< std::get<3>(*it) << std::get<4>(*it) ; } */ /* void CPoolRessource::createServiceDumpIn(CBufferIn& buffer) { std::string serviceId ; int type ; int size; int nbPartitions; bool in ; notifications_.clear() ; int nbNotifications ; buffer>>nbNotifications ; for(int i=0;i>serviceId>>type>>size>>nbPartitions>>in ; notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ; } } */ bool CPoolRessource::eventLoop(bool serviceOnly) { if (info.isActive(logTimers)) CTimer::get("CPoolRessource::eventLoop").resume(); double time=MPI_Wtime() ; int flag ; MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); if (time-lastEventLoop_ > eventLoopLatency_) { //checkCreateServiceNotification() ; checkNotifications() ; lastEventLoop_=time ; } for (auto it=services_.begin(); it!=services_.end() ; ++it) { if (it->second->eventLoop(serviceOnly)) { delete it->second ; services_.erase(it) ; // don't forget to free service later break ; } } if (info.isActive(logTimers)) CTimer::get("CPoolRessource::eventLoop").suspend(); if (services_.empty() && finalizeSignal_) finished_=true ; return finished_ ; } void CPoolRessource::threadEventLoop(void) { if (info.isActive(logTimers)) CTimer::get("CPoolRessource::eventLoop").resume(); info(100)<<"Launch Thread for CPoolRessource::threadEventLoop, pool id = "< eventLoopLatency_) { //checkCreateServiceNotification() ; checkNotifications() ; lastEventLoop_=time ; } for(auto it=services_.begin();it!=services_.end();++it) { if (it->second->isFinished()) { delete it->second ; services_.erase(it) ; // destroy server_context -> to do later break ; } ; } if (services_.empty() && finalizeSignal_) finished_=true ; if (!finished_) CThreadManager::yield() ; } while (!finished_) ; CThreadManager::threadFinalize() ; if (info.isActive(logTimers)) CTimer::get("CPoolRessource::eventLoop").suspend(); info(100)<<"Close thread for CPoolRessource::threadEventLoop, pool id = "<lockWindow(commRank,0) ; winNotify_->updateFromWindow(commRank, this, &CPoolRessource::createServiceDumpIn) ; if (!notifications_.empty()) { auto info = notifications_.front() ; createNewService(get<0>(info), get<1>(info), get<2>(info), get<3>(info), get<4>(info)) ; notifications_.pop_front() ; winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ; } winNotify_->unlockWindow(commRank,0) ; } */ void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) { info(40)<<"CPoolRessource::createNewService : receive createService notification ; serviceId : "<= (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) ) { int rank = serviceCommRank - (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) ; partitionId = serviceCommSize%nbPartitions + rank / (serviceCommSize/nbPartitions) ; } else partitionId = serviceCommRank / (serviceCommSize/nbPartitions + 1) ; xios::MPI_Comm_split(serviceComm, partitionId, commRank, &newServiceComm) ; MPI_Comm_size(newServiceComm,&serviceCommSize) ; MPI_Comm_rank(newServiceComm,&serviceCommRank) ; info(10)<<"Service "< parentScheduler, childScheduler ; freeRessourceEventScheduler_->splitScheduler(newServiceComm, parentScheduler, childScheduler) ; if (isFirstSplit_) eventScheduler_ = parentScheduler ; isFirstSplit_=false ; services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, childScheduler, Id_, serviceId, partitionId, type, nbPartitions) ; xios::MPI_Comm_free(&newServiceComm) ; } else { shared_ptr parentScheduler, childScheduler ; freeRessourceEventScheduler_->splitScheduler(serviceComm, parentScheduler, childScheduler) ; if (isFirstSplit_) eventScheduler_ = parentScheduler ; freeRessourceEventScheduler_ = childScheduler ; isFirstSplit_=false ; } xios::MPI_Comm_free(&serviceComm) ; } xios::MPI_Comm_free(&freeComm) ; } void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId) { info(40)<<"CPoolRessource::createNewServiceOnto : receive createServiceOnto notification ; serviceId : " <(service.first)==onServiceId) { const MPI_Comm& serviceComm = service.second->getCommunicator() ; MPI_Comm newServiceComm ; xios::MPI_Comm_dup(serviceComm, &newServiceComm) ; CXios::getMpiGarbageCollector().registerCommunicator(newServiceComm) ; int nbPartitions = service.second->getNbPartitions() ; int partitionId = service.second->getPartitionId() ; shared_ptr eventScheduler = service.second->getEventScheduler() ; info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "< eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients { services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, eventScheduler, Id_, serviceId, partitionId, type, nbPartitions) ; } void CPoolRessource::finalizeSignal(void) { finalizeSignal_=true ; for (auto it=services_.begin(); it!=services_.end() ; ++it) it->second->finalizeSignal() ; } CPoolRessource::~CPoolRessource() { delete winNotify_ ; for(auto& service : services_) delete service.second ; } }