source: XMLIO_V2/dev/common/src/xmlio/manager/mpi_manager.cpp @ 255

Last change on this file since 255 was 255, checked in by hozdoba, 13 years ago

Gestion du dispatch client

File size: 11.2 KB
Line 
1#include "mpi_manager.hpp"
2
3#include "impi_interface.hpp"
4
5namespace xmlioserver
6{
7   namespace comm
8   {
9      /// ////////////////////// Définitions ////////////////////// ///
10
11      void CMPIManager::Initialise(int * UNUSED(argc), char *** UNUSED(argv))
12      {
13         int error = 0;
14         bool flag = false;
15
16         mpi_initialized(&flag, &error);
17         if (error != mpi_success)
18            ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !");
19         if (!flag)
20         {
21            mpi_init(&error);
22            if (error != mpi_success)
23               ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !");
24         }
25      }
26
27      void CMPIManager::Finalize(void)
28      {
29         int error = 0;
30         mpi_finalize(&error);
31         if (error != mpi_success)
32            ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !");
33      }
34
35      ///--------------------------------------------------------------
36
37      int CMPIManager::GetCommRank(MPIComm comm)
38      {
39         int rank = 0, error = 0;
40         mpi_comm_rank(&comm, &rank, &error);
41         if (error != mpi_success)
42            ERROR("CMPIManager::GetCommRank(comm)", << " MPI Error !");
43         return (rank);
44      }
45
46      int CMPIManager::GetCommSize(MPIComm comm)
47      {
48         int size = 0, error = 0;
49         mpi_comm_size(&comm, &size, &error);
50         if (error != mpi_success)
51            ERROR("CMPIManager::GetCommSize(comm)", << " MPI Error !");
52         return (size);
53      }
54
55      MPIComm CMPIManager::GetCommWorld(void)
56      { 
57         return (mpi_comm_world); 
58      }
59
60      bool CMPIManager::IsMaster(MPIComm comm)
61      { 
62         return (CMPIManager::GetCommRank(comm) == 0); 
63      }
64
65      bool CMPIManager::IsRank(MPIComm comm, int rank)
66      { 
67         return (CMPIManager::GetCommRank(comm) == rank); 
68      }
69
70      MPIComm CMPIManager::CreateComm(MPIGroup group, MPIComm pcomm)
71      {
72         MPIComm  commu = 0;
73         int error = 0;
74         mpi_comm_create(&pcomm, &group, &commu, &error);
75         if (error != mpi_success)
76            ERROR("CMPIManager::CreateComm(group, pcomm)", << " MPI Error !");
77         return (commu);
78      }
79
80      //---------------------------------------------------------------
81
82      void CMPIManager::Barrier(MPIComm comm)
83      {
84         int error = 0;
85         mpi_barrier(&comm, &error);
86         if (error != mpi_success)
87            ERROR("CMPIManager::Barrier(comm)", << " MPI Error !");
88      }
89
90      bool CMPIManager::DispatchClient(bool      is_server,
91                                       MPIComm & comm_client,
92                                       MPIComm & comm_client_server,
93                                       MPIComm & comm_server,
94                                       MPIComm   comm_parent)
95      {
96         int value = (is_server) ? 1 : 2;
97         StdSize nbClient = 0, nbServer = 0, nbClientByServer = 0;
98         std::vector<int> info, rank_client, rank_server;
99         CMPIManager::AllGather(value, info, comm_parent);
100
101         for (StdSize s = 0;  s < info.size(); s++)
102         {
103            if (info[s] == 1) rank_server.push_back(s);
104            else rank_client.push_back(s);
105         }
106         nbClient = rank_client.size();
107         nbServer = rank_server.size();
108         
109
110         comm_client = CMPIManager::CreateComm(CMPIManager::CreateSubGroup(
111                       CMPIManager::GetGroupWorld(), rank_client), comm_parent);
112
113         if (nbServer != 0)
114         {
115            StdSize currentServer = 0;
116            nbClientByServer = nbClient/nbServer;
117            comm_server = CMPIManager::CreateComm(CMPIManager::CreateSubGroup(
118                          CMPIManager::GetGroupWorld(), rank_server), comm_parent);
119
120            //std::cout << nbClient << "," << nbServer  << "," << nbClientByServer << std::endl;
121
122            for (StdSize mm = 0; mm < nbClient; mm += nbClientByServer)
123            {
124               std::vector<int> group_rank;
125               group_rank.push_back(rank_server[currentServer++]);
126               for (StdSize nn = 0; nn < nbClientByServer; nn++)
127                  group_rank.push_back(rank_client[nn+mm]);
128               MPIComm comm_client_server_ = CMPIManager::CreateComm(CMPIManager::CreateSubGroup(
129                                             CMPIManager::GetGroupWorld(), group_rank), comm_parent);
130
131               if (std::find(group_rank.begin(), group_rank.end(), CMPIManager::GetCommRank()) != group_rank.end())
132               {
133                  comm_client_server = comm_client_server_;
134               }
135               
136               group_rank.clear();
137            }
138            return (true);
139         }
140         else
141         {
142            comm_server = comm_client;
143            return (false);
144         }
145      }
146
147      //---------------------------------------------------------------
148
149      MPIGroup CMPIManager::GetGroupWorld(void)
150      {
151         MPIGroup group = 0;
152         int error = 0;
153         MPIComm  commu = CMPIManager::GetCommWorld();
154         mpi_comm_group(&commu, &group, &error);
155         if (error != mpi_success)
156            ERROR("CMPIManager::GetGroupWorld()", << " MPI Error !");
157         return (group);
158      }
159
160      MPIGroup CMPIManager::CreateSubGroup(MPIGroup pgroup, const std::vector<int> & ranks)
161      {
162         MPIGroup group = 0;
163         int size = ranks.size();
164         int error = 0;
165         mpi_group_incl(&pgroup, &size, &(ranks[0]), &group, &error);
166         if (error != mpi_success)
167            ERROR("CMPIManager::CreateSubGroup(pgroup, ranks)", << " MPI Error !");
168         return (group);
169      }
170
171      MPIGroup CMPIManager::CreateSubGroup(MPIGroup pgroup, int min_rank, int max_rank, int intval)
172      {
173         std::vector<int> ranks;
174         for (int i = min_rank; i <= max_rank; i += intval)
175            ranks.push_back(i);
176         return (CMPIManager::CreateSubGroup(pgroup, ranks));
177      }
178
179      //---------------------------------------------------------------
180
181      void CMPIManager::AllocMem(void * data, StdSize size)
182      {
183         if (MPI_Alloc_mem(sizeof(char) * size, MPI_INFO_NULL, data) != MPI_SUCCESS)
184            ERROR("CMPIManager::AllocMem(data, size)", << " MPI Error !");
185      }
186
187      void CMPIManager::FreeMem(void * data)
188      { 
189         MPI_Free_mem(data);
190      }
191
192      //--------------------------------------------------------------
193
194      void CMPIManager::Send (MPIComm comm, int dest_rank, char * data,
195                              StdSize size, MPIRequest & request)
196      {
197         MPIDataType type = mpi_char;
198         int nsize = size;
199         int tag = 0, error = 0;
200         mpi_issend(data, &nsize, &type, &dest_rank, &tag, &comm, &request, &error);
201         if (error != mpi_success)
202            ERROR("CMPIManager::Send (comm, dest_rank, data, size, request)", << " MPI Error !");
203      }
204
205      void CMPIManager::Wait (MPIRequest & request)
206      {
207         MPIStatus status = new int[mpi_status_size]();
208         int error = 0;
209         mpi_wait(&request, status, &error);
210         if (error != mpi_success)
211            ERROR("CMPIManager::Wait (request)", << " MPI Error !");
212         delete [] status;
213      }
214
215      bool CMPIManager::Test (MPIRequest & request)
216      {
217         MPIStatus status = new int[mpi_status_size]();
218         bool flag = false;
219         int error = 0;
220         mpi_test(&request, &flag, status, &error);
221         if (error != mpi_success)
222            ERROR("CMPIManager::Test (request)", << " MPI Error !");
223         delete [] status;
224         return (flag);
225      }
226
227      bool CMPIManager::HasReceivedData(MPIComm comm, int src_rank)
228      {
229         MPIStatus status = new int[mpi_status_size]();
230         bool flag = false;
231         int error = 0, tag = mpi_any_tag;
232         mpi_iprobe(&src_rank, &tag, &comm, &flag, status, &error);
233         if (error != mpi_success)
234            ERROR("CMPIManager::hasReceivedData (comm, rank)", << " MPI Error !");
235         delete [] status;
236         return (flag);
237      }
238
239      StdSize CMPIManager::GetReceivedDataSize(MPIComm comm, int src_rank)
240      {
241         MPIDataType type = mpi_char;
242         MPIStatus status = new int[mpi_status_size]();
243         bool flag = false;
244         int error = 0, size = 0, tag = mpi_any_tag;
245
246         mpi_iprobe(&src_rank, &tag, &comm, &flag, status, &error);
247         if (error != mpi_success)
248            ERROR("CMPIManager::getReceivedDataSize (comm, rank)", << " MPI Error !");
249         if (flag == false) return (0);       
250         mpi_get_count(status, &type, &size, &error);
251         if (error != mpi_success)
252            ERROR("CMPIManager::getReceivedDataSize (comm, rank)", << " MPI Error !");
253         delete [] status;
254         return (size);
255      }
256
257      void CMPIManager::Receive(MPIComm comm, int src_rank, char * data)
258      {
259         MPIRequest req = 0;
260         MPIDataType type = mpi_char;
261         int error = 0, tag = mpi_any_tag;
262         int size = CMPIManager::GetReceivedDataSize(comm, src_rank);
263
264         mpi_irecv(data, &size, &type, &src_rank, &tag, &comm, &req, &error);
265         if (error != mpi_success)
266            ERROR("CMPIManager::Receive (comm, src_rank, data)", << " MPI Error !");
267         CMPIManager::Wait (req); // Temporaire
268      }
269
270      void CMPIManager::AllGather(int indata, std::vector<int> & outdata, MPIComm comm)
271      {
272         std::vector<int> data; data.push_back(indata);
273         CMPIManager::AllGather(data, outdata, comm);
274      }
275
276      void  CMPIManager::AllGather(std::vector<int> & indata,
277                                   std::vector<int> & outdata, MPIComm comm)
278      {
279         int error = 0;
280         int sendcount = indata.size(), recvcount = indata.size() * CMPIManager::GetCommSize(comm);
281         outdata.resize(recvcount);
282         mpi_allgather(&(indata[0]), &sendcount, &(outdata[0]), &sendcount, &comm, &error);
283         if (error != mpi_success)
284            ERROR("CMPIManager::AllGather (indata, outdata, comm)", << " MPI Error !");
285      }
286
287      //--------------------------------------------------------------
288
289      void CMPIManager::SendLinearBuffer
290         (MPIComm comm, int dest_rank, CLinearBuffer & buff, MPIRequest & request)
291      {
292         CMPIManager::Send(comm, dest_rank, buff, buff.getUsedSize(), request);
293         buff.clear();
294      }
295
296      void CMPIManager::ReceiveLinearBuffer(MPIComm comm, int src_rank, CLinearBuffer & buff)
297      {
298         CMPIManager::Receive(comm, src_rank, buff);
299         buff.computeBufferData();
300      }
301
302      boost::shared_ptr<CLinearBuffer> CMPIManager::ReceiveLinearBuffer(MPIComm comm, int src_rank)
303      {
304         boost::shared_ptr<CLinearBuffer> buff_ptr
305            (new CLinearBuffer(CMPIManager::GetReceivedDataSize(comm, src_rank)));
306         CMPIManager::ReceiveLinearBuffer(comm, src_rank, *buff_ptr);
307         return (buff_ptr);
308      }
309
310      void CMPIManager::ReceiveCircularBuffer(MPIComm comm, int src_rank, CCircularBuffer & buff)
311      {
312         StdSize data_size  = CMPIManager::GetReceivedDataSize(comm, src_rank);
313         StdSize data_begin = buff.prepareNextDataPosition(data_size);
314         CMPIManager::Receive(comm, src_rank, buff.getData(data_begin));
315         buff.updateNbRequests(data_begin, data_begin + data_size);
316      }
317
318      ///--------------------------------------------------------------
319
320   } // namespace comm
321} // namespace xmlioserver
Note: See TracBrowser for help on using the repository browser.