source: XIOS/dev/dev_trunk_omp/extern/src_ep_dev2/ep_recv.cpp @ 1710

Last change on this file since 1710 was 1651, checked in by yushan, 5 years ago

dev on EP for tracing with itac. Tested on ADA with test_omp

File size: 8.8 KB
Line 
1#ifdef _usingEP
2/*!
3   \file ep_recv.cpp
4   \since 2 may 2016
5
6   \brief Definitions of MPI receive functions: MPI_Recv, MPI_Mrecv, MPI_Irecv, MPI_Imrecv
7 */
8
9
10#include "ep_lib.hpp"
11#include <mpi.h>
12#include "ep_declaration.hpp"
13#include "ep_mpi.hpp"
14
15using namespace std;
16
17extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
18#pragma omp threadprivate(EP_PendingRequests)
19
20namespace ep_lib
21{
22
23  int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status *status)
24  {
25
26    if(src>=0) status->ep_src = src;
27   
28    if(comm->is_intercomm)
29    {
30      if(src>=0) src = comm->inter_rank_map->at(src);
31    }
32
33    ::MPI_Status mpi_status;
34    if(src>=0)
35    {
36
37      int mpi_src = comm->ep_rank_map->at(src).second;
38      int mpi_tag = tag_combine(tag, comm->ep_rank_map->at(src).first, comm->ep_comm_ptr->size_rank_info[1].first);
39
40      //printf("MPI_Recv with src\n");
41      //printf("proc %d recv from proc %d\n", comm->ep_comm_ptr->size_rank_info[0].first, src);
42      ::MPI_Recv(buf, count, to_mpi_type(datatype), mpi_src, mpi_tag, to_mpi_comm(comm->mpi_comm), &mpi_status);
43      status->mpi_status = new ::MPI_Status(mpi_status);
44      return MPI_SUCCESS;
45    }
46    else
47    {
48      //printf("MPI_Recv with ANY_SOURCE\n");
49      int probed=false;
50      int mpi_tag;
51      while(!probed)
52      {
53        //for(int i=0; i<comm->ep_comm_ptr->size_rank_info[1].second; i++)
54        for(int i=0; i<4; i++)
55        {
56          mpi_tag = tag_combine(tag, i, comm->ep_comm_ptr->size_rank_info[1].first);
57          ::MPI_Iprobe(MPI_ANY_SOURCE, mpi_tag, to_mpi_comm(comm->mpi_comm), &probed, &mpi_status);
58          if(probed) 
59          {
60            //printf("proc %d recv from proc %d\n", comm->ep_comm_ptr->size_rank_info[0].first, get_ep_rank(comm, i, mpi_status.MPI_SOURCE));
61            ::MPI_Recv(buf, count, to_mpi_type(datatype), mpi_status.MPI_SOURCE, mpi_status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &mpi_status);
62            //printf("get_ep_rank in MPI_Recv\n");
63            status->ep_src = get_ep_rank(comm, i, mpi_status.MPI_SOURCE);
64            status->mpi_status = new ::MPI_Status(mpi_status);
65            return MPI_SUCCESS;
66          }
67        }
68      }
69    }
70   
71  }
72
73  int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request *request)
74  {
75    if(comm->is_intercomm)
76    {
77      if(src>=0) src = comm->inter_rank_map->at(src);
78    }
79
80    *request = new ep_request;
81    //printf("MPI_Irecv : *request = new ep_request => *request = %p\n", *request);
82
83    (*request)->mpi_request = new ::MPI_Request;
84    //printf("MPI_Irecv : (*request)->mpi_request = new ::MPI_Request => (*request)->mpi_request = %p\n", (*request)->mpi_request);
85
86
87    if(src>=0)
88    {
89      int mpi_src = comm->ep_rank_map->at(src).second;
90      int mpi_tag = tag_combine(tag, comm->ep_rank_map->at(src).first, comm->ep_comm_ptr->size_rank_info[1].first);
91      //printf("MPI_Irecv with src\n");
92      //printf("proc %d recv from proc %d\n", comm->ep_comm_ptr->size_rank_info[0].first, src);
93      return ::MPI_Irecv(buf, count, to_mpi_type(datatype), mpi_src, mpi_tag, to_mpi_comm(comm->mpi_comm), to_mpi_request_ptr(*request));
94    }
95    else
96    {
97      //printf("MPI_Irecv with ANY_SOURCE\n");
98      int probed=false;
99      int mpi_tag;
100      ::MPI_Status mpi_status;
101      while(!probed)
102      {
103        //for(int i=0; i<comm->ep_comm_ptr->size_rank_info[1].second; i++)
104        for(int i=0; i<4; i++)
105        {
106          mpi_tag = tag_combine(tag, i, comm->ep_comm_ptr->size_rank_info[1].first);
107          ::MPI_Iprobe(MPI_ANY_SOURCE, mpi_tag, to_mpi_comm(comm->mpi_comm), &probed, &mpi_status);
108          if(probed)
109          {
110            //printf("proc %d recv from proc %d\n", comm->ep_comm_ptr->size_rank_info[0].first, get_ep_rank(comm, i, mpi_status.MPI_SOURCE));
111            return ::MPI_Irecv(buf, count, to_mpi_type(datatype), mpi_status.MPI_SOURCE, mpi_status.MPI_TAG, to_mpi_comm(comm->mpi_comm), to_mpi_request_ptr(*request));
112          }
113        }
114      }
115    }
116   
117
118  }
119
120 
121
122
123  int MPI_Irecv2(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request *request)
124  {
125    if(!comm->is_ep) return MPI_Irecv_mpi(buf, count, datatype, src, tag, comm, request);
126
127    if(comm->is_intercomm)
128    {
129      if(src>=0) src = comm->inter_rank_map->at(src);
130    }
131   
132    Debug("MPI_Irecv with EP");
133    int dest_rank;
134    MPI_Comm_rank(comm, &dest_rank);
135   
136    *request = new ep_request;   
137    memcheck("new " << *request <<" : in ep_lib::MPI_Irecv, *request = new ep_request");
138
139    (*request)->mpi_request = new ::MPI_Request;
140    memcheck("new " << (*request)->mpi_request << " : in ep_lib::MPI_Irecv, (*request)->mpi_request = new ::MPI_Request");
141   
142    (*request)->buf = buf;
143    (*request)->comm = comm;
144    (*request)->type = 2;
145    (*request)->probed = false;
146    (*request)->state = 0;
147
148    (*request)->ep_src = src;
149    (*request)->ep_tag = tag;
150    (*request)->ep_datatype = datatype;
151 
152    if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >;
153
154    EP_PendingRequests->push_back(request); 
155   
156    //memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());   
157   
158    show_EP_PendingRequests(EP_PendingRequests);
159
160#ifdef _showinfo
161    if(comm->is_intercomm)
162    {
163      int ep_dest_loc  = comm->ep_rank_map->at(dest_rank).first;
164      int ep_src_loc = comm->ep_comm_ptr->intercomm->intercomm_rank_map->at(src).first;
165      int mpi_tag     = tag_combine(tag, ep_src_loc, ep_dest_loc);
166      int mpi_dest    = comm->ep_comm_ptr->intercomm->intercomm_rank_map->at(src).second.first;
167
168      printf("Irecv : ep_src_loc = %d, ep_dest_loc = %d, mpi_src = %d, mpi_dest = %d, mpi_tag = %d\n", ep_src_loc, ep_dest_loc, comm->ep_comm_ptr->size_rank_info[2].first, mpi_dest, mpi_tag);
169    }                                                         
170#endif                                     
171
172    return Request_Check();
173  }
174 
175  int MPI_Mrecv(void *buf, int count, MPI_Datatype datatype, MPI_Message *message, MPI_Status *status)
176  {
177    Debug("MPI_Mrecv with MPI/EP");
178
179    status->mpi_status = new ::MPI_Status;
180    memcheck("new " << status->mpi_status << " : in ep_lib::MPI_Mrecv, status->mpi_status = new ::MPI_Status");
181   
182    ::MPI_Mrecv(buf, count, to_mpi_type(datatype), static_cast< ::MPI_Message* >((*message)->mpi_message), to_mpi_status_ptr(*status));
183
184   
185    status->ep_src = (*message)->ep_src;
186    status->ep_datatype = datatype;
187    status->ep_tag = (*message)->ep_tag;
188
189    memcheck("delete " << (*message)->mpi_message << " : in ep_lib::MPI_Mrecv, delete (*message)->mpi_message");
190    delete (*message)->mpi_message;
191    memcheck("delete " << *message << " : in ep_lib::MPI_Imrecv, delete *message");
192    delete *message;
193
194#ifdef _check_sum
195    check_sum_recv(buf, count, datatype, message->ep_src, message->ep_tag);
196#endif
197
198    return Request_Check();
199  }
200
201
202  int MPI_Imrecv(void *buf, int count, MPI_Datatype datatype, MPI_Message *message, MPI_Request *request)
203  {
204    Debug("MPI_Imrecv with MPI/EP");
205
206    (*request)->type = 3;
207    (*request)->ep_datatype = datatype;
208    (*request)->ep_tag = (*message)->ep_tag;
209    (*request)->ep_src = (*message)->ep_src;
210   
211    (*request)->probed = true;
212    (*request)->state = 1;
213       
214    ::MPI_Imrecv(buf, count, to_mpi_type(datatype), to_mpi_message_ptr(*message), to_mpi_request_ptr(*request));               
215   
216    memcheck("delete " << (*message)->mpi_message << " : in ep_lib::MPI_Imrecv, delete (*message)->mpi_message");
217    delete (*message)->mpi_message;
218
219
220#ifdef _check_sum
221    check_sum_recv(buf, count, datatype, message->ep_src, message->ep_tag);
222#endif
223
224   
225    //return Request_Check();
226  }
227
228
229   int MPI_Recv_mpi(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status *status)
230  {
231    Debug("MPI_Recv with MPI");
232    status->ep_src = src;
233    status->ep_tag = tag;
234    status->ep_datatype = datatype;
235   
236    return ::MPI_Recv(buf, count, to_mpi_type(datatype), src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), to_mpi_status_ptr(*status)); 
237  }
238 
239  int MPI_Irecv_mpi(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request *request)
240  {
241    Debug("MPI_Irecv with MPI");
242    int dest_rank;
243    MPI_Comm_rank(comm, &dest_rank);
244   
245    *request = new ep_request;
246    memcheck("new "<< *request <<" : in ep_lib::MPI_Irecv, *request = new ep_request");
247
248    (*request)->mpi_request = new ::MPI_Request;
249    memcheck("new "<< (*request)->mpi_request <<" : in ep_lib::MPI_Irecv, (*request)->mpi_request = new ::MPI_Request");
250     
251    (*request)->ep_src = src;
252    (*request)->ep_datatype = datatype;
253    (*request)->type = 2;
254    (*request)->ep_tag = tag;
255   
256    return ::MPI_Irecv(buf, count, to_mpi_type(datatype), src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), to_mpi_request_ptr(*request));
257  }
258}
259
260
261
262#endif
Note: See TracBrowser for help on using the repository browser.