source: XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/services_manager.cpp @ 1761

Last change on this file since 1761 was 1761, checked in by ymipsl, 5 years ago

implementing first guess for service functionnalities.

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 7.6 KB
Line 
1#include "daemons_manager.hpp"
2#include "services_manager.hpp"
3#include "ressources_manager.hpp"
4#include "cxios.hpp"
5#include "pool_ressource.hpp"
6#include "type.hpp"
7#include "server.hpp"
8#include "servers_ressource.hpp"
9
10namespace xios
11{
12
13 
14
15  CServicesManager::CServicesManager(bool isXiosServer)
16  {
17   
18    int commRank ; 
19    xiosComm_ = CXios::getXiosComm() ;
20    MPI_Comm_rank(xiosComm_, &commRank) ;
21   
22   
23    // The global manager leader will be the process of rank 0
24    // By "xiosComm" communicator construction
25    // - if servers exits it will be the root process of the servers communicator
26    // - otherwise the root process of the first model
27   
28    managerGlobalLeader_ = 0 ;
29
30    MPI_Comm_rank(xiosComm_, &commRank) ;
31    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
32    winNotify_->lockWindow(commRank,0) ;
33    winNotify_->updateToWindow(commRank, this, &CServicesManager::notificationsDumpOut) ;
34    winNotify_->unlockWindow(commRank,0) ;
35
36    winServices_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
37    winServices_->lockWindow(commRank,0) ;
38    winServices_->updateToWindow(commRank, this, &CServicesManager::servicesDumpOut) ;
39    winServices_->unlockWindow(commRank,0) ;
40
41    MPI_Barrier(xiosComm_)  ;   
42  }
43
44  bool CServicesManager::createServices(const std::string& poolId, const std::string& serviceId, 
45                                        int type, int size, int nbPartitions, bool wait) 
46  {
47
48    int leader ;
49    int poolSize ;
50   
51    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;
52    if (wait)
53    {
54      while (!ok) 
55      {
56        CXios::getDaemonsManager()->eventLoop() ;
57        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;
58      }
59    }
60
61    if (ok) 
62    {
63      createServicesNotify(leader, serviceId, type, size, nbPartitions) ;
64      return true ;
65    }
66    else return false ;
67  }
68
69
70  void CServicesManager::createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions)
71  {
72    winNotify_->lockWindow(rank,0) ;
73    winNotify_->updateFromWindow(rank, this, &CServicesManager::notificationsDumpIn) ;
74    notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions)) ;
75    winNotify_->updateToWindow(rank, this, &CServicesManager::notificationsDumpOut) ;
76    winNotify_->unlockWindow(rank,0) ;
77  }
78
79 
80  void CServicesManager::checkCreateServicesNotification(void)
81  {
82    int commRank ;
83    MPI_Comm_rank(xiosComm_,&commRank) ;
84    winNotify_->lockWindow(commRank,0) ;
85    winNotify_->updateFromWindow(commRank, this, &CServicesManager::notificationsDumpIn) ;
86   
87    if (!notifications_.empty())
88    {
89      auto info = notifications_.front() ;
90      CServer::getServersRessource()->getPoolRessource()->createService(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ;
91      notifications_.pop_front() ;
92      winNotify_->updateToWindow(commRank, this, &CServicesManager::notificationsDumpOut) ;     
93    }
94    winNotify_->unlockWindow(commRank,0) ;
95
96  }
97
98  void CServicesManager::eventLoop(void)
99  {
100    checkCreateServicesNotification() ;
101  }
102
103 
104  void CServicesManager::notificationsDumpOut(CBufferOut& buffer)
105  {
106   
107    buffer.realloc(maxBufferSize_) ;
108   
109    buffer<<(int)notifications_.size();
110   
111    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
112      buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it) << std::get<3>(*it)  ;
113  }
114
115  void CServicesManager::notificationsDumpIn(CBufferIn& buffer)
116  {
117    std::string id ;
118    int type ;
119    int size; 
120    int nbPartitions ;
121
122    notifications_.clear() ;
123    int nbNotifications ;
124    buffer>>nbNotifications ;
125    for(int i=0;i<nbNotifications;i++) 
126    {
127      buffer>>id>>type>>size>>nbPartitions ;
128      notifications_.push_back(std::make_tuple(id,type,size,nbPartitions)) ;
129    }
130  }
131
132 
133  void CServicesManager::servicesDumpOut(CBufferOut& buffer)
134  {
135   
136    buffer.realloc(maxBufferSize_) ;
137   
138    buffer<<(int)services_.size();
139   
140    for(auto it=services_.begin();it!=services_.end(); ++it)
141    { 
142      auto key = it->first ;
143      auto val = it->second ; 
144      buffer << std::get<0>(key) << std::get<1>(key) << std::get<2>(key) 
145             <<  static_cast<int>(std::get<0>(val)) << std::get<1>(val) << std::get<2>(val) << std::get<3>(val) ;
146    }
147  }
148
149  void CServicesManager::servicesDumpIn(CBufferIn& buffer)
150  {
151    std::string poolId, serviceId ;
152    int partitionId ;
153    int type ;
154    int size; 
155    int nbPartitions ;
156    int leader ;
157
158    services_.clear() ;
159    int nbServices ;
160    buffer>>nbServices ;
161    for(int i=0;i<nbServices;i++) 
162    {
163      buffer>>poolId>>serviceId>>partitionId>>type>>size>>nbPartitions>>leader ;
164      services_[std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
165    }
166  }
167
168  void CServicesManager::registerService(const std::string& poolId, const std::string& serviceId, const int& partitionId, int type, 
169                                         int size, int nbPartitions, int leader)
170  {
171   
172    winServices_->lockWindow(managerGlobalLeader_,0) ;
173    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
174    services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
175    winServices_->updateToWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ;
176    winServices_->unlockWindow(managerGlobalLeader_,0) ;
177  }
178
179  bool CServicesManager::getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, 
180                                        int& size, int& nbPartitions, int& leader)
181  {
182    winServices_->lockWindow(managerGlobalLeader_,0) ;
183    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
184    winServices_->unlockWindow(managerGlobalLeader_,0) ;
185
186    auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ;
187    if ( it == services_.end()) return false ;
188    else
189    {
190      type= std::get<0>(it->second); 
191      size= std::get<1>(it->second); 
192      nbPartitions = std::get<2>(it->second); 
193      leader = std::get<3>(it->second); 
194      return true ;
195    }
196  }
197
198  bool CServicesManager::getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader)
199  {
200    int type;
201    int size ;
202    int nbPartitions;
203    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
204  }
205
206  bool CServicesManager::getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type)
207  {
208    int size ;
209    int nbPartitions;
210    int leader;
211    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
212  }
213
214  bool CServicesManager::getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartitions)
215  {
216    int size ;
217    int type;
218    int leader;
219    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
220  }
221
222  bool CServicesManager::hasService(const std::string& poolId, const std::string& serviceId, const int& partitionId)
223  {
224    winServices_->lockWindow(managerGlobalLeader_,0) ;
225    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
226    winServices_->unlockWindow(managerGlobalLeader_,0) ;
227    auto it=services_.find(std::tuple<std::string, std::string, int>(poolId, serviceId, partitionId)) ;
228    if ( it == services_.end()) return false ;
229    else return true ;
230  }
231
232}
Note: See TracBrowser for help on using the repository browser.