source: XIOS3/trunk/src/transport/one_sided_context_client.cpp

Last change on this file was 2629, checked in by jderouillat, 2 months ago

Delete boost dependencies, the few features used are replaced by functions stored in extern/boost_extraction

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.6 KB
Line 
1#include "xios_spl.hpp"
2#include "one_sided_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "type.hpp"
7#include "event_client.hpp"
8#include "context.hpp"
9#include "mpi.hpp"
10#include "timer.hpp"
11#include "cxios.hpp"
12#include "server.hpp"
13#include "services.hpp"
14#include <random>
15#include <chrono>
16
17namespace xios
18{
19    /*!
20    \param [in] parent Pointer to context on client side
21    \param [in] intraComm_ communicator of group client
22    \param [in] interComm_ communicator of group server
23    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode --> obsolete ).
24    */
25    COneSidedContextClient::COneSidedContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
26     : CContextClient(parent, intraComm_, interComm_, cxtSer),
27       mapBufferSize_(), maxBufferedEvents(4)
28    {
29     
30      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
31
32      xios::MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
33      CXios::getMpiGarbageCollector().registerCommunicator(interCommMerged_) ;
34     
35      xios::MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
36      CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ;
37      eventScheduler_ = parent->getEventScheduler() ; 
38      timeLine = 1;
39    }
40
41
42    /*!
43    In case of attached mode, the current context must be reset to context for client
44    \param [in] event Event sent to server
45    */
46    void COneSidedContextClient::sendEvent(CEventClient& event)
47    {
48      list<int> ranks = event.getRanks();
49 
50//      ostringstream str ;
51//      for(auto& rank : ranks) str<<rank<<" ; " ;
52//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
53
54      if (CXios::checkEventSync)
55      {
56        int typeId, classId, typeId_in, classId_in;
57        long long timeLine_out;
58        long long timeLine_in( timeLine );
59        typeId_in=event.getTypeId() ;
60        classId_in=event.getClassId() ;
61//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
62        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
63        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
64        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
65        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
66        {
67           ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)",
68               << "Event are not coherent between client for timeline = "<<timeLine);
69        }
70       
71        vector<int> servers(serverSize,0) ;
72        auto ranks=event.getRanks() ;
73        for(auto& rank : ranks) servers[rank]=1 ;
74        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
75        ostringstream osstr ;
76        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
77        if (!osstr.str().empty())
78        {
79          ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)",
80                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
81                 <<"Servers are : "<<osstr.str()) ;
82        }
83
84
85      }
86     
87      event.setFirst() ;
88      while(!event.isEmpty())
89      {
90        int rank=event.getRank() ;
91        auto itBuffer=buffers.find(rank) ;
92        if (itBuffer==buffers.end()) 
93        { 
94          newBuffer(rank) ;
95          itBuffer=buffers.find(rank) ;
96        }
97        itBuffer->second->eventLoop() ;
98        double time=CTimer::getTime() ;
99        bool succed = itBuffer->second->writeEvent(timeLine, event)  ;
100        if (succed) 
101        {
102          time=CTimer::getTime()-time ;
103          if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").minus(time) ;
104        }
105
106        if (succed) event.remove() ;
107        else event.next() ;
108        if (event.isFirst())
109        {
110          if (CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").resume() ;
111          yield() ;
112        } 
113      }
114      if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").suspend() ;
115
116
117      synchronize() ;
118     
119      timeLine++;
120    }
121
122
123   void COneSidedContextClient::eventLoop(void)
124   {
125      if (!locked_) checkBuffers() ;
126   }
127
128   void COneSidedContextClient::callGlobalEventLoop(void)
129   {
130     locked_=true ;
131     context_->globalEventLoop() ;
132     locked_=false ;
133   }
134
135   void COneSidedContextClient::yield(void)
136   {
137     locked_=true ;
138     context_->yield() ;
139     locked_=false ;
140   }
141
142   void COneSidedContextClient::synchronize(void)
143   {
144     if (context_->getServiceType()!=CServicesManager::CLIENT)
145     {
146       locked_=true ;
147       context_->synchronize() ;
148       locked_=false ;
149     }   
150   }
151
152   /*!
153   Make a new buffer for a certain connection to server with specific rank
154   \param [in] rank rank of connected server
155   */
156   void COneSidedContextClient::newBuffer(int rank)
157   {
158      if (!mapBufferSize_.count(rank))
159      {
160        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
161        mapBufferSize_[rank] = CXios::minBufferSize;
162        maxEventSizes[rank] = CXios::minBufferSize;
163      }
164
165      COneSidedClientBuffer* buffer = buffers[rank] = new COneSidedClientBuffer(interComm, rank, commSelf_, interCommMerged_, clientSize+rank );
166      if (isGrowableBuffer_) { buffer->setGrowable(growingFactor_) ; }
167      else buffer->setFixed(mapBufferSize_[rank]) ;
168 
169   }
170
171   /*!
172   Verify state of buffers. Buffer is under pending state if there is no message on it
173   \return state of buffers, pending(true), ready(false)
174   */
175   bool COneSidedContextClient::checkBuffers(void)
176   {
177      bool pending = false;
178      for (auto itBuff : buffers)
179      {
180        itBuff.second->eventLoop() ;
181        pending |= !(itBuff.second->isEmpty());
182      }
183      return pending;
184   }
185
186   //! Release all buffers
187   void COneSidedContextClient::releaseBuffers()
188   {
189      for (auto& itBuff : buffers) delete itBuff.second;
190      buffers.clear();
191   }
192
193
194   /*!
195   Verify state of buffers corresponding to a connection
196   \param [in] ranks list rank of server to which client connects to
197   \return state of buffers, pending(true), ready(false)
198   */
199   bool COneSidedContextClient::checkBuffers(list<int>& ranks)
200   {
201      bool pending = false;
202      for (auto& rank : ranks) 
203      {
204        buffers[rank]->eventLoop() ;
205        pending |= !(buffers[rank]->isEmpty()) ;
206      }
207      return pending;
208   }
209
210   /*!
211    * Set the buffer size for each connection. Warning: This function is collective.
212    *
213    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
214    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
215   */
216   void COneSidedContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
217   {
218     setFixedBuffer() ;
219     for(auto& it : mapSize)
220     {
221      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) ;
222      mapBufferSize_[it.first]=size ;
223      if (buffers.count(it.first)>0) buffers[it.first]->setFixed(size);
224     }
225   }
226
227
228   /*!
229   * Finalize context client and do some reports. Function is non-blocking.
230   */
231  void COneSidedContextClient::finalize(void)
232  {
233    bool stop = false;
234
235    int* nbServerConnectionLocal  = new int[serverSize] ;
236    int* nbServerConnectionGlobal  = new int[serverSize] ;
237    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
238    for (auto itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
239    for (auto ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
240   
241    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
242   
243    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
244    CMessage msg;
245
246    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
247    sendEvent(event);
248
249    delete[] nbServerConnectionLocal ;
250    delete[] nbServerConnectionGlobal ;
251
252
253    CTimer::get("Blocking time").resume();
254    checkBuffers();
255    CTimer::get("Blocking time").suspend();
256
257    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
258                                          iteMap = mapBufferSize_.end(), itMap;
259
260    StdSize totalBuf = 0;
261    for (itMap = itbMap; itMap != iteMap; ++itMap)
262    {
263      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
264                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
265      totalBuf += itMap->second;
266    }
267    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
268
269  }
270
271
272  /*!
273  */
274  bool COneSidedContextClient::havePendingRequests(void)
275  {
276    return checkBuffers();
277  }
278 
279  bool COneSidedContextClient::havePendingRequests(list<int>& ranks)
280  {
281    return checkBuffers(ranks) ;
282  }
283
284  bool COneSidedContextClient::isNotifiedFinalized(void)
285  {
286
287    bool finalized = true;
288    for (auto& it : buffers ) finalized &= it.second->isNotifiedFinalized();
289    return finalized;
290  }
291
292}
Note: See TracBrowser for help on using the repository browser.