source: XMLIO_V2/dev/dev_rv/src/xmlio/manager/mpi_manager.cpp @ 267

Last change on this file since 267 was 267, checked in by hozdoba, 13 years ago
File size: 10.6 KB
Line 
1/* ************************************************************************** *
2 *      Copyright © IPSL/LSCE, XMLIOServer, Avril 2010 - Octobre 2011         *
3 * ************************************************************************** */
4 
5 /**
6 * \file    mpi_interface.cpp
7 * \brief   Gestion des communications MPI via une surcouche interne (implémentation).
8 * \author  Hervé Ozdoba
9 * \version 0.4
10 * \date    28 Juin 2011
11 */
12 
13// XMLIOServer headers
14#include "mpi_manager.hpp"
15
16
17// /////////////////////////////// Définitions ////////////////////////////// //
18
19namespace xmlioserver {
20namespace comm {
21
22   // ---------------------- Initialisation & Finalisation ---------------------
23
24   void CMPIManager::Initialise(int * _argc, char *** _argv)
25   {
26      int flag = 0;
27      if (MPI_Initialized(&flag) != MPI_SUCCESS)
28         ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !");
29      if (!flag)
30      {
31         if (MPI_Init(_argc, _argv) != MPI_SUCCESS)
32            ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !");
33      }
34
35   }
36   
37   void CMPIManager::Finalize(void)
38   {
39      if (MPI_Finalize() != MPI_SUCCESS)
40         ERROR("CMPIManager::Finalize(void)", << " MPI Error !");
41   }
42   
43   // ------------------------------ Communicateurs ----------------------------
44   
45   int CMPIManager::GetCommRank(MPI_Comm _comm)
46   {
47      int rank = 0;
48      if (MPI_Comm_rank(_comm, &rank) != MPI_SUCCESS)
49         ERROR("CMPIManager::GetCommRank(comm)", << " MPI Error !");
50      return (rank);
51   }
52   
53   int CMPIManager::GetCommSize(MPI_Comm _comm)
54   {
55      int size = 0;
56      if (MPI_Comm_size(_comm, &size) != MPI_SUCCESS)
57         ERROR("CMPIManager::GetCommSize(comm)", << " MPI Error !");
58      return (size);
59   }
60   
61   MPI_Comm CMPIManager::CreateComm(MPI_Group _group, MPI_Comm _pcomm)
62   {
63      MPI_Comm commu;     
64      if (MPI_Comm_create(_pcomm, _group, &commu) != MPI_SUCCESS)
65         ERROR("CMPIManager::CreateComm(group, pcomm)", << " MPI Error !");
66      return (commu);
67   }
68   
69   //MPI_Comm CMPIManager::GetCommWorld(void)
70   //{
71   //   return (MPI_COMM_WORLD);
72   //}
73   
74   // ---------------------------------- Autre ---------------------------------
75         
76   void CMPIManager::Barrier(MPI_Comm _comm)
77   {
78      if (MPI_Barrier(_comm) != MPI_SUCCESS)
79         ERROR("CMPIManager::Barrier(comm)", << " MPI Error !");
80   }
81   
82   bool CMPIManager::DispatchClient(bool       _is_server,
83                                    MPI_Comm & _comm_client,
84                                    MPI_Comm & _comm_client_server,
85                                    MPI_Comm & _comm_server,
86                                    MPI_Comm   _comm_parent)
87   {
88      int value = (_is_server) ? 1 : 2;
89      std::size_t nbClient = 0, nbServer = 0, nbClientByServer = 0;
90      std::vector<int> info, rank_client, rank_server;
91      CMPIManager::AllGather(value, info, _comm_parent);
92
93      for (std::size_t s = 0;  s < info.size(); s++)
94      {
95         if (info[s] == 1) rank_server.push_back(s);
96         else rank_client.push_back(s);
97      }
98      nbClient = rank_client.size();
99      nbServer = rank_server.size();
100     
101      if (nbClient == 0)
102         ERROR("CMPIManager::DispatchClient()", << " Aucun client disponible !");
103         
104
105      _comm_client = CMPIManager::CreateComm(CMPIManager::CreateSubGroup(
106                     CMPIManager::GetGroupWorld(), rank_client), _comm_parent);
107
108      if (nbServer != 0)
109      {
110         std::size_t currentServer = 0;
111         nbClientByServer = nbClient/nbServer;
112         _comm_server = CMPIManager::CreateComm(CMPIManager::CreateSubGroup(
113                        CMPIManager::GetGroupWorld(), rank_server), _comm_parent);
114
115         //std::cout << nbClient << "," << nbServer  << "," << nbClientByServer << std::endl;
116
117         for (std::size_t mm = 0; mm < nbClient; mm += nbClientByServer)
118         {
119            std::vector<int> group_rank;
120            group_rank.push_back(rank_server[currentServer++]);
121            for (std::size_t nn = 0; nn < nbClientByServer; nn++)
122               group_rank.push_back(rank_client[nn+mm]);
123            MPI_Comm comm_client_server_ = CMPIManager::CreateComm(CMPIManager::CreateSubGroup(
124                                           CMPIManager::GetGroupWorld(), group_rank), _comm_parent);
125
126            if (std::find(group_rank.begin(), group_rank.end(),
127                         CMPIManager::GetCommRank(_comm_parent)) != group_rank.end())
128            {
129               _comm_client_server = comm_client_server_;
130            }
131               
132            group_rank.clear();
133         }
134         return (true);
135      }
136      else
137      {
138         _comm_server = _comm_client;
139         return (false);
140      }
141   }
142   
143
144   // --------------------------------- Groupes --------------------------------
145         
146   MPI_Group CMPIManager::GetGroupWorld(void)
147   {
148      MPI_Group group = 0;
149      if (MPI_Comm_group(MPI_COMM_WORLD, &group) != MPI_SUCCESS)
150         ERROR("CMPIManager::GetGroupWorld()", << " MPI Error !");
151      return (group);
152   }
153   
154   MPI_Group CMPIManager::CreateSubGroup(MPI_Group _pgroup, const std::vector<int> & _ranks)
155   {
156      MPI_Group group = 0;
157      if (MPI_Group_incl(_pgroup, _ranks.size(), const_cast<int*>(&(_ranks[0])), &group) != MPI_SUCCESS)
158         ERROR("CMPIManager::CreateSubGroup(pgroup, ranks)", << " MPI Error !");
159      return (group);
160   }
161   
162   MPI_Group CMPIManager::CreateSubGroup
163      (MPI_Group _pgroup, int _min_rank, int _max_rank, int _intval)
164   {
165      std::vector<int> ranks;
166      for (int i = _min_rank; i <= _max_rank; i += _intval)
167         ranks.push_back(i);
168      return (CMPIManager::CreateSubGroup(_pgroup, ranks));
169   }
170
171   // ----------------------------------- Tests --------------------------------
172         
173   bool CMPIManager::IsMaster(MPI_Comm _comm)
174   {
175      return (CMPIManager::GetCommRank(_comm) == 0); 
176   }
177   
178   bool CMPIManager::IsRank(int _rank, MPI_Comm _comm)
179   {
180      return (CMPIManager::GetCommRank(_comm) == _rank); 
181   }
182
183   // --------------------------- Communication simple -------------------------
184         
185   void CMPIManager::Send (MPI_Comm _comm, int _dest_rank, char * _data,
186                           std::size_t _size, MPI_Request & _request)
187   {
188      int nsize = _size;   
189      if (MPI_Issend(_data, nsize, MPI_CHAR, _dest_rank, 0, _comm, &_request) != MPI_SUCCESS)
190         ERROR("CMPIManager::Send (comm, dest_rank, data, size, request)", << " MPI Error !");
191   }
192   
193   void CMPIManager::Wait (MPI_Request & _request)
194   {
195      MPI_Status status;
196      if (MPI_Wait(&_request, &status) != MPI_SUCCESS)
197         ERROR("CMPIManager::Wait (request)", << " MPI Error !");
198   }
199   
200   bool CMPIManager::Test (MPI_Request & _request)
201   {
202      MPI_Status status;
203      int flag = 0;
204      if (MPI_Test(&_request, &flag, &status) != MPI_SUCCESS)
205         ERROR("CMPIManager::Test (request)", << " MPI Error !");
206      return (flag);
207   }
208
209   bool CMPIManager::HasReceivedData(MPI_Comm _comm, int _src_rank)
210   {
211      MPI_Status status;
212      int flag = 0;
213      if (MPI_Iprobe(_src_rank, MPI_ANY_TAG, _comm, &flag, &status) != MPI_SUCCESS)
214         ERROR("CMPIManager::HasReceivedData (comm, rank)", << " MPI Error !");
215      return (flag);
216   }
217   
218   std::size_t CMPIManager::GetReceivedDataSize(MPI_Comm _comm, int _src_rank)
219   {
220      MPI_Status status;
221      int flag = 0, size = 0;
222      if (MPI_Iprobe(_src_rank, MPI_ANY_TAG, _comm, &flag, &status) != MPI_SUCCESS)
223         ERROR("CMPIManager::getReceivedDataSize (comm, rank)", << " MPI Error !");
224      if (!flag) return (0); 
225      if (MPI_Get_count(&status, MPI_CHAR, &size) != MPI_SUCCESS)
226         ERROR("CMPIManager::getReceivedDataSize (comm, rank)", << " MPI Error !");
227
228      return (size);
229   }
230   
231   void CMPIManager::Receive(MPI_Comm _comm, int _src_rank, char * _data)
232   {
233      MPI_Request request = 0;
234      int size = CMPIManager::GetReceivedDataSize(_comm, _src_rank);
235      if (MPI_Irecv(_data, size, MPI_CHAR, _src_rank, MPI_ANY_TAG, _comm, &request) != MPI_SUCCESS)
236         ERROR("CMPIManager::Receive (comm, src_rank, data)", << " MPI Error !");
237      CMPIManager::Wait (request); // Temporaire
238   }
239   
240   void CMPIManager::AllGather(int _indata, std::vector<int> & _outdata, MPI_Comm _comm)
241   {
242      std::vector<int> data; data.push_back(_indata);
243      CMPIManager::AllGather(data, _outdata, _comm);
244   }
245
246   void  CMPIManager::AllGather(const std::vector<int> & _indata,
247                                      std::vector<int> & _outdata, MPI_Comm _comm)
248   {
249      int sendcount = _indata.size(),
250          recvcount = _indata.size() * CMPIManager::GetCommSize(_comm);
251      _outdata.resize(recvcount);     
252      if (MPI_Allgather ( const_cast<int*>(&(_indata[0])), sendcount, MPI_INTEGER,
253                                          &(_outdata[0]) , recvcount, MPI_INTEGER, _comm) != MPI_SUCCESS)
254         ERROR("CMPIManager::AllGather (indata, outdata, comm)", << " MPI Error !");
255   }
256         
257   // ------------------------- Communication 'complexe' -----------------------
258         
259   void CMPIManager::SendLinearBuffer(MPI_Comm _comm, int _dest_rank, CLinearBuffer & _lbuffer, MPI_Request & _request)
260   {
261      CMPIManager::Send(_comm, _dest_rank, _lbuffer, _lbuffer.getUsedSize(), _request);
262      _lbuffer.clear();
263   }
264   
265   void CMPIManager::ReceiveLinearBuffer(MPI_Comm _comm, int _src_rank, CLinearBuffer & _lbuffer)
266   {
267      CMPIManager::Receive(_comm, _src_rank, _lbuffer);
268      _lbuffer.computeBufferData();
269   }
270   
271   boost::shared_ptr<CLinearBuffer> CMPIManager::ReceiveLinearBuffer(MPI_Comm _comm, int _src_rank)
272   {
273      boost::shared_ptr<CLinearBuffer> buff_ptr
274         (new CLinearBuffer(CMPIManager::GetReceivedDataSize(_comm, _src_rank)));
275      CMPIManager::ReceiveLinearBuffer(_comm, _src_rank, *buff_ptr);
276      return (buff_ptr);
277   }
278   
279   void CMPIManager::ReceiveCircularBuffer(MPI_Comm _comm, int _src_rank, CCircularBuffer & _cbuffer)
280   {
281      std::size_t data_size  = CMPIManager::GetReceivedDataSize(_comm, _src_rank);
282      std::size_t data_begin = _cbuffer.prepareNextDataPosition(data_size);
283      CMPIManager::Receive(_comm, _src_rank, _cbuffer.getData(data_begin));
284      _cbuffer.updateNbRequests(data_begin, data_begin + data_size);
285   }
286
287   // ---------------------- Mémoire (non fonctionnel ....) --------------------
288         
289   void CMPIManager::AllocMemory(void * _data, std::size_t _size)
290   {
291      if (MPI_Alloc_mem(sizeof(char) * _size, MPI_INFO_NULL, _data) != MPI_SUCCESS)
292         ERROR("CMPIManager::AllocMem(data, size)", << " MPI Error !");
293   }
294   
295   void CMPIManager::FreeMemory (void * _data)
296   {
297      MPI_Free_mem(_data);
298   }
299
300} // namespace comm
301} // namespace xmlioserver
302
Note: See TracBrowser for help on using the repository browser.