XMLIOSERVER 0.4
Serveur d'Entrées/Sorties parallèles
|
00001 /* ************************************************************************** * 00002 * Copyright © IPSL/LSCE, XMLIOServer, Avril 2010 - Octobre 2011 * 00003 * ************************************************************************** */ 00004 00013 // XMLIOServer headers 00014 #include "mpi_interface.hpp" 00015 00016 // /////////////////////////////// Définitions ////////////////////////////// // 00017 00018 namespace xmlioserver { 00019 namespace comm { 00020 00021 // ---------------------- Initialisation & Finalisation --------------------- 00022 00023 void CMPIManager::Initialise(int * _argc, char *** _argv) 00024 { 00025 int flag = 0; 00026 if (MPI_Initialized(&flag) != MPI_SUCCESS) 00027 XIOS_ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !"); 00028 if (!flag) 00029 { 00030 if (MPI_Init(_argc, _argv) != MPI_SUCCESS) 00031 XIOS_ERROR("CMPIManager::Initialise(arc, argv)", << " MPI Error !"); 00032 } 00033 00034 } 00035 00036 void CMPIManager::Finalize(void) 00037 { 00038 if (MPI_Finalize() != MPI_SUCCESS) 00039 XIOS_ERROR("CMPIManager::Finalize(void)", << " MPI Error !"); 00040 } 00041 00042 // ------------------------------ Communicateurs ---------------------------- 00043 00044 int CMPIManager::GetCommRank(MPI_Comm _comm) 00045 { 00046 int rank = 0; 00047 if (MPI_Comm_rank(_comm, &rank) != MPI_SUCCESS) 00048 XIOS_ERROR("CMPIManager::GetCommRank(comm)", << " MPI Error !"); 00049 return (rank); 00050 } 00051 00052 int CMPIManager::GetCommSize(MPI_Comm _comm) 00053 { 00054 int size = 0; 00055 if (MPI_Comm_size(_comm, &size) != MPI_SUCCESS) 00056 XIOS_ERROR("CMPIManager::GetCommSize(comm)", << " MPI Error !"); 00057 return (size); 00058 } 00059 00060 MPI_Comm CMPIManager::CreateComm(MPI_Group _group, MPI_Comm _pcomm) 00061 { 00062 MPI_Comm commu; 00063 if (MPI_Comm_create(_pcomm, _group, &commu) != MPI_SUCCESS) 00064 XIOS_ERROR("CMPIManager::CreateComm(group, pcomm)", << " MPI Error !"); 00065 return (commu); 00066 } 00067 00068 MPI_Comm CMPIManager::GetCommWorld(void) 00069 { 00070 return (MPI_COMM_WORLD); 00071 } 00072 00073 // ---------------------------------- Autre --------------------------------- 00074 00075 void CMPIManager::Barrier(MPI_Comm _comm) 00076 { 00077 if (MPI_Barrier(_comm) != MPI_SUCCESS) 00078 XIOS_ERROR("CMPIManager::Barrier(comm)", << " MPI Error !"); 00079 } 00080 00081 bool CMPIManager::DispatchClient(bool _is_server, 00082 MPI_Comm & _comm_client, 00083 MPI_Comm & _comm_client_server, 00084 MPI_Comm & _comm_server, 00085 MPI_Comm _comm_parent) 00086 { 00087 int value = (_is_server) ? 1 : 2; 00088 std::size_t nbClient = 0, nbServer = 0, nbClientByServer = 0; 00089 std::vector<int> info, rank_client, rank_server; 00090 CMPIManager::AllGather(value, info, _comm_parent); 00091 00092 for (std::size_t s = 0; s < info.size(); s++) 00093 { 00094 if (info[s] == 1) rank_server.push_back(s); 00095 else rank_client.push_back(s); 00096 } 00097 nbClient = rank_client.size(); 00098 nbServer = rank_server.size(); 00099 00100 00101 _comm_client = CMPIManager::CreateComm(CMPIManager::CreateSubGroup( 00102 CMPIManager::GetGroupWorld(), rank_client), _comm_parent); 00103 00104 if (nbServer != 0) 00105 { 00106 std::size_t currentServer = 0; 00107 nbClientByServer = nbClient/nbServer; 00108 _comm_server = CMPIManager::CreateComm(CMPIManager::CreateSubGroup( 00109 CMPIManager::GetGroupWorld(), rank_server), _comm_parent); 00110 00111 //std::cout << nbClient << "," << nbServer << "," << nbClientByServer << std::endl; 00112 00113 for (std::size_t mm = 0; mm < nbClient; mm += nbClientByServer) 00114 { 00115 std::vector<int> group_rank; 00116 group_rank.push_back(rank_server[currentServer++]); 00117 for (std::size_t nn = 0; nn < nbClientByServer; nn++) 00118 group_rank.push_back(rank_client[nn+mm]); 00119 MPI_Comm comm_client_server_ = CMPIManager::CreateComm(CMPIManager::CreateSubGroup( 00120 CMPIManager::GetGroupWorld(), group_rank), _comm_parent); 00121 00122 if (std::find(group_rank.begin(), group_rank.end(), 00123 CMPIManager::GetCommRank(_comm_parent)) != group_rank.end()) 00124 { 00125 _comm_client_server = comm_client_server_; 00126 } 00127 00128 group_rank.clear(); 00129 } 00130 return (true); 00131 } 00132 else 00133 { 00134 _comm_server = _comm_client; 00135 return (false); 00136 } 00137 } 00138 00139 00140 // --------------------------------- Groupes -------------------------------- 00141 00142 MPI_Group CMPIManager::GetGroupWorld(void) 00143 { 00144 MPI_Group group = 0; 00145 if (MPI_Comm_group(CMPIManager::GetCommWorld(), &group) != MPI_SUCCESS) 00146 XIOS_ERROR("CMPIManager::GetGroupWorld()", << " MPI Error !"); 00147 return (group); 00148 } 00149 00150 MPI_Group CMPIManager::CreateSubGroup(MPI_Group _pgroup, const std::vector<int> & _ranks) 00151 { 00152 MPI_Group group = 0; 00153 if (MPI_Group_incl(_pgroup, _ranks.size(), const_cast<int*>(&(_ranks[0])), &group) != MPI_SUCCESS) 00154 XIOS_ERROR("CMPIManager::CreateSubGroup(pgroup, ranks)", << " MPI Error !"); 00155 return (group); 00156 } 00157 00158 MPI_Group CMPIManager::CreateSubGroup 00159 (MPI_Group _pgroup, int _min_rank, int _max_rank, int _intval) 00160 { 00161 std::vector<int> ranks; 00162 for (int i = _min_rank; i <= _max_rank; i += _intval) 00163 ranks.push_back(i); 00164 return (CMPIManager::CreateSubGroup(_pgroup, ranks)); 00165 } 00166 00167 // ----------------------------------- Tests -------------------------------- 00168 00169 bool CMPIManager::IsMaster(MPI_Comm _comm) 00170 { 00171 return (CMPIManager::GetCommRank(_comm) == 0); 00172 } 00173 00174 bool CMPIManager::IsRank(int _rank, MPI_Comm _comm) 00175 { 00176 return (CMPIManager::GetCommRank(_comm) == _rank); 00177 } 00178 00179 // --------------------------- Communication simple ------------------------- 00180 00181 void CMPIManager::Send (MPI_Comm _comm, int _dest_rank, char * _data, 00182 std::size_t _size, MPI_Request & _request) 00183 { 00184 int nsize = _size; 00185 if (MPI_Issend(_data, nsize, MPI_CHAR, _dest_rank, 0, _comm, &_request) != MPI_SUCCESS) 00186 XIOS_ERROR("CMPIManager::Send (comm, dest_rank, data, size, request)", << " MPI Error !"); 00187 } 00188 00189 void CMPIManager::Wait (MPI_Request & _request) 00190 { 00191 MPI_Status status; 00192 if (MPI_Wait(&_request, &status) != MPI_SUCCESS) 00193 XIOS_ERROR("CMPIManager::Wait (request)", << " MPI Error !"); 00194 } 00195 00196 bool CMPIManager::Test (MPI_Request & _request) 00197 { 00198 MPI_Status status; 00199 int flag = 0; 00200 if (MPI_Test(&_request, &flag, &status) != MPI_SUCCESS) 00201 XIOS_ERROR("CMPIManager::Test (request)", << " MPI Error !"); 00202 return (flag); 00203 } 00204 00205 bool CMPIManager::HasReceivedData(MPI_Comm _comm, int _src_rank) 00206 { 00207 MPI_Status status; 00208 int flag = 0; 00209 if (MPI_Iprobe(_src_rank, MPI_ANY_TAG, _comm, &flag, &status) != MPI_SUCCESS) 00210 XIOS_ERROR("CMPIManager::HasReceivedData (comm, rank)", << " MPI Error !"); 00211 return (flag); 00212 } 00213 00214 std::size_t CMPIManager::GetReceivedDataSize(MPI_Comm _comm, int _src_rank) 00215 { 00216 MPI_Status status; 00217 int flag = 0, size = 0; 00218 if (MPI_Iprobe(_src_rank, MPI_ANY_TAG, _comm, &flag, &status) != MPI_SUCCESS) 00219 XIOS_ERROR("CMPIManager::getReceivedDataSize (comm, rank)", << " MPI Error !"); 00220 if (!flag) return (0); 00221 if (MPI_Get_count(&status, MPI_CHAR, &size) != MPI_SUCCESS) 00222 XIOS_ERROR("CMPIManager::getReceivedDataSize (comm, rank)", << " MPI Error !"); 00223 00224 return (size); 00225 } 00226 00227 void CMPIManager::Receive(MPI_Comm _comm, int _src_rank, char * _data) 00228 { 00229 MPI_Request request = 0; 00230 int size = CMPIManager::GetReceivedDataSize(_comm, _src_rank); 00231 if (MPI_Irecv(_data, size, MPI_CHAR, _src_rank, MPI_ANY_TAG, _comm, &request) != MPI_SUCCESS) 00232 XIOS_ERROR("CMPIManager::Receive (comm, src_rank, data)", << " MPI Error !"); 00233 CMPIManager::Wait (request); // Temporaire 00234 } 00235 00236 void CMPIManager::AllGather(int _indata, std::vector<int> & _outdata, MPI_Comm _comm) 00237 { 00238 std::vector<int> data; data.push_back(_indata); 00239 CMPIManager::AllGather(data, _outdata, _comm); 00240 } 00241 00242 void CMPIManager::AllGather(const std::vector<int> & _indata, 00243 std::vector<int> & _outdata, MPI_Comm _comm) 00244 { 00245 int sendcount = _indata.size(), 00246 recvcount = _indata.size() * CMPIManager::GetCommSize(_comm); 00247 _outdata.resize(recvcount); 00248 if (MPI_Allgather ( const_cast<int*>(&(_indata[0])), sendcount, MPI_INTEGER, 00249 &(_outdata[0]) , recvcount, MPI_INTEGER, _comm) != MPI_SUCCESS) 00250 XIOS_ERROR("CMPIManager::AllGather (indata, outdata, comm)", << " MPI Error !"); 00251 } 00252 00253 // ------------------------- Communication 'complexe' ----------------------- 00254 00255 //void SendLinearBuffer(MPIComm comm, int dest_rank, CLinearBuffer & buff, MPIRequest & request); 00256 //void ReceiveLinearBuffer(MPIComm comm, int src_rank, CLinearBuffer & buff); 00257 //boost::shared_ptr<CLinearBuffer> ReceiveLinearBuffer(MPIComm comm, int src_rank); 00258 //void ReceiveCircularBuffer(MPIComm comm, int src_rank, CCircularBuffer & buff); 00259 00260 // ---------------------- Mémoire (non fonctionnel ....) -------------------- 00261 00262 void CMPIManager::AllocMemory(void * _data, std::size_t _size) 00263 { 00264 if (MPI_Alloc_mem(sizeof(char) * _size, MPI_INFO_NULL, _data) != MPI_SUCCESS) 00265 XIOS_ERROR("CMPIManager::AllocMem(data, size)", << " MPI Error !"); 00266 } 00267 00268 void CMPIManager::FreeMemory (void * _data) 00269 { 00270 MPI_Free_mem(_data); 00271 } 00272 00273 } // namespace comm 00274 } // namespace xmlioserver 00275