source: XIOS3/trunk/src/transport/context_client.cpp.old

Last change on this file was 2629, checked in by jderouillat, 2 months ago

Delete boost dependencies, the few features used are replaced by functions stored in extern/boost_extraction

  • Property svn:executable set to *
File size: 18.0 KB
Line 
1#include "xios_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14#include "services.hpp"
15#include <random>
16#include <chrono>
17
18namespace xios
19{
20    /*!
21    \param [in] parent Pointer to context on client side
22    \param [in] intraComm_ communicator of group client
23    \param [in] interComm_ communicator of group server
24    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
25    */
26    CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
27     : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4), associatedServer_(nullptr)
28    {
29     
30      context_ = parent;
31      intraComm = intraComm_;
32      interComm = interComm_;
33      MPI_Comm_rank(intraComm, &clientRank);
34      MPI_Comm_size(intraComm, &clientSize);
35
36      int flag;
37      MPI_Comm_test_inter(interComm, &flag);
38      if (flag) isAttached_=false ;
39      else  isAttached_=true ;
40
41      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
42      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
43     
44
45
46      if (flag) MPI_Comm_remote_size(interComm, &serverSize);
47      else  MPI_Comm_size(interComm, &serverSize);
48
49      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader);
50
51      if (flag) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
52     
53      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
54
55      auto time=chrono::system_clock::now().time_since_epoch().count() ;
56      std::default_random_engine rd(time); // not reproducible from a run to another
57      std::uniform_int_distribution<size_t> dist;
58      hashId_=dist(rd) ;
59      MPI_Bcast(&hashId_,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context
60
61      timeLine = 1;
62    }
63
64    void CContextClient::computeLeader(int clientRank, int clientSize, int serverSize,
65                                       std::list<int>& rankRecvLeader,
66                                       std::list<int>& rankRecvNotLeader)
67    {
68      if ((0 == clientSize) || (0 == serverSize)) return;
69
70      if (clientSize < serverSize)
71      {
72        int serverByClient = serverSize / clientSize;
73        int remain = serverSize % clientSize;
74        int rankStart = serverByClient * clientRank;
75
76        if (clientRank < remain)
77        {
78          serverByClient++;
79          rankStart += clientRank;
80        }
81        else
82          rankStart += remain;
83
84        for (int i = 0; i < serverByClient; i++)
85          rankRecvLeader.push_back(rankStart + i);
86
87        rankRecvNotLeader.resize(0);
88      }
89      else
90      {
91        int clientByServer = clientSize / serverSize;
92        int remain = clientSize % serverSize;
93
94        if (clientRank < (clientByServer + 1) * remain)
95        {
96          if (clientRank % (clientByServer + 1) == 0)
97            rankRecvLeader.push_back(clientRank / (clientByServer + 1));
98          else
99            rankRecvNotLeader.push_back(clientRank / (clientByServer + 1));
100        }
101        else
102        {
103          int rank = clientRank - (clientByServer + 1) * remain;
104          if (rank % clientByServer == 0)
105            rankRecvLeader.push_back(remain + rank / clientByServer);
106          else
107            rankRecvNotLeader.push_back(remain + rank / clientByServer);
108        }
109      }
110    }
111
112    /*!
113    In case of attached mode, the current context must be reset to context for client
114    \param [in] event Event sent to server
115    */
116
117    void CContextClient::sendEvent(CEventClient& event)
118    {
119      list<int> ranks = event.getRanks();
120 
121//      ostringstream str ;
122//      for(auto& rank : ranks) str<<rank<<" ; " ;
123//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
124
125      if (CXios::checkEventSync)
126      {
127        int typeId, classId, typeId_in, classId_in;
128        long long timeLine_out;
129        long long timeLine_in( timeLine );
130        typeId_in=event.getTypeId() ;
131        classId_in=event.getClassId() ;
132//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
133        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ;
134        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
135        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
136        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
137        {
138           ERROR("void CContextClient::sendEvent(CEventClient& event)",
139               << "Event are not coherent between client for timeline = "<<timeLine);
140        }
141       
142        vector<int> servers(serverSize,0) ;
143        auto ranks=event.getRanks() ;
144        for(auto& rank : ranks) servers[rank]=1 ;
145        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
146        ostringstream osstr ;
147        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
148        if (!osstr.str().empty())
149        {
150          ERROR("void CContextClient::sendEvent(CEventClient& event)",
151                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
152                 <<"Servers are : "<<osstr.str()) ;
153        }
154
155
156      }
157
158      if (!event.isEmpty())
159      {
160        list<int> sizes = event.getSizes();
161
162         // We force the getBuffers call to be non-blocking on classical servers
163        list<CBufferOut*> buffList;
164        getBuffers(timeLine, ranks, sizes, buffList) ;
165
166        event.send(timeLine, sizes, buffList);
167       
168        //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ;
169
170        unlockBuffers(ranks) ;
171        checkBuffers(ranks);
172       
173      }
174     
175      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
176      {
177        while (checkBuffers(ranks)) callGlobalEventLoop() ;
178     
179        CXios::getDaemonsManager()->scheduleContext(hashId_) ;
180        while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ;
181      }
182     
183      timeLine++;
184    }
185
186 
187    /*!
188     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
189     * it is explicitly requested to be non-blocking.
190     *
191     *
192     * \param [in] timeLine time line of the event which will be sent to servers
193     * \param [in] serverList list of rank of connected server
194     * \param [in] sizeList size of message corresponding to each connection
195     * \param [out] retBuffers list of buffers that can be used to store an event
196     * \param [in] nonBlocking whether this function should be non-blocking
197     * \return whether the already allocated buffers could be used
198    */
199    bool CContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,
200                                    bool nonBlocking /*= false*/)
201    {
202      list<int>::const_iterator itServer, itSize;
203      list<CClientBuffer*> bufferList;
204      map<int,CClientBuffer*>::const_iterator it;
205      list<CClientBuffer*>::iterator itBuffer;
206      bool areBuffersFree;
207
208      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
209      {
210        it = buffers.find(*itServer);
211        if (it == buffers.end())
212        {
213          newBuffer(*itServer);
214          it = buffers.find(*itServer);
215        }
216        bufferList.push_back(it->second);
217      }
218
219      double lastTimeBuffersNotFree=0. ;
220      double time ;
221      bool doUnlockBuffers ;
222      CTimer::get("Blocking time").resume();
223      do
224      {
225        areBuffersFree = true;
226        doUnlockBuffers=false ;
227        time=MPI_Wtime() ;
228        if (time-lastTimeBuffersNotFree > latency_)
229        {
230          for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
231          {
232            areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
233          }
234          if (!areBuffersFree)
235          {
236            lastTimeBuffersNotFree = time ;
237            doUnlockBuffers=true ;
238          }         
239        }
240        else areBuffersFree = false ;
241
242        if (!areBuffersFree)
243        {
244          if (doUnlockBuffers) for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer();
245          checkBuffers();
246
247          callGlobalEventLoop() ;
248        }
249
250      } while (!areBuffersFree && !nonBlocking);
251      CTimer::get("Blocking time").suspend();
252
253      if (areBuffersFree)
254      {
255        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
256          retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize));
257      }
258      return areBuffersFree;
259   }
260
261   void CContextClient::eventLoop(void)
262   {
263      if (!locked_) checkBuffers() ;
264   }
265
266   void CContextClient::callGlobalEventLoop(void)
267   {
268     locked_=true ;
269     context_->globalEventLoop() ;
270     locked_=false ;
271   }
272   /*!
273   Make a new buffer for a certain connection to server with specific rank
274   \param [in] rank rank of connected server
275   */
276   void CContextClient::newBuffer(int rank)
277   {
278      if (!mapBufferSize_.count(rank))
279      {
280        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
281        mapBufferSize_[rank] = CXios::minBufferSize;
282        maxEventSizes[rank] = CXios::minBufferSize;
283      }
284     
285      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank]);
286      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ;
287      else buffer->fixBuffer() ;
288      // Notify the server
289      CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint));
290      MPI_Aint sendBuff[4] ;
291      sendBuff[0]=hashId_;
292      sendBuff[1]=mapBufferSize_[rank];
293      sendBuff[2]=buffers[rank]->getWinAddress(0);
294      sendBuff[3]=buffers[rank]->getWinAddress(1);
295      info(100)<<"CContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl;
296      bufOut->put(sendBuff, 4);
297      buffer->checkBuffer(true);
298     
299       // create windows dynamically for one-sided
300      if (!isAttachedModeEnabled())
301      {
302        CTimer::get("create Windows").resume() ;
303        MPI_Comm interComm ;
304        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ;
305        MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
306        CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
307        MPI_Comm_free(&interComm) ;
308        windows_[rank].resize(2) ;
309       
310        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
311        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
312       
313        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
314        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
315
316        CTimer::get("create Windows").suspend() ;
317      }
318      else
319      {
320        winComm_[rank] = MPI_COMM_NULL ;
321        windows_[rank].resize(2) ;
322        windows_[rank][0] = MPI_WIN_NULL ;
323        windows_[rank][1] = MPI_WIN_NULL ;
324      }
325      buffer->attachWindows(windows_[rank]) ;
326      if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ;
327       
328   }
329
330   /*!
331   Verify state of buffers. Buffer is under pending state if there is no message on it
332   \return state of buffers, pending(true), ready(false)
333   */
334   bool CContextClient::checkBuffers(void)
335   {
336      map<int,CClientBuffer*>::iterator itBuff;
337      bool pending = false;
338      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
339        pending |= itBuff->second->checkBuffer(!pureOneSided);
340      return pending;
341   }
342
343   //! Release all buffers
344   void CContextClient::releaseBuffers()
345   {
346      map<int,CClientBuffer*>::iterator itBuff;
347      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
348      {
349         delete itBuff->second;
350      }
351      buffers.clear();
352
353// don't know when release windows
354
355      //if (!isAttachedModeEnabled())
356      //{ 
357      //  for(auto& it : winComm_)
358      //  {
359      //    int rank = it.first ;
360      //    MPI_Win_free(&windows_[rank][0]);
361      //    MPI_Win_free(&windows_[rank][1]);
362      //    MPI_Comm_free(&winComm_[rank]) ;
363      //  }
364      //}
365   }
366
367     
368  /*!
369   Lock the buffers for one sided communications
370   \param [in] ranks list rank of server to which client connects to
371   */
372   void CContextClient::lockBuffers(list<int>& ranks)
373   {
374      list<int>::iterator it;
375      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer();
376   }
377
378  /*!
379   Unlock the buffers for one sided communications
380   \param [in] ranks list rank of server to which client connects to
381   */
382   void CContextClient::unlockBuffers(list<int>& ranks)
383   {
384      list<int>::iterator it;
385      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer();
386   }
387     
388   /*!
389   Verify state of buffers corresponding to a connection
390   \param [in] ranks list rank of server to which client connects to
391   \return state of buffers, pending(true), ready(false)
392   */
393   bool CContextClient::checkBuffers(list<int>& ranks)
394   {
395      list<int>::iterator it;
396      bool pending = false;
397      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided);
398      return pending;
399   }
400
401   /*!
402    * Set the buffer size for each connection. Warning: This function is collective.
403    *
404    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
405    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
406   */
407   void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
408   {
409     setFixedBuffer() ;
410     for(auto& it : mapSize)
411     {
412      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) ;
413      mapBufferSize_[it.first]=size ;
414      if (buffers.count(it.first)>0) buffers[it.first]->fixBufferSize(size);
415     }
416   }
417
418  /*!
419  Get leading server in the group of connected server
420  \return ranks of leading servers
421  */
422  const std::list<int>& CContextClient::getRanksServerNotLeader(void) const
423  {
424    return ranksServerNotLeader;
425  }
426
427  /*!
428  Check if client connects to leading server
429  \return connected(true), not connected (false)
430  */
431  bool CContextClient::isServerNotLeader(void) const
432  {
433    return !ranksServerNotLeader.empty();
434  }
435
436  /*!
437  Get leading server in the group of connected server
438  \return ranks of leading servers
439  */
440  const std::list<int>& CContextClient::getRanksServerLeader(void) const
441  {
442    return ranksServerLeader;
443  }
444
445  /*!
446  Check if client connects to leading server
447  \return connected(true), not connected (false)
448  */
449  bool CContextClient::isServerLeader(void) const
450  {
451    return !ranksServerLeader.empty();
452  }
453
454   /*!
455   * Finalize context client and do some reports. Function is non-blocking.
456   */
457  void CContextClient::finalize(void)
458  {
459    map<int,CClientBuffer*>::iterator itBuff;
460    std::list<int>::iterator ItServerLeader;
461   
462    bool stop = false;
463
464    int* nbServerConnectionLocal  = new int[serverSize] ;
465    int* nbServerConnectionGlobal  = new int[serverSize] ;
466    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
467    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
468    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
469   
470    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
471   
472    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
473    CMessage msg;
474
475    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
476    sendEvent(event);
477
478    delete[] nbServerConnectionLocal ;
479    delete[] nbServerConnectionGlobal ;
480
481
482    CTimer::get("Blocking time").resume();
483    checkBuffers();
484    CTimer::get("Blocking time").suspend();
485
486    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
487                                          iteMap = mapBufferSize_.end(), itMap;
488
489    StdSize totalBuf = 0;
490    for (itMap = itbMap; itMap != iteMap; ++itMap)
491    {
492      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
493                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
494      totalBuf += itMap->second;
495    }
496    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
497
498  }
499
500
501  /*!
502  */
503  bool CContextClient::havePendingRequests(void)
504  {
505    bool pending = false;
506    map<int,CClientBuffer*>::iterator itBuff;
507    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
508      pending |= itBuff->second->hasPendingRequest();
509    return pending;
510  }
511 
512  bool CContextClient::havePendingRequests(list<int>& ranks)
513  {
514      list<int>::iterator it;
515      bool pending = false;
516      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->hasPendingRequest();
517      return pending;
518  }
519
520  bool CContextClient::isNotifiedFinalized(void)
521  {
522    if (isAttachedModeEnabled()) return true ;
523
524    bool finalized = true;
525    map<int,CClientBuffer*>::iterator itBuff;
526    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
527      finalized &= itBuff->second->isNotifiedFinalized();
528    return finalized;
529  }
530
531}
Note: See TracBrowser for help on using the repository browser.