source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/servers_ressource.cpp @ 2328

Last change on this file since 2328 was 2274, checked in by ymipsl, 3 years ago

Tracking memory leak : release memory statically alocated

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 5.1 KB
Line 
1#include "servers_ressource.hpp"
2#include "window_manager.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "cxios.hpp"
6#include "mpi.hpp"
7#include "timer.hpp"
8#include <vector>
9#include <string>
10
11
12
13
14namespace xios
15{
16  using namespace std ;
17
18  CServersRessource::CServersRessource(MPI_Comm serverComm) : poolRessource_(nullptr), finalizeSignal_(false)
19  {
20
21    MPI_Comm_dup(serverComm, &serverComm_) ;
22    MPI_Comm xiosComm=CXios::getXiosComm() ;
23 
24    int localRank, globalRank ;
25    MPI_Comm_rank(xiosComm,&globalRank) ;
26    MPI_Comm_rank(serverComm_,&localRank) ;
27   
28    winNotify_ = new CWindowManager(serverComm_, maxBufferSize_) ;
29    MPI_Barrier(serverComm_) ;
30    if (localRank==localLeader_) 
31    {
32      int commSize ;
33      MPI_Comm_size(serverComm_,&commSize) ;
34      CXios::getRessourcesManager()->registerServerLeader(globalRank) ;
35      CXios::getRessourcesManager()->registerRessourcesSize(commSize) ;
36      freeRessourcesRank_.resize(commSize) ;
37      for(int i=0;i<commSize;i++) freeRessourcesRank_[i]=i ;
38    }
39
40    MPI_Comm_dup(serverComm_, &freeRessourcesComm_) ; 
41
42  }
43
44  void CServersRessource::createPool(const string& poolId, const int size)
45  {
46    int commSize ;
47    MPI_Comm_size(serverComm_,&commSize) ;
48    vector<int> newFreeRessourcesRank(freeRessourcesRank_.size()-size) ;
49
50    bool isPartOf ;
51
52    for(int i=0; i<freeRessourcesRank_.size();i++) 
53    {
54       if (i<size) isPartOf=true ;
55       else 
56       {
57         isPartOf=false ;
58         newFreeRessourcesRank[i]=freeRessourcesRank_[i] ;
59       }
60       
61       notifyOutType_=NOTIFY_CREATE_POOL ;
62       notifyOutCreatePool_ = make_tuple(poolId, isPartOf) ;
63       sendNotification(freeRessourcesRank_[i]) ;
64    }
65    freeRessourcesRank_ = std::move(newFreeRessourcesRank) ;
66  }
67
68  void CServersRessource::finalize(void)
69  {
70    int commSize ;
71    MPI_Comm_size(serverComm_,&commSize) ;
72
73    for(int rank=0; rank<commSize;rank++)
74    { 
75      notifyOutType_=NOTIFY_FINALIZE ;
76      sendNotification(rank) ;
77    }
78  }
79
80  void CServersRessource::sendNotification(int rank)
81  {
82    winNotify_->lockWindow(rank,0) ;
83    winNotify_->pushToWindow(rank, this, &CServersRessource::notificationsDumpOut) ;
84    winNotify_->unlockWindow(rank,0) ;
85  }
86
87
88  void CServersRessource::notificationsDumpOut(CBufferOut& buffer)
89  {
90   
91    buffer.realloc(maxBufferSize_) ;
92   
93    if (notifyOutType_==NOTIFY_CREATE_POOL)
94    {
95      auto& arg=notifyOutCreatePool_ ;
96      buffer << notifyOutType_ << std::get<0>(arg) << std::get<1>(arg) ;
97    }
98    else if (notifyOutType_==NOTIFY_FINALIZE) buffer << notifyOutType_ ;
99  }
100
101  void CServersRessource::notificationsDumpIn(CBufferIn& buffer)
102  {
103    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
104    else
105    {
106      buffer>>notifyInType_;
107      if (notifyInType_==NOTIFY_CREATE_POOL)
108      {
109        auto& arg=notifyInCreatePool_ ;
110        buffer >> std::get<0>(arg) >> std::get<1>(arg)  ;
111      }
112      else if (notifyInType_==NOTIFY_FINALIZE) { /*nothing to do*/}
113    }
114  }
115
116  bool CServersRessource::eventLoop(bool serviceOnly)
117  {
118    CTimer::get("CServersRessource::eventLoop").resume();
119    double time=MPI_Wtime() ;
120    int flag ;
121    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
122
123    if (time-lastEventLoop_ > eventLoopLatency_) 
124    {
125      checkNotifications() ;
126      lastEventLoop_=time ;
127    }
128
129    if (poolRessource_!=nullptr) 
130    {
131      if (poolRessource_->eventLoop(serviceOnly))
132      {
133        delete poolRessource_ ;
134        poolRessource_=nullptr ;
135        // don't forget to free pool ressource later
136      } 
137    }
138    CTimer::get("CServersRessource::eventLoop").suspend();
139    if (poolRessource_==nullptr && finalizeSignal_) return true ;
140    else return false ;
141  }
142
143  void CServersRessource::checkNotifications(void)
144  {
145    int commRank ;
146    MPI_Comm_rank(serverComm_, &commRank) ;
147    winNotify_->lockWindow(commRank,0) ;
148    winNotify_->popFromWindow(commRank, this, &CServersRessource::notificationsDumpIn) ;
149    winNotify_->unlockWindow(commRank,0) ;
150    if (notifyInType_==NOTIFY_CREATE_POOL) createPool() ;
151    else if (notifyInType_==NOTIFY_FINALIZE) finalizeSignal() ;
152  }
153
154  void CServersRessource::createPool(void)
155  {
156    auto& arg=notifyInCreatePool_ ;
157    string poolId=get<0>(arg) ;
158    bool isPartOf=get<1>(arg) ;
159   
160    int commRank ;
161    MPI_Comm poolComm ;
162    MPI_Comm_rank(freeRessourcesComm_,&commRank) ;
163    MPI_Comm_split(freeRessourcesComm_, isPartOf, commRank, &poolComm) ;
164   
165    if (isPartOf)
166    { 
167      poolRessource_ = new CPoolRessource(poolComm, poolId) ;
168      MPI_Comm_free(&poolComm) ;
169    }
170    else 
171    {
172      MPI_Comm_free(&freeRessourcesComm_) ;
173      freeRessourcesComm_=poolComm ;
174    }
175
176  }
177 
178  void CServersRessource::finalizeSignal(void)
179  {
180    finalizeSignal_=true ;
181    if (poolRessource_!=nullptr) poolRessource_->finalizeSignal() ;
182  }
183
184  bool CServersRessource::isServerLeader(void)
185  {
186    int commRank ;
187    MPI_Comm_rank(serverComm_,&commRank) ;
188    if (commRank==localLeader_) return true ;
189    else return false ;
190  }
191
192  CServersRessource::~CServersRessource()
193  {
194    delete winNotify_ ;
195  }
196}
Note: See TracBrowser for help on using the repository browser.