source: XIOS3/trunk/src/manager/pool_ressource.cpp @ 2517

Last change on this file since 2517 was 2517, checked in by ymipsl, 13 months ago

New way to manage locks in window manager. Windows is locked with MPI_Win_lock_all at creation (shared mode), and lock is manage by software way in the class (using MPI_swap_and_compare and MPI_Fetch_op). We get in this case a better control of lock, with controled latency between each attemp of lock.

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.3 KB
Line 
1#include "pool_ressource.hpp"
2#include "services.hpp"
3#include "buffer_in.hpp"
4#include "buffer_out.hpp"
5#include "message.hpp"
6#include "type.hpp"
7#include "cxios.hpp"
8#include "timer.hpp"
9
10namespace xios
11{
12  CPoolRessource::CPoolRessource(MPI_Comm poolComm, const std::string& Id, bool isServer) : Id_(Id), finalizeSignal_(false)
13  {
14    int commRank, commSize ;
15    MPI_Comm_dup(poolComm, &poolComm_) ;
16    winNotify_ = new CWindowManager(poolComm_, maxBufferSize_) ;
17    MPI_Comm_rank(poolComm, &commRank) ;
18    MPI_Comm_size(poolComm, &commSize) ;
19    info(40)<<"CPoolRessource::CPoolRessource  : creating new pool : "<<Id<<endl ;
20    if (commRank==localLeader_)
21    {
22      for(int i=0; i<commSize;i++) occupancy_.insert(std::pair<char,int>(0,i)) ; 
23      int globalLeaderRank ;
24      MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ;
25      if (isServer) CXios::getRessourcesManager()->registerPoolServer(Id, commSize, globalLeaderRank) ;
26    }
27   
28    notifyType_=NOTIFY_NOTHING;
29    winNotify_->updateToExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ;
30    MPI_Barrier(poolComm_) ;
31  }
32
33  void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)
34  {
35    // for now suppose nbPartitions=1
36   
37    auto it=occupancy_.begin() ;
38    int commSize ;
39    MPI_Comm_size(poolComm_, &commSize) ;
40    vector<bool> procs_in(commSize,false) ;
41    vector<pair<int,int>> procs_update ;
42
43    for(int i=0; i<size; i++) 
44    {
45      procs_in[it->second]=true ;
46      procs_update.push_back(std::pair<int,int>(it->first+1,it->second)) ;
47      ++it ;
48    }
49   
50    occupancy_.erase(occupancy_.begin(),it) ;
51    occupancy_.insert(procs_update.begin(),procs_update.end()) ;
52   
53    info(40)<<"CPoolRessource::createService  : notify createService to all pool members ; serviceId : "<<serviceId<<endl ;
54    for(int rank=0; rank<commSize; rank++)
55    {
56      if (procs_in[rank]) createServiceNotify(rank, serviceId, type, size, nbPartitions, true) ;
57      else createServiceNotify(rank, serviceId, type, size, nbPartitions, false) ;
58    }
59  }
60
61  void CPoolRessource::createServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
62  {
63    // for now suppose nbPartitions=1
64   
65    auto it=occupancy_.begin() ;
66    int commSize ;
67    MPI_Comm_size(poolComm_, &commSize) ;
68   
69    info(40)<<"CPoolRessource::createService  : notify createServiceOnto to all pool members ; serviceId : "<<serviceId
70            <<"  onto service Id  :"<< serviceId<<endl ;
71    for(int rank=0; rank<commSize; rank++) createServiceOntoNotify(rank, serviceId, type, onServiceId) ;
72  }
73
74/* 
75  void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions,
76                                           bool in)
77  {
78    winNotify_->lockWindow(rank,0) ;
79    winNotify_->updateFromWindow(rank, this, &CPoolRessource::createServiceDumpIn) ;
80    notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
81    winNotify_->updateToWindow(rank, this, &CPoolRessource::createServiceDumpOut) ; 
82    winNotify_->unlockWindow(rank,0) ;   
83  }
84*/
85 
86  void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in)
87  {
88    notifyType_=NOTIFY_CREATE_SERVICE ;
89    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ;
90    sendNotification(rank) ;
91  }
92
93
94  void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId)
95  {
96    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ;
97    notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ;
98    sendNotification(rank) ;
99  }
100
101
102  void CPoolRessource::sendNotification(int rank)
103  {
104    winNotify_->pushToExclusiveWindow(rank, this, &CPoolRessource::notificationsDumpOut) ;
105  }
106
107  void CPoolRessource::checkNotifications(void)
108  {
109    int commRank ;
110    MPI_Comm_rank(poolComm_, &commRank) ;
111    winNotify_->popFromExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ;
112    if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ;
113    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ;
114  }
115
116
117  void CPoolRessource::notificationsDumpOut(CBufferOut& buffer)
118  {
119
120    buffer.realloc(maxBufferSize_) ;
121   
122    if (notifyType_==NOTIFY_CREATE_SERVICE)
123    {
124      auto& arg=notifyCreateService_ ;
125      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg);
126    }
127    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
128    {
129      auto& arg=notifyCreateServiceOnto_ ;
130      buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg)  ;
131    }
132  }
133
134  void CPoolRessource::notificationsDumpIn(CBufferIn& buffer)
135  {
136    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
137    else
138    {
139      buffer>>notifyType_;
140      if (notifyType_==NOTIFY_CREATE_SERVICE)
141      {
142        auto& arg=notifyCreateService_ ;
143        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ;
144      }
145      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
146      {
147        auto& arg=notifyCreateServiceOnto_ ;
148        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ;
149      }
150    }
151  } 
152
153  void CPoolRessource::createService(void)
154  {
155    auto& arg = notifyCreateService_ ;
156    createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ;
157  }
158 
159  void CPoolRessource::createServiceOnto(void)
160  {
161    auto& arg = notifyCreateServiceOnto_ ;
162    createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ;
163  }
164
165/*
166  void CPoolRessource::createServiceDumpOut(CBufferOut& buffer)
167  {
168    buffer.realloc(maxBufferSize_) ;
169   
170    buffer << (int) (notifications_.size());
171   
172    for(auto it=notifications_.begin();it!=notifications_.end(); ++it)
173      buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it)<< std::get<3>(*it) << std::get<4>(*it)  ;
174  }
175
176*/
177
178/*
179  void CPoolRessource::createServiceDumpIn(CBufferIn& buffer)
180  {
181    std::string serviceId ;
182    int type ;
183    int size;
184    int nbPartitions;
185    bool in ;
186
187    notifications_.clear() ;
188    int nbNotifications ;
189    buffer>>nbNotifications ;
190    for(int i=0;i<nbNotifications;i++)
191    {
192      buffer>>serviceId>>type>>size>>nbPartitions>>in ;
193      notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
194    }
195  }
196*/
197
198  bool CPoolRessource::eventLoop(bool serviceOnly)
199  {
200    CTimer::get("CPoolRessource::eventLoop").resume();
201   
202    double time=MPI_Wtime() ;
203    int flag ;
204    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
205    if (time-lastEventLoop_ > eventLoopLatency_) 
206    {
207      //checkCreateServiceNotification() ;
208      checkNotifications() ;
209      lastEventLoop_=time ;
210    }
211   
212    for (auto it=services_.begin(); it!=services_.end() ; ++it) 
213    {
214      if (it->second->eventLoop(serviceOnly))
215      {
216        delete it->second ;
217        services_.erase(it) ;
218        // don't forget to free service later
219        break ;
220      }
221    }
222    CTimer::get("CPoolRessource::eventLoop").suspend();
223    if (services_.empty() && finalizeSignal_) return true ;
224    else return false ;
225  }
226/*
227  void CPoolRessource::checkCreateServiceNotification(void)
228  {
229    int commRank ;
230    MPI_Comm_rank(poolComm_, &commRank) ;
231    winNotify_->lockWindow(commRank,0) ;
232    winNotify_->updateFromWindow(commRank, this, &CPoolRessource::createServiceDumpIn) ;
233   
234    if (!notifications_.empty())
235    {
236      auto info = notifications_.front() ;
237      createNewService(get<0>(info), get<1>(info), get<2>(info), get<3>(info), get<4>(info)) ;
238      notifications_.pop_front() ;
239      winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ;     
240    }
241    winNotify_->unlockWindow(commRank,0) ;
242
243  }
244*/
245
246  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in)
247  {
248     
249     info(40)<<"CPoolRessource::createNewService  : receive createService notification ; serviceId : "<<serviceId<<endl ;
250     MPI_Comm serviceComm, newServiceComm ;
251     int commRank ;
252     MPI_Comm_rank(poolComm_,&commRank) ;
253     MPI_Comm_split(poolComm_, in, commRank, &serviceComm) ;
254     if (in)
255     {
256       int serviceCommSize ;
257       int serviceCommRank ;
258       MPI_Comm_size(serviceComm,&serviceCommSize) ;
259       MPI_Comm_rank(serviceComm,&serviceCommRank) ;
260
261       info(10)<<"Service  "<<serviceId<<" created "<<"  service size : "<<serviceCommSize<< "   service rank : "<<serviceCommRank
262                            <<" on rank pool "<<commRank<<endl ;
263       
264       int partitionId ; 
265       if ( serviceCommRank >= (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) )
266       {
267         int rank =  serviceCommRank - (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) ;
268         partitionId = serviceCommSize%nbPartitions +  rank / (serviceCommSize/nbPartitions) ;
269       }
270       else  partitionId = serviceCommRank / (serviceCommSize/nbPartitions + 1) ;
271
272       MPI_Comm_split(serviceComm, partitionId, commRank, &newServiceComm) ;
273       
274       MPI_Comm_size(newServiceComm,&serviceCommSize) ;
275       MPI_Comm_rank(newServiceComm,&serviceCommRank) ;
276       info(10)<<"Service  "<<serviceId<<" created "<<"  partition : " <<partitionId<<" service size : "<<serviceCommSize
277               << " service rank : "<<serviceCommRank <<" on rank pool "<<commRank<<endl ;
278     
279       services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, Id_, serviceId, partitionId, type, nbPartitions) ;
280       
281       MPI_Comm_free(&newServiceComm) ;
282     }
283     MPI_Comm_free(&serviceComm) ;
284  }
285 
286  void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
287  {
288     
289    info(40)<<"CPoolRessource::createNewServiceOnto  : receive createServiceOnto notification ; serviceId : "
290            <<serviceId<<"  ontoServiceId : "<<onServiceId<<endl ;
291    for(auto& service : services_) 
292    {
293      if (std::get<0>(service.first)==onServiceId)
294      {
295        const MPI_Comm& serviceComm = service.second->getCommunicator() ;
296        MPI_Comm newServiceComm ;
297        MPI_Comm_dup(serviceComm, &newServiceComm) ;
298        int nbPartitions = service.second->getNbPartitions() ;
299        int partitionId = service.second->getPartitionId() ;
300        shared_ptr<CEventScheduler>  eventScheduler = service.second->getEventScheduler() ;
301        info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "<<onServiceId<<endl  ;
302        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, Id_, serviceId, partitionId, type,
303                                                                         nbPartitions, eventScheduler) ;       
304      }
305    }
306   
307  }
308
309  void CPoolRessource::createService(MPI_Comm serviceComm, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached
310  {
311    services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, Id_, serviceId, partitionId, type, nbPartitions) ;
312  }
313
314
315  void CPoolRessource::finalizeSignal(void)
316  {
317    finalizeSignal_=true ;
318    for (auto it=services_.begin(); it!=services_.end() ; ++it) it->second->finalizeSignal() ;
319  } 
320 
321  CPoolRessource::~CPoolRessource()
322  {
323    delete winNotify_ ;
324    for(auto& service : services_) delete service.second ;
325  }
326}
Note: See TracBrowser for help on using the repository browser.