source: XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp @ 1482

Last change on this file since 1482 was 1374, checked in by yushan, 7 years ago

unify type : MPI_Request

File size: 6.5 KB
RevLine 
[1134]1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
[1295]11#include "ep_mpi.hpp"
[1134]12
13using namespace std;
14
[1196]15extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
16#pragma omp threadprivate(EP_PendingRequests)
17
[1134]18namespace ep_lib
19{
20
21  int Message_Check(MPI_Comm comm)
22  {
23    if(!comm.is_ep) return 0;
24
25    if(comm.is_intercomm)
26    {
27      return  Message_Check_intercomm(comm);
28    }
29
30    int flag = true;
31    ::MPI_Message message;
32    ::MPI_Status status;
33    int mpi_source;
34
35    while(flag) // loop until the end of global queue
36    {
37      Debug("Message probing for intracomm\n");
[1295]38     
[1134]39      #pragma omp critical (_mpi_call)
40      {
[1295]41        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
[1134]42        if(flag)
43        {
44          Debug("find message in mpi comm \n");
45          mpi_source = status.MPI_SOURCE;
46          int tag = status.MPI_TAG;
[1295]47          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
[1134]48
49        }
50      }
[1374]51
[1176]52     
[1134]53      if(flag)
54      {
55
[1220]56        MPI_Message *msg_block = new MPI_Message; 
[1362]57        msg_block->mpi_message = new ::MPI_Message;
58        *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message; 
[1220]59        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
60        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
61        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
62        int src_mpi       = status.MPI_SOURCE;
[1134]63             
[1220]64        msg_block->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);       
[1134]65        msg_block->mpi_status = new ::MPI_Status(status);
66
67        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
68        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
69
70
71        #pragma omp critical (_query)
72        {
73          #pragma omp flush
[1295]74          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);     
[1134]75          #pragma omp flush
76        }
77       
[1374]78        delete msg_block;
[1134]79      }
80
81    }
82
83    return MPI_SUCCESS;
84  }
85
86
87
88  int Message_Check_intercomm(MPI_Comm comm)
89  {
90    if(!comm.ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
91
92    Debug("Message probing for intercomm\n");
93
94    int flag = true;
95    ::MPI_Message message;
96    ::MPI_Status status;
97    int mpi_source;
98    int current_ep_rank;
99    MPI_Comm_rank(comm, &current_ep_rank);
100
101    while(flag) // loop until the end of global queue "comm.ep_comm_ptr->intercomm->mpi_inter_comm"
102    {
103      Debug("Message probing for intracomm\n");
[1295]104
[1134]105      #pragma omp critical (_mpi_call)
106      {
[1295]107        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);
[1134]108        if(flag)
109        {
110          Debug("find message in mpi comm \n");
111          mpi_source = status.MPI_SOURCE;
112          int tag = status.MPI_TAG;
[1295]113          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);
[1134]114
115        }
116      }
[1355]117     
[1347]118
[1134]119      if(flag)
120      {
121
122        MPI_Message *msg_block = new MPI_Message;
[1362]123        msg_block->mpi_message = new ::MPI_Message;
124        *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message;
[1134]125        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
126        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
127        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
128        int src_mpi       = status.MPI_SOURCE;
129        int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first;
130             
131        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc,  src_mpi);
132        msg_block->mpi_status = new ::MPI_Status(status);
133
134
135        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
136        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
137
138
139        #pragma omp critical (_query)
140        {
141          #pragma omp flush
[1295]142          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);
[1134]143          #pragma omp flush
144        }
145       
146        delete msg_block;
147       
148      }
149
150    }
151
152    flag = true;
153    while(flag) // loop until the end of global queue "comm.mpi_comm"
154    {
155      Debug("Message probing for intracomm\n");
[1295]156     
[1134]157      #pragma omp critical (_mpi_call)
158      {
[1295]159        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);
[1134]160        if(flag)
161        {
162          Debug("find message in mpi comm \n");
163          mpi_source = status.MPI_SOURCE;
164          int tag = status.MPI_TAG;
[1295]165          ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);
[1134]166
167        }
168      }
[1355]169     
[1347]170
[1134]171      if(flag)
172      {
173
174        MPI_Message *msg_block = new MPI_Message;
[1362]175        msg_block->mpi_message = new ::MPI_Message;
176        *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message;
[1134]177        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
178        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
179        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
180        int src_mpi       = status.MPI_SOURCE;
181       
182        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc, src_mpi);
183        msg_block->mpi_status = new ::MPI_Status(status);
184       
185
186        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
187        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
188
189
190        #pragma omp critical (_query)
191        {
192          #pragma omp flush
[1295]193          comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);
[1134]194          #pragma omp flush
195        }
196       
197        delete msg_block;
198       
199      }
200
201    }
202
203    return MPI_SUCCESS;
204  }
205
[1196]206  int Request_Check()
207  {
208    MPI_Status status;
209    MPI_Message message;
210    int probed = false;
211    int recv_count = 0;
212    std::list<MPI_Request* >::iterator it;
213   
214    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
215    { 
216      Message_Check((*it)->comm);
217    }
218
219
220    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
221    {
222      MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status);
223      if(probed)
224      {
225        MPI_Get_count(&status, (*it)->ep_datatype, &recv_count);
226        MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it);
227        (*it)->type = 3;
228        EP_PendingRequests->erase(it);
229        it = EP_PendingRequests->begin();
230        continue;
231      }
232      it++;
233    }
234  }
235
[1134]236}
Note: See TracBrowser for help on using the repository browser.