source: XIOS/dev/branch_yushan_merged/src/client_client_dht_template_impl.hpp @ 1196

Last change on this file since 1196 was 1196, checked in by yushan, 7 years ago

add request_check. test client and complete OK

File size: 34.0 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12#ifdef _usingEP
13#include "ep_declaration.hpp"
14#include "ep_lib.hpp"
15#endif
16
17
18namespace xios
19{
20template<typename T, typename H>
21CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const ep_lib::MPI_Comm& clientIntraComm)
22  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
23{
24  MPI_Comm_size(clientIntraComm, &nbClient_);
25  this->computeMPICommLevel();
26  int nbLvl = this->getNbLevel();
27  sendRank_.resize(nbLvl);
28  recvRank_.resize(nbLvl);
29}
30
31/*!
32  Constructor with initial distribution information and the corresponding index
33  Each client (process) holds a piece of information as well as the attached index, the index
34will be redistributed (projected) into size_t space as long as the associated information.
35  \param [in] indexInfoMap initial index and information mapping
36  \param [in] clientIntraComm communicator of clients
37  \param [in] hierarLvl level of hierarchy
38*/
39template<typename T, typename H>
40CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
41                                                        const ep_lib::MPI_Comm& clientIntraComm)
42  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
43{
44  MPI_Comm_size(clientIntraComm, &nbClient_);
45  this->computeMPICommLevel();
46  int nbLvl = this->getNbLevel();
47  sendRank_.resize(nbLvl);
48  recvRank_.resize(nbLvl);
49  Index2VectorInfoTypeMap indexToVecInfoMap;
50  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
51  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
52  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
53  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
54}
55
56/*!
57  Constructor with initial distribution information and the corresponding index
58  Each client (process) holds a piece of information as well as the attached index, the index
59will be redistributed (projected) into size_t space as long as the associated information.
60  \param [in] indexInfoMap initial index and information mapping
61  \param [in] clientIntraComm communicator of clients
62  \param [in] hierarLvl level of hierarchy
63*/
64template<typename T, typename H>
65CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
66                                                        const ep_lib::MPI_Comm& clientIntraComm)
67  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
68{
69  MPI_Comm_size(clientIntraComm, &nbClient_);
70  this->computeMPICommLevel();
71  int nbLvl = this->getNbLevel();
72  sendRank_.resize(nbLvl);
73  recvRank_.resize(nbLvl);
74  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
75}
76
77template<typename T, typename H>
78CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
79{
80}
81
82/*!
83  Compute mapping between indices and information corresponding to these indices
84  \param [in] indices indices a proc has
85*/
86template<typename T, typename H>
87void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
88{
89  int nbLvl = this->getNbLevel();
90  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
91}
92
93/*!
94    Compute mapping between indices and information corresponding to these indices
95for each level of hierarchical DHT. Recursive function
96   \param [in] indices indices a proc has
97   \param [in] commLevel communicator of current level
98   \param [in] level current level
99*/
100template<typename T, typename H>
101void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
102                                                                 const ep_lib::MPI_Comm& commLevel,
103                                                                 int level)
104{
105  int clientRank;
106  MPI_Comm_rank(commLevel,&clientRank);
107  ep_lib::MPI_Barrier(commLevel);
108  int groupRankBegin = this->getGroupBegin()[level];
109  int nbClient = this->getNbInGroup()[level];
110  std::vector<size_t> hashedIndex;
111  computeHashIndex(hashedIndex, nbClient);
112
113  size_t ssize = indices.numElements(), hashedVal;
114
115  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
116                                      iteClientHash = hashedIndex.end();
117  std::vector<int> sendBuff(nbClient,0);
118  std::vector<int> sendNbIndexBuff(nbClient,0);
119
120  // Number of global index whose mapping server are on other clients
121  int nbIndexToSend = 0;
122  size_t index;
123  HashXIOS<size_t> hashGlobalIndex;
124  boost::unordered_map<size_t,int> nbIndices;
125  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
126  for (int i = 0; i < ssize; ++i)
127  {
128    index = indices(i);
129    if (0 == nbIndices.count(index))
130    {
131      hashedVal  = hashGlobalIndex(index);
132      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
133      int indexClient = std::distance(itbClientHash, itClientHash)-1;
134      ++sendNbIndexBuff[indexClient];
135      nbIndices[index] = 1;
136    }
137  }
138
139  boost::unordered_map<int, size_t* > client2ClientIndex;
140  for (int idx = 0; idx < nbClient; ++idx)
141  {
142    if (0 != sendNbIndexBuff[idx])
143    {
144      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
145      nbIndexToSend += sendNbIndexBuff[idx];
146      sendBuff[idx] = 1;
147      sendNbIndexBuff[idx] = 0;
148    }
149  }
150
151  for (int i = 0; i < ssize; ++i)
152  {
153    index = indices(i);
154    if (1 == nbIndices[index])
155    {
156      hashedVal  = hashGlobalIndex(index);
157      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
158      int indexClient = std::distance(itbClientHash, itClientHash)-1;
159      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
160      ++sendNbIndexBuff[indexClient];
161      ++nbIndices[index];
162    }
163  }
164
165  std::vector<int> recvRankClient, recvNbIndexClientCount;
166  sendRecvRank(level, sendBuff, sendNbIndexBuff,
167               recvRankClient, recvNbIndexClientCount);
168
169  int recvNbIndexCount = 0;
170  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
171    recvNbIndexCount += recvNbIndexClientCount[idx];
172
173  unsigned long* recvIndexBuff;
174  if (0 != recvNbIndexCount)
175    recvIndexBuff = new unsigned long[recvNbIndexCount];
176
177  int request_size = 0;
178
179  int currentIndex = 0;
180  int nbRecvClient = recvRankClient.size();
181
182  int position = 0;
183
184  for (int idx = 0; idx < nbRecvClient; ++idx)
185  {
186    if (0 != recvNbIndexClientCount[idx])
187    {
188      request_size++;
189    }
190  }
191
192  request_size += client2ClientIndex.size();
193
194
195  std::vector<ep_lib::MPI_Request> request(request_size);
196
197  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
198                             iteRecvIndex = recvRankClient.end(),
199                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
200                           itRecvNbIndex;
201 
202 
203  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
204                                                iteIndex = client2ClientIndex.end();
205  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
206  {
207    MPI_Isend(itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], MPI_UNSIGNED_LONG, itIndex->first, MPI_DHT_INDEX, commLevel, &request[position]);
208    position++;
209    //sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
210  }
211   
212  for (int idx = 0; idx < nbRecvClient; ++idx)
213  {
214    if (0 != recvNbIndexClientCount[idx])
215    {
216      MPI_Irecv(recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], MPI_UNSIGNED_LONG,
217            recvRankClient[idx], MPI_DHT_INDEX, commLevel, &request[position]);
218      position++;
219      //recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
220    }
221    currentIndex += recvNbIndexClientCount[idx];
222  }
223
224 
225  std::vector<ep_lib::MPI_Status> status(request_size);
226  MPI_Waitall(request.size(), &request[0], &status[0]);
227
228
229  CArray<size_t,1>* tmpGlobalIndex;
230  if (0 != recvNbIndexCount)
231    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
232  else
233    tmpGlobalIndex = new CArray<size_t,1>();
234
235  // OK, we go to the next level and do something recursive
236  if (0 < level)
237  {
238    --level;
239    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
240     
241  }
242  else // Now, we are in the last level where necessary mappings are.
243    indexToInfoMappingLevel_= (index2InfoMapping_);
244
245  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
246  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
247  currentIndex = 0;
248  for (int idx = 0; idx < nbRecvClient; ++idx)
249  {
250    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
251    {
252      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
253      if (iteIndexToInfoMap != itIndexToInfoMap)
254        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
255    }
256    currentIndex += recvNbIndexClientCount[idx];
257  }
258
259  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
260  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
261  int indexIndex = 0;
262  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
263  {
264    recvRankOnReturn[indexIndex] = itIndex->first;
265  }
266  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
267                   recvRankOnReturn, recvNbIndexOnReturn);
268
269  int recvNbIndexCountOnReturn = 0;
270  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
271    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
272
273  unsigned long* recvIndexBuffOnReturn;
274  unsigned char* recvInfoBuffOnReturn;
275  if (0 != recvNbIndexCountOnReturn)
276  {
277    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
278    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
279  }
280
281  request_size = 0;
282  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
283  {
284    if (0 != recvNbIndexOnReturn[idx])
285    {
286      request_size += 2;
287    }
288  }
289
290  for (int idx = 0; idx < nbRecvClient; ++idx)
291  {
292    if (0 != sendNbIndexOnReturn[idx])
293    {
294      request_size += 2;
295    }
296  }
297
298  std::vector<ep_lib::MPI_Request> requestOnReturn(request_size);
299  currentIndex = 0;
300  position = 0;
301  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
302  {
303    if (0 != recvNbIndexOnReturn[idx])
304    {
305      //recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
306      MPI_Irecv(recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], MPI_UNSIGNED_LONG,
307            recvRankOnReturn[idx], MPI_DHT_INDEX, commLevel, &requestOnReturn[position]);
308      position++;
309      //recvInfoFromClients(recvRankOnReturn[idx],
310      //                    recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
311      //                    recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
312      //                    commLevel, requestOnReturn);
313      MPI_Irecv(recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 
314                recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), MPI_CHAR,
315                recvRankOnReturn[idx], MPI_DHT_INFO, commLevel, &requestOnReturn[position]);
316      position++;
317    }
318    currentIndex += recvNbIndexOnReturn[idx];
319  }
320
321  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
322  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
323  currentIndex = 0;
324  for (int idx = 0; idx < nbRecvClient; ++idx)
325  {
326    if (0 != sendNbIndexOnReturn[idx])
327    {
328      int rank = recvRankClient[idx];
329      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
330      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
331      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
332      int infoIndex = 0;
333      int nb = 0;
334      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
335      {
336        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
337        if (iteIndexToInfoMap != itIndexToInfoMap)
338        {
339          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
340          for (int k = 0; k < infoTmp.size(); ++k)
341          {
342            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
343            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
344            ++nb;
345          }
346        }
347      }
348
349      //sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
350      //                   sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
351      MPI_Isend(client2ClientIndexOnReturn[rank], sendNbIndexOnReturn[idx], MPI_UNSIGNED_LONG,
352            rank, MPI_DHT_INDEX, commLevel, &requestOnReturn[position]);
353      position++;
354      //sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
355      //                  sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
356      MPI_Isend(client2ClientInfoOnReturn[rank], sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), MPI_CHAR,
357            rank, MPI_DHT_INFO, commLevel, &requestOnReturn[position]);
358      position++;
359    }
360    currentIndex += recvNbIndexClientCount[idx];
361  }
362
363  std::vector<ep_lib::MPI_Status> statusOnReturn(requestOnReturn.size());
364  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
365
366  Index2VectorInfoTypeMap indexToInfoMapping;
367  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
368  int infoIndex = 0;
369  InfoType unpackedInfo;
370  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
371  {
372    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
373    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
374  }
375
376  indexToInfoMappingLevel_.swap(indexToInfoMapping);
377  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
378  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
379                                                        it != client2ClientIndex.end(); ++it)
380      delete [] it->second;
381  delete tmpGlobalIndex;
382
383  if (0 != recvNbIndexCountOnReturn)
384  {
385    delete [] recvIndexBuffOnReturn;
386    delete [] recvInfoBuffOnReturn;
387  }
388
389  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
390                                                               it != client2ClientInfoOnReturn.end(); ++it)
391      delete [] it->second;
392
393  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
394                                            it != client2ClientIndexOnReturn.end(); ++it)
395      delete [] it->second;
396}
397
398/*!
399  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
400*/
401template<typename T, typename H>
402void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
403{
404  // Compute range of hash index for each client
405  hashedIndex.resize(nbClient+1);
406  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
407  size_t nbHashIndex;
408  hashedIndex[0] = 0;
409  for (int i = 1; i < nbClient; ++i)
410  {
411    nbHashIndex = nbHashIndexMax / nbClient;
412    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
413    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
414  }
415  hashedIndex[nbClient] = nbHashIndexMax;
416}
417
418/*!
419  Compute distribution of global index for servers
420  Each client already holds a piece of information and its associated index.
421This information will be redistributed among processes by projecting indices into size_t space,
422the corresponding information will be also distributed on size_t space.
423After the redistribution, each client holds rearranged index and its corresponding information.
424  \param [in] indexInfoMap index and its corresponding info (usually server index)
425  \param [in] commLevel communicator of current level
426  \param [in] level current level
427*/
428template<typename T, typename H>
429void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
430                                                            const ep_lib::MPI_Comm& commLevel,
431                                                            int level)
432{
433  int clientRank;
434  MPI_Comm_rank(commLevel,&clientRank);
435  computeSendRecvRank(level, clientRank);
436  ep_lib::MPI_Barrier(commLevel);
437
438  int groupRankBegin = this->getGroupBegin()[level];
439  int nbClient = this->getNbInGroup()[level];
440  std::vector<size_t> hashedIndex;
441  computeHashIndex(hashedIndex, nbClient);
442
443  std::vector<int> sendBuff(nbClient,0);
444  std::vector<int> sendNbIndexBuff(nbClient,0);
445  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
446                                      iteClientHash = hashedIndex.end();
447  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
448                                                   ite = indexInfoMap.end();
449  HashXIOS<size_t> hashGlobalIndex;
450
451  // Compute size of sending and receving buffer
452  for (it = itb; it != ite; ++it)
453  {
454    size_t hashIndex = hashGlobalIndex(it->first);
455    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
456    int indexClient = std::distance(itbClientHash, itClientHash)-1;
457    sendNbIndexBuff[indexClient] += it->second.size();
458  }
459
460  boost::unordered_map<int, size_t*> client2ClientIndex;
461  boost::unordered_map<int, unsigned char*> client2ClientInfo;
462  for (int idx = 0; idx < nbClient; ++idx)
463  {
464    if (0 != sendNbIndexBuff[idx])
465    {
466      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
467      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
468      sendNbIndexBuff[idx] = 0;
469      sendBuff[idx] = 1;
470    }
471  }
472
473  std::vector<int> sendNbInfo(nbClient,0);
474  for (it = itb; it != ite; ++it)
475  {
476    const std::vector<InfoType>& infoTmp = it->second;
477    size_t hashIndex = hashGlobalIndex(it->first);
478    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
479    int indexClient = std::distance(itbClientHash, itClientHash)-1;
480    for (int idx = 0; idx < infoTmp.size(); ++idx)
481    {
482      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
483      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
484      ++sendNbIndexBuff[indexClient];
485    }
486  }
487
488  // Calculate from how many clients each client receive message.
489  // Calculate size of buffer for receiving message
490  std::vector<int> recvRankClient, recvNbIndexClientCount;
491  sendRecvRank(level, sendBuff, sendNbIndexBuff,
492               recvRankClient, recvNbIndexClientCount);
493
494  int recvNbIndexCount = 0;
495  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
496  { 
497    recvNbIndexCount += recvNbIndexClientCount[idx];
498  }
499
500  unsigned long* recvIndexBuff;
501  unsigned char* recvInfoBuff;
502  if (0 != recvNbIndexCount)
503  {
504    recvIndexBuff = new unsigned long[recvNbIndexCount];
505    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
506  }
507
508  // If a client holds information about index and the corresponding which don't belong to it,
509  // it will send a message to the correct clients.
510  // Contents of the message are index and its corresponding informatioin
511  int request_size = 0; 
512  int currentIndex = 0;
513  int nbRecvClient = recvRankClient.size();
514  int current_pos = 0; 
515
516  for (int idx = 0; idx < nbRecvClient; ++idx)
517  {
518    if (0 != recvNbIndexClientCount[idx])
519    {
520      request_size += 2;
521    }
522    //currentIndex += recvNbIndexClientCount[idx];
523  }
524
525  request_size += client2ClientIndex.size();
526  request_size += client2ClientInfo.size();
527
528
529
530  std::vector<ep_lib::MPI_Request> request(request_size);
531 
532  //unsigned long* tmp_send_buf_long[client2ClientIndex.size()];
533  //unsigned char* tmp_send_buf_char[client2ClientInfo.size()];
534 
535  int info_position = 0;
536  int index_position = 0;
537
538
539  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
540                                                iteIndex = client2ClientIndex.end();
541  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
542  {
543    //sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
544
545    //tmp_send_buf_long[index_position] = new unsigned long[sendNbIndexBuff[itIndex->first-groupRankBegin]];
546    //for(int i=0; i<sendNbIndexBuff[itIndex->first-groupRankBegin]; i++)
547    //{
548    //  tmp_send_buf_long[index_position][i] = (static_cast<unsigned long * >(itIndex->second))[i];
549    //}
550    //MPI_Isend(tmp_send_buf_long[current_pos], sendNbIndexBuff[itIndex->first-groupRankBegin], MPI_UNSIGNED_LONG,
551    MPI_Isend(itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], MPI_UNSIGNED_LONG,
552              itIndex->first, MPI_DHT_INDEX, commLevel, &request[current_pos]);
553    current_pos++; 
554    index_position++;
555
556  }
557
558  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
559                                                      iteInfo = client2ClientInfo.end();
560  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
561  {
562    //sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
563
564    //tmp_send_buf_char[info_position] = new unsigned char[sendNbInfo[itInfo->first-groupRankBegin]];
565    //for(int i=0; i<sendNbInfo[itInfo->first-groupRankBegin]; i++)
566    //{
567    //  tmp_send_buf_char[info_position][i] = (static_cast<unsigned char * >(itInfo->second))[i];
568    //}
569
570    MPI_Isend(itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], MPI_CHAR,
571              itInfo->first, MPI_DHT_INFO, commLevel, &request[current_pos]);
572
573    current_pos++;
574    info_position++;
575  }
576 
577  for (int idx = 0; idx < nbRecvClient; ++idx)
578  {
579    if (0 != recvNbIndexClientCount[idx])
580    {
581      //recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
582      MPI_Irecv(recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], MPI_UNSIGNED_LONG,
583                recvRankClient[idx], MPI_DHT_INDEX, commLevel, &request[current_pos]);
584      current_pos++;
585     
586     
587      MPI_Irecv(recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 
588                recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(), 
589                MPI_CHAR, recvRankClient[idx], MPI_DHT_INFO, commLevel, &request[current_pos]);
590     
591      current_pos++;
592     
593
594
595      // recvInfoFromClients(recvRankClient[idx],
596      //                     recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
597      //                     recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
598      //                     commLevel, request);
599    }
600    currentIndex += recvNbIndexClientCount[idx];
601  }
602
603  std::vector<ep_lib::MPI_Status> status(request.size());
604 
605  MPI_Waitall(request.size(), &request[0], &status[0]);
606 
607 
608  //for(int i=0; i<client2ClientInfo.size(); i++)
609  //  delete[] tmp_send_buf_char[i];
610
611 
612
613  //for(int i=0; i<client2ClientIndex.size(); i++)
614  //  delete[] tmp_send_buf_long[i];
615
616
617  Index2VectorInfoTypeMap indexToInfoMapping;
618  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
619  currentIndex = 0;
620  InfoType infoValue;
621  int infoIndex = 0;
622  unsigned char* infoBuff = recvInfoBuff;
623  for (int idx = 0; idx < nbRecvClient; ++idx)
624  {
625    size_t index;
626    int count = recvNbIndexClientCount[idx];
627    for (int i = 0; i < count; ++i)
628    {
629      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
630      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
631    }
632    currentIndex += count;
633  }
634
635  if (0 != recvNbIndexCount)
636  {
637    delete [] recvIndexBuff;
638    delete [] recvInfoBuff;
639  }
640  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
641                                                               it != client2ClientInfo.end(); ++it)
642      delete [] it->second;
643
644  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
645                                                        it != client2ClientIndex.end(); ++it)
646      delete [] it->second;
647
648  // Ok, now do something recursive
649  if (0 < level)
650  {
651    --level;
652    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
653  }
654  else
655    index2InfoMapping_.swap(indexToInfoMapping);
656 
657}
658
659/*!
660  Send message containing index to clients
661  \param [in] clientDestRank rank of destination client
662  \param [in] indices index to send
663  \param [in] indiceSize size of index array to send
664  \param [in] clientIntraComm communication group of client
665  \param [in] requestSendIndex list of sending request
666*/
667template<typename T, typename H>
668void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
669                                                       const ep_lib::MPI_Comm& clientIntraComm,
670                                                       std::vector<ep_lib::MPI_Request>& requestSendIndex)
671{
672  ep_lib::MPI_Request request;
673  requestSendIndex.push_back(request);
674  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
675            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
676}
677
678/*!
679  Receive message containing index to clients
680  \param [in] clientDestRank rank of destination client
681  \param [in] indices index to send
682  \param [in] clientIntraComm communication group of client
683  \param [in] requestRecvIndex list of receiving request
684*/
685template<typename T, typename H>
686void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
687                                                         const ep_lib::MPI_Comm& clientIntraComm,
688                                                         std::vector<ep_lib::MPI_Request>& requestRecvIndex)
689{
690  ep_lib::MPI_Request request;
691  requestRecvIndex.push_back(request);
692  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
693            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
694}
695
696/*!
697  Send message containing information to clients
698  \param [in] clientDestRank rank of destination client
699  \param [in] info info array to send
700  \param [in] infoSize info array size to send
701  \param [in] clientIntraComm communication group of client
702  \param [in] requestSendInfo list of sending request
703*/
704template<typename T, typename H>
705void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
706                                                      const ep_lib::MPI_Comm& clientIntraComm,
707                                                      std::vector<ep_lib::MPI_Request>& requestSendInfo)
708{
709  ep_lib::MPI_Request request;
710  requestSendInfo.push_back(request);
711  MPI_Isend(info, infoSize, MPI_CHAR,
712            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
713}
714
715/*!
716  Receive message containing information from other clients
717  \param [in] clientDestRank rank of destination client
718  \param [in] info info array to receive
719  \param [in] infoSize info array size to receive
720  \param [in] clientIntraComm communication group of client
721  \param [in] requestRecvInfo list of sending request
722*/
723template<typename T, typename H>
724void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
725                                                        const ep_lib::MPI_Comm& clientIntraComm,
726                                                        std::vector<ep_lib::MPI_Request>& requestRecvInfo)
727{
728  ep_lib::MPI_Request request;
729  requestRecvInfo.push_back(request);
730
731  MPI_Irecv(info, infoSize, MPI_CHAR,
732            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
733}
734
735/*!
736  Compute how many processes one process needs to send to and from how many processes it will receive.
737  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
738*/
739template<typename T, typename H>
740void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
741{
742  int groupBegin = this->getGroupBegin()[level];
743  int nbInGroup  = this->getNbInGroup()[level];
744  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
745  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
746
747  std::vector<size_t> hashedIndexGroup;
748  computeHashIndex(hashedIndexGroup, nbInGroup);
749  size_t a = hashedIndexGroup[rank-groupBegin];
750  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
751
752  int currentGroup, offset;
753  size_t e,f;
754
755  // Do a simple math [a,b) intersect [c,d)
756  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
757  {
758    std::vector<size_t> hashedIndexGroupParent;
759    int nbInGroupParent = nbInGroupParents[idx];
760    if (0 != nbInGroupParent)
761      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
762    for (int i = 0; i < nbInGroupParent; ++i)
763    {
764      size_t c = hashedIndexGroupParent[i];
765      size_t d = hashedIndexGroupParent[i+1]-1;
766
767    if (!((d < a) || (b <c)))
768        recvRank_[level].push_back(groupParentBegin[idx]+i);
769    }
770
771    offset = rank - groupParentBegin[idx];
772    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
773    {
774      e = hashedIndexGroupParent[offset];
775      f = hashedIndexGroupParent[offset+1]-1;
776    }
777  }
778
779  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
780                                      iteHashGroup = hashedIndexGroup.end();
781  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
782  int begin = std::distance(itbHashGroup, itHashGroup)-1;
783  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
784  int end = std::distance(itbHashGroup, itHashGroup) -1;
785  sendRank_[level].resize(end-begin+1);
786  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
787}
788
789/*!
790  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
791  \param [in] sendNbRank Rank of clients to send to
792  \param [in] sendNbElements Number of elements each client to send to
793  \param [in] receiveNbRank Rank of clients to receive from
794  \param [out] recvNbElements Number of elements each client to send to
795*/
796template<typename T, typename H>
797void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
798                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
799{
800  recvNbElements.resize(recvNbRank.size());
801  std::vector<ep_lib::MPI_Request> request(sendNbRank.size()+recvNbRank.size());
802  std::vector<ep_lib::MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
803
804  int nRequest = 0;
805 
806
807  for (int idx = 0; idx < sendNbRank.size(); ++idx)
808  {
809    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
810              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
811    ++nRequest;
812  }
813 
814  for (int idx = 0; idx < recvNbRank.size(); ++idx)
815  {
816    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
817              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
818    ++nRequest;
819  }
820
821  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
822}
823
824/*!
825  Send and receive number of process each process need to listen to as well as number
826  of index it will receive during the initalization phase
827  \param [in] level current level
828  \param [in] sendNbRank Rank of clients to send to
829  \param [in] sendNbElements Number of elements each client to send to
830  \param [out] receiveNbRank Rank of clients to receive from
831  \param [out] recvNbElements Number of elements each client to send to
832*/
833template<typename T, typename H>
834void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
835                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
836                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
837{
838  int groupBegin = this->getGroupBegin()[level];
839
840  int offSet = 0;
841  std::vector<int>& sendRank = sendRank_[level];
842  std::vector<int>& recvRank = recvRank_[level];
843  int sendBuffSize = sendRank.size();
844  std::vector<int> sendBuff(sendBuffSize*2);
845  int recvBuffSize = recvRank.size();
846  std::vector<int> recvBuff(recvBuffSize*2,0);
847
848  std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize);
849  std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
850  //ep_lib::MPI_Request request[sendBuffSize+recvBuffSize];
851  //ep_lib::MPI_Status requestStatus[sendBuffSize+recvBuffSize];
852 
853  int my_rank;
854  MPI_Comm_rank(this->internalComm_, &my_rank);
855 
856  int nRequest = 0;
857  for (int idx = 0; idx < recvBuffSize; ++idx)
858  {
859    MPI_Irecv(&recvBuff[2*idx], 2, MPI_INT,
860              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
861    ++nRequest;
862  }
863 
864
865  for (int idx = 0; idx < sendBuffSize; ++idx)
866  {
867    offSet = sendRank[idx]-groupBegin;
868    sendBuff[idx*2] = sendNbRank[offSet];
869    sendBuff[idx*2+1] = sendNbElements[offSet];
870  }
871 
872 
873
874  for (int idx = 0; idx < sendBuffSize; ++idx)
875  {
876    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
877              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
878    ++nRequest;
879  }
880 
881 
882
883  //MPI_Barrier(this->internalComm_);
884
885  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
886  //MPI_Waitall(sendBuffSize+recvBuffSize, request, requestStatus);
887
888 
889  int nbRecvRank = 0, nbRecvElements = 0;
890  recvNbRank.clear();
891  recvNbElements.clear();
892  for (int idx = 0; idx < recvBuffSize; ++idx)
893  {
894    if (0 != recvBuff[2*idx])
895    {
896      recvNbRank.push_back(recvRank[idx]);
897      recvNbElements.push_back(recvBuff[2*idx+1]);
898    }
899  }
900
901
902 
903 
904}
905
906}
907
Note: See TracBrowser for help on using the repository browser.