source: XIOS/trunk/src/context_server.cpp @ 423

Last change on this file since 423 was 401, checked in by ymipsl, 12 years ago
  • Implement mechanism for tracking memory leak
  • Correct important memory leaks
  • Add complementary report on memory consumption

YM

  • Property svn:eol-style set to native
File size: 5.8 KB
RevLine 
[300]1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
[352]5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
[300]8#include "domain.hpp"
[352]9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[401]15#include "cxios.hpp"
[300]16
17
18
[335]19namespace xios
[300]20{
21
[345]22  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
[300]23  {
24    context=parent ;
25    intraComm=intraComm_ ;
26    MPI_Comm_size(intraComm,&intraCommSize) ;
27    MPI_Comm_rank(intraComm,&intraCommRank) ;
28    interComm=interComm_ ;
29    int flag ;
30    MPI_Comm_test_inter(interComm,&flag) ;
31    if (flag) MPI_Comm_remote_size(interComm,&commSize);
32    else  MPI_Comm_size(interComm,&commSize) ;
33    currentTimeLine=0 ;
34    finished=false ;
35  }
36  void CContextServer::setPendingEvent(void)
37  {
38    pendingEvent=true ;
39  }
40 
41  bool CContextServer::hasPendingEvent(void)
42  {
43    return pendingEvent ;
44  }
45 
46  bool CContextServer::eventLoop(void)
47  {
48    listen() ;
49    checkPendingRequest() ;
50    processEvents() ;
51    return finished ;
52  }
53
54  void CContextServer::listen(void)
55  {
56    int rank;
57    int flag ;
58    int count ;
59    char * addr ;
60    MPI_Status status; 
61    map<int,CServerBuffer*>::iterator it;
62   
63    for(rank=0;rank<commSize;rank++)
64    {
65      if (pendingRequest.find(rank)==pendingRequest.end())
66      {
[347]67        traceOff() ;
[300]68        MPI_Iprobe(rank,20,interComm,&flag,&status);     
[347]69        traceOn() ;
[300]70        if (flag==true)
71        {
72          it=buffers.find(rank) ;
73          if (it==buffers.end()) 
74            it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer))).first ;
75          MPI_Get_count(&status,MPI_CHAR,&count) ;
76          if (it->second->isBufferFree(count))
77          {
78            addr=(char*)it->second->getBuffer(count) ;
79            MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]) ;
80            bufferRequest[rank]=addr ;
81          }
82        }
83      }
84    }
85  }
86 
87  void CContextServer::checkPendingRequest(void)
88  {
89    map<int,MPI_Request>::iterator it;
90    list<int> recvRequest ;
91    list<int>::iterator itRecv;
92    int rank ;
93    int flag ;
94    int count ;
95    MPI_Status status ;
96   
97    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
98    {
99      rank=it->first ;
[347]100      traceOff() ;
[300]101      MPI_Test(& it->second, &flag, &status) ;
[347]102      traceOn() ;
[300]103      if (flag==true)
104      {
105        recvRequest.push_back(rank) ;
106        MPI_Get_count(&status,MPI_CHAR,&count) ;
107        processRequest(rank,bufferRequest[rank],count) ;
108      }
109    }
110   
111    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++) 
112    {
113      pendingRequest.erase(*itRecv) ;
114      bufferRequest.erase(*itRecv) ;
115    }
116  }
117 
118  void CContextServer::processRequest(int rank, char* buff,int count)
119  {
120   
121    CBufferIn buffer(buff,count) ;
122    char* startBuffer,endBuffer ;
123    int size, offset ;
124    size_t timeLine ;
125    map<size_t,CEventServer*>::iterator it ;
126       
127    while(count>0)
128    {
129      char* startBuffer=(char*)buffer.ptr() ;
130      CBufferIn newBuffer(startBuffer,buffer.remain()) ;
131      newBuffer>>size>>timeLine ;
132
133      it=events.find(timeLine) ;
134      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first ;
135      it->second->push(rank,buffers[rank],startBuffer,size) ;
136
137      buffer.advance(size) ;
138      count=buffer.remain() ;           
139    } 
140 
141  }
142   
143  void CContextServer::processEvents(void)
144  {
145    map<size_t,CEventServer*>::iterator it ;
146    CEventServer* event ;
147   
148    it=events.find(currentTimeLine) ;
149    if (it!=events.end()) 
150    {
151      event=it->second ;
152      if (event->isFull())
153      {
[347]154         CTimer::get("Process events").resume() ;
[300]155         dispatchEvent(*event) ;
[347]156         CTimer::get("Process events").suspend() ;
[300]157         pendingEvent=false ;
158         delete event ;
159         events.erase(it) ;
160         currentTimeLine++ ;
161       }
162     }
163   }
164       
165  CContextServer::~CContextServer()
166  {
167    map<int,CServerBuffer*>::iterator it ;
168    for(it=buffers.begin();it!=buffers.end();++it) delete it->second ; 
169  } 
170
171
172  void CContextServer::dispatchEvent(CEventServer& event)
173  {
174    string contextName ;
175    string buff ;
176    int MsgSize ;
177    int rank ;
178    list<CEventServer::SSubEvent>::iterator it ;
[346]179    CContext::setCurrent(context->getId()) ;
[300]180       
181    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
182    {
183      info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl ;
184      context->finalize() ;
185      finished=true ;
[401]186      report(0)<< " Memory report : Context <"<<context->getId()<<"> : server side : total memory used for buffer "<<buffers.size()*CXios::bufferSize<<" bytes"<<endl ;
[300]187    }
188    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event) ;
189    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event) ;
190    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event) ;
191    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event) ;
192    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event) ;
193    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event) ;
194    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event) ;
195    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event) ;
196    else if (event.classId==CField::GetType()) CField::dispatchEvent(event) ;
197    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event) ;
198    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event) ;
199    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event) ;
200    else
201    {
202      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl) ;
203    }
204  }
205}
Note: See TracBrowser for help on using the repository browser.