source: XIOS/dev/dev_ym/XIOS_COUPLING/src/distribution/element.cpp @ 1918

Last change on this file since 1918 was 1918, checked in by ymipsl, 4 years ago

Big update on on going work related to data distribution and transfer between clients and servers.

  • move all related file into distribution directorie
  • implement the concept of data "View"
  • implement the concept of "connector" which make the data transfer between 2 differents "Views"

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 4.4 KB
Line 
1#include "element.hpp"
2#include "distributed_view.hpp"
3#include "local_view.hpp"
4#include "local_connector.hpp"
5#include "context_client.hpp"
6#include "context_server.hpp"
7#include "mpi.hpp"
8
9
10namespace xios
11{
12
13  CDistributedElement::CDistributedElement(int globalSize, const map<int, CArray<size_t,1>>& globalIndex)
14  {
15    globalSize_= globalSize ;
16    for(auto index : globalIndex) 
17    {
18      globalIndex_[index.first].reference(index.second.copy()) ;
19      localSize_[index.first] = index.second.numElements() ;
20    }
21  }
22
23  CDistributedElement::CDistributedElement(CEventServer& event) 
24  {
25    recvFromClient(event) ;
26  }
27 
28  void CDistributedElement::addView(CElementView::type type, std::map<int, CArray<int,1>>& indexView)
29  {
30    views_[type] = new CDistributedView(this, type, indexView) ;
31  } 
32
33  void CDistributedElement::addFullView(void)
34  {
35    if (views_[CElementView::FULL]!=nullptr) return ;
36
37    std::map<int, CArray<int,1>> indexView ;
38    for(auto rank : globalIndex_)
39    {
40      auto& index = indexView[rank.first] ;
41      int size=rank.second.numElements() ;
42      index.resize(size) ;
43      for(int i=0;i<size;i++) index(i)=i ;
44    }
45    addView(CElementView::FULL, indexView) ;
46  } 
47
48  void CDistributedElement::sendToServer(CContextClient* client, CEventClient& event, const CMessage& messageHeader)
49  {
50    int remoteSize = client->getRemoteSize() ;
51    vector<int> nbSenders(remoteSize,0) ;
52    for(auto rank : globalIndex_) nbSenders[rank.first]=1 ; 
53    MPI_Allreduce(MPI_IN_PLACE, nbSenders.data(), remoteSize, MPI_INT, MPI_SUM, client->getIntraComm()) ;
54
55    list<CMessage> messages;
56    for(auto ranksData : globalIndex_)
57    {
58      int rank = ranksData.first ;
59      auto& data = ranksData.second ;
60
61      messages.push_back(CMessage(messageHeader));
62      messages.back()<<globalSize_<<data;
63      event.push(rank, nbSenders[rank], messages.back());
64    } 
65    client->sendEvent(event) ; 
66  }
67
68  void CDistributedElement::recvFromClient(CEventServer& event)
69  {
70    globalIndex_.clear();
71    for (auto& subEvent : event.subEvents)
72    {     
73      CBufferIn* buffer = subEvent.buffer;
74      int rank=subEvent.rank ;
75      *buffer>>globalSize_ ;
76      *buffer >> globalIndex_[rank];
77    }
78    localSize_.clear() ;
79    for(auto& globalIndex : globalIndex_) localSize_[globalIndex.first] = globalIndex.second.numElements() ; 
80  }
81 
82
83
84  CLocalElement::CLocalElement(int localRank, size_t globalSize, CArray<size_t,1>& globalIndex) 
85                             : CDistributedElement(globalSize, {{localRank, globalIndex}}),
86                               globalIndex_(CDistributedElement::globalIndex_[localRank]), localSize_(CDistributedElement::localSize_[localRank]), localRank_(localRank)
87  {
88   
89  }   
90
91  CLocalElement::CLocalElement(int localRank, CEventServer& event) : globalIndex_(CDistributedElement::globalIndex_[localRank]), localSize_(CDistributedElement::localSize_[localRank]), localRank_(localRank) 
92  {
93    recvFromClient(localRank, event) ;
94  }
95
96  void CLocalElement::recvFromClient(int localRank, CEventServer& event)
97  {
98    set<size_t> globalIndex ;
99
100    for (auto& subEvent : event.subEvents)
101    {     
102      CBufferIn* buffer = subEvent.buffer;
103      int rank=subEvent.rank ;
104      CArray<size_t,1> indGlo ;
105      *buffer >> globalSize_>> indGlo;
106      globalIndex.insert(indGlo.dataFirst(), indGlo.dataFirst()+indGlo.numElements()) ;
107    }
108
109    localSize_ = globalIndex.size() ;
110    globalIndex_.resize(localSize_) ;
111    int i=0 ;
112    for(auto& ind : globalIndex) { globalIndex_(i)=ind ; i++; }
113  }
114
115  void CLocalElement::addView(CElementView::type type, CArray<int,1>& indexView)
116  {
117    views_[type] = new CLocalView(this, type, indexView) ;
118  } 
119
120  void CLocalElement::addFullView(void)
121  {
122    if (views_[CElementView::FULL]!=nullptr) return ;
123
124    CArray<int,1> indexView(localSize_) ;
125    for(int i=0;i<localSize_;i++) indexView(i)=i ;
126    addView(CElementView::FULL, indexView) ;
127  } 
128
129  CLocalConnector* CLocalElement::getConnector(CElementView::type srcType, CElementView::type dstType) 
130  { 
131    auto newPair = pair<CElementView::type,CElementView::type>(srcType,dstType);
132    auto it = connectors_.find(newPair) ;
133    if (it==connectors_.end()) 
134    {
135      auto insertPair=pair<pair<CElementView::type,CElementView::type>, CLocalConnector*>(newPair,new CLocalConnector(getView(srcType),getView(dstType))) ;
136      it=connectors_.insert(insertPair).first ;
137      it->second->computeConnector() ;
138    }
139    return it->second ;
140  }
141
142}
Note: See TracBrowser for help on using the repository browser.