source: XIOS3/trunk/src/transport/legacy_context_client.cpp @ 2634

Last change on this file since 2634 was 2629, checked in by jderouillat, 2 months ago

Delete boost dependencies, the few features used are replaced by functions stored in extern/boost_extraction

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 16.5 KB
Line 
1#include "xios_spl.hpp"
2#include "legacy_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14#include "services.hpp"
15#include "ressources_manager.hpp"
16#include <random>
17#include <chrono>
18
19namespace xios
20{
21    extern CLogType logTimers ;
22 
23    /*!
24    \param [in] parent Pointer to context on client side
25    \param [in] intraComm_ communicator of group client
26    \param [in] interComm_ communicator of group server
27    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode --> obsolete).
28    */
29    CLegacyContextClient::CLegacyContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
30                         : CContextClient(parent, intraComm_, interComm_, cxtSer),
31                           mapBufferSize_(),  maxBufferedEvents(4)
32    {
33      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
34      xios::MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
35      CXios::getMpiGarbageCollector().registerCommunicator(interCommMerged_) ;
36      xios::MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
37      CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ;
38      eventScheduler_ = parent->getEventScheduler() ; 
39      timeLine = 1;
40    }
41
42    CContextClient::ETransport getType(void) {return CContextClient::legacy ;}
43
44    /*!
45    \param [in] event Event sent to server
46    */
47    void CLegacyContextClient::sendEvent(CEventClient& event)
48    {
49      list<int> ranks = event.getRanks();
50 
51//      ostringstream str ;
52//      for(auto& rank : ranks) str<<rank<<" ; " ;
53//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
54
55      if (CXios::checkEventSync)
56      {
57        int typeId, classId, typeId_in, classId_in;
58        long long timeLine_out;
59        long long timeLine_in( timeLine );
60        typeId_in=event.getTypeId() ;
61        classId_in=event.getClassId() ;
62//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
63        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
64        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
65        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
66        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
67        {
68           ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
69               << "Event are not coherent between client for timeline = "<<timeLine);
70        }
71       
72        vector<int> servers(serverSize,0) ;
73        auto ranks=event.getRanks() ;
74        for(auto& rank : ranks) servers[rank]=1 ;
75        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
76        ostringstream osstr ;
77        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
78        if (!osstr.str().empty())
79        {
80          ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
81                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
82                 <<"Servers are : "<<osstr.str()) ;
83        }
84
85
86      }
87
88      if (!event.isEmpty())
89      {
90        list<int> sizes = event.getSizes();
91
92         // We force the getBuffers call to be non-blocking on classical servers
93        list<CBufferOut*> buffList;
94        getBuffers(timeLine, ranks, sizes, buffList) ;
95
96        event.send(timeLine, sizes, buffList);
97       
98        //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ;
99
100        unlockBuffers(ranks) ;
101        checkBuffers(ranks);
102       
103      }
104     
105      synchronize() ;
106      timeLine++;
107    }
108
109
110    /*!
111     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
112     * it is explicitly requested to be non-blocking.
113     *
114     *
115     * \param [in] timeLine time line of the event which will be sent to servers
116     * \param [in] serverList list of rank of connected server
117     * \param [in] sizeList size of message corresponding to each connection
118     * \param [out] retBuffers list of buffers that can be used to store an event
119     * \param [in] nonBlocking whether this function should be non-blocking
120     * \return whether the already allocated buffers could be used
121    */
122    void CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers)
123    {
124      list<int>::const_iterator itServer, itSize;
125      list<CClientBuffer*> bufferList;
126      map<int,CClientBuffer*>::const_iterator it;
127      list<CClientBuffer*>::iterator itBuffer;
128      bool areBuffersFree;
129/*     
130      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
131      {
132        it = buffers.find(*itServer);
133        if (it == buffers.end())
134        {
135          CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ;
136          size_t token = tokenManager->getToken() ;
137          while (!tokenManager->checkToken(token)) callGlobalEventLoop() ;
138          newBuffer(*itServer);
139          it = buffers.find(*itServer);
140          checkAttachWindows(it->second,it->first) ;
141          tokenManager->updateToken(token) ;
142        }
143        bufferList.push_back(it->second);
144      }
145*/
146      map<int,MPI_Request> attachList ;
147     
148      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
149      {
150        it = buffers.find(*itServer);
151        if (it == buffers.end())
152        {
153          newBuffer(*itServer);
154          it = buffers.find(*itServer);
155          checkAttachWindows(it->second, it->first, attachList) ;
156        }
157        bufferList.push_back(it->second);
158      }
159     
160      while(!attachList.empty())
161      {
162        auto it = attachList.begin() ;
163        while(it!=attachList.end())
164        {
165          if (checkAttachWindows(buffers[it->first], it->first, attachList)) it=attachList.erase(it) ;
166          else ++it ;
167        }
168
169        yield() ;
170      }
171
172
173      double lastTimeBuffersNotFree=0. ;
174      double time ;
175      bool doUnlockBuffers ;
176      CTimer::get("Blocking time").resume();
177      do
178      {
179        areBuffersFree = true;
180        doUnlockBuffers=false ;
181        time=MPI_Wtime() ;
182        if (time-lastTimeBuffersNotFree > latency_)
183        {
184          for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
185          {
186            areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
187          }
188          if (!areBuffersFree)
189          {
190            lastTimeBuffersNotFree = time ;
191            doUnlockBuffers=true ;
192          }         
193        }
194        else areBuffersFree = false ;
195
196        if (!areBuffersFree)
197        {
198          if (doUnlockBuffers) for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer();
199          checkBuffers();
200
201          yield() ;
202        }
203
204      } while (!areBuffersFree);
205      CTimer::get("Blocking time").suspend();
206
207      for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
208        retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize));
209   }
210
211
212   bool CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank, map<int, MPI_Request>& attachList)
213   {
214      int dummy;
215      bool ret=true; 
216
217      if (!buffer->isAttachedWindows())
218      {
219           // create windows dynamically for one-sided
220          /*
221          if (info.isActive(logTimers)) CTimer::get("create Windows").resume() ;
222          MPI_Comm interComm ;
223          int tag = 0 ;
224          xios::MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, tag, &interComm) ;
225          xios::MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
226          xios::MPI_Comm_free(&interComm) ;
227               
228          buffer->attachWindows(winComm_[rank]) ;
229          CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
230          MPI_Barrier(winComm_[rank]) ;
231        */
232        if (attachList.count(rank)==0) 
233        {
234          MPI_Irecv(&dummy,0,MPI_INT,clientSize+rank, 21, interCommMerged_, &attachList[rank]) ;
235          ret = false ;
236        }
237        else
238        {
239          MPI_Status status ;
240          int flag ;
241          MPI_Test(&attachList[rank],&flag, &status) ;
242          if (flag)
243          {
244            if (info.isActive(logTimers)) CTimer::get("create Windows").resume() ;
245            MPI_Comm interComm ;
246            int tag = 0 ;
247            xios::MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, tag, &interComm) ;
248            xios::MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
249            xios::MPI_Comm_free(&interComm) ;
250             
251            buffer->attachWindows(winComm_[rank]) ;
252            CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
253            MPI_Barrier(winComm_[rank]) ;
254            ret = true ;
255          }
256          else ret=false ;
257        }
258      }
259      return ret ;
260    }
261
262
263   void CLegacyContextClient::eventLoop(void)
264   {
265      if (!locked_) checkBuffers() ;
266   }
267
268   void CLegacyContextClient::callGlobalEventLoop(void)
269   {
270     locked_=true ;
271     context_->yield() ;
272     locked_=false ;
273   }
274
275   void CLegacyContextClient::yield(void)
276   {
277     locked_=true ;
278     context_->yield() ;
279     locked_=false ;
280   }
281
282   void CLegacyContextClient::synchronize(void)
283   {
284     if (context_->getServiceType()!=CServicesManager::CLIENT)
285     {
286       locked_=true ;
287       context_->synchronize() ;
288       locked_=false ;
289     }   
290   }
291   /*!
292   Make a new buffer for a certain connection to server with specific rank
293   \param [in] rank rank of connected server
294   */
295   void CLegacyContextClient::newBuffer(int rank)
296   {
297      if (!mapBufferSize_.count(rank))
298      {
299        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
300        mapBufferSize_[rank] = CXios::minBufferSize;
301        maxEventSizes[rank] = CXios::minBufferSize;
302      }
303      bool hasWindows = true ;
304      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interCommMerged_, clientSize+rank, mapBufferSize_[rank], hasWindows);
305      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ;
306      else buffer->fixBuffer() ;
307      // Notify the server
308     
309      CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint));
310      MPI_Aint sendBuff[4] ;
311      sendBuff[0]=hashId_;
312      sendBuff[1]=mapBufferSize_[rank];
313      sendBuff[2]=buffers[rank]->getWinBufferAddress(0); 
314      sendBuff[3]=buffers[rank]->getWinBufferAddress(1); 
315      info(100)<<"CLegacyContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinBufferAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinBufferAddress(1)<<endl;
316      bufOut->put(sendBuff,4); 
317      buffer->checkBuffer(true);
318
319   }
320
321 
322 
323   /*!
324   Verify state of buffers. Buffer is under pending state if there is no message on it
325   \return state of buffers, pending(true), ready(false)
326   */
327   bool CLegacyContextClient::checkBuffers(void)
328   {
329      map<int,CClientBuffer*>::iterator itBuff;
330      bool pending = false;
331      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
332        pending |= itBuff->second->checkBuffer(!pureOneSided);
333      return pending;
334   }
335
336   //! Release all buffers
337   void CLegacyContextClient::releaseBuffers()
338   {
339      map<int,CClientBuffer*>::iterator itBuff;
340      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
341      {
342         delete itBuff->second;
343      }
344      buffers.clear();
345
346      for(auto& it : winComm_)
347      {
348        int rank = it.first ;
349      }
350   }
351
352     
353  /*!
354   Lock the buffers for one sided communications
355   \param [in] ranks list rank of server to which client connects to
356   */
357   void CLegacyContextClient::lockBuffers(list<int>& ranks)
358   {
359      list<int>::iterator it;
360      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer();
361   }
362
363  /*!
364   Unlock the buffers for one sided communications
365   \param [in] ranks list rank of server to which client connects to
366   */
367   void CLegacyContextClient::unlockBuffers(list<int>& ranks)
368   {
369      list<int>::iterator it;
370      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer();
371   }
372     
373   /*!
374   Verify state of buffers corresponding to a connection
375   \param [in] ranks list rank of server to which client connects to
376   \return state of buffers, pending(true), ready(false)
377   */
378   bool CLegacyContextClient::checkBuffers(list<int>& ranks)
379   {
380      list<int>::iterator it;
381      bool pending = false;
382      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided);
383      return pending;
384   }
385
386   /*!
387    * Set the buffer size for each connection. Warning: This function is collective.
388    *
389    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
390    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
391   */
392   void CLegacyContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
393   {
394     setFixedBuffer() ;
395     for(auto& it : mapSize)
396     {
397      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) ;
398      mapBufferSize_[it.first]=size ;
399      if (buffers.count(it.first)>0) buffers[it.first]->fixBufferSize(size);
400     }
401   }
402
403   /*!
404   * Finalize context client and do some reports. Function is non-blocking.
405   */
406  void CLegacyContextClient::finalize(void)
407  {
408    map<int,CClientBuffer*>::iterator itBuff;
409    std::list<int>::iterator ItServerLeader; 
410   
411    bool stop = false;
412
413    int* nbServerConnectionLocal  = new int[serverSize] ;
414    int* nbServerConnectionGlobal  = new int[serverSize] ;
415    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
416    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
417    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
418   
419    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
420   
421    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
422    CMessage msg;
423
424    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
425    sendEvent(event);
426
427    delete[] nbServerConnectionLocal ;
428    delete[] nbServerConnectionGlobal ;
429
430
431    CTimer::get("Blocking time").resume();
432    checkBuffers();
433    CTimer::get("Blocking time").suspend();
434
435    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
436                                          iteMap = mapBufferSize_.end(), itMap;
437
438    StdSize totalBuf = 0;
439    for (itMap = itbMap; itMap != iteMap; ++itMap)
440    {
441      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
442                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
443      totalBuf += itMap->second;
444    }
445    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
446
447  }
448
449
450  /*!
451  */
452  bool CLegacyContextClient::havePendingRequests(void)
453  {
454    bool pending = false;
455    map<int,CClientBuffer*>::iterator itBuff;
456    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
457      pending |= itBuff->second->hasPendingRequest();
458    return pending;
459  }
460 
461  bool CLegacyContextClient::havePendingRequests(list<int>& ranks)
462  {
463      list<int>::iterator it;
464      bool pending = false;
465      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->hasPendingRequest();
466      return pending;
467  }
468
469  bool CLegacyContextClient::isNotifiedFinalized(void)
470  {
471    bool finalized = true;
472    map<int,CClientBuffer*>::iterator itBuff;
473    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
474      finalized &= itBuff->second->isNotifiedFinalized();
475    return finalized;
476  }
477
478}
Note: See TracBrowser for help on using the repository browser.