source: XIOS3/trunk/src/transport/legacy_context_client.cpp @ 2520

Last change on this file since 2520 was 2520, checked in by jderouillat, 13 months ago

Replace MPI probing on intercommunicator by probing on intracommunicator

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 17.0 KB
Line 
1#include "xios_spl.hpp"
2#include "legacy_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14#include "services.hpp"
15#include "ressources_manager.hpp"
16#include <boost/functional/hash.hpp>
17#include <random>
18#include <chrono>
19
20namespace xios
21{
22    /*!
23    \param [in] parent Pointer to context on client side
24    \param [in] intraComm_ communicator of group client
25    \param [in] interComm_ communicator of group server
26    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
27    */
28    CLegacyContextClient::CLegacyContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
29                         : CContextClient(parent, intraComm_, interComm_, cxtSer),
30                           mapBufferSize_(),  maxBufferedEvents(4)
31    {
32      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
33      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
34
35      if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
36     
37      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
38
39      timeLine = 1;
40    }
41
42    CContextClient::ETransport getType(void) {return CContextClient::legacy ;}
43
44    /*!
45    In case of attached mode, the current context must be reset to context for client
46    \param [in] event Event sent to server
47    */
48    void CLegacyContextClient::sendEvent(CEventClient& event)
49    {
50      list<int> ranks = event.getRanks();
51 
52//      ostringstream str ;
53//      for(auto& rank : ranks) str<<rank<<" ; " ;
54//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
55
56      if (CXios::checkEventSync)
57      {
58        int typeId, classId, typeId_in, classId_in;
59        long long timeLine_out;
60        long long timeLine_in( timeLine );
61        typeId_in=event.getTypeId() ;
62        classId_in=event.getClassId() ;
63//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
64        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
65        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
66        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
67        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
68        {
69           ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
70               << "Event are not coherent between client for timeline = "<<timeLine);
71        }
72       
73        vector<int> servers(serverSize,0) ;
74        auto ranks=event.getRanks() ;
75        for(auto& rank : ranks) servers[rank]=1 ;
76        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
77        ostringstream osstr ;
78        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
79        if (!osstr.str().empty())
80        {
81          ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
82                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
83                 <<"Servers are : "<<osstr.str()) ;
84        }
85
86
87      }
88
89      if (!event.isEmpty())
90      {
91        list<int> sizes = event.getSizes();
92
93         // We force the getBuffers call to be non-blocking on classical servers
94        list<CBufferOut*> buffList;
95        getBuffers(timeLine, ranks, sizes, buffList) ;
96
97        event.send(timeLine, sizes, buffList);
98       
99        //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ;
100
101        unlockBuffers(ranks) ;
102        checkBuffers(ranks);
103       
104      }
105     
106      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
107      {
108        while (checkBuffers(ranks)) callGlobalEventLoop() ;
109     
110        CXios::getDaemonsManager()->scheduleContext(hashId_) ;
111        while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ;
112      }
113     
114      MPI_Request req ;
115      MPI_Status status ;
116      MPI_Ibarrier(intraComm,&req) ;
117      int flag ;
118      MPI_Test(&req,&flag,&status) ;
119      while(!flag) 
120      {
121        callGlobalEventLoop() ;
122        MPI_Test(&req,&flag,&status) ;
123      }
124
125
126      timeLine++;
127    }
128
129
130    /*!
131     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
132     * it is explicitly requested to be non-blocking.
133     *
134     *
135     * \param [in] timeLine time line of the event which will be sent to servers
136     * \param [in] serverList list of rank of connected server
137     * \param [in] sizeList size of message corresponding to each connection
138     * \param [out] retBuffers list of buffers that can be used to store an event
139     * \param [in] nonBlocking whether this function should be non-blocking
140     * \return whether the already allocated buffers could be used
141    */
142    void CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers)
143    {
144      list<int>::const_iterator itServer, itSize;
145      list<CClientBuffer*> bufferList;
146      map<int,CClientBuffer*>::const_iterator it;
147      list<CClientBuffer*>::iterator itBuffer;
148      bool areBuffersFree;
149     
150      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
151      {
152        it = buffers.find(*itServer);
153        if (it == buffers.end())
154        {
155          CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ;
156          size_t token = tokenManager->getToken() ;
157          while (!tokenManager->lockToken(token)) callGlobalEventLoop() ;
158          newBuffer(*itServer);
159          it = buffers.find(*itServer);
160          checkAttachWindows(it->second,it->first) ;
161          tokenManager->unlockToken(token) ;
162        }
163        bufferList.push_back(it->second);
164      }
165
166      double lastTimeBuffersNotFree=0. ;
167      double time ;
168      bool doUnlockBuffers ;
169      CTimer::get("Blocking time").resume();
170      do
171      {
172        areBuffersFree = true;
173        doUnlockBuffers=false ;
174        time=MPI_Wtime() ;
175        if (time-lastTimeBuffersNotFree > latency_)
176        {
177          for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
178          {
179            areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
180          }
181          if (!areBuffersFree)
182          {
183            lastTimeBuffersNotFree = time ;
184            doUnlockBuffers=true ;
185          }         
186        }
187        else areBuffersFree = false ;
188
189        if (!areBuffersFree)
190        {
191          if (doUnlockBuffers) for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer();
192          checkBuffers();
193
194          callGlobalEventLoop() ;
195        }
196
197      } while (!areBuffersFree);
198      CTimer::get("Blocking time").suspend();
199
200      for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
201        retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize));
202   }
203
204   void CLegacyContextClient::eventLoop(void)
205   {
206      if (!locked_) checkBuffers() ;
207   }
208
209   void CLegacyContextClient::callGlobalEventLoop(void)
210   {
211     locked_=true ;
212     context_->globalEventLoop() ;
213     locked_=false ;
214   }
215   /*!
216   Make a new buffer for a certain connection to server with specific rank
217   \param [in] rank rank of connected server
218   */
219   void CLegacyContextClient::newBuffer(int rank)
220   {
221      if (!mapBufferSize_.count(rank))
222      {
223        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
224        mapBufferSize_[rank] = CXios::minBufferSize;
225        maxEventSizes[rank] = CXios::minBufferSize;
226      }
227     
228      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interCommMerged_, clientSize+rank, mapBufferSize_[rank], maxEventSizes[rank]);
229      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ;
230      else buffer->fixBuffer() ;
231      // Notify the server
232      CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint));
233      MPI_Aint sendBuff[4] ;
234      sendBuff[0]=hashId_;
235      sendBuff[1]=mapBufferSize_[rank];
236      sendBuff[2]=buffers[rank]->getWinAddress(0); 
237      sendBuff[3]=buffers[rank]->getWinAddress(1); 
238      info(100)<<"CLegacyContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl;
239      bufOut->put(sendBuff, 4); 
240      buffer->checkBuffer(true);
241/*
242       // create windows dynamically for one-sided
243      if (!isAttachedModeEnabled())
244      {
245        CTimer::get("create Windows").resume() ;
246        MPI_Comm interComm ;
247        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ;
248        MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
249        CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
250        MPI_Comm_free(&interComm) ;
251        windows_[rank].resize(2) ;
252       
253        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
254        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
255       
256        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
257        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
258
259        CTimer::get("create Windows").suspend() ;
260      }
261      else
262      {
263        winComm_[rank] = MPI_COMM_NULL ;
264        windows_[rank].resize(2) ;
265        windows_[rank][0] = MPI_WIN_NULL ;
266        windows_[rank][1] = MPI_WIN_NULL ;
267      }
268      buffer->attachWindows(windows_[rank]) ;
269      if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ;
270  */     
271   }
272
273   void CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank)
274   {
275      if (!buffer->isAttachedWindows())
276      {
277           // create windows dynamically for one-sided
278        if (!isAttachedModeEnabled())
279        { 
280          CTimer::get("create Windows").resume() ;
281          MPI_Comm interComm ;
282          MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ;
283          MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
284          CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
285          MPI_Comm_free(&interComm) ;
286          windows_[rank].resize(2) ;
287     
288          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
289          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
290     
291          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
292          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
293
294          CTimer::get("create Windows").suspend() ;
295          buffer->attachWindows(windows_[rank]) ;
296          MPI_Barrier(winComm_[rank]) ;
297        }
298        else
299        {
300          winComm_[rank] = MPI_COMM_NULL ;
301          windows_[rank].resize(2) ;
302          windows_[rank][0] = MPI_WIN_NULL ;
303          windows_[rank][1] = MPI_WIN_NULL ;
304          buffer->attachWindows(windows_[rank]) ;
305        }
306
307      }
308    }
309
310
311 
312   /*!
313   Verify state of buffers. Buffer is under pending state if there is no message on it
314   \return state of buffers, pending(true), ready(false)
315   */
316   bool CLegacyContextClient::checkBuffers(void)
317   {
318      map<int,CClientBuffer*>::iterator itBuff;
319      bool pending = false;
320      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
321        pending |= itBuff->second->checkBuffer(!pureOneSided);
322      return pending;
323   }
324
325   //! Release all buffers
326   void CLegacyContextClient::releaseBuffers()
327   {
328      map<int,CClientBuffer*>::iterator itBuff;
329      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
330      {
331         delete itBuff->second;
332      }
333      buffers.clear();
334
335// don't know when release windows
336
337      //if (!isAttachedModeEnabled())
338      //{ 
339      //  for(auto& it : winComm_)
340      //  {
341      //    int rank = it.first ;
342      //    MPI_Win_free(&windows_[rank][0]);
343      //    MPI_Win_free(&windows_[rank][1]);
344      //    MPI_Comm_free(&winComm_[rank]) ;
345      //  }
346      //}
347   }
348
349     
350  /*!
351   Lock the buffers for one sided communications
352   \param [in] ranks list rank of server to which client connects to
353   */
354   void CLegacyContextClient::lockBuffers(list<int>& ranks)
355   {
356      list<int>::iterator it;
357      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer();
358   }
359
360  /*!
361   Unlock the buffers for one sided communications
362   \param [in] ranks list rank of server to which client connects to
363   */
364   void CLegacyContextClient::unlockBuffers(list<int>& ranks)
365   {
366      list<int>::iterator it;
367      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer();
368   }
369     
370   /*!
371   Verify state of buffers corresponding to a connection
372   \param [in] ranks list rank of server to which client connects to
373   \return state of buffers, pending(true), ready(false)
374   */
375   bool CLegacyContextClient::checkBuffers(list<int>& ranks)
376   {
377      list<int>::iterator it;
378      bool pending = false;
379      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided);
380      return pending;
381   }
382
383   /*!
384    * Set the buffer size for each connection. Warning: This function is collective.
385    *
386    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
387    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
388   */
389   void CLegacyContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
390   {
391     setFixedBuffer() ;
392     for(auto& it : mapSize)
393     {
394      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) ;
395      mapBufferSize_[it.first]=size ;
396      if (buffers.count(it.first)>0) buffers[it.first]->fixBufferSize(size);
397     }
398   }
399
400   /*!
401   * Finalize context client and do some reports. Function is non-blocking.
402   */
403  void CLegacyContextClient::finalize(void)
404  {
405    map<int,CClientBuffer*>::iterator itBuff;
406    std::list<int>::iterator ItServerLeader; 
407   
408    bool stop = false;
409
410    int* nbServerConnectionLocal  = new int[serverSize] ;
411    int* nbServerConnectionGlobal  = new int[serverSize] ;
412    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
413    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
414    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
415   
416    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
417   
418    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
419    CMessage msg;
420
421    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
422    sendEvent(event);
423
424    delete[] nbServerConnectionLocal ;
425    delete[] nbServerConnectionGlobal ;
426
427
428    CTimer::get("Blocking time").resume();
429    checkBuffers();
430    CTimer::get("Blocking time").suspend();
431
432    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
433                                          iteMap = mapBufferSize_.end(), itMap;
434
435    StdSize totalBuf = 0;
436    for (itMap = itbMap; itMap != iteMap; ++itMap)
437    {
438      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
439                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
440      totalBuf += itMap->second;
441    }
442    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
443
444  }
445
446
447  /*!
448  */
449  bool CLegacyContextClient::havePendingRequests(void)
450  {
451    bool pending = false;
452    map<int,CClientBuffer*>::iterator itBuff;
453    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
454      pending |= itBuff->second->hasPendingRequest();
455    return pending;
456  }
457 
458  bool CLegacyContextClient::havePendingRequests(list<int>& ranks)
459  {
460      list<int>::iterator it;
461      bool pending = false;
462      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->hasPendingRequest();
463      return pending;
464  }
465
466  bool CLegacyContextClient::isNotifiedFinalized(void)
467  {
468    if (isAttachedModeEnabled()) return true ;
469
470    bool finalized = true;
471    map<int,CClientBuffer*>::iterator itBuff;
472    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
473      finalized &= itBuff->second->isNotifiedFinalized();
474    return finalized;
475  }
476
477}
Note: See TracBrowser for help on using the repository browser.