source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/pool_ressource.cpp @ 2274

Last change on this file since 2274 was 2274, checked in by ymipsl, 3 years ago

Tracking memory leak : release memory statically alocated

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 7.2 KB
Line 
1#include "pool_ressource.hpp"
2#include "services.hpp"
3#include "buffer_in.hpp"
4#include "buffer_out.hpp"
5#include "message.hpp"
6#include "type.hpp"
7#include "cxios.hpp"
8#include "timer.hpp"
9
10namespace xios
11{
12  CPoolRessource::CPoolRessource(MPI_Comm poolComm, const std::string& Id) : Id_(Id), finalizeSignal_(false)
13  {
14    int commRank, commSize ;
15    MPI_Comm_dup(poolComm, &poolComm_) ;
16    winNotify_ = new CWindowManager(poolComm_, maxBufferSize_) ;
17    MPI_Comm_rank(poolComm, &commRank) ;
18    MPI_Comm_size(poolComm, &commSize) ;
19    info(40)<<"CPoolRessource::CPoolRessource  : creating new pool : "<<Id<<endl ;
20    if (commRank==localLeader_)
21    {
22      for(int i=0; i<commSize;i++) occupancy_.insert(std::pair<char,int>(0,i)) ; 
23      int globalLeaderRank ;
24      MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ;
25      CXios::getRessourcesManager()->registerPool(Id, commSize, globalLeaderRank) ;
26    }
27   
28    winNotify_->lockWindow(commRank,0) ;
29    winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ; 
30    winNotify_->unlockWindow(commRank,0) ;       
31    MPI_Barrier(poolComm_) ;
32  }
33
34  void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)
35  {
36    // for now suppose nbPartitions=1
37   
38    auto it=occupancy_.begin() ;
39    int commSize ;
40    MPI_Comm_size(poolComm_, &commSize) ;
41    vector<bool> procs_in(commSize,false) ;
42    vector<pair<int,int>> procs_update ;
43
44    for(int i=0; i<size; i++) 
45    {
46      procs_in[it->second]=true ;
47      procs_update.push_back(std::pair<int,int>(it->first+1,it->second)) ;
48      ++it ;
49    }
50   
51    occupancy_.erase(occupancy_.begin(),it) ;
52    occupancy_.insert(procs_update.begin(),procs_update.end()) ;
53   
54    info(40)<<"CPoolRessource::createService  : notify createService to all pool members ; serviceId : "<<serviceId<<endl ;
55    for(int rank=0; rank<commSize; rank++)
56    {
57      if (procs_in[rank]) createServiceNotify(rank, serviceId, type, size, nbPartitions, true) ;
58      else createServiceNotify(rank, serviceId, type, size, nbPartitions, false) ;
59    }
60  }
61
62 
63  void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions, 
64                                           bool in)
65  {
66    winNotify_->lockWindow(rank,0) ;
67    winNotify_->updateFromWindow(rank, this, &CPoolRessource::createServiceDumpIn) ;
68    notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
69    winNotify_->updateToWindow(rank, this, &CPoolRessource::createServiceDumpOut) ; 
70    winNotify_->unlockWindow(rank,0) ;   
71  }
72
73
74  void CPoolRessource::createServiceDumpOut(CBufferOut& buffer)
75  {
76    buffer.realloc(maxBufferSize_) ;
77   
78    buffer << (int) (notifications_.size());
79   
80    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
81      buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it)<< std::get<3>(*it) << std::get<4>(*it)  ;
82  }
83
84
85  void CPoolRessource::createServiceDumpIn(CBufferIn& buffer)
86  {
87    std::string serviceId ;
88    int type ;
89    int size; 
90    int nbPartitions; 
91    bool in ;
92
93    notifications_.clear() ;
94    int nbNotifications ;
95    buffer>>nbNotifications ;
96    for(int i=0;i<nbNotifications;i++) 
97    {
98      buffer>>serviceId>>type>>size>>nbPartitions>>in ;
99      notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
100    }
101  }
102
103  bool CPoolRessource::eventLoop(bool serviceOnly)
104  {
105    CTimer::get("CPoolRessource::eventLoop").resume();
106   
107    double time=MPI_Wtime() ;
108    int flag ;
109    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
110    if (time-lastEventLoop_ > eventLoopLatency_) 
111    {
112      checkCreateServiceNotification() ;
113      lastEventLoop_=time ;
114    }
115   
116    for (auto it=services_.begin(); it!=services_.end() ; ++it) 
117    {
118      if (it->second->eventLoop(serviceOnly))
119      {
120        delete it->second ;
121        services_.erase(it) ;
122        // don't forget to free service later
123        break ;
124      }
125    }
126    CTimer::get("CPoolRessource::eventLoop").suspend();
127    if (services_.empty() && finalizeSignal_) return true ;
128    else return false ;
129  }
130
131  void CPoolRessource::checkCreateServiceNotification(void)
132  {
133    int commRank ;
134    MPI_Comm_rank(poolComm_, &commRank) ;
135    winNotify_->lockWindow(commRank,0) ;
136    winNotify_->updateFromWindow(commRank, this, &CPoolRessource::createServiceDumpIn) ;
137   
138    if (!notifications_.empty())
139    {
140      auto info = notifications_.front() ;
141      createNewService(get<0>(info), get<1>(info), get<2>(info), get<3>(info), get<4>(info)) ;
142      notifications_.pop_front() ;
143      winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ;     
144    }
145    winNotify_->unlockWindow(commRank,0) ;
146
147  }
148
149  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in)
150  {
151     
152     info(40)<<"CPoolRessource::createNewService  : receive createService notification ; serviceId : "<<serviceId<<endl ;
153     MPI_Comm serviceComm, newServiceComm ;
154     int commRank ;
155     MPI_Comm_rank(poolComm_,&commRank) ;
156     MPI_Comm_split(poolComm_, in, commRank, &serviceComm) ;
157     if (in)
158     {
159       int serviceCommSize ;
160       int serviceCommRank ;
161       MPI_Comm_size(serviceComm,&serviceCommSize) ;
162       MPI_Comm_rank(serviceComm,&serviceCommRank) ;
163
164       info(10)<<"Service  "<<serviceId<<" created "<<"  service size : "<<serviceCommSize<< "   service rank : "<<serviceCommRank
165                            <<" on rank pool "<<commRank<<endl ;
166       
167       int partitionId ; 
168       if ( serviceCommRank >= (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) )
169       {
170         int rank =  serviceCommRank - (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) ;
171         partitionId = serviceCommSize%nbPartitions +  rank / (serviceCommSize/nbPartitions) ;
172       }
173       else  partitionId = serviceCommRank / (serviceCommSize/nbPartitions + 1) ;
174
175       MPI_Comm_split(serviceComm, partitionId, commRank, &newServiceComm) ;
176       
177       MPI_Comm_size(newServiceComm,&serviceCommSize) ;
178       MPI_Comm_rank(newServiceComm,&serviceCommRank) ;
179       info(10)<<"Service  "<<serviceId<<" created "<<"  partition : " <<partitionId<<" service size : "<<serviceCommSize
180               << " service rank : "<<serviceCommRank <<" on rank pool "<<commRank<<endl ;
181     
182       services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, Id_, serviceId, partitionId, type, nbPartitions) ;
183       
184       MPI_Comm_free(&newServiceComm) ;
185     }
186     MPI_Comm_free(&serviceComm) ;
187  }
188
189  void CPoolRessource::createService(MPI_Comm serviceComm, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached
190  {
191    services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, Id_, serviceId, partitionId, type, nbPartitions) ;
192  }
193
194
195  void CPoolRessource::finalizeSignal(void)
196  {
197    finalizeSignal_=true ;
198    for (auto it=services_.begin(); it!=services_.end() ; ++it) it->second->finalizeSignal() ;
199  } 
200 
201  CPoolRessource::~CPoolRessource()
202  {
203    delete winNotify_ ;
204    for(auto& service : services_) delete service.second ;
205  }
206}
Note: See TracBrowser for help on using the repository browser.