source: XIOS3/trunk/src/manager/pool_ressource.cpp @ 2494

Last change on this file since 2494 was 2494, checked in by jderouillat, 14 months ago

Backporting 2489 and 2490 (+ wait in getServiceInfo) from XIOS_ATTACHED + initialization of PoolRessource::notifyType_

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.5 KB
Line 
1#include "pool_ressource.hpp"
2#include "services.hpp"
3#include "buffer_in.hpp"
4#include "buffer_out.hpp"
5#include "message.hpp"
6#include "type.hpp"
7#include "cxios.hpp"
8#include "timer.hpp"
9
10namespace xios
11{
12  CPoolRessource::CPoolRessource(MPI_Comm poolComm, const std::string& Id, bool isServer) : Id_(Id), finalizeSignal_(false)
13  {
14    int commRank, commSize ;
15    MPI_Comm_dup(poolComm, &poolComm_) ;
16    winNotify_ = new CWindowManager(poolComm_, maxBufferSize_) ;
17    MPI_Comm_rank(poolComm, &commRank) ;
18    MPI_Comm_size(poolComm, &commSize) ;
19    info(40)<<"CPoolRessource::CPoolRessource  : creating new pool : "<<Id<<endl ;
20    if (commRank==localLeader_)
21    {
22      for(int i=0; i<commSize;i++) occupancy_.insert(std::pair<char,int>(0,i)) ; 
23      int globalLeaderRank ;
24      MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ;
25      if (isServer) CXios::getRessourcesManager()->registerPoolServer(Id, commSize, globalLeaderRank) ;
26    }
27   
28    notifyType_=NOTIFY_NOTHING;
29    winNotify_->lockWindow(commRank,0) ;
30    winNotify_->updateToWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ;
31    winNotify_->unlockWindow(commRank,0) ;       
32    MPI_Barrier(poolComm_) ;
33  }
34
35  void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)
36  {
37    // for now suppose nbPartitions=1
38   
39    auto it=occupancy_.begin() ;
40    int commSize ;
41    MPI_Comm_size(poolComm_, &commSize) ;
42    vector<bool> procs_in(commSize,false) ;
43    vector<pair<int,int>> procs_update ;
44
45    for(int i=0; i<size; i++) 
46    {
47      procs_in[it->second]=true ;
48      procs_update.push_back(std::pair<int,int>(it->first+1,it->second)) ;
49      ++it ;
50    }
51   
52    occupancy_.erase(occupancy_.begin(),it) ;
53    occupancy_.insert(procs_update.begin(),procs_update.end()) ;
54   
55    info(40)<<"CPoolRessource::createService  : notify createService to all pool members ; serviceId : "<<serviceId<<endl ;
56    for(int rank=0; rank<commSize; rank++)
57    {
58      if (procs_in[rank]) createServiceNotify(rank, serviceId, type, size, nbPartitions, true) ;
59      else createServiceNotify(rank, serviceId, type, size, nbPartitions, false) ;
60    }
61  }
62
63  void CPoolRessource::createServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
64  {
65    // for now suppose nbPartitions=1
66   
67    auto it=occupancy_.begin() ;
68    int commSize ;
69    MPI_Comm_size(poolComm_, &commSize) ;
70   
71    info(40)<<"CPoolRessource::createService  : notify createServiceOnto to all pool members ; serviceId : "<<serviceId
72            <<"  onto service Id  :"<< serviceId<<endl ;
73    for(int rank=0; rank<commSize; rank++) createServiceOntoNotify(rank, serviceId, type, onServiceId) ;
74  }
75
76/* 
77  void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions,
78                                           bool in)
79  {
80    winNotify_->lockWindow(rank,0) ;
81    winNotify_->updateFromWindow(rank, this, &CPoolRessource::createServiceDumpIn) ;
82    notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
83    winNotify_->updateToWindow(rank, this, &CPoolRessource::createServiceDumpOut) ; 
84    winNotify_->unlockWindow(rank,0) ;   
85  }
86*/
87 
88  void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in)
89  {
90    notifyType_=NOTIFY_CREATE_SERVICE ;
91    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ;
92    sendNotification(rank) ;
93  }
94
95
96  void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId)
97  {
98    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ;
99    notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ;
100    sendNotification(rank) ;
101  }
102
103
104  void CPoolRessource::sendNotification(int rank)
105  {
106    winNotify_->lockWindowExclusive(rank) ;
107    winNotify_->pushToLockedWindow(rank, this, &CPoolRessource::notificationsDumpOut) ;
108    winNotify_->unlockWindow(rank) ;
109  }
110
111  void CPoolRessource::checkNotifications(void)
112  {
113    int commRank ;
114    MPI_Comm_rank(poolComm_, &commRank) ;
115    winNotify_->lockWindowExclusive(commRank) ;
116    winNotify_->popFromLockedWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ;
117    winNotify_->unlockWindow(commRank) ;
118    if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ;
119    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ;
120  }
121
122
123  void CPoolRessource::notificationsDumpOut(CBufferOut& buffer)
124  {
125
126    buffer.realloc(maxBufferSize_) ;
127   
128    if (notifyType_==NOTIFY_CREATE_SERVICE)
129    {
130      auto& arg=notifyCreateService_ ;
131      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg);
132    }
133    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
134    {
135      auto& arg=notifyCreateServiceOnto_ ;
136      buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg)  ;
137    }
138  }
139
140  void CPoolRessource::notificationsDumpIn(CBufferIn& buffer)
141  {
142    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
143    else
144    {
145      buffer>>notifyType_;
146      if (notifyType_==NOTIFY_CREATE_SERVICE)
147      {
148        auto& arg=notifyCreateService_ ;
149        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ;
150      }
151      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
152      {
153        auto& arg=notifyCreateServiceOnto_ ;
154        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ;
155      }
156    }
157  } 
158
159  void CPoolRessource::createService(void)
160  {
161    auto& arg = notifyCreateService_ ;
162    createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ;
163  }
164 
165  void CPoolRessource::createServiceOnto(void)
166  {
167    auto& arg = notifyCreateServiceOnto_ ;
168    createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ;
169  }
170
171/*
172  void CPoolRessource::createServiceDumpOut(CBufferOut& buffer)
173  {
174    buffer.realloc(maxBufferSize_) ;
175   
176    buffer << (int) (notifications_.size());
177   
178    for(auto it=notifications_.begin();it!=notifications_.end(); ++it)
179      buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it)<< std::get<3>(*it) << std::get<4>(*it)  ;
180  }
181
182*/
183
184/*
185  void CPoolRessource::createServiceDumpIn(CBufferIn& buffer)
186  {
187    std::string serviceId ;
188    int type ;
189    int size;
190    int nbPartitions;
191    bool in ;
192
193    notifications_.clear() ;
194    int nbNotifications ;
195    buffer>>nbNotifications ;
196    for(int i=0;i<nbNotifications;i++)
197    {
198      buffer>>serviceId>>type>>size>>nbPartitions>>in ;
199      notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
200    }
201  }
202*/
203
204  bool CPoolRessource::eventLoop(bool serviceOnly)
205  {
206    CTimer::get("CPoolRessource::eventLoop").resume();
207   
208    double time=MPI_Wtime() ;
209    int flag ;
210    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
211    if (time-lastEventLoop_ > eventLoopLatency_) 
212    {
213      //checkCreateServiceNotification() ;
214      checkNotifications() ;
215      lastEventLoop_=time ;
216    }
217   
218    for (auto it=services_.begin(); it!=services_.end() ; ++it) 
219    {
220      if (it->second->eventLoop(serviceOnly))
221      {
222        delete it->second ;
223        services_.erase(it) ;
224        // don't forget to free service later
225        break ;
226      }
227    }
228    CTimer::get("CPoolRessource::eventLoop").suspend();
229    if (services_.empty() && finalizeSignal_) return true ;
230    else return false ;
231  }
232/*
233  void CPoolRessource::checkCreateServiceNotification(void)
234  {
235    int commRank ;
236    MPI_Comm_rank(poolComm_, &commRank) ;
237    winNotify_->lockWindow(commRank,0) ;
238    winNotify_->updateFromWindow(commRank, this, &CPoolRessource::createServiceDumpIn) ;
239   
240    if (!notifications_.empty())
241    {
242      auto info = notifications_.front() ;
243      createNewService(get<0>(info), get<1>(info), get<2>(info), get<3>(info), get<4>(info)) ;
244      notifications_.pop_front() ;
245      winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ;     
246    }
247    winNotify_->unlockWindow(commRank,0) ;
248
249  }
250*/
251
252  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in)
253  {
254     
255     info(40)<<"CPoolRessource::createNewService  : receive createService notification ; serviceId : "<<serviceId<<endl ;
256     MPI_Comm serviceComm, newServiceComm ;
257     int commRank ;
258     MPI_Comm_rank(poolComm_,&commRank) ;
259     MPI_Comm_split(poolComm_, in, commRank, &serviceComm) ;
260     if (in)
261     {
262       int serviceCommSize ;
263       int serviceCommRank ;
264       MPI_Comm_size(serviceComm,&serviceCommSize) ;
265       MPI_Comm_rank(serviceComm,&serviceCommRank) ;
266
267       info(10)<<"Service  "<<serviceId<<" created "<<"  service size : "<<serviceCommSize<< "   service rank : "<<serviceCommRank
268                            <<" on rank pool "<<commRank<<endl ;
269       
270       int partitionId ; 
271       if ( serviceCommRank >= (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) )
272       {
273         int rank =  serviceCommRank - (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) ;
274         partitionId = serviceCommSize%nbPartitions +  rank / (serviceCommSize/nbPartitions) ;
275       }
276       else  partitionId = serviceCommRank / (serviceCommSize/nbPartitions + 1) ;
277
278       MPI_Comm_split(serviceComm, partitionId, commRank, &newServiceComm) ;
279       
280       MPI_Comm_size(newServiceComm,&serviceCommSize) ;
281       MPI_Comm_rank(newServiceComm,&serviceCommRank) ;
282       info(10)<<"Service  "<<serviceId<<" created "<<"  partition : " <<partitionId<<" service size : "<<serviceCommSize
283               << " service rank : "<<serviceCommRank <<" on rank pool "<<commRank<<endl ;
284     
285       services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, Id_, serviceId, partitionId, type, nbPartitions) ;
286       
287       MPI_Comm_free(&newServiceComm) ;
288     }
289     MPI_Comm_free(&serviceComm) ;
290  }
291 
292  void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
293  {
294     
295    info(40)<<"CPoolRessource::createNewServiceOnto  : receive createServiceOnto notification ; serviceId : "
296            <<serviceId<<"  ontoServiceId : "<<onServiceId<<endl ;
297    for(auto& service : services_) 
298    {
299      if (std::get<0>(service.first)==onServiceId)
300      {
301        const MPI_Comm& serviceComm = service.second->getCommunicator() ;
302        MPI_Comm newServiceComm ;
303        MPI_Comm_dup(serviceComm, &newServiceComm) ;
304        int nbPartitions = service.second->getNbPartitions() ;
305        int partitionId = service.second->getPartitionId() ;
306        shared_ptr<CEventScheduler>  eventScheduler = service.second->getEventScheduler() ;
307        info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "<<onServiceId<<endl  ;
308        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, Id_, serviceId, partitionId, type,
309                                                                         nbPartitions, eventScheduler) ;       
310      }
311    }
312   
313  }
314
315  void CPoolRessource::createService(MPI_Comm serviceComm, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached
316  {
317    services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, Id_, serviceId, partitionId, type, nbPartitions) ;
318  }
319
320
321  void CPoolRessource::finalizeSignal(void)
322  {
323    finalizeSignal_=true ;
324    for (auto it=services_.begin(); it!=services_.end() ; ++it) it->second->finalizeSignal() ;
325  } 
326 
327  CPoolRessource::~CPoolRessource()
328  {
329    delete winNotify_ ;
330    for(auto& service : services_) delete service.second ;
331  }
332}
Note: See TracBrowser for help on using the repository browser.