source: XIOS/dev/dev_ym/XIOS_COUPLING/src/transport/one_sided_context_client.cpp @ 2343

Last change on this file since 2343 was 2343, checked in by ymipsl, 2 years ago
  • Implement new infrastructure for transfert protocol.
  • new purelly one sided protocol is now available, the previous protocol (legacy, mix send/recv and one sided) is still available. Other specific protocol could be implemented more easilly in future.
  • switch can be operate with "transport_protocol" variable in XIOS context :

ex:
<variable id="transport_protocol" type="string">one_sided</variable>

Available protocols are : one_sided, legacy or default. The default protocol is "legacy".

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.1 KB
Line 
1#include "xios_spl.hpp"
2#include "one_sided_context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "type.hpp"
7#include "event_client.hpp"
8#include "context.hpp"
9#include "mpi.hpp"
10#include "timer.hpp"
11#include "cxios.hpp"
12#include "server.hpp"
13#include "services.hpp"
14#include <boost/functional/hash.hpp>
15#include <random>
16#include <chrono>
17
18namespace xios
19{
20    /*!
21    \param [in] parent Pointer to context on client side
22    \param [in] intraComm_ communicator of group client
23    \param [in] interComm_ communicator of group server
24    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
25    */
26    COneSidedContextClient::COneSidedContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
27     : CContextClient(parent, intraComm_, interComm_, cxtSer),
28       mapBufferSize_(), maxBufferedEvents(4)
29    {
30     
31      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
32      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
33
34      if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ;
35     
36      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows
37
38      timeLine = 1;
39    }
40
41
42    /*!
43    In case of attached mode, the current context must be reset to context for client
44    \param [in] event Event sent to server
45    */
46    void COneSidedContextClient::sendEvent(CEventClient& event)
47    {
48      list<int> ranks = event.getRanks();
49 
50//      ostringstream str ;
51//      for(auto& rank : ranks) str<<rank<<" ; " ;
52//      info(100)<<"Event "<<timeLine<<" of context "<<context_->getId()<<"  for ranks : "<<str.str()<<endl ;
53
54      if (CXios::checkEventSync)
55      {
56        int typeId, classId, typeId_in, classId_in;
57        long long timeLine_out;
58        long long timeLine_in( timeLine );
59        typeId_in=event.getTypeId() ;
60        classId_in=event.getClassId() ;
61//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
62        MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
63        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
64        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
65        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
66        {
67           ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)",
68               << "Event are not coherent between client for timeline = "<<timeLine);
69        }
70       
71        vector<int> servers(serverSize,0) ;
72        auto ranks=event.getRanks() ;
73        for(auto& rank : ranks) servers[rank]=1 ;
74        MPI_Allreduce(MPI_IN_PLACE, servers.data(), serverSize,MPI_INT,MPI_SUM,intraComm) ;
75        ostringstream osstr ;
76        for(int i=0;i<serverSize;i++)  if (servers[i]==0) osstr<<i<<" , " ;
77        if (!osstr.str().empty())
78        {
79          ERROR("void COneSidedContextClient::sendEvent(CEventClient& event)",
80                 <<" Some servers will not receive the message for timeline = "<<timeLine<<endl
81                 <<"Servers are : "<<osstr.str()) ;
82        }
83
84
85      }
86     
87      event.setFirst() ;
88      while(!event.isEmpty())
89      {
90        int rank=event.getRank() ;
91        auto itBuffer=buffers.find(rank) ;
92        if (itBuffer==buffers.end()) 
93        { 
94          newBuffer(rank) ;
95          itBuffer=buffers.find(rank) ;
96        }
97        itBuffer->second->eventLoop() ;
98        bool succed = itBuffer->second->writeEvent(timeLine, event)  ;
99        if (succed) event.remove() ;
100        else event.next() ;
101        if (event.isFirst()) callGlobalEventLoop() ;
102      }
103     
104      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
105      {
106        while (checkBuffers(ranks)) callGlobalEventLoop() ;
107     
108        CXios::getDaemonsManager()->scheduleContext(hashId_) ;
109        while (CXios::getDaemonsManager()->isScheduledContext(hashId_)) callGlobalEventLoop() ;
110      }
111     
112      timeLine++;
113    }
114
115
116   void COneSidedContextClient::eventLoop(void)
117   {
118      if (!locked_) checkBuffers() ;
119   }
120
121   void COneSidedContextClient::callGlobalEventLoop(void)
122   {
123     locked_=true ;
124     context_->globalEventLoop() ;
125     locked_=false ;
126   }
127   /*!
128   Make a new buffer for a certain connection to server with specific rank
129   \param [in] rank rank of connected server
130   */
131   void COneSidedContextClient::newBuffer(int rank)
132   {
133      if (!mapBufferSize_.count(rank))
134      {
135        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
136        mapBufferSize_[rank] = CXios::minBufferSize;
137        maxEventSizes[rank] = CXios::minBufferSize;
138      }
139
140      COneSidedClientBuffer* buffer = buffers[rank] = new COneSidedClientBuffer(interComm, rank, commSelf_, interCommMerged_, clientSize+rank );
141      if (isGrowableBuffer_) { buffer->setGrowable(growingFactor_) ; }
142      else buffer->setFixed(mapBufferSize_[rank]) ;
143 
144   }
145
146   /*!
147   Verify state of buffers. Buffer is under pending state if there is no message on it
148   \return state of buffers, pending(true), ready(false)
149   */
150   bool COneSidedContextClient::checkBuffers(void)
151   {
152      bool pending = false;
153      for (auto itBuff : buffers)
154      {
155        itBuff.second->eventLoop() ;
156        pending |= itBuff.second->isEmpty();
157      }
158      return pending;
159   }
160
161   //! Release all buffers
162   void COneSidedContextClient::releaseBuffers()
163   {
164      for (auto& itBuff : buffers) delete itBuff.second;
165      buffers.clear();
166   }
167
168
169   /*!
170   Verify state of buffers corresponding to a connection
171   \param [in] ranks list rank of server to which client connects to
172   \return state of buffers, pending(true), ready(false)
173   */
174   bool COneSidedContextClient::checkBuffers(list<int>& ranks)
175   {
176      bool pending = false;
177      for (auto& rank : ranks) 
178      {
179        buffers[rank]->eventLoop() ;
180        pending |= buffers[rank]->isEmpty() ;
181      }
182      return pending;
183   }
184
185   /*!
186    * Set the buffer size for each connection. Warning: This function is collective.
187    *
188    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
189    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
190   */
191   void COneSidedContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
192   {
193     setFixedBuffer() ;
194     for(auto& it : mapSize)
195     {
196      size_t size=std::max(CXios::minBufferSize*1.0,std::min(it.second*CXios::bufferSizeFactor*1.01,CXios::maxBufferSize*1.0)) * 8 ; // double
197      mapBufferSize_[it.first]=size ;
198      if (buffers.count(it.first)>0) buffers[it.first]->setFixed(size);
199     }
200   }
201
202
203   /*!
204   * Finalize context client and do some reports. Function is non-blocking.
205   */
206  void COneSidedContextClient::finalize(void)
207  {
208    bool stop = false;
209
210    int* nbServerConnectionLocal  = new int[serverSize] ;
211    int* nbServerConnectionGlobal  = new int[serverSize] ;
212    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
213    for (auto itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
214    for (auto ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
215   
216    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
217   
218    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
219    CMessage msg;
220
221    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
222    sendEvent(event);
223
224    delete[] nbServerConnectionLocal ;
225    delete[] nbServerConnectionGlobal ;
226
227
228    CTimer::get("Blocking time").resume();
229    checkBuffers();
230    CTimer::get("Blocking time").suspend();
231
232    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
233                                          iteMap = mapBufferSize_.end(), itMap;
234
235    StdSize totalBuf = 0;
236    for (itMap = itbMap; itMap != iteMap; ++itMap)
237    {
238      report(10) << " Memory report : Context <" << context_->getId() << "> : client side : memory used for buffer of each connection to server" << endl
239                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
240      totalBuf += itMap->second;
241    }
242    report(0) << " Memory report : Context <" << context_->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
243
244  }
245
246
247  /*!
248  */
249  bool COneSidedContextClient::havePendingRequests(void)
250  {
251    return checkBuffers();
252  }
253 
254  bool COneSidedContextClient::havePendingRequests(list<int>& ranks)
255  {
256    return checkBuffers(ranks) ;
257  }
258
259  bool COneSidedContextClient::isNotifiedFinalized(void)
260  {
261    if (isAttachedModeEnabled()) return true ;
262
263    bool finalized = true;
264    for (auto& it : buffers ) finalized &= it.second->isNotifiedFinalized();
265    return finalized;
266  }
267
268}
Note: See TracBrowser for help on using the repository browser.