source: XIOS/dev/dev_trunk_omp/extern/src_ep_dev2/ep_probe.cpp @ 1838

Last change on this file since 1838 was 1651, checked in by yushan, 5 years ago

dev on EP for tracing with itac. Tested on ADA with test_omp

File size: 10.1 KB
Line 
1#ifdef _usingEP
2#include "ep_lib.hpp"
3#include <mpi.h>
4#include "ep_declaration.hpp"
5#include "ep_mpi.hpp"
6
7namespace ep_lib
8{
9  int MPI_Iprobe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
10  {
11    ::MPI_Status mpi_status;
12
13    ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
14
15    status->mpi_status = new ::MPI_Status(mpi_status);
16    status->ep_src = src;
17    status->ep_tag = tag;
18  }
19
20
21  int MPI_Improbe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
22  {
23    ::MPI_Status mpi_status;
24    ::MPI_Message mpi_message;
25
26    #ifdef _openmpi
27    #pragma omp critical (_mpi_call)
28    {
29      ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
30      if(*flag)
31      {
32        ::MPI_Mprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), &mpi_message, &mpi_status);
33      }
34    }
35    #elif _intelmpi
36    ::MPI_Improbe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_message, &mpi_status);
37    #endif
38     
39    status->mpi_status = new ::MPI_Status(mpi_status);
40    status->ep_src = src;
41    status->ep_tag = tag;
42
43    (*message)->mpi_message = &message;
44    (*message)->ep_src = src;
45    (*message)->ep_tag = tag;
46  }
47
48
49  int MPI_Iprobe(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
50  {
51    *flag = false;
52    if(src>=0) status->ep_src = src;
53    if(comm->is_intercomm)
54    {
55      if(src>=0) src = comm->inter_rank_map->at(src);
56    }
57
58    ::MPI_Status mpi_status;
59   
60    if(src>=0)
61    {
62
63      int mpi_src = comm->ep_rank_map->at(src).second;
64      int mpi_tag = tag_combine(tag, comm->ep_rank_map->at(src).first, comm->ep_comm_ptr->size_rank_info[1].first);
65      //printf("MPI_Iprobe with src\n");
66     
67      ::MPI_Iprobe(mpi_src, mpi_tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
68      if(*flag)
69      {
70        //printf("proc %d probe from proc %d\n", comm->ep_comm_ptr->size_rank_info[0].first, src);
71        status->mpi_status = new ::MPI_Status(mpi_status);
72      }
73      return MPI_SUCCESS;
74    }
75    else
76    {
77      //printf("MPI_Iprobe with ANY_SOURCE\n");
78      //for(int i=0; i<comm->ep_comm_ptr->size_rank_info[1].second; i++)
79      for(int i=0; i<4; i++)
80      {
81        int mpi_tag = tag_combine(tag, i, comm->ep_comm_ptr->size_rank_info[1].first);
82        ::MPI_Iprobe(MPI_ANY_SOURCE, mpi_tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
83        if(*flag) 
84        {
85          //printf("proc %d probe from proc %d\n", comm->ep_comm_ptr->size_rank_info[0].first, get_ep_rank(comm, i, mpi_status.MPI_SOURCE));
86          status->mpi_status = new ::MPI_Status(mpi_status);
87          //printf("get_ep_rank in MPI_Iprobe\n");
88          status->ep_src = get_ep_rank(comm, i, mpi_status.MPI_SOURCE);
89          return MPI_SUCCESS;
90        }
91   
92      }
93     
94    }
95
96  }
97
98
99
100  int MPI_Iprobe2(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
101  {
102    if(!comm->is_ep)
103    {
104      Debug("MPI_Iprobe with MPI\n");
105      return MPI_Iprobe_mpi(src, tag, comm, flag, status);
106    }
107
108    if(comm->is_intercomm)
109    {
110      if(src>=0) src = comm->inter_rank_map->at(src);
111    } 
112   
113    return MPI_Iprobe_endpoint(src, tag, comm, flag, status);
114  }
115
116  int MPI_Iprobe_endpoint(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
117  {
118    Debug("MPI_Iprobe with EP\n");
119
120    *flag = false;
121   
122    #pragma omp critical (_query)
123    for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
124    {
125      bool src_matched = src<0? true: (*it)->ep_src == src;
126      bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
127     
128      if(src_matched && tag_matched)       
129      {
130        Debug("find message\n");
131         
132        status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
133        status->ep_src = (*it)->ep_src;
134        status->ep_tag = (*it)->ep_tag;
135       
136        if(comm->is_intercomm)
137        {
138          for(INTER_RANK_MAP::iterator iter = comm->inter_rank_map->begin(); iter != comm->inter_rank_map->end(); iter++)
139          {
140            if(iter->second == (*it)->ep_src) status->ep_src=iter->first;
141          }
142        }
143
144        *flag = true;
145        break;
146      }
147    }
148    if(*flag) return 0;
149   
150    Message_Check(comm);
151
152    #pragma omp flush
153
154    #pragma omp critical (_query)
155    for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
156    {
157      bool src_matched = src<0? true: (*it)->ep_src == src;
158      bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
159     
160      if(src_matched && tag_matched)       
161      {
162        Debug("find message\n");
163         
164        status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
165        status->ep_src = (*it)->ep_src;
166        status->ep_tag = (*it)->ep_tag;
167       
168        if(comm->is_intercomm)
169        {
170          for(INTER_RANK_MAP::iterator iter = comm->inter_rank_map->begin(); iter != comm->inter_rank_map->end(); iter++)
171          {
172            if(iter->second == (*it)->ep_src) status->ep_src=iter->first;
173          }
174        }
175
176        *flag = true;
177        break;
178      }
179    }
180    if(*flag) return 0;
181  }
182
183 
184
185  int MPI_Improbe(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
186  {
187    if(!comm->is_ep)
188    {
189      Debug("MPI_Iprobe with MPI\n");
190      return MPI_Improbe_mpi(src, tag, comm, flag, message, status);
191    }
192
193    if(comm->is_intercomm)
194    {
195      src = comm->inter_rank_map->at(src);
196      *message = new ep_message;
197      printf("============= new *message = %p\n", *message);
198    } 
199   
200    return MPI_Improbe_endpoint(src, tag, comm, flag, message, status);
201  }
202
203
204
205  int MPI_Improbe_endpoint(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
206  {
207    int ep_rank_loc = comm->ep_comm_ptr->size_rank_info[1].first;
208    int mpi_rank    = comm->ep_comm_ptr->size_rank_info[2].first;
209
210    *flag = false;
211   
212    #pragma omp critical (_query)
213    if(! comm->ep_comm_ptr->message_queue->empty())
214    {
215      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
216      {
217                                         
218        bool src_matched = src<0? true: (*it)->ep_src == src;
219        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
220       
221        if(src_matched && tag_matched)
222        {
223          *flag = true;
224
225          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
226          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
227          status->ep_src = (*it)->ep_src;
228          status->ep_tag = (*it)->ep_tag;
229
230          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
231          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
232          (*message)->ep_src = (*it)->ep_src;
233          (*message)->ep_tag = (*it)->ep_tag;
234                                     
235
236          #pragma omp critical (_query2)
237          {             
238            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
239            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
240            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
241           
242           
243            delete (*it)->mpi_message;     
244            delete (*it)->mpi_status; 
245            delete *it;
246           
247                       
248            comm->ep_comm_ptr->message_queue->erase(it);
249            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
250            #pragma omp flush
251          }
252         
253          break;
254        }
255
256      }
257    }
258
259    if(*flag) return 0;
260   
261    Message_Check(comm);
262   
263    #pragma omp flush
264
265    #pragma omp critical (_query)
266    if(! comm->ep_comm_ptr->message_queue->empty())
267    {
268      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
269      {
270                                         
271        bool src_matched = src<0? true: (*it)->ep_src == src;
272        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
273       
274        if(src_matched && tag_matched)
275        {
276          *flag = true;
277
278          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
279          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
280          status->ep_src = (*it)->ep_src;
281          status->ep_tag = (*it)->ep_tag;
282
283          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
284          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
285          (*message)->ep_src = (*it)->ep_src;
286          (*message)->ep_tag = (*it)->ep_tag;
287                                     
288
289          #pragma omp critical (_query2)
290          {             
291            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
292            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
293            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
294           
295           
296            delete (*it)->mpi_message;     
297            delete (*it)->mpi_status; 
298            delete *it;
299           
300                       
301            comm->ep_comm_ptr->message_queue->erase(it);
302            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
303            #pragma omp flush
304          }
305         
306          break;
307        }
308
309      }
310    }
311
312    if(*flag) return 0;
313
314  }
315
316}
317
318
319#endif
Note: See TracBrowser for help on using the repository browser.