source: XIOS3/trunk/src/manager/pool_ressource.cpp @ 2589

Last change on this file since 2589 was 2589, checked in by jderouillat, 9 months ago

Specify the usage of the xios namespace to overload the MPI funtions

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 15.3 KB
Line 
1#include "pool_ressource.hpp"
2#include "services.hpp"
3#include "buffer_in.hpp"
4#include "buffer_out.hpp"
5#include "message.hpp"
6#include "type.hpp"
7#include "cxios.hpp"
8#include "timer.hpp"
9#include "event_scheduler.hpp"
10#include "thread_manager.hpp"
11
12namespace xios
13{
14  CPoolRessource::CPoolRessource(MPI_Comm poolComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& Id, bool isServer) : Id_(Id), finalizeSignal_(false)
15  {
16    int commRank, commSize ;
17    xios::MPI_Comm_dup(poolComm, &poolComm_) ;
18    CXios::getMpiGarbageCollector().registerCommunicator(poolComm_) ;
19    winNotify_ = new CWindowManager(poolComm_, maxBufferSize_,"CPoolRessource::winNotify_") ;
20    MPI_Comm_rank(poolComm, &commRank) ;
21    MPI_Comm_size(poolComm, &commSize) ;
22    info(40)<<"CPoolRessource::CPoolRessource  : creating new pool : "<<Id<<endl ;
23    if (commRank==localLeader_)
24    {
25      for(int i=0; i<commSize;i++) occupancy_.insert(std::pair<char,int>(0,i)) ; 
26      int globalLeaderRank ;
27      MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ;
28      if (isServer) CXios::getRessourcesManager()->registerPoolServer(Id, commSize, globalLeaderRank) ;
29    }
30   
31    notifyType_=NOTIFY_NOTHING;
32    winNotify_->updateToExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ;
33    MPI_Barrier(poolComm_) ;
34    if (eventScheduler) eventScheduler_=eventScheduler ;
35    else eventScheduler_= make_shared<CEventScheduler>(poolComm) ;
36    freeRessourceEventScheduler_ = eventScheduler_ ;
37    std::hash<string> hashString ;
38    hashId_ = hashString("CPoolRessource::"+Id) ;
39    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CPoolRessource::threadEventLoop, this) ;
40  }
41
42  void CPoolRessource::synchronize(void)
43  {
44    bool out=false ; 
45    size_t timeLine=0 ;
46         
47    eventScheduler_->registerEvent(timeLine, hashId_) ;
48    while (!out)
49    {
50      CThreadManager::yield() ;
51      out = eventScheduler_->queryEvent(timeLine,hashId_) ;
52      if (out) eventScheduler_->popEvent() ;
53    }
54  }
55
56  void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)
57  {
58    // for now suppose nbPartitions=1
59   
60    auto it=occupancy_.begin() ;
61
62    // ym obsolete, service cannot overlap, only created on separate ressource or matching excatly existing service
63    // occupancy management must not be used anymore => simplification
64    // for now raise a message error when no ressources are availables
65   
66    int commSize ;
67    MPI_Comm_size(poolComm_, &commSize) ;
68    vector<bool> procs_in(commSize,false) ;
69    vector<pair<int,int>> procs_update ;
70
71    for(int i=0; i<size; i++) 
72    {
73      if (it->first != 0) ERROR("void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)",
74                                 << "No enough free ressources on pool id="<<getId()<<" to launch service id="<<serviceId);
75      procs_in[it->second]=true ;
76      procs_update.push_back(std::pair<int,int>(it->first+1,it->second)) ;
77      ++it ;
78    }
79
80
81    occupancy_.erase(occupancy_.begin(),it) ;
82    occupancy_.insert(procs_update.begin(),procs_update.end()) ;
83   
84    info(40)<<"CPoolRessource::createService  : notify createService to all pool members ; serviceId : "<<serviceId<<endl ;
85    for(int rank=0; rank<commSize; rank++)
86    {
87      if (procs_in[rank]) createServiceNotify(rank, serviceId, type, size, nbPartitions, true) ;
88      else createServiceNotify(rank, serviceId, type, size, nbPartitions, false) ;
89    }
90  }
91
92  void CPoolRessource::createServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
93  {
94    // for now suppose nbPartitions=1
95   
96    auto it=occupancy_.begin() ;
97    int commSize ;
98    MPI_Comm_size(poolComm_, &commSize) ;
99   
100    info(40)<<"CPoolRessource::createService  : notify createServiceOnto to all pool members ; serviceId : "<<serviceId
101            <<"  onto service Id  :"<< serviceId<<endl ;
102    for(int rank=0; rank<commSize; rank++) createServiceOntoNotify(rank, serviceId, type, onServiceId) ;
103  }
104
105/* 
106  void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions,
107                                           bool in)
108  {
109    winNotify_->lockWindow(rank,0) ;
110    winNotify_->updateFromWindow(rank, this, &CPoolRessource::createServiceDumpIn) ;
111    notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
112    winNotify_->updateToWindow(rank, this, &CPoolRessource::createServiceDumpOut) ; 
113    winNotify_->unlockWindow(rank,0) ;   
114  }
115*/
116 
117  void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in)
118  {
119    notifyType_=NOTIFY_CREATE_SERVICE ;
120    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ;
121    sendNotification(rank) ;
122  }
123
124
125  void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId)
126  {
127    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ;
128    notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ;
129    sendNotification(rank) ;
130  }
131
132
133  void CPoolRessource::sendNotification(int rank)
134  {
135    winNotify_->pushToExclusiveWindow(rank, this, &CPoolRessource::notificationsDumpOut) ;
136  }
137
138  void CPoolRessource::checkNotifications(void)
139  {
140    int commRank ;
141    MPI_Comm_rank(poolComm_, &commRank) ;
142    winNotify_->popFromExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ;
143    if (notifyType_==NOTIFY_CREATE_SERVICE) 
144    {
145      if (CThreadManager::isUsingThreads()) synchronize() ;
146      createService() ;
147    }
148    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 
149    {
150      if (CThreadManager::isUsingThreads()) synchronize() ;
151      createServiceOnto() ;
152    }
153  }
154
155
156  void CPoolRessource::notificationsDumpOut(CBufferOut& buffer)
157  {
158
159    buffer.realloc(maxBufferSize_) ;
160   
161    if (notifyType_==NOTIFY_CREATE_SERVICE)
162    {
163      auto& arg=notifyCreateService_ ;
164      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg);
165    }
166    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
167    {
168      auto& arg=notifyCreateServiceOnto_ ;
169      buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg)  ;
170    }
171  }
172
173  void CPoolRessource::notificationsDumpIn(CBufferIn& buffer)
174  {
175    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
176    else
177    {
178      buffer>>notifyType_;
179      if (notifyType_==NOTIFY_CREATE_SERVICE)
180      {
181        auto& arg=notifyCreateService_ ;
182        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ;
183      }
184      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
185      {
186        auto& arg=notifyCreateServiceOnto_ ;
187        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ;
188      }
189    }
190  } 
191
192  void CPoolRessource::createService(void)
193  {
194    auto& arg = notifyCreateService_ ;
195    createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ;
196  }
197 
198  void CPoolRessource::createServiceOnto(void)
199  {
200    auto& arg = notifyCreateServiceOnto_ ;
201    createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ;
202  }
203
204/*
205  void CPoolRessource::createServiceDumpOut(CBufferOut& buffer)
206  {
207    buffer.realloc(maxBufferSize_) ;
208   
209    buffer << (int) (notifications_.size());
210   
211    for(auto it=notifications_.begin();it!=notifications_.end(); ++it)
212      buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it)<< std::get<3>(*it) << std::get<4>(*it)  ;
213  }
214
215*/
216
217/*
218  void CPoolRessource::createServiceDumpIn(CBufferIn& buffer)
219  {
220    std::string serviceId ;
221    int type ;
222    int size;
223    int nbPartitions;
224    bool in ;
225
226    notifications_.clear() ;
227    int nbNotifications ;
228    buffer>>nbNotifications ;
229    for(int i=0;i<nbNotifications;i++)
230    {
231      buffer>>serviceId>>type>>size>>nbPartitions>>in ;
232      notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
233    }
234  }
235*/
236
237  bool CPoolRessource::eventLoop(bool serviceOnly)
238  {
239    CTimer::get("CPoolRessource::eventLoop").resume();
240   
241    double time=MPI_Wtime() ;
242    int flag ;
243    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
244    if (time-lastEventLoop_ > eventLoopLatency_) 
245    {
246      //checkCreateServiceNotification() ;
247      checkNotifications() ;
248      lastEventLoop_=time ;
249    }
250   
251    for (auto it=services_.begin(); it!=services_.end() ; ++it) 
252    {
253      if (it->second->eventLoop(serviceOnly))
254      {
255        delete it->second ;
256        services_.erase(it) ;
257        // don't forget to free service later
258        break ;
259      }
260    }
261    CTimer::get("CPoolRessource::eventLoop").suspend();
262    if (services_.empty() && finalizeSignal_) finished_=true ;
263    return finished_ ;
264  }
265
266  void CPoolRessource::threadEventLoop(void)
267  {
268    CTimer::get("CPoolRessource::eventLoop").resume();
269    info(100)<<"Launch Thread for  CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ;
270    CThreadManager::threadInitialize() ; 
271   
272    do
273    {
274
275      double time=MPI_Wtime() ;
276      int flag ;
277      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
278      if (time-lastEventLoop_ > eventLoopLatency_) 
279      {
280        //checkCreateServiceNotification() ;
281        checkNotifications() ;
282        lastEventLoop_=time ;
283      }
284   
285      for(auto it=services_.begin();it!=services_.end();++it) 
286      {
287        if (it->second->isFinished())
288        {
289          delete it->second ; 
290          services_.erase(it) ;
291          // destroy server_context -> to do later
292          break ;
293        } ;
294      }
295
296      CTimer::get("CPoolRessource::eventLoop").suspend();
297      if (services_.empty() && finalizeSignal_) finished_=true ;
298     
299      if (!finished_) CThreadManager::yield() ;
300   
301    } while (!finished_) ;
302
303    CThreadManager::threadFinalize() ;
304    info(100)<<"Close thread for  CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ;
305  }
306
307/*
308  void CPoolRessource::checkCreateServiceNotification(void)
309  {
310    int commRank ;
311    MPI_Comm_rank(poolComm_, &commRank) ;
312    winNotify_->lockWindow(commRank,0) ;
313    winNotify_->updateFromWindow(commRank, this, &CPoolRessource::createServiceDumpIn) ;
314   
315    if (!notifications_.empty())
316    {
317      auto info = notifications_.front() ;
318      createNewService(get<0>(info), get<1>(info), get<2>(info), get<3>(info), get<4>(info)) ;
319      notifications_.pop_front() ;
320      winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ;     
321    }
322    winNotify_->unlockWindow(commRank,0) ;
323
324  }
325*/
326
327  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in)
328  {
329     
330    info(40)<<"CPoolRessource::createNewService  : receive createService notification ; serviceId : "<<serviceId<<endl ;
331    MPI_Comm serviceComm, newServiceComm, freeComm ;
332    int commRank ;
333     
334    int color;
335    if (!services_.empty()) color = 0 ;
336    else color=1 ; 
337    MPI_Comm_rank(poolComm_,&commRank) ;
338    xios::MPI_Comm_split(poolComm_, color, commRank, &freeComm) ;  // workaround
339   
340    if (services_.empty()) 
341    {
342      MPI_Comm_rank(freeComm,&commRank) ;
343      xios::MPI_Comm_split(freeComm, in, commRank, &serviceComm) ;
344
345      // temporary for event scheduler, we must using hierarchical split of free ressources communicator.
346      // we hope for now that spliting using occupancy make this match
347
348      if (in)
349      {
350        int serviceCommSize ;
351        int serviceCommRank ;
352        MPI_Comm_size(serviceComm,&serviceCommSize) ;
353        MPI_Comm_rank(serviceComm,&serviceCommRank) ;
354
355        info(10)<<"Service  "<<serviceId<<" created "<<"  service size : "<<serviceCommSize<< "   service rank : "<<serviceCommRank
356                              <<" on rank pool "<<commRank<<endl ;
357       
358        int partitionId ; 
359        if ( serviceCommRank >= (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) )
360        {
361          int rank =  serviceCommRank - (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) ;
362          partitionId = serviceCommSize%nbPartitions +  rank / (serviceCommSize/nbPartitions) ;
363        }
364        else  partitionId = serviceCommRank / (serviceCommSize/nbPartitions + 1) ;
365
366        xios::MPI_Comm_split(serviceComm, partitionId, commRank, &newServiceComm) ;
367
368        MPI_Comm_size(newServiceComm,&serviceCommSize) ;
369        MPI_Comm_rank(newServiceComm,&serviceCommRank) ;
370        info(10)<<"Service  "<<serviceId<<" created "<<"  partition : " <<partitionId<<" service size : "<<serviceCommSize
371                << " service rank : "<<serviceCommRank <<" on rank pool "<<commRank<<endl ;
372     
373        shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
374        freeRessourceEventScheduler_->splitScheduler(newServiceComm, parentScheduler, childScheduler) ;
375        if (isFirstSplit_) eventScheduler_ = parentScheduler ;
376        isFirstSplit_=false ;
377
378        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, childScheduler, Id_, serviceId, partitionId, type, nbPartitions) ;
379       
380        xios::MPI_Comm_free(&newServiceComm) ;
381      }
382      else
383      {
384        shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
385        freeRessourceEventScheduler_->splitScheduler(serviceComm, parentScheduler, childScheduler) ;
386        if (isFirstSplit_) eventScheduler_ = parentScheduler ;
387        freeRessourceEventScheduler_ = childScheduler ;
388        isFirstSplit_=false ;
389      }
390      xios::MPI_Comm_free(&serviceComm) ;
391    }
392    xios::MPI_Comm_free(&freeComm) ;
393  }
394 
395  void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
396  {
397     
398    info(40)<<"CPoolRessource::createNewServiceOnto  : receive createServiceOnto notification ; serviceId : "
399            <<serviceId<<"  ontoServiceId : "<<onServiceId<<endl ;
400    for(auto& service : services_) 
401    {
402      if (std::get<0>(service.first)==onServiceId)
403      {
404        const MPI_Comm& serviceComm = service.second->getCommunicator() ;
405        MPI_Comm newServiceComm ;
406        xios::MPI_Comm_dup(serviceComm, &newServiceComm) ;
407        CXios::getMpiGarbageCollector().registerCommunicator(newServiceComm) ;
408        int nbPartitions = service.second->getNbPartitions() ;
409        int partitionId = service.second->getPartitionId() ;
410        shared_ptr<CEventScheduler>  eventScheduler = service.second->getEventScheduler() ;
411        info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "<<onServiceId<<endl  ;
412        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, eventScheduler, Id_, serviceId, partitionId, type,
413                                                                         nbPartitions) ;       
414      }
415    }
416   
417  }
418
419  void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients
420  {
421    services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, eventScheduler, Id_, serviceId, partitionId, type, nbPartitions) ;
422  }
423
424
425  void CPoolRessource::finalizeSignal(void)
426  {
427    finalizeSignal_=true ;
428    for (auto it=services_.begin(); it!=services_.end() ; ++it) it->second->finalizeSignal() ;
429  } 
430 
431  CPoolRessource::~CPoolRessource()
432  {
433    delete winNotify_ ;
434    for(auto& service : services_) delete service.second ;
435  }
436}
Note: See TracBrowser for help on using the repository browser.