source: XIOS3/trunk/src/manager/servers_ressource.cpp @ 2580

Last change on this file since 2580 was 2580, checked in by ymipsl, 9 months ago

Tracking unfree MPI windows and communicators.

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 7.4 KB
Line 
1#include "servers_ressource.hpp"
2#include "window_manager.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "event_scheduler.hpp"
6#include "cxios.hpp"
7#include "mpi.hpp"
8#include "timer.hpp"
9#include <vector>
10#include <string>
11#include "thread_manager.hpp"
12
13
14
15
16
17namespace xios
18{
19  using namespace std ;
20
21  CServersRessource::CServersRessource(MPI_Comm serverComm) : poolRessource_(nullptr), finalizeSignal_(false)
22  {
23
24    MPI_Comm_dup(serverComm, &serverComm_) ;
25    CXios::getMpiGarbageCollector().registerCommunicator(serverComm_) ; 
26    MPI_Comm xiosComm=CXios::getXiosComm() ;
27 
28    int localRank, globalRank ;
29    MPI_Comm_rank(xiosComm,&globalRank) ;
30    MPI_Comm_rank(serverComm_,&localRank) ;
31   
32    winNotify_ = new CWindowManager(serverComm_, maxBufferSize_,"CServersRessource::winNotify_") ;
33    MPI_Barrier(serverComm_) ;
34    if (localRank==localLeader_) 
35    {
36      int commSize ;
37      MPI_Comm_size(serverComm_,&commSize) ;
38      CXios::getRessourcesManager()->registerServerLeader(globalRank) ;
39      CXios::getRessourcesManager()->registerRessourcesSize(commSize) ;
40      freeRessourcesRank_.resize(commSize) ;
41      for(int i=0;i<commSize;i++) freeRessourcesRank_[i]=i ;
42    }
43
44    MPI_Comm_dup(serverComm_, &freeRessourcesComm_) ; 
45    CXios::getMpiGarbageCollector().registerCommunicator(freeRessourcesComm_) ;
46    eventScheduler_ = make_shared<CEventScheduler>(freeRessourcesComm_) ;
47    freeRessourceEventScheduler_ = eventScheduler_ ;
48    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServersRessource::threadEventLoop, this) ;
49  }
50
51  void CServersRessource::createPool(const string& poolId, const int size)
52  {
53    int commSize ;
54    MPI_Comm_size(serverComm_,&commSize) ;
55    vector<int> newFreeRessourcesRank(freeRessourcesRank_.size()-size) ;
56
57    bool isPartOf ;
58
59    for(int i=0, j=0; i<freeRessourcesRank_.size();i++) 
60    {
61       if (i<size) isPartOf=true ;
62       else 
63       {
64         isPartOf=false ;
65         newFreeRessourcesRank[j]=freeRessourcesRank_[i] ;
66         j++ ;
67       }
68       
69       notifyOutType_=NOTIFY_CREATE_POOL ;
70       notifyOutCreatePool_ = make_tuple(poolId, isPartOf) ;
71       sendNotification(freeRessourcesRank_[i]) ;
72    }
73    freeRessourcesRank_ = std::move(newFreeRessourcesRank) ;
74  }
75
76  void CServersRessource::finalize(void)
77  {
78    int commSize ;
79    MPI_Comm_size(serverComm_,&commSize) ;
80
81    for(int rank=0; rank<commSize;rank++)
82    { 
83      notifyOutType_=NOTIFY_FINALIZE ;
84      sendNotification(rank) ;
85    }
86  }
87
88  void CServersRessource::sendNotification(int rank)
89  {
90    winNotify_->pushToExclusiveWindow(rank, this, &CServersRessource::notificationsDumpOut) ;
91  }
92
93
94  void CServersRessource::notificationsDumpOut(CBufferOut& buffer)
95  {
96   
97    buffer.realloc(maxBufferSize_) ;
98   
99    if (notifyOutType_==NOTIFY_CREATE_POOL)
100    {
101      auto& arg=notifyOutCreatePool_ ;
102      buffer << notifyOutType_ << std::get<0>(arg) << std::get<1>(arg) ;
103    }
104    else if (notifyOutType_==NOTIFY_FINALIZE) buffer << notifyOutType_ ;
105  }
106
107  void CServersRessource::notificationsDumpIn(CBufferIn& buffer)
108  {
109    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
110    else
111    {
112      buffer>>notifyInType_;
113      if (notifyInType_==NOTIFY_CREATE_POOL)
114      {
115        auto& arg=notifyInCreatePool_ ;
116        buffer >> std::get<0>(arg) >> std::get<1>(arg)  ;
117      }
118      else if (notifyInType_==NOTIFY_FINALIZE) { /*nothing to do*/}
119    }
120  }
121
122  bool CServersRessource::eventLoop(bool serviceOnly)
123  {
124    CTimer::get("CServersRessource::eventLoop").resume();
125    double time=MPI_Wtime() ;
126    int flag ;
127    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
128
129    if (time-lastEventLoop_ > eventLoopLatency_) 
130    {
131      checkNotifications() ;
132      lastEventLoop_=time ;
133    }
134
135    if (poolRessource_!=nullptr) 
136    {
137      poolRessource_->eventLoop(serviceOnly) ;
138      if (poolRessource_->isFinished())
139      {
140        delete poolRessource_ ;
141        poolRessource_=nullptr ;
142        // don't forget to free pool ressource later
143      } 
144    }
145    CTimer::get("CServersRessource::eventLoop").suspend();
146    if (poolRessource_==nullptr && finalizeSignal_) finished_=true ;
147    return finished_ ;
148  }
149
150  void CServersRessource::threadEventLoop(void)
151  {
152    CTimer::get("CServersRessource::eventLoop").resume();
153    info(100)<<"Launch Thread for  CServersRessource::threadEventLoop"<<endl ;
154    CThreadManager::threadInitialize() ; 
155
156    do
157    {
158      double time=MPI_Wtime() ;
159      int flag ;
160      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
161
162      if (time-lastEventLoop_ > eventLoopLatency_) 
163      {
164        checkNotifications() ;
165        lastEventLoop_=time ;
166      }
167
168      if (poolRessource_!=nullptr) 
169      {
170        if (poolRessource_->isFinished())
171        {
172          delete poolRessource_ ;
173          poolRessource_=nullptr ;
174          // don't forget to free pool ressource later
175        } 
176      }
177      CTimer::get("CServersRessource::eventLoop").suspend();
178      if (poolRessource_==nullptr && finalizeSignal_) finished_=true ;
179      if (!finished_) CThreadManager::yield() ;
180   
181    } while (!finished_) ;
182
183    CThreadManager::threadFinalize() ;
184    info(100)<<"Close thread for CServersRessource::threadEventLoop"<<endl ; ;
185  }
186
187
188  void CServersRessource::checkNotifications(void)
189  {
190    int commRank ;
191    MPI_Comm_rank(serverComm_, &commRank) ;
192    winNotify_->popFromExclusiveWindow(commRank, this, &CServersRessource::notificationsDumpIn) ;
193    if (notifyInType_==NOTIFY_CREATE_POOL) 
194    {
195      if (CThreadManager::isUsingThreads()) synchronize() ;
196      createPool() ;
197    }
198    else if (notifyInType_==NOTIFY_FINALIZE) finalizeSignal() ;
199  }
200
201  void CServersRessource::synchronize(void)
202  {
203    bool out=false ; 
204    size_t timeLine=0 ;
205    std::hash<string> hashString ;
206    int commSize ;
207    MPI_Comm_size(freeRessourcesComm_,&commSize) ;
208    size_t hashId = hashString("CServersRessource::"+to_string(commSize)) ;
209    freeRessourceEventScheduler_->registerEvent(timeLine, hashId) ;
210    while (!out)
211    {
212      CThreadManager::yield() ;
213      out = eventScheduler_->queryEvent(timeLine,hashId) ;
214      if (out) eventScheduler_->popEvent() ;
215    }
216  }
217
218  void CServersRessource::createPool(void)
219  {
220    auto& arg=notifyInCreatePool_ ;
221    string poolId=get<0>(arg) ;
222    bool isPartOf=get<1>(arg) ;
223   
224    int commRank ;
225    MPI_Comm poolComm ;
226    MPI_Comm_rank(freeRessourcesComm_,&commRank) ;
227    MPI_Comm_split(freeRessourcesComm_, isPartOf, commRank, &poolComm) ;
228   
229    shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
230    freeRessourceEventScheduler_->splitScheduler(poolComm, parentScheduler, childScheduler) ;
231   
232    if (isFirstSplit_) eventScheduler_ = parentScheduler ; 
233    isFirstSplit_ = false ;
234
235    if (isPartOf)
236    { 
237      poolRessource_ = new CPoolRessource(poolComm, childScheduler, poolId, true) ;
238      MPI_Comm_free(&poolComm) ;
239    }
240    else 
241    {
242      freeRessourceEventScheduler_ = childScheduler ;
243      MPI_Comm_free(&freeRessourcesComm_) ;
244      freeRessourcesComm_=poolComm ;
245    }
246
247  }
248 
249  void CServersRessource::finalizeSignal(void)
250  {
251    finalizeSignal_=true ;
252    if (poolRessource_!=nullptr) poolRessource_->finalizeSignal() ;
253  }
254
255  bool CServersRessource::isServerLeader(void)
256  {
257    int commRank ;
258    MPI_Comm_rank(serverComm_,&commRank) ;
259    if (commRank==localLeader_) return true ;
260    else return false ;
261  }
262
263  CServersRessource::~CServersRessource()
264  {
265    delete winNotify_ ;
266  }
267}
Note: See TracBrowser for help on using the repository browser.