source: XIOS/dev/dev_trunk_omp/extern/src_ep_dev/ep_probe.cpp @ 1746

Last change on this file since 1746 was 1746, checked in by yushan, 5 years ago

Generic_testcase: remove _openmpi _intelmpi flags. EP library is no longer sensitive to the underlying MPI library. Tested on Irene with generic_testcase

File size: 8.6 KB
Line 
1#ifdef _usingEP
2#include "ep_lib.hpp"
3#include <mpi.h>
4#include "ep_declaration.hpp"
5#include "ep_mpi.hpp"
6
7namespace ep_lib
8{
9  int MPI_Iprobe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
10  {
11    ::MPI_Status mpi_status;
12
13    ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
14
15    status->mpi_status = new ::MPI_Status(mpi_status);
16    status->ep_src = src;
17    status->ep_tag = tag;
18  }
19
20
21  int MPI_Improbe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
22  {
23    ::MPI_Status mpi_status;
24    ::MPI_Message mpi_message;
25
26    //#ifdef _openmpi
27    ::MPI_Improbe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_message, &mpi_status);
28    /*#pragma omp critical (_mpi_call)
29    {
30      ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
31      if(*flag)
32      {
33        ::MPI_Mprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), &mpi_message, &mpi_status);
34      }
35    }*/
36    //#elif _intelmpi
37    //::MPI_Improbe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_message, &mpi_status);
38    //#endif
39     
40    status->mpi_status = new ::MPI_Status(mpi_status);
41    status->ep_src = src;
42    status->ep_tag = tag;
43
44    (*message)->mpi_message = &message;
45    (*message)->ep_src = src;
46    (*message)->ep_tag = tag;
47  }
48
49
50  int MPI_Iprobe(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
51  {
52    if(!comm->is_ep)
53    {
54      Debug("MPI_Iprobe with MPI\n");
55      return MPI_Iprobe_mpi(src, tag, comm, flag, status);
56    }
57
58    if(comm->is_intercomm)
59    {
60      if(src>=0) src = comm->inter_rank_map->at(src);
61    } 
62   
63    return MPI_Iprobe_endpoint(src, tag, comm, flag, status);
64  }
65
66  int MPI_Iprobe_endpoint(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
67  {
68    Debug("MPI_Iprobe with EP\n");
69
70    *flag = false;
71   
72    #pragma omp critical (_query)
73    for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
74    {
75      bool src_matched = src<0? true: (*it)->ep_src == src;
76      bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
77     
78      if(src_matched && tag_matched)       
79      {
80        Debug("find message\n");
81         
82        status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
83        status->ep_src = (*it)->ep_src;
84        status->ep_tag = (*it)->ep_tag;
85       
86        if(comm->is_intercomm)
87        {
88          for(INTER_RANK_MAP::iterator iter = comm->inter_rank_map->begin(); iter != comm->inter_rank_map->end(); iter++)
89          {
90            if(iter->second == (*it)->ep_src) status->ep_src=iter->first;
91          }
92        }
93
94        *flag = true;
95        break;
96      }
97    }
98    if(*flag) return 0;
99   
100    Message_Check(comm);
101
102    #pragma omp flush
103
104    #pragma omp critical (_query)
105    for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
106    {
107      bool src_matched = src<0? true: (*it)->ep_src == src;
108      bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
109     
110      if(src_matched && tag_matched)       
111      {
112        Debug("find message\n");
113         
114        status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
115        status->ep_src = (*it)->ep_src;
116        status->ep_tag = (*it)->ep_tag;
117       
118        if(comm->is_intercomm)
119        {
120          for(INTER_RANK_MAP::iterator iter = comm->inter_rank_map->begin(); iter != comm->inter_rank_map->end(); iter++)
121          {
122            if(iter->second == (*it)->ep_src) status->ep_src=iter->first;
123          }
124        }
125
126        *flag = true;
127        break;
128      }
129    }
130    if(*flag) return 0;
131  }
132
133 
134
135  int MPI_Improbe(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
136  {
137    if(!comm->is_ep)
138    {
139      Debug("MPI_Iprobe with MPI\n");
140      return MPI_Improbe_mpi(src, tag, comm, flag, message, status);
141    }
142
143    if(comm->is_intercomm)
144    {
145      src = comm->inter_rank_map->at(src);
146      *message = new ep_message;
147      printf("============= new *message = %p\n", *message);
148    } 
149   
150    return MPI_Improbe_endpoint(src, tag, comm, flag, message, status);
151  }
152
153
154
155  int MPI_Improbe_endpoint(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
156  {
157    int ep_rank_loc = comm->ep_comm_ptr->size_rank_info[1].first;
158    int mpi_rank    = comm->ep_comm_ptr->size_rank_info[2].first;
159
160    *flag = false;
161   
162    #pragma omp critical (_query)
163    if(! comm->ep_comm_ptr->message_queue->empty())
164    {
165      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
166      {
167                                         
168        bool src_matched = src<0? true: (*it)->ep_src == src;
169        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
170       
171        if(src_matched && tag_matched)
172        {
173          *flag = true;
174
175          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
176          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
177          status->ep_src = (*it)->ep_src;
178          status->ep_tag = (*it)->ep_tag;
179
180          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
181          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
182          (*message)->ep_src = (*it)->ep_src;
183          (*message)->ep_tag = (*it)->ep_tag;
184                                     
185
186          #pragma omp critical (_query2)
187          {             
188            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
189            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
190            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
191           
192           
193            delete (*it)->mpi_message;     
194            delete (*it)->mpi_status; 
195            delete *it;
196           
197                       
198            comm->ep_comm_ptr->message_queue->erase(it);
199            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
200            #pragma omp flush
201          }
202         
203          break;
204        }
205
206      }
207    }
208
209    if(*flag) return 0;
210   
211    Message_Check(comm);
212   
213    #pragma omp flush
214
215    #pragma omp critical (_query)
216    if(! comm->ep_comm_ptr->message_queue->empty())
217    {
218      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
219      {
220                                         
221        bool src_matched = src<0? true: (*it)->ep_src == src;
222        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
223       
224        if(src_matched && tag_matched)
225        {
226          *flag = true;
227
228          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
229          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
230          status->ep_src = (*it)->ep_src;
231          status->ep_tag = (*it)->ep_tag;
232
233          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
234          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
235          (*message)->ep_src = (*it)->ep_src;
236          (*message)->ep_tag = (*it)->ep_tag;
237                                     
238
239          #pragma omp critical (_query2)
240          {             
241            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
242            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
243            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
244           
245           
246            delete (*it)->mpi_message;     
247            delete (*it)->mpi_status; 
248            delete *it;
249           
250                       
251            comm->ep_comm_ptr->message_queue->erase(it);
252            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
253            #pragma omp flush
254          }
255         
256          break;
257        }
258
259      }
260    }
261
262    if(*flag) return 0;
263
264  }
265
266}
267
268
269#endif
Note: See TracBrowser for help on using the repository browser.