XIOS  1.0
Xml I/O Server
 Tout Classes Espaces de nommage Fichiers Fonctions Variables Définitions de type Énumérations Valeurs énumérées Amis Macros
context_server.cpp
Aller à la documentation de ce fichier.
1 #include "context_server.hpp"
2 #include "buffer_in.hpp"
3 #include "type.hpp"
4 #include "context.hpp"
5 #include "object_template.hpp"
6 #include "group_template.hpp"
7 #include "attribute_template.hpp"
8 #include "domain.hpp"
9 #include "field.hpp"
10 #include "file.hpp"
11 #include "grid.hpp"
12 #include "mpi.hpp"
13 #include "tracer.hpp"
14 #include "timer.hpp"
15 #include "cxios.hpp"
16 #include "event_scheduler.hpp"
17 #include "server.hpp"
18 #include <boost/functional/hash.hpp>
19 
20 
21 
22 namespace xios
23 {
24 
25  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
26  {
27  context=parent;
28  intraComm=intraComm_;
29  MPI_Comm_size(intraComm,&intraCommSize);
30  MPI_Comm_rank(intraComm,&intraCommRank);
31 
32  interComm=interComm_;
33  int flag;
34  MPI_Comm_test_inter(interComm,&flag);
35  if (flag) MPI_Comm_remote_size(interComm,&commSize);
36  else MPI_Comm_size(interComm,&commSize);
37 
39  scheduled=false;
40  finished=false;
41  boost::hash<string> hashString;
42  if (CServer::serverLevel == 1)
43  hashId=hashString(context->getId() + boost::lexical_cast<string>(context->clientPrimServer.size()));
44  else
45  hashId=hashString(context->getId());
46  }
47 
49  {
50  pendingEvent=true;
51  }
52 
54  {
55  return pendingEvent;
56  }
57 
59  {
60  return finished;
61  }
62 
63  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
64  {
65  listen();
67  if (enableEventsProcessing)
68  processEvents();
69  return finished;
70  }
71 
73  {
74  int rank;
75  int flag;
76  int count;
77  char * addr;
78  MPI_Status status;
79  map<int,CServerBuffer*>::iterator it;
80  bool okLoop;
81 
82  traceOff();
83  MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status);
84  traceOn();
85 
86  if (flag==true)
87  {
88  rank=status.MPI_SOURCE ;
89  okLoop = true;
90  if (pendingRequest.find(rank)==pendingRequest.end())
91  okLoop = !listenPendingRequest(status) ;
92  if (okLoop)
93  {
94  for(rank=0;rank<commSize;rank++)
95  {
96  if (pendingRequest.find(rank)==pendingRequest.end())
97  {
98 
99  traceOff();
100  MPI_Iprobe(rank, 20,interComm,&flag,&status);
101  traceOn();
102  if (flag==true) listenPendingRequest(status) ;
103  }
104  }
105  }
106  }
107  }
108 
109  bool CContextServer::listenPendingRequest(MPI_Status& status)
110  {
111  int count;
112  char * addr;
113  map<int,CServerBuffer*>::iterator it;
114  int rank=status.MPI_SOURCE ;
115 
116  it=buffers.find(rank);
117  if (it==buffers.end()) // Receive the buffer size and allocate the buffer
118  {
119  StdSize buffSize = 0;
120  MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status);
121  mapBufferSize_.insert(std::make_pair(rank, buffSize));
122  it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first;
123  return true;
124  }
125  else
126  {
127  MPI_Get_count(&status,MPI_CHAR,&count);
128  if (it->second->isBufferFree(count))
129  {
130  addr=(char*)it->second->getBuffer(count);
131  MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]);
132  bufferRequest[rank]=addr;
133  return true;
134  }
135  else
136  return false;
137  }
138  }
139 
140 
142  {
143  map<int,MPI_Request>::iterator it;
144  list<int> recvRequest;
145  list<int>::iterator itRecv;
146  int rank;
147  int flag;
148  int count;
149  MPI_Status status;
150 
151  for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
152  {
153  rank=it->first;
154  traceOff();
155  MPI_Test(& it->second, &flag, &status);
156  traceOn();
157  if (flag==true)
158  {
159  recvRequest.push_back(rank);
160  MPI_Get_count(&status,MPI_CHAR,&count);
161  processRequest(rank,bufferRequest[rank],count);
162  }
163  }
164 
165  for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
166  {
167  pendingRequest.erase(*itRecv);
168  bufferRequest.erase(*itRecv);
169  }
170  }
171 
172  void CContextServer::processRequest(int rank, char* buff,int count)
173  {
174 
175  CBufferIn buffer(buff,count);
176  char* startBuffer,endBuffer;
177  int size, offset;
178  size_t timeLine;
179  map<size_t,CEventServer*>::iterator it;
180 
181  CTimer::get("Process request").resume();
182  while(count>0)
183  {
184  char* startBuffer=(char*)buffer.ptr();
185  CBufferIn newBuffer(startBuffer,buffer.remain());
186  newBuffer>>size>>timeLine;
187 
188  it=events.find(timeLine);
189  if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first;
190  it->second->push(rank,buffers[rank],startBuffer,size);
191 
192  buffer.advance(size);
193  count=buffer.remain();
194  }
195  CTimer::get("Process request").suspend();
196  }
197 
199  {
200  map<size_t,CEventServer*>::iterator it;
201  CEventServer* event;
202 
203  it=events.find(currentTimeLine);
204  if (it!=events.end())
205  {
206  event=it->second;
207 
208  if (event->isFull())
209  {
210  if (!scheduled && CServer::eventScheduler) // Skip event scheduling for attached mode and reception on client side
211  {
213  scheduled=true;
214  }
216  {
217  // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
218  // The best way to properly solve this problem will be to use the event scheduler also in attached mode
219  // for now just set up a MPI barrier
220  if (!CServer::eventScheduler && CXios::isServer) MPI_Barrier(intraComm) ;
221 
222  CTimer::get("Process events").resume();
223  dispatchEvent(*event);
224  CTimer::get("Process events").suspend();
225  pendingEvent=false;
226  delete event;
227  events.erase(it);
228  currentTimeLine++;
229  scheduled = false;
230  }
231  }
232  }
233  }
234 
236  {
237  map<int,CServerBuffer*>::iterator it;
238  for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
239  }
240 
242  {
243  string contextName;
244  string buff;
245  int MsgSize;
246  int rank;
247  list<CEventServer::SSubEvent>::iterator it;
248  StdString ctxId = context->getId();
249  CContext::setCurrent(ctxId);
250  StdSize totalBuf = 0;
251 
253  {
254  finished=true;
255  info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
256  context->finalize();
257  std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
258  iteMap = mapBufferSize_.end(), itMap;
259  for (itMap = itbMap; itMap != iteMap; ++itMap)
260  {
261  rank = itMap->first;
262  report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
263  << " +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
264  totalBuf += itMap->second;
265  }
266  report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
267  }
268  else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
269  else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
271  else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
272  else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
273  else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
274  else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
275  else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
276  else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
277  else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
278  else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
279  else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
280  else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
281  else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
282  else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
283  else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
284  else
285  {
286  ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
287  }
288  }
289 }
CLog report("report")
Definition: log.hpp:56
static CEventScheduler * eventScheduler
Definition: server.hpp:33
map< int, char * > bufferRequest
static bool dispatchEvent(CEventServer &event)
Dispatch event from the lower communication layer then process event according to its type...
Definition: axis.cpp:476
static ENodeType GetType(void)
Definition: field.cpp:675
static bool dispatchEvent(CEventServer &event)
Dispatch event received from client Whenever a message is received in buffer of server, it will be processed depending on its event type.
Definition: file.cpp:1359
int count
Definition: tracer.cpp:26
CLog info("info")
Definition: log.hpp:55
static bool dispatchEvent(CEventServer &event)
static ENodeType GetType(void)
Definition: file.cpp:50
bool eventLoop(bool enableEventsProcessing=true)
void traceOn(void)
Definition: tracer.cpp:28
bool scheduled
event of current timeline is alreading scheduled ?
CContextServer(CContext *parent, MPI_Comm intraComm, MPI_Comm interComm)
std::vector< CContextClient * > clientPrimServer
Definition: context.hpp:253
size_t remain(void)
Definition: buffer_in.cpp:46
static ENodeType GetType(void)
Definition: domain.cpp:265
void checkPendingRequest(void)
static ENodeType GetType(void)
Definition: context.cpp:59
static ENodeType GetType(void)
Definition: variable.cpp:34
map< int, CServerBuffer * > buffers
void suspend(void)
Definition: timer.cpp:23
static int serverLevel
Definition: server.hpp:35
std::string StdString
Definition: xios_spl.hpp:48
#define xios(arg)
const StdString & getId(void) const
Accesseurs ///.
Definition: object.cpp:26
static ENodeType GetType(void)
Definition: scalar.cpp:41
static void setCurrent(const string &id)
Set context with an id be the current context.
Definition: context.cpp:2029
std::map< int, StdSize > mapBufferSize_
CATCH CScalarAlgorithmReduceScalar::CScalarAlgorithmReduceScalar(CScalar *scalarDestination, CScalar *scalarSource, CReduceScalarToScalar *algo ERROR)("CScalarAlgorithmReduceScalar::CScalarAlgorithmReduceScalar(CScalar* scalarDestination, CScalar* scalarSource, CReduceScalarToScalar* algo)",<< "Operation must be defined."<< "Scalar source "<< scalarSource->getId()<< std::endl<< "Scalar destination "<< scalarDestination->getId())
bool advance(size_t n)
Definition: buffer_in.cpp:44
bool listenPendingRequest(MPI_Status &status)
std::size_t StdSize
Definition: xios_spl.hpp:49
static bool dispatchEvent(CEventServer &event)
Definition: field.cpp:103
map< size_t, CEventServer * > events
static bool dispatchEvent(CEventServer &event)
Dispatch event received from client Whenever a message is received in buffer of server, it will be processed depending on its event type.
Definition: grid.cpp:2024
void processRequest(int rank, char *buff, int count)
static bool dispatchEvent(CEventServer &event)
Deallocate buffers allocated by clientContexts.
Definition: context.cpp:1246
static bool isServer
Check if xios is server.
Definition: cxios.hpp:40
static ENodeType GetType(void)
Definition: axis.cpp:257
void traceOff(void)
Definition: tracer.cpp:39
void resume(void)
Definition: timer.cpp:33
static bool dispatchEvent(CEventServer &event)
Definition: domain.cpp:2423
static CTimer & get(std::string name)
Definition: timer.cpp:54
static ENodeType GetType(void)
Definition: grid.cpp:80
void dispatchEvent(CEventServer &event)
void * ptr(void)
Definition: buffer_in.cpp:66
map< int, MPI_Request > pendingRequest
void finalize(void)
Terminate a context.
Definition: context.cpp:464
static ENodeType GetType(void)
static bool dispatchEvent(CEventServer &event)
Definition: variable.cpp:166
void registerEvent(const size_t timeLine, const size_t contextHashId)
public interface for registring an event from the server