source: XIOS3/trunk/src/manager/services.cpp @ 2404

Last change on this file since 2404 was 2404, checked in by ymipsl, 21 months ago

Add the possibility to launch a service on same ressource than an other.
YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 8.7 KB
Line 
1#include "services.hpp"
2#include "services_manager.hpp"
3#include "mpi.hpp"
4#include "cxios.hpp"
5#include "server_context.hpp"
6#include "event_scheduler.hpp"
7#include "timer.hpp"
8
9namespace xios
10{
11  CService::CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId, 
12                     int type, int nbPartitions, shared_ptr<CEventScheduler> eventScheduler) 
13                         : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId),
14                           partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false)
15
16
17  {
18    info(40)<<"CService::CService  : new service created ; serviceId : "<<serviceId<<endl ;
19   
20    int localRank, globalRank, commSize ;
21
22    MPI_Comm_dup(serviceComm, &serviceComm_) ;
23    MPI_Comm globalComm_=CXios::getXiosComm() ;
24 
25    MPI_Comm_rank(globalComm_,&globalRank) ;
26    MPI_Comm_rank(serviceComm_,&localRank) ;
27   
28    winNotify_ = new CWindowManager(serviceComm_, maxBufferSize_) ;
29    winNotify_->lockWindow(localRank,0) ;
30    winNotify_->updateToWindow(localRank, this, &CService::createContextDumpOut) ;
31    winNotify_->unlockWindow(localRank,0) ;
32    MPI_Barrier(serviceComm_) ;
33    if (localRank==localLeader_) 
34    {
35      globalLeader_=globalRank ;
36      MPI_Comm_rank(serviceComm_,&commSize) ;
37      CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ;
38    }
39    if (eventScheduler) eventScheduler_ = eventScheduler ;
40    eventScheduler_ = make_shared<CEventScheduler>(serviceComm_) ;
41
42    ostringstream oss;
43    oss<<partitionId;
44    name_= poolId+"__"+serviceId+"_"+oss.str();
45  }
46
47  CService::~CService()
48  {
49    delete winNotify_ ;
50    for(auto& it : contexts_) delete it.second ;
51  }
52
53
54  void CService::createContext( const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
55  {
56    int commSize ;
57    MPI_Comm_size(serviceComm_, &commSize) ;
58    info(40)<<"CService::createContext  : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ;
59
60    for(int rank=0; rank<commSize; rank++) 
61    {
62      notifyOutType_=NOTIFY_CREATE_CONTEXT ;
63      notifyOutCreateContext_ = make_tuple(poolId, serviceId, partitionId, contextId) ;
64      sendNotification(rank) ;
65    }
66    info(40)<<"CService::createContext  : notify CreateContext to all services members : DONE "<<endl ;
67  }
68/*
69  void CService::createContext(const std::string& contextId)
70  {
71    contexts_[contextId] = new CServerContext(this, serviceComm_, poolId_, serviceId_, partitionId_, contextId) ;
72  }
73*/
74  void CService::createContextNotify(int rank, const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
75  {
76    winNotify_->lockWindow(rank,0) ;
77    winNotify_->updateFromWindow(rank, this, &CService::createContextDumpIn) ;
78    notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
79    winNotify_->updateToWindow(rank, this, &CService::createContextDumpOut) ; 
80    winNotify_->unlockWindow(rank,0) ;   
81  }
82
83
84  void CService::createContextDumpOut(CBufferOut& buffer)
85  {
86    buffer.realloc(maxBufferSize_) ;
87   
88    buffer << (int) (notifications_.size());
89   
90    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
91      buffer << std::get<0>(*it) << std::get<1>(*it) << std::get<2>(*it) << std::get<3>(*it)  ;
92  }
93
94
95  void CService::createContextDumpIn(CBufferIn& buffer)
96  {
97    std::string poolId ;
98    std::string serviceId ;
99    int partitionId ;
100    std::string contextId ;
101   
102    notifications_.clear() ;
103    int nbNotifications ;
104    buffer>>nbNotifications ;
105    for(int i=0;i<nbNotifications;i++) 
106    {
107      buffer>>poolId>>serviceId>>partitionId>>contextId ;
108      notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
109    }
110  }
111
112  bool CService::eventLoop(bool serviceOnly)
113  {
114    //checkCreateContextNotification() ;
115    CTimer::get("CService::eventLoop").resume();
116    int flag ;
117    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
118   
119//    double time=MPI_Wtime() ;
120//    if (time-lastEventLoop_ > eventLoopLatency_)
121//    {
122      checkNotifications() ;
123//      lastEventLoop_=time ;
124//    }
125
126
127    eventScheduler_->checkEvent() ;
128    for(auto it=contexts_.begin();it!=contexts_.end();++it) 
129    {
130      if (it->second->eventLoop(serviceOnly))
131      {
132        delete it->second ; 
133        contexts_.erase(it) ;
134        // destroy server_context -> to do later
135        break ;
136      } ;
137    }
138    CTimer::get("CService::eventLoop").suspend();
139    if (contexts_.empty() && finalizeSignal_) return true ;
140    else return false ;
141  }
142
143  void CService::sendNotification(int rank)
144  {
145    winNotify_->lockWindowExclusive(rank) ;
146    winNotify_->pushToLockedWindow(rank, this, &CService::notificationsDumpOut) ;
147    winNotify_->unlockWindow(rank) ;
148  }
149
150 
151  void CService::notificationsDumpOut(CBufferOut& buffer)
152  {
153   
154    buffer.realloc(maxBufferSize_) ;
155   
156    if (notifyOutType_==NOTIFY_CREATE_CONTEXT)
157    {
158      auto& arg=notifyOutCreateContext_ ;
159      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) << std::get<2>(arg)<<std::get<3>(arg) ;
160    }
161  }
162
163  void CService::notificationsDumpIn(CBufferIn& buffer)
164  {
165    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
166    else
167    {
168      buffer>>notifyInType_;
169      if (notifyInType_==NOTIFY_CREATE_CONTEXT)
170      {
171        auto& arg=notifyInCreateContext_ ;
172        buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg);
173      }
174    }
175  }
176
177
178
179
180  void CService::checkNotifications(void)
181  {
182    if (!hasNotification_)
183    {
184      double time=MPI_Wtime() ;
185      if (time-lastEventLoop_ > eventLoopLatency_) 
186      {
187        int commRank ;
188        MPI_Comm_rank(serviceComm_, &commRank) ;
189        winNotify_->lockWindowExclusive(commRank) ;
190        winNotify_->popFromLockedWindow(commRank, this, &CService::notificationsDumpIn) ;
191        winNotify_->unlockWindow(commRank) ;
192     
193        if (notifyInType_!= NOTIFY_NOTHING)
194        {
195          hasNotification_=true ;
196          std::hash<string> hashString ;
197          size_t hashId = hashString(name_) ;
198          size_t currentTimeLine=0 ;
199          info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler"<<endl ;
200          eventScheduler_->registerEvent(currentTimeLine,hashId); 
201        }
202        lastEventLoop_=time ;
203      }
204    }
205   
206    if (hasNotification_)
207    {
208      std::hash<string> hashString ;
209      size_t hashId = hashString(name_) ;
210      size_t currentTimeLine=0 ;
211      info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ;
212      if (eventScheduler_->queryEvent(currentTimeLine,hashId))
213      {
214        eventScheduler_->popEvent() ;
215        info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ;
216        if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ;
217        hasNotification_=false ;
218      }
219    }
220  }
221
222
223
224//ym not use any more
225  void CService::checkCreateContextNotification(void)
226  {
227    int commRank ;
228    MPI_Comm_rank(serviceComm_, &commRank) ;
229    winNotify_->lockWindow(commRank,0) ;
230    winNotify_->updateFromWindow(commRank, this, &CService::createContextDumpIn) ;
231   
232    if (!notifications_.empty())
233    {
234      auto info = notifications_.front() ;
235      createNewContext(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ;
236      notifications_.pop_front() ;
237      winNotify_->updateToWindow(commRank, this, &CService::createContextDumpOut) ;     
238    }
239    winNotify_->unlockWindow(commRank,0) ;
240  }
241
242  void CService::createContext(void)
243   {
244     info(40)<<"CService::createContext(void)  : receive createContext notification"<<endl ;
245     auto& arg=notifyInCreateContext_ ;
246     string poolId = get<0>(arg) ;
247     string serviceId = get<1>(arg) ;
248     int partitionId = get<2>(arg) ;
249     string contextId = get<3>(arg) ;
250     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ;
251   }
252
253   //to remove, not used anymore
254   void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
255   {
256     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 
257   }
258
259  void CService::finalizeSignal(void)
260  {
261    finalizeSignal_=true ;
262    for(auto it=contexts_.begin();it!=contexts_.end();++it) it->second->finalizeSignal() ;
263  }
264
265  shared_ptr<CEventScheduler> CService::getEventScheduler(void)
266  {
267    return eventScheduler_ ;
268  }
269}
Note: See TracBrowser for help on using the repository browser.