source: XIOS3/trunk/src/transport/one_sided_context_client.cpp @ 2592

Last change on this file since 2592 was 2592, checked in by jderouillat, 9 months ago

Free communicators in legacy and one_sided transport layer

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.6 KB
Line 
1#include "xios_spl.hpp"
2#include "one_sided_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "type.hpp"
7#include "event_client.hpp"
8#include "context.hpp"
9#include "mpi.hpp"
10#include "timer.hpp"
11#include "cxios.hpp"
12#include "server.hpp"
13#include "services.hpp"
14#include <boost/functional/hash.hpp>
15#include <random>
16#include <chrono>
17
18namespace xios
19{
20    /*!
21    \param [in] parent Pointer to context on client side
22    \param [in] intraComm_ communicator of group client
23    \param [in] interComm_ communicator of group server
24    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode --> obsolete ).
25    */
26    COneSidedContextClient::COneSidedContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
27     : CContextClient(parent, intraComm_, interComm_, cxtSer),
28       mapBufferSize_(), maxBufferedEvents(4)
29    {
30     
31      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
32
33      xios::MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
34      CXios::getMpiGarbageCollector().registerCommunicator(interCommMerged_) ;
35     
36      xios::MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
37      CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ;
38      eventScheduler_ = parent->getEventScheduler() ; 
39      timeLine = 1;
40    }
41
42
43    /*!
44    In case of attached mode, the current context must be reset to context for client
45    \param [in] event Event sent to server
46    */
47    void COneSidedContextClient::sendEvent(CEventClient& event)
48    {
49      list<int> ranks = event.getRanks();
50 
51//      ostringstream str ;
52//      for(auto& rank : ranks) str<<rank<<" ; " ;
53//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
54
55      if (CXios::checkEventSync)
56      {
57        int typeId, classId, typeId_in, classId_in;
58        long long timeLine_out;
59        long long timeLine_in( timeLine );
60        typeId_in=event.getTypeId() ;
61        classId_in=event.getClassId() ;
62//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
63        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
64        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
65        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
66        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
67        {
68           ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)",
69               << "Event are not coherent between client for timeline = "<<timeLine);
70        }
71       
72        vector<int> servers(serverSize,0) ;
73        auto ranks=event.getRanks() ;
74        for(auto& rank : ranks) servers[rank]=1 ;
75        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
76        ostringstream osstr ;
77        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
78        if (!osstr.str().empty())
79        {
80          ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)",
81                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
82                 <<"Servers are : "<<osstr.str()) ;
83        }
84
85
86      }
87     
88      event.setFirst() ;
89      while(!event.isEmpty())
90      {
91        int rank=event.getRank() ;
92        auto itBuffer=buffers.find(rank) ;
93        if (itBuffer==buffers.end()) 
94        { 
95          newBuffer(rank) ;
96          itBuffer=buffers.find(rank) ;
97        }
98        itBuffer->second->eventLoop() ;
99        double time=CTimer::getTime() ;
100        bool succed = itBuffer->second->writeEvent(timeLine, event)  ;
101        if (succed) 
102        {
103          time=CTimer::getTime()-time ;
104          if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").minus(time) ;
105        }
106
107        if (succed) event.remove() ;
108        else event.next() ;
109        if (event.isFirst())
110        {
111          if (CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").resume() ;
112          yield() ;
113        } 
114      }
115      if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").suspend() ;
116
117
118      synchronize() ;
119     
120      timeLine++;
121    }
122
123
124   void COneSidedContextClient::eventLoop(void)
125   {
126      if (!locked_) checkBuffers() ;
127   }
128
129   void COneSidedContextClient::callGlobalEventLoop(void)
130   {
131     locked_=true ;
132     context_->globalEventLoop() ;
133     locked_=false ;
134   }
135
136   void COneSidedContextClient::yield(void)
137   {
138     locked_=true ;
139     context_->yield() ;
140     locked_=false ;
141   }
142
143   void COneSidedContextClient::synchronize(void)
144   {
145     if (context_->getServiceType()!=CServicesManager::CLIENT)
146     {
147       locked_=true ;
148       context_->synchronize() ;
149       locked_=false ;
150     }   
151   }
152
153   /*!
154   Make a new buffer for a certain connection to server with specific rank
155   \param [in] rank rank of connected server
156   */
157   void COneSidedContextClient::newBuffer(int rank)
158   {
159      if (!mapBufferSize_.count(rank))
160      {
161        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
162        mapBufferSize_[rank] = CXios::minBufferSize;
163        maxEventSizes[rank] = CXios::minBufferSize;
164      }
165
166      COneSidedClientBuffer* buffer = buffers[rank] = new COneSidedClientBuffer(interComm, rank, commSelf_, interCommMerged_, clientSize+rank );
167      if (isGrowableBuffer_) { buffer->setGrowable(growingFactor_) ; }
168      else buffer->setFixed(mapBufferSize_[rank]) ;
169 
170   }
171
172   /*!
173   Verify state of buffers. Buffer is under pending state if there is no message on it
174   \return state of buffers, pending(true), ready(false)
175   */
176   bool COneSidedContextClient::checkBuffers(void)
177   {
178      bool pending = false;
179      for (auto itBuff : buffers)
180      {
181        itBuff.second->eventLoop() ;
182        pending |= !(itBuff.second->isEmpty());
183      }
184      return pending;
185   }
186
187   //! Release all buffers
188   void COneSidedContextClient::releaseBuffers()
189   {
190      for (auto& itBuff : buffers) delete itBuff.second;
191      buffers.clear();
192   }
193
194
195   /*!
196   Verify state of buffers corresponding to a connection
197   \param [in] ranks list rank of server to which client connects to
198   \return state of buffers, pending(true), ready(false)
199   */
200   bool COneSidedContextClient::checkBuffers(list<int>& ranks)
201   {
202      bool pending = false;
203      for (auto& rank : ranks) 
204      {
205        buffers[rank]->eventLoop() ;
206        pending |= !(buffers[rank]->isEmpty()) ;
207      }
208      return pending;
209   }
210
211   /*!
212    * Set the buffer size for each connection. Warning: This function is collective.
213    *
214    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
215    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
216   */
217   void COneSidedContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
218   {
219     setFixedBuffer() ;
220     for(auto& it : mapSize)
221     {
222      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) ;
223      mapBufferSize_[it.first]=size ;
224      if (buffers.count(it.first)>0) buffers[it.first]->setFixed(size);
225     }
226   }
227
228
229   /*!
230   * Finalize context client and do some reports. Function is non-blocking.
231   */
232  void COneSidedContextClient::finalize(void)
233  {
234    bool stop = false;
235
236    int* nbServerConnectionLocal  = new int[serverSize] ;
237    int* nbServerConnectionGlobal  = new int[serverSize] ;
238    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
239    for (auto itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
240    for (auto ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
241   
242    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
243   
244    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
245    CMessage msg;
246
247    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
248    sendEvent(event);
249
250    delete[] nbServerConnectionLocal ;
251    delete[] nbServerConnectionGlobal ;
252
253
254    CTimer::get("Blocking time").resume();
255    checkBuffers();
256    CTimer::get("Blocking time").suspend();
257
258    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
259                                          iteMap = mapBufferSize_.end(), itMap;
260
261    StdSize totalBuf = 0;
262    for (itMap = itbMap; itMap != iteMap; ++itMap)
263    {
264      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
265                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
266      totalBuf += itMap->second;
267    }
268    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
269
270  }
271
272
273  /*!
274  */
275  bool COneSidedContextClient::havePendingRequests(void)
276  {
277    return checkBuffers();
278  }
279 
280  bool COneSidedContextClient::havePendingRequests(list<int>& ranks)
281  {
282    return checkBuffers(ranks) ;
283  }
284
285  bool COneSidedContextClient::isNotifiedFinalized(void)
286  {
287
288    bool finalized = true;
289    for (auto& it : buffers ) finalized &= it.second->isNotifiedFinalized();
290    return finalized;
291  }
292
293}
Note: See TracBrowser for help on using the repository browser.