source: XIOS3/branches/xios-3.0-beta/src/transport/legacy_context_client.cpp @ 2527

Last change on this file since 2527 was 2527, checked in by jderouillat, 12 months ago

Fix intracommunicator probing for attached mode

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 15.3 KB
Line 
1#include "xios_spl.hpp"
2#include "legacy_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14#include "services.hpp"
15#include <boost/functional/hash.hpp>
16#include <random>
17#include <chrono>
18
19namespace xios
20{
21    /*!
22    \param [in] parent Pointer to context on client side
23    \param [in] intraComm_ communicator of group client
24    \param [in] interComm_ communicator of group server
25    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
26    */
27    CLegacyContextClient::CLegacyContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
28                         : CContextClient(parent, intraComm_, interComm_, cxtSer),
29                           mapBufferSize_(),  maxBufferedEvents(4)
30    {
31      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
32      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
33
34      if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
35      else interCommMerged_ = interComm_; // interComm_ is yet an intracommunicator in attached
36     
37      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
38
39      timeLine = 1;
40    }
41
42
43    /*!
44    In case of attached mode, the current context must be reset to context for client
45    \param [in] event Event sent to server
46    */
47    void CLegacyContextClient::sendEvent(CEventClient& event)
48    {
49      list<int> ranks = event.getRanks();
50 
51//      ostringstream str ;
52//      for(auto& rank : ranks) str<<rank<<" ; " ;
53//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
54
55      if (CXios::checkEventSync)
56      {
57        int typeId, classId, typeId_in, classId_in;
58        long long timeLine_out;
59        long long timeLine_in( timeLine );
60        typeId_in=event.getTypeId() ;
61        classId_in=event.getClassId() ;
62//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
63        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
64        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
65        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
66        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
67        {
68           ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
69               << "Event are not coherent between client for timeline = "<<timeLine);
70        }
71       
72        vector<int> servers(serverSize,0) ;
73        auto ranks=event.getRanks() ;
74        for(auto& rank : ranks) servers[rank]=1 ;
75        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
76        ostringstream osstr ;
77        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
78        if (!osstr.str().empty())
79        {
80          ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
81                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
82                 <<"Servers are : "<<osstr.str()) ;
83        }
84
85
86      }
87
88      if (!event.isEmpty())
89      {
90        list<int> sizes = event.getSizes();
91
92         // We force the getBuffers call to be non-blocking on classical servers
93        list<CBufferOut*> buffList;
94        getBuffers(timeLine, ranks, sizes, buffList) ;
95
96        event.send(timeLine, sizes, buffList);
97       
98        //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ;
99
100        unlockBuffers(ranks) ;
101        checkBuffers(ranks);
102       
103      }
104     
105      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
106      {
107        while (checkBuffers(ranks)) callGlobalEventLoop() ;
108     
109        CXios::getDaemonsManager()->scheduleContext(hashId_) ;
110        while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ;
111      }
112     
113      timeLine++;
114    }
115
116
117    /*!
118     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
119     * it is explicitly requested to be non-blocking.
120     *
121     *
122     * \param [in] timeLine time line of the event which will be sent to servers
123     * \param [in] serverList list of rank of connected server
124     * \param [in] sizeList size of message corresponding to each connection
125     * \param [out] retBuffers list of buffers that can be used to store an event
126     * \param [in] nonBlocking whether this function should be non-blocking
127     * \return whether the already allocated buffers could be used
128    */
129    bool CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,
130                                    bool nonBlocking /*= false*/)
131    {
132      list<int>::const_iterator itServer, itSize;
133      list<CClientBuffer*> bufferList;
134      map<int,CClientBuffer*>::const_iterator it;
135      list<CClientBuffer*>::iterator itBuffer;
136      bool areBuffersFree;
137
138      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
139      {
140        it = buffers.find(*itServer);
141        if (it == buffers.end())
142        {
143          newBuffer(*itServer);
144          it = buffers.find(*itServer);
145        }
146        bufferList.push_back(it->second);
147      }
148
149      double lastTimeBuffersNotFree=0. ;
150      double time ;
151      bool doUnlockBuffers ;
152      CTimer::get("Blocking time").resume();
153      do
154      {
155        areBuffersFree = true;
156        doUnlockBuffers=false ;
157        time=MPI_Wtime() ;
158        if (time-lastTimeBuffersNotFree > latency_)
159        {
160          for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
161          {
162            areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
163          }
164          if (!areBuffersFree)
165          {
166            lastTimeBuffersNotFree = time ;
167            doUnlockBuffers=true ;
168          }         
169        }
170        else areBuffersFree = false ;
171
172        if (!areBuffersFree)
173        {
174          if (doUnlockBuffers) for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer();
175          checkBuffers();
176
177          callGlobalEventLoop() ;
178        }
179
180      } while (!areBuffersFree && !nonBlocking);
181      CTimer::get("Blocking time").suspend();
182
183      if (areBuffersFree)
184      {
185        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
186          retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize));
187      }
188      return areBuffersFree;
189   }
190
191   void CLegacyContextClient::eventLoop(void)
192   {
193      if (!locked_) checkBuffers() ;
194   }
195
196   void CLegacyContextClient::callGlobalEventLoop(void)
197   {
198     locked_=true ;
199     context_->globalEventLoop() ;
200     locked_=false ;
201   }
202   /*!
203   Make a new buffer for a certain connection to server with specific rank
204   \param [in] rank rank of connected server
205   */
206   void CLegacyContextClient::newBuffer(int rank)
207   {
208      if (!mapBufferSize_.count(rank))
209      {
210        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
211        mapBufferSize_[rank] = CXios::minBufferSize;
212        maxEventSizes[rank] = CXios::minBufferSize;
213      }
214     
215      int considerServers = 1;
216      if (isAttachedModeEnabled()) considerServers = 0;
217      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interCommMerged_, considerServers*clientSize+rank, mapBufferSize_[rank], maxEventSizes[rank]);
218      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ;
219      else buffer->fixBuffer() ;
220      // Notify the server
221      CBufferOut* bufOut = buffer->getBuffer(0, 4*sizeof(MPI_Aint));
222      MPI_Aint sendBuff[4] ;
223      sendBuff[0]=hashId_;
224      sendBuff[1]=mapBufferSize_[rank];
225      sendBuff[2]=buffers[rank]->getWinAddress(0); 
226      sendBuff[3]=buffers[rank]->getWinAddress(1); 
227      info(100)<<"CLegacyContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl;
228      bufOut->put(sendBuff, 4); 
229      buffer->checkBuffer(true);
230     
231       // create windows dynamically for one-sided
232      if (!isAttachedModeEnabled())
233      { 
234        CTimer::get("create Windows").resume() ;
235        MPI_Comm interComm ;
236        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ;
237        MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ;
238        CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
239        MPI_Comm_free(&interComm) ;
240        windows_[rank].resize(2) ;
241       
242        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
243        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
244       
245        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
246        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
247
248        CTimer::get("create Windows").suspend() ;
249      }
250      else
251      {
252        winComm_[rank] = MPI_COMM_NULL ;
253        windows_[rank].resize(2) ;
254        windows_[rank][0] = MPI_WIN_NULL ;
255        windows_[rank][1] = MPI_WIN_NULL ;
256      }
257      buffer->attachWindows(windows_[rank]) ;
258      if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ;
259       
260   }
261
262   /*!
263   Verify state of buffers. Buffer is under pending state if there is no message on it
264   \return state of buffers, pending(true), ready(false)
265   */
266   bool CLegacyContextClient::checkBuffers(void)
267   {
268      map<int,CClientBuffer*>::iterator itBuff;
269      bool pending = false;
270      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
271        pending |= itBuff->second->checkBuffer(!pureOneSided);
272      return pending;
273   }
274
275   //! Release all buffers
276   void CLegacyContextClient::releaseBuffers()
277   {
278      map<int,CClientBuffer*>::iterator itBuff;
279      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
280      {
281         delete itBuff->second;
282      }
283      buffers.clear();
284
285// don't know when release windows
286
287      //if (!isAttachedModeEnabled())
288      //{ 
289      //  for(auto& it : winComm_)
290      //  {
291      //    int rank = it.first ;
292      //    MPI_Win_free(&windows_[rank][0]);
293      //    MPI_Win_free(&windows_[rank][1]);
294      //    MPI_Comm_free(&winComm_[rank]) ;
295      //  }
296      //}
297   }
298
299     
300  /*!
301   Lock the buffers for one sided communications
302   \param [in] ranks list rank of server to which client connects to
303   */
304   void CLegacyContextClient::lockBuffers(list<int>& ranks)
305   {
306      list<int>::iterator it;
307      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer();
308   }
309
310  /*!
311   Unlock the buffers for one sided communications
312   \param [in] ranks list rank of server to which client connects to
313   */
314   void CLegacyContextClient::unlockBuffers(list<int>& ranks)
315   {
316      list<int>::iterator it;
317      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer();
318   }
319     
320   /*!
321   Verify state of buffers corresponding to a connection
322   \param [in] ranks list rank of server to which client connects to
323   \return state of buffers, pending(true), ready(false)
324   */
325   bool CLegacyContextClient::checkBuffers(list<int>& ranks)
326   {
327      list<int>::iterator it;
328      bool pending = false;
329      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided);
330      return pending;
331   }
332
333   /*!
334    * Set the buffer size for each connection. Warning: This function is collective.
335    *
336    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
337    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
338   */
339   void CLegacyContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
340   {
341     setFixedBuffer() ;
342     for(auto& it : mapSize)
343     {
344      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) ;
345      mapBufferSize_[it.first]=size ;
346      if (buffers.count(it.first)>0) buffers[it.first]->fixBufferSize(size);
347     }
348   }
349
350   /*!
351   * Finalize context client and do some reports. Function is non-blocking.
352   */
353  void CLegacyContextClient::finalize(void)
354  {
355    map<int,CClientBuffer*>::iterator itBuff;
356    std::list<int>::iterator ItServerLeader; 
357   
358    bool stop = false;
359
360    int* nbServerConnectionLocal  = new int[serverSize] ;
361    int* nbServerConnectionGlobal  = new int[serverSize] ;
362    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
363    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
364    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
365   
366    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
367   
368    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
369    CMessage msg;
370
371    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
372    sendEvent(event);
373
374    delete[] nbServerConnectionLocal ;
375    delete[] nbServerConnectionGlobal ;
376
377
378    CTimer::get("Blocking time").resume();
379    checkBuffers();
380    CTimer::get("Blocking time").suspend();
381
382    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
383                                          iteMap = mapBufferSize_.end(), itMap;
384
385    StdSize totalBuf = 0;
386    for (itMap = itbMap; itMap != iteMap; ++itMap)
387    {
388      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
389                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
390      totalBuf += itMap->second;
391    }
392    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
393
394  }
395
396
397  /*!
398  */
399  bool CLegacyContextClient::havePendingRequests(void)
400  {
401    bool pending = false;
402    map<int,CClientBuffer*>::iterator itBuff;
403    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
404      pending |= itBuff->second->hasPendingRequest();
405    return pending;
406  }
407 
408  bool CLegacyContextClient::havePendingRequests(list<int>& ranks)
409  {
410      list<int>::iterator it;
411      bool pending = false;
412      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->hasPendingRequest();
413      return pending;
414  }
415
416  bool CLegacyContextClient::isNotifiedFinalized(void)
417  {
418    if (isAttachedModeEnabled()) return true ;
419
420    bool finalized = true;
421    map<int,CClientBuffer*>::iterator itBuff;
422    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
423      finalized &= itBuff->second->isNotifiedFinalized();
424    return finalized;
425  }
426
427}
Note: See TracBrowser for help on using the repository browser.