source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.cpp @ 2260

Last change on this file since 2260 was 2260, checked in by ymipsl, 3 years ago

Improvment of one sided protocol

  • removed latency
  • solve dead-lock

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 8.4 KB
Line 
1#include "services.hpp"
2#include "services_manager.hpp"
3#include "mpi.hpp"
4#include "cxios.hpp"
5#include "server_context.hpp"
6#include "event_scheduler.hpp"
7#include "timer.hpp"
8
9namespace xios
10{
11  CService::CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId, 
12                     int type, int nbPartitions) : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId),
13                                                   partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false)
14
15
16  {
17    info(40)<<"CService::CService  : new service created ; serviceId : "<<serviceId<<endl ;
18   
19    int localRank, globalRank, commSize ;
20
21    MPI_Comm_dup(serviceComm, &serviceComm_) ;
22    MPI_Comm globalComm_=CXios::getXiosComm() ;
23 
24    MPI_Comm_rank(globalComm_,&globalRank) ;
25    MPI_Comm_rank(serviceComm_,&localRank) ;
26   
27    winNotify_ = new CWindowManager(serviceComm_, maxBufferSize_) ;
28    winNotify_->lockWindow(localRank,0) ;
29    winNotify_->updateToWindow(localRank, this, &CService::createContextDumpOut) ;
30    winNotify_->unlockWindow(localRank,0) ;
31    MPI_Barrier(serviceComm_) ;
32    if (localRank==localLeader_) 
33    {
34      globalLeader_=globalRank ;
35      MPI_Comm_rank(serviceComm_,&commSize) ;
36      CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ;
37    }
38    eventScheduler_ = new CEventScheduler(serviceComm_) ;
39
40    ostringstream oss;
41    oss<<partitionId;
42    name_= poolId+"::"+serviceId+"_"+oss.str();
43  }
44
45  void CService::createContext( const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
46  {
47    int commSize ;
48    MPI_Comm_size(serviceComm_, &commSize) ;
49    info(40)<<"CService::createContext  : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ;
50
51    for(int rank=0; rank<commSize; rank++) 
52    {
53      notifyOutType_=NOTIFY_CREATE_CONTEXT ;
54      notifyOutCreateContext_ = make_tuple(poolId, serviceId, partitionId, contextId) ;
55      sendNotification(rank) ;
56    }
57    info(40)<<"CService::createContext  : notify CreateContext to all services members : DONE "<<endl ;
58  }
59/*
60  void CService::createContext(const std::string& contextId)
61  {
62    contexts_[contextId] = new CServerContext(this, serviceComm_, poolId_, serviceId_, partitionId_, contextId) ;
63  }
64*/
65  void CService::createContextNotify(int rank, const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
66  {
67    winNotify_->lockWindow(rank,0) ;
68    winNotify_->updateFromWindow(rank, this, &CService::createContextDumpIn) ;
69    notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
70    winNotify_->updateToWindow(rank, this, &CService::createContextDumpOut) ; 
71    winNotify_->unlockWindow(rank,0) ;   
72  }
73
74
75  void CService::createContextDumpOut(CBufferOut& buffer)
76  {
77    buffer.realloc(maxBufferSize_) ;
78   
79    buffer << (int) (notifications_.size());
80   
81    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
82      buffer << std::get<0>(*it) << std::get<1>(*it) << std::get<2>(*it) << std::get<3>(*it)  ;
83  }
84
85
86  void CService::createContextDumpIn(CBufferIn& buffer)
87  {
88    std::string poolId ;
89    std::string serviceId ;
90    int partitionId ;
91    std::string contextId ;
92   
93    notifications_.clear() ;
94    int nbNotifications ;
95    buffer>>nbNotifications ;
96    for(int i=0;i<nbNotifications;i++) 
97    {
98      buffer>>poolId>>serviceId>>partitionId>>contextId ;
99      notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
100    }
101  }
102
103  bool CService::eventLoop(bool serviceOnly)
104  {
105    //checkCreateContextNotification() ;
106    CTimer::get("CService::eventLoop").resume();
107    int flag ;
108    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
109   
110//    double time=MPI_Wtime() ;
111//    if (time-lastEventLoop_ > eventLoopLatency_)
112//    {
113      checkNotifications() ;
114//      lastEventLoop_=time ;
115//    }
116
117
118    eventScheduler_->checkEvent() ;
119    for(auto it=contexts_.begin();it!=contexts_.end();++it) 
120    {
121      if (it->second->eventLoop(serviceOnly))
122      {
123        contexts_.erase(it) ;
124        // destroy server_context -> to do later
125        break ;
126      } ;
127    }
128    CTimer::get("CService::eventLoop").suspend();
129    if (contexts_.empty() && finalizeSignal_) return true ;
130    else return false ;
131  }
132
133  void CService::sendNotification(int rank)
134  {
135    winNotify_->lockWindowExclusive(rank) ;
136    winNotify_->pushToLockedWindow(rank, this, &CService::notificationsDumpOut) ;
137    winNotify_->unlockWindow(rank) ;
138  }
139
140 
141  void CService::notificationsDumpOut(CBufferOut& buffer)
142  {
143   
144    buffer.realloc(maxBufferSize_) ;
145   
146    if (notifyOutType_==NOTIFY_CREATE_CONTEXT)
147    {
148      auto& arg=notifyOutCreateContext_ ;
149      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) << std::get<2>(arg)<<std::get<3>(arg) ;
150    }
151  }
152
153  void CService::notificationsDumpIn(CBufferIn& buffer)
154  {
155    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
156    else
157    {
158      buffer>>notifyInType_;
159      if (notifyInType_==NOTIFY_CREATE_CONTEXT)
160      {
161        auto& arg=notifyInCreateContext_ ;
162        buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg);
163      }
164    }
165  }
166
167
168
169
170  void CService::checkNotifications(void)
171  {
172    if (!hasNotification_)
173    {
174      double time=MPI_Wtime() ;
175      if (time-lastEventLoop_ > eventLoopLatency_) 
176      {
177        int commRank ;
178        MPI_Comm_rank(serviceComm_, &commRank) ;
179        winNotify_->lockWindowExclusive(commRank) ;
180        winNotify_->popFromLockedWindow(commRank, this, &CService::notificationsDumpIn) ;
181        winNotify_->unlockWindow(commRank) ;
182     
183        if (notifyInType_!= NOTIFY_NOTHING)
184        {
185          hasNotification_=true ;
186          std::hash<string> hashString ;
187          size_t hashId = hashString(name_) ;
188          size_t currentTimeLine=0 ;
189          info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler"<<endl ;
190          eventScheduler_->registerEvent(currentTimeLine,hashId); 
191        }
192        lastEventLoop_=time ;
193      }
194    }
195   
196    if (hasNotification_)
197    {
198      std::hash<string> hashString ;
199      size_t hashId = hashString(name_) ;
200      size_t currentTimeLine=0 ;
201      info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ;
202      if (eventScheduler_->queryEvent(currentTimeLine,hashId))
203      {
204        eventScheduler_->popEvent() ;
205        info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ;
206        if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ;
207        hasNotification_=false ;
208      }
209    }
210  }
211
212
213
214//ym not use any more
215  void CService::checkCreateContextNotification(void)
216  {
217    int commRank ;
218    MPI_Comm_rank(serviceComm_, &commRank) ;
219    winNotify_->lockWindow(commRank,0) ;
220    winNotify_->updateFromWindow(commRank, this, &CService::createContextDumpIn) ;
221   
222    if (!notifications_.empty())
223    {
224      auto info = notifications_.front() ;
225      createNewContext(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ;
226      notifications_.pop_front() ;
227      winNotify_->updateToWindow(commRank, this, &CService::createContextDumpOut) ;     
228    }
229    winNotify_->unlockWindow(commRank,0) ;
230  }
231
232  void CService::createContext(void)
233   {
234     info(40)<<"CService::createContext(void)  : receive createContext notification"<<endl ;
235     auto& arg=notifyInCreateContext_ ;
236     string poolId = get<0>(arg) ;
237     string& serviceId = get<1>(arg) ;
238     int partitionId = get<2>(arg) ;
239     string contextId = get<3>(arg) ;
240     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 
241   }
242
243   //to remove, not used anymore
244   void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
245   {
246     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 
247   }
248
249  void CService::finalizeSignal(void)
250  {
251    finalizeSignal_=true ;
252    for(auto it=contexts_.begin();it!=contexts_.end();++it) it->second->finalizeSignal() ;
253  }
254
255  CEventScheduler* CService::getEventScheduler(void)
256  {
257    return eventScheduler_ ;
258  }
259}
Note: See TracBrowser for help on using the repository browser.