source: XIOS3/dev/XIOS_ATTACHED/src/transport/legacy_context_client.cpp @ 2482

Last change on this file since 2482 was 2482, checked in by ymipsl, 15 months ago

First guess in supression of attached mode replaced by online reader and write filters

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 16.9 KB
Line 
1#include "xios_spl.hpp"
2#include "legacy_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14#include "services.hpp"
15#include "ressources_manager.hpp"
16#include <boost/functional/hash.hpp>
17#include <random>
18#include <chrono>
19
20namespace xios
21{
22    /*!
23    \param [in] parent Pointer to context on client side
24    \param [in] intraComm_ communicator of group client
25    \param [in] interComm_ communicator of group server
26    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
27    */
28    CLegacyContextClient::CLegacyContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
29                         : CContextClient(parent, intraComm_, interComm_, cxtSer),
30                           mapBufferSize_(),  maxBufferedEvents(4)
31    {
32      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
33      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
34
35      if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
36     
37      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
38
39      timeLine = 1;
40    }
41
42    CContextClient::ETransport getType(void) {return CContextClient::legacy ;}
43
44    /*!
45    In case of attached mode, the current context must be reset to context for client
46    \param [in] event Event sent to server
47    */
48    void CLegacyContextClient::sendEvent(CEventClient& event)
49    {
50      list<int> ranks = event.getRanks();
51 
52//      ostringstream str ;
53//      for(auto& rank : ranks) str<<rank<<" ; " ;
54//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
55
56      if (CXios::checkEventSync)
57      {
58        int typeId, classId, typeId_in, classId_in;
59        long long timeLine_out;
60        long long timeLine_in( timeLine );
61        typeId_in=event.getTypeId() ;
62        classId_in=event.getClassId() ;
63//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
64        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
65        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
66        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
67        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
68        {
69           ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
70               << "Event are not coherent between client for timeline = "<<timeLine);
71        }
72       
73        vector<int> servers(serverSize,0) ;
74        auto ranks=event.getRanks() ;
75        for(auto& rank : ranks) servers[rank]=1 ;
76        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
77        ostringstream osstr ;
78        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
79        if (!osstr.str().empty())
80        {
81          ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
82                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
83                 <<"Servers are : "<<osstr.str()) ;
84        }
85
86
87      }
88
89      if (!event.isEmpty())
90      {
91        list<int> sizes = event.getSizes();
92
93         // We force the getBuffers call to be non-blocking on classical servers
94        list<CBufferOut*> buffList;
95        getBuffers(timeLine, ranks, sizes, buffList) ;
96
97        event.send(timeLine, sizes, buffList);
98       
99        //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ;
100
101        unlockBuffers(ranks) ;
102        checkBuffers(ranks);
103       
104      }
105     
106      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
107      {
108        while (checkBuffers(ranks)) callGlobalEventLoop() ;
109     
110        CXios::getDaemonsManager()->scheduleContext(hashId_) ;
111        while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ;
112      }
113     
114      MPI_Request req ;
115      MPI_Status status ;
116      MPI_Ibarrier(intraComm,&req) ;
117      int flag ;
118      MPI_Test(&req,&flag,&status) ;
119      while(!flag) 
120      {
121        callGlobalEventLoop() ;
122        MPI_Test(&req,&flag,&status) ;
123      }
124
125
126      timeLine++;
127    }
128
129
130    /*!
131     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
132     * it is explicitly requested to be non-blocking.
133     *
134     *
135     * \param [in] timeLine time line of the event which will be sent to servers
136     * \param [in] serverList list of rank of connected server
137     * \param [in] sizeList size of message corresponding to each connection
138     * \param [out] retBuffers list of buffers that can be used to store an event
139     * \param [in] nonBlocking whether this function should be non-blocking
140     * \return whether the already allocated buffers could be used
141    */
142    void CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers)
143    {
144      list<int>::const_iterator itServer, itSize;
145      list<CClientBuffer*> bufferList;
146      map<int,CClientBuffer*>::const_iterator it;
147      list<CClientBuffer*>::iterator itBuffer;
148      bool areBuffersFree;
149     
150      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
151      {
152        it = buffers.find(*itServer);
153        if (it == buffers.end())
154        {
155          CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ;
156          size_t token = tokenManager->getToken() ;
157          while (!tokenManager->lockToken(token)) callGlobalEventLoop() ;
158          newBuffer(*itServer);
159          it = buffers.find(*itServer);
160          checkAttachWindows(it->second,it->first) ;
161          tokenManager->unlockToken(token) ;
162        }
163        bufferList.push_back(it->second);
164      }
165
166      double lastTimeBuffersNotFree=0. ;
167      double time ;
168      bool doUnlockBuffers ;
169      CTimer::get("Blocking time").resume();
170      do
171      {
172        areBuffersFree = true;
173        doUnlockBuffers=false ;
174        time=MPI_Wtime() ;
175        if (time-lastTimeBuffersNotFree > latency_)
176        {
177          for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
178          {
179            areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
180          }
181          if (!areBuffersFree)
182          {
183            lastTimeBuffersNotFree = time ;
184            doUnlockBuffers=true ;
185          }         
186        }
187        else areBuffersFree = false ;
188
189        if (!areBuffersFree)
190        {
191          if (doUnlockBuffers) for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer();
192          checkBuffers();
193
194          callGlobalEventLoop() ;
195        }
196
197      } while (!areBuffersFree);
198      CTimer::get("Blocking time").suspend();
199
200      for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
201        retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize));
202   }
203
204   void CLegacyContextClient::eventLoop(void)
205   {
206      if (!locked_) checkBuffers() ;
207   }
208
209   void CLegacyContextClient::callGlobalEventLoop(void)
210   {
211     locked_=true ;
212     context_->globalEventLoop() ;
213     locked_=false ;
214   }
215   /*!
216   Make a new buffer for a certain connection to server with specific rank
217   \param [in] rank rank of connected server
218   */
219   void CLegacyContextClient::newBuffer(int rank)
220   {
221      if (!mapBufferSize_.count(rank))
222      {
223        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
224        mapBufferSize_[rank] = CXios::minBufferSize;
225        maxEventSizes[rank] = CXios::minBufferSize;
226      }
227     
228      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank]);
229      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ;
230      else buffer->fixBuffer() ;
231      // Notify the server
232      CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint));
233      MPI_Aint sendBuff[4] ;
234      sendBuff[0]=hashId_;
235      sendBuff[1]=mapBufferSize_[rank];
236      sendBuff[2]=buffers[rank]->getWinAddress(0); 
237      sendBuff[3]=buffers[rank]->getWinAddress(1); 
238      info(100)<<"CLegacyContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl;
239      bufOut->put(sendBuff, 4); 
240      buffer->checkBuffer(true);
241/*
242       // create windows dynamically for one-sided
243      if (!isAttachedModeEnabled())
244      {
245        CTimer::get("create Windows").resume() ;
246        MPI_Comm interComm ;
247        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ;
248        MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
249        CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
250        MPI_Comm_free(&interComm) ;
251        windows_[rank].resize(2) ;
252       
253        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
254        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
255       
256        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
257        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
258
259        CTimer::get("create Windows").suspend() ;
260      }
261      else
262      {
263        winComm_[rank] = MPI_COMM_NULL ;
264        windows_[rank].resize(2) ;
265        windows_[rank][0] = MPI_WIN_NULL ;
266        windows_[rank][1] = MPI_WIN_NULL ;
267      }
268      buffer->attachWindows(windows_[rank]) ;
269      if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ;
270  */     
271   }
272
273   void CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank)
274   {
275      if (!buffer->isAttachedWindows())
276      {
277           // create windows dynamically for one-sided
278        if (!isAttachedModeEnabled())
279        { 
280          CTimer::get("create Windows").resume() ;
281          MPI_Comm interComm ;
282          MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ;
283          MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
284          CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
285          MPI_Comm_free(&interComm) ;
286          windows_[rank].resize(2) ;
287     
288          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
289          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
290     
291          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
292          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
293
294          CTimer::get("create Windows").suspend() ;
295          buffer->attachWindows(windows_[rank]) ;
296          MPI_Barrier(winComm_[rank]) ;
297        }
298        else
299        {
300          winComm_[rank] = MPI_COMM_NULL ;
301          windows_[rank].resize(2) ;
302          windows_[rank][0] = MPI_WIN_NULL ;
303          windows_[rank][1] = MPI_WIN_NULL ;
304          buffer->attachWindows(windows_[rank]) ;
305        }
306
307      }
308    }
309
310
311 
312   /*!
313   Verify state of buffers. Buffer is under pending state if there is no message on it
314   \return state of buffers, pending(true), ready(false)
315   */
316   bool CLegacyContextClient::checkBuffers(void)
317   {
318      map<int,CClientBuffer*>::iterator itBuff;
319      bool pending = false;
320      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
321        pending |= itBuff->second->checkBuffer(!pureOneSided);
322      return pending;
323   }
324
325   //! Release all buffers
326   void CLegacyContextClient::releaseBuffers()
327   {
328      map<int,CClientBuffer*>::iterator itBuff;
329      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
330      {
331         delete itBuff->second;
332      }
333      buffers.clear();
334
335// don't know when release windows
336
337      //if (!isAttachedModeEnabled())
338      //{ 
339      //  for(auto& it : winComm_)
340      //  {
341      //    int rank = it.first ;
342      //    MPI_Win_free(&windows_[rank][0]);
343      //    MPI_Win_free(&windows_[rank][1]);
344      //    MPI_Comm_free(&winComm_[rank]) ;
345      //  }
346      //}
347   }
348
349     
350  /*!
351   Lock the buffers for one sided communications
352   \param [in] ranks list rank of server to which client connects to
353   */
354   void CLegacyContextClient::lockBuffers(list<int>& ranks)
355   {
356      list<int>::iterator it;
357      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer();
358   }
359
360  /*!
361   Unlock the buffers for one sided communications
362   \param [in] ranks list rank of server to which client connects to
363   */
364   void CLegacyContextClient::unlockBuffers(list<int>& ranks)
365   {
366      list<int>::iterator it;
367      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer();
368   }
369     
370   /*!
371   Verify state of buffers corresponding to a connection
372   \param [in] ranks list rank of server to which client connects to
373   \return state of buffers, pending(true), ready(false)
374   */
375   bool CLegacyContextClient::checkBuffers(list<int>& ranks)
376   {
377      list<int>::iterator it;
378      bool pending = false;
379      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided);
380      return pending;
381   }
382
383   /*!
384    * Set the buffer size for each connection. Warning: This function is collective.
385    *
386    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
387    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
388   */
389   void CLegacyContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
390   {
391     setFixedBuffer() ;
392     for(auto& it : mapSize)
393     {
394      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) ;
395      mapBufferSize_[it.first]=size ;
396      if (buffers.count(it.first)>0) buffers[it.first]->fixBufferSize(size);
397     }
398   }
399
400   /*!
401   * Finalize context client and do some reports. Function is non-blocking.
402   */
403  void CLegacyContextClient::finalize(void)
404  {
405    map<int,CClientBuffer*>::iterator itBuff;
406    std::list<int>::iterator ItServerLeader; 
407   
408    bool stop = false;
409
410    int* nbServerConnectionLocal  = new int[serverSize] ;
411    int* nbServerConnectionGlobal  = new int[serverSize] ;
412    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
413    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
414    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
415   
416    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
417   
418    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
419    CMessage msg;
420
421    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
422    sendEvent(event);
423
424    delete[] nbServerConnectionLocal ;
425    delete[] nbServerConnectionGlobal ;
426
427
428    CTimer::get("Blocking time").resume();
429    checkBuffers();
430    CTimer::get("Blocking time").suspend();
431
432    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
433                                          iteMap = mapBufferSize_.end(), itMap;
434
435    StdSize totalBuf = 0;
436    for (itMap = itbMap; itMap != iteMap; ++itMap)
437    {
438      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
439                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
440      totalBuf += itMap->second;
441    }
442    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
443
444  }
445
446
447  /*!
448  */
449  bool CLegacyContextClient::havePendingRequests(void)
450  {
451    bool pending = false;
452    map<int,CClientBuffer*>::iterator itBuff;
453    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
454      pending |= itBuff->second->hasPendingRequest();
455    return pending;
456  }
457 
458  bool CLegacyContextClient::havePendingRequests(list<int>& ranks)
459  {
460      list<int>::iterator it;
461      bool pending = false;
462      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->hasPendingRequest();
463      return pending;
464  }
465
466  bool CLegacyContextClient::isNotifiedFinalized(void)
467  {
468    if (isAttachedModeEnabled()) return true ;
469
470    bool finalized = true;
471    map<int,CClientBuffer*>::iterator itBuff;
472    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
473      finalized &= itBuff->second->isNotifiedFinalized();
474    return finalized;
475  }
476
477}
Note: See TracBrowser for help on using the repository browser.