source: XIOS3/trunk/src/manager/services.cpp @ 2547

Last change on this file since 2547 was 2547, checked in by ymipsl, 10 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.8 KB
Line 
1#include "services.hpp"
2#include "services_manager.hpp"
3#include "mpi.hpp"
4#include "cxios.hpp"
5#include "server_context.hpp"
6#include "event_scheduler.hpp"
7#include "thread_manager.hpp"
8#include "timer.hpp"
9
10namespace xios
11{
12  CService::CService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& poolId, const std::string& serviceId, const int& partitionId, 
13                     int type, int nbPartitions) 
14                         : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId),
15                           partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false)
16
17
18  {
19    info(40)<<"CService::CService  : new service created ; serviceId : "<<serviceId<<endl ;
20   
21    int localRank, globalRank, commSize ;
22
23    MPI_Comm_dup(serviceComm, &serviceComm_) ;
24    MPI_Comm globalComm_=CXios::getXiosComm() ;
25 
26    MPI_Comm_rank(globalComm_,&globalRank) ;
27    MPI_Comm_rank(serviceComm_,&localRank) ;
28   
29    winNotify_ = new CWindowManager(serviceComm_, maxBufferSize_) ;
30    winNotify_->updateToExclusiveWindow(localRank, this, &CService::createContextDumpOut) ;
31    MPI_Barrier(serviceComm_) ;
32    if (localRank==localLeader_) 
33    {
34      globalLeader_=globalRank ;
35      MPI_Comm_rank(serviceComm_,&commSize) ;
36      CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ;
37    }
38    if (eventScheduler) eventScheduler_ = eventScheduler ;
39    else eventScheduler_ = make_shared<CEventScheduler>(serviceComm_) ;
40
41    ostringstream oss;
42    oss<<partitionId;
43    name_= poolId+"__"+serviceId+"_"+oss.str();
44   
45    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CService::threadEventLoop, this) ;
46  }
47
48  CService::~CService()
49  {
50    delete winNotify_ ;
51    for(auto& it : contexts_) delete it.second ;
52  }
53
54
55  void CService::createContext( const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
56  {
57    int commSize ;
58    MPI_Comm_size(serviceComm_, &commSize) ;
59    info(40)<<"CService::createContext  : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ;
60
61    for(int rank=0; rank<commSize; rank++) 
62    {
63      notifyOutType_=NOTIFY_CREATE_CONTEXT ;
64      notifyOutCreateContext_ = make_tuple(poolId, serviceId, partitionId, contextId) ;
65      sendNotification(rank) ;
66    }
67    info(40)<<"CService::createContext  : notify CreateContext to all services members : DONE "<<endl ;
68  }
69/*
70  void CService::createContext(const std::string& contextId)
71  {
72    contexts_[contextId] = new CServerContext(this, serviceComm_, poolId_, serviceId_, partitionId_, contextId) ;
73  }
74*/
75  void CService::createContextNotify(int rank, const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
76  {
77    winNotify_->lockWindowExclusive(rank) ;
78    winNotify_->updateFromLockedWindow(rank, this, &CService::createContextDumpIn) ;
79    notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
80    winNotify_->updateToLockedWindow(rank, this, &CService::createContextDumpOut) ; 
81    winNotify_->unlockWindowExclusive(rank) ;   
82  }
83
84
85  void CService::createContextDumpOut(CBufferOut& buffer)
86  {
87    buffer.realloc(maxBufferSize_) ;
88   
89    buffer << (int) (notifications_.size());
90   
91    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
92      buffer << std::get<0>(*it) << std::get<1>(*it) << std::get<2>(*it) << std::get<3>(*it)  ;
93  }
94
95
96  void CService::createContextDumpIn(CBufferIn& buffer)
97  {
98    std::string poolId ;
99    std::string serviceId ;
100    int partitionId ;
101    std::string contextId ;
102   
103    notifications_.clear() ;
104    int nbNotifications ;
105    buffer>>nbNotifications ;
106    for(int i=0;i<nbNotifications;i++) 
107    {
108      buffer>>poolId>>serviceId>>partitionId>>contextId ;
109      notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
110    }
111  }
112
113  bool CService::eventLoop(bool serviceOnly)
114  {
115    //checkCreateContextNotification() ;
116    CTimer::get("CService::eventLoop").resume();
117    int flag ;
118    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
119   
120//    double time=MPI_Wtime() ;
121//    if (time-lastEventLoop_ > eventLoopLatency_)
122//    {
123      checkNotifications() ;
124//      lastEventLoop_=time ;
125//    }
126
127
128    eventScheduler_->checkEvent() ;
129   
130    for(auto it=contexts_.begin();it!=contexts_.end();++it) 
131    {
132      if (it->second->eventLoop(serviceOnly))
133      {
134        delete it->second ; 
135        contexts_.erase(it) ;
136        // destroy server_context -> to do later
137        break ;
138      } ;
139    }
140 
141    CTimer::get("CService::eventLoop").suspend();
142    if (contexts_.empty() && finalizeSignal_) return true ;
143    else return false ;
144  }
145
146  void CService::threadEventLoop(void)
147  {
148    info(100)<<"Launch Thread for  CService::threadEventLoop, service id = "<<name_<<endl ;
149    CThreadManager::threadInitialize() ; 
150   
151    do
152    {
153      CTimer::get("CService::eventLoop").resume();
154      int flag ;
155      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
156   
157//    double time=MPI_Wtime() ;
158//    if (time-lastEventLoop_ > eventLoopLatency_)
159//    {
160      checkNotifications() ;
161//      lastEventLoop_=time ;
162//    }
163
164
165      eventScheduler_->checkEvent() ;
166   
167      for(auto it=contexts_.begin();it!=contexts_.end();++it) 
168      {
169        if (it->second->isFinished())
170        {
171          delete it->second ; 
172          contexts_.erase(it) ;
173          // destroy server_context -> to do later
174          break ;
175        } ;
176      }
177
178      CTimer::get("CService::eventLoop").suspend();
179      if (contexts_.empty() && finalizeSignal_) finished_=true ;
180      if (!finished_) CThreadManager::yield() ;
181    } while (!finished_) ;
182   
183    CThreadManager::threadFinalize() ;
184    info(100)<<"Close thread for  CService::threadEventLoop, service id = "<<name_<<endl ;
185  }
186
187
188  void CService::sendNotification(int rank)
189  {
190    winNotify_->pushToExclusiveWindow(rank, this, &CService::notificationsDumpOut) ;
191  }
192
193 
194  void CService::notificationsDumpOut(CBufferOut& buffer)
195  {
196   
197    buffer.realloc(maxBufferSize_) ;
198   
199    if (notifyOutType_==NOTIFY_CREATE_CONTEXT)
200    {
201      auto& arg=notifyOutCreateContext_ ;
202      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) << std::get<2>(arg)<<std::get<3>(arg) ;
203    }
204  }
205
206  void CService::notificationsDumpIn(CBufferIn& buffer)
207  {
208    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
209    else
210    {
211      buffer>>notifyInType_;
212      if (notifyInType_==NOTIFY_CREATE_CONTEXT)
213      {
214        auto& arg=notifyInCreateContext_ ;
215        buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg);
216      }
217    }
218  }
219
220
221
222
223  void CService::checkNotifications(void)
224  {
225    if (!hasNotification_)
226    {
227      double time=MPI_Wtime() ;
228      if (time-lastEventLoop_ > eventLoopLatency_) 
229      {
230        int commRank ;
231        MPI_Comm_rank(serviceComm_, &commRank) ;
232        winNotify_->popFromExclusiveWindow(commRank, this, &CService::notificationsDumpIn) ;
233       
234        if (notifyInType_!= NOTIFY_NOTHING)
235        {
236          hasNotification_=true ;
237          std::hash<string> hashString ;
238          size_t hashId = hashString(name_) ;
239          size_t currentTimeLine=0 ;
240          info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : timeLine : "<<currentTimeLine<<"  hashId : "<<hashId<<endl ;
241          eventScheduler_->registerEvent(currentTimeLine,hashId); 
242        }
243        lastEventLoop_=time ;
244      }
245    }
246   
247    if (hasNotification_)
248    {
249      std::hash<string> hashString ;
250      size_t hashId = hashString(name_) ;
251      size_t currentTimeLine=0 ;
252//      info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ;
253      if (eventScheduler_->queryEvent(currentTimeLine,hashId))
254      {
255        eventScheduler_->popEvent() ;
256        info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ;
257        if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ;
258        hasNotification_=false ;
259      }
260    }
261  }
262
263
264
265//ym not use any more
266  void CService::checkCreateContextNotification(void)
267  {
268    int commRank ;
269    MPI_Comm_rank(serviceComm_, &commRank) ;
270    winNotify_->lockWindowExclusive(commRank) ;
271    winNotify_->updateFromLockedWindow(commRank, this, &CService::createContextDumpIn) ;
272   
273    if (!notifications_.empty())
274    {
275      auto info = notifications_.front() ;
276      createNewContext(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ;
277      notifications_.pop_front() ;
278      winNotify_->updateToLockedWindow(commRank, this, &CService::createContextDumpOut) ;     
279    }
280    winNotify_->unlockWindowExclusive(commRank) ;
281  }
282
283  void CService::createContext(void)
284   {
285     info(40)<<"CService::createContext(void)  : receive createContext notification"<<endl ;
286     auto& arg=notifyInCreateContext_ ;
287     string poolId = get<0>(arg) ;
288     string serviceId = get<1>(arg) ;
289     int partitionId = get<2>(arg) ;
290     string contextId = get<3>(arg) ;
291     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ;
292   }
293
294   //to remove, not used anymore
295   void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
296   {
297     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 
298   }
299
300  void CService::finalizeSignal(void)
301  {
302    finalizeSignal_=true ;
303    for(auto it=contexts_.begin();it!=contexts_.end();++it) it->second->finalizeSignal() ;
304  }
305
306  shared_ptr<CEventScheduler> CService::getEventScheduler(void)
307  {
308    return eventScheduler_ ;
309  }
310}
Note: See TracBrowser for help on using the repository browser.