source: XIOS/dev/dev_trunk_omp/extern/src_ep_dev/ep_probe.cpp @ 1646

Last change on this file since 1646 was 1646, checked in by yushan, 5 years ago

branch merged with trunk @1645. arch file (ep&mpi) added for ADA

File size: 8.5 KB
Line 
1#ifdef _usingEP
2#include "ep_lib.hpp"
3#include <mpi.h>
4#include "ep_declaration.hpp"
5#include "ep_mpi.hpp"
6
7namespace ep_lib
8{
9  int MPI_Iprobe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
10  {
11    ::MPI_Status mpi_status;
12
13    ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
14
15    status->mpi_status = new ::MPI_Status(mpi_status);
16    status->ep_src = src;
17    status->ep_tag = tag;
18  }
19
20
21  int MPI_Improbe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
22  {
23    ::MPI_Status mpi_status;
24    ::MPI_Message mpi_message;
25
26    #ifdef _openmpi
27    #pragma omp critical (_mpi_call)
28    {
29      ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
30      if(*flag)
31      {
32        ::MPI_Mprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), &mpi_message, &mpi_status);
33      }
34    }
35    #elif _intelmpi
36    ::MPI_Improbe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_message, &mpi_status);
37    #endif
38     
39    status->mpi_status = new ::MPI_Status(mpi_status);
40    status->ep_src = src;
41    status->ep_tag = tag;
42
43    (*message)->mpi_message = &message;
44    (*message)->ep_src = src;
45    (*message)->ep_tag = tag;
46  }
47
48
49  int MPI_Iprobe(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
50  {
51    if(!comm->is_ep)
52    {
53      Debug("MPI_Iprobe with MPI\n");
54      return MPI_Iprobe_mpi(src, tag, comm, flag, status);
55    }
56
57    if(comm->is_intercomm)
58    {
59      if(src>=0) src = comm->inter_rank_map->at(src);
60    } 
61   
62    return MPI_Iprobe_endpoint(src, tag, comm, flag, status);
63  }
64
65  int MPI_Iprobe_endpoint(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
66  {
67    Debug("MPI_Iprobe with EP\n");
68
69    *flag = false;
70   
71    #pragma omp critical (_query)
72    for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
73    {
74      bool src_matched = src<0? true: (*it)->ep_src == src;
75      bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
76     
77      if(src_matched && tag_matched)       
78      {
79        Debug("find message\n");
80         
81        status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
82        status->ep_src = (*it)->ep_src;
83        status->ep_tag = (*it)->ep_tag;
84       
85        if(comm->is_intercomm)
86        {
87          for(INTER_RANK_MAP::iterator iter = comm->inter_rank_map->begin(); iter != comm->inter_rank_map->end(); iter++)
88          {
89            if(iter->second == (*it)->ep_src) status->ep_src=iter->first;
90          }
91        }
92
93        *flag = true;
94        break;
95      }
96    }
97    if(*flag) return 0;
98   
99    Message_Check(comm);
100
101    #pragma omp flush
102
103    #pragma omp critical (_query)
104    for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
105    {
106      bool src_matched = src<0? true: (*it)->ep_src == src;
107      bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
108     
109      if(src_matched && tag_matched)       
110      {
111        Debug("find message\n");
112         
113        status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
114        status->ep_src = (*it)->ep_src;
115        status->ep_tag = (*it)->ep_tag;
116       
117        if(comm->is_intercomm)
118        {
119          for(INTER_RANK_MAP::iterator iter = comm->inter_rank_map->begin(); iter != comm->inter_rank_map->end(); iter++)
120          {
121            if(iter->second == (*it)->ep_src) status->ep_src=iter->first;
122          }
123        }
124
125        *flag = true;
126        break;
127      }
128    }
129    if(*flag) return 0;
130  }
131
132 
133
134  int MPI_Improbe(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
135  {
136    if(!comm->is_ep)
137    {
138      Debug("MPI_Iprobe with MPI\n");
139      return MPI_Improbe_mpi(src, tag, comm, flag, message, status);
140    }
141
142    if(comm->is_intercomm)
143    {
144      src = comm->inter_rank_map->at(src);
145      *message = new ep_message;
146      printf("============= new *message = %p\n", *message);
147    } 
148   
149    return MPI_Improbe_endpoint(src, tag, comm, flag, message, status);
150  }
151
152
153
154  int MPI_Improbe_endpoint(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
155  {
156    int ep_rank_loc = comm->ep_comm_ptr->size_rank_info[1].first;
157    int mpi_rank    = comm->ep_comm_ptr->size_rank_info[2].first;
158
159    *flag = false;
160   
161    #pragma omp critical (_query)
162    if(! comm->ep_comm_ptr->message_queue->empty())
163    {
164      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
165      {
166                                         
167        bool src_matched = src<0? true: (*it)->ep_src == src;
168        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
169       
170        if(src_matched && tag_matched)
171        {
172          *flag = true;
173
174          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
175          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
176          status->ep_src = (*it)->ep_src;
177          status->ep_tag = (*it)->ep_tag;
178
179          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
180          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
181          (*message)->ep_src = (*it)->ep_src;
182          (*message)->ep_tag = (*it)->ep_tag;
183                                     
184
185          #pragma omp critical (_query2)
186          {             
187            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
188            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
189            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
190           
191           
192            delete (*it)->mpi_message;     
193            delete (*it)->mpi_status; 
194            delete *it;
195           
196                       
197            comm->ep_comm_ptr->message_queue->erase(it);
198            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
199            #pragma omp flush
200          }
201         
202          break;
203        }
204
205      }
206    }
207
208    if(*flag) return 0;
209   
210    Message_Check(comm);
211   
212    #pragma omp flush
213
214    #pragma omp critical (_query)
215    if(! comm->ep_comm_ptr->message_queue->empty())
216    {
217      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
218      {
219                                         
220        bool src_matched = src<0? true: (*it)->ep_src == src;
221        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
222       
223        if(src_matched && tag_matched)
224        {
225          *flag = true;
226
227          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
228          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
229          status->ep_src = (*it)->ep_src;
230          status->ep_tag = (*it)->ep_tag;
231
232          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
233          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
234          (*message)->ep_src = (*it)->ep_src;
235          (*message)->ep_tag = (*it)->ep_tag;
236                                     
237
238          #pragma omp critical (_query2)
239          {             
240            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
241            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
242            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
243           
244           
245            delete (*it)->mpi_message;     
246            delete (*it)->mpi_status; 
247            delete *it;
248           
249                       
250            comm->ep_comm_ptr->message_queue->erase(it);
251            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
252            #pragma omp flush
253          }
254         
255          break;
256        }
257
258      }
259    }
260
261    if(*flag) return 0;
262
263  }
264
265}
266
267
268#endif
Note: See TracBrowser for help on using the repository browser.