source: XIOS/trunk/src/context_client.cpp @ 511

Last change on this file since 511 was 511, checked in by mhnguyen, 10 years ago

Seperating database of context on "client" side and "server" side

+) Add one more context in contex client in case of attached mode
+) Do some minor changements to make sure everything fine in case of attached mode
+) Replace buffer group with the new options

Test
+) On Curie
+) Connection mode: Attached and seperated
+) File mode: one and multiple
+) All tests passed

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 7.8 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "message.hpp"
9#include "event_client.hpp"
10#include "context.hpp"
11#include "mpi.hpp"
12#include "timer.hpp"
13#include "cxios.hpp"
14
15namespace xios
16{
17
18
19    CContextClient::CContextClient(CContext* parent,MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
20     : mapBufferSize_(), parentServer(cxtSer)
21    {
22      context=parent ;
23      intraComm=intraComm_ ;
24      interComm=interComm_ ;
25      MPI_Comm_rank(intraComm,&clientRank) ;
26      MPI_Comm_size(intraComm,&clientSize) ;
27
28      int flag ;
29      MPI_Comm_test_inter(interComm,&flag) ;
30      if (flag) MPI_Comm_remote_size(interComm,&serverSize);
31      else  MPI_Comm_size(interComm,&serverSize) ;
32
33      timeLine=0 ;
34
35    }
36
37
38    void CContextClient::sendEvent(CEventClient& event)
39    {
40      list<int>::iterator itServer ;
41      list<int> ranks ;
42      list<int> sizes ;
43      list<int>::iterator itSize ;
44
45      ranks=event.getRanks() ;
46      if (! event.isEmpty())
47      {
48        sizes=event.getSizes() ;
49        CMessage msg ;
50
51        msg<<*(sizes.begin())<<timeLine ;
52        for(list<int>::iterator it=sizes.begin();it!=sizes.end();it++) *it+=msg.size() ;
53        list<CBufferOut*> buffList=getBuffers(ranks,sizes) ;
54
55        list<CBufferOut*>::iterator it ;
56        for(it=buffList.begin(),itSize=sizes.begin();it!=buffList.end();++it,++itSize)
57        {
58          **it<<*itSize<<timeLine ;
59        }
60        event.send(buffList) ;
61        checkBuffers(ranks) ;
62      }
63
64//      if (context->hasServer)
65      if (0 != parentServer)
66      {
67        waitEvent(ranks);
68        CContext::setCurrent(context->getId());
69      }
70
71      timeLine++ ;
72    }
73
74    void CContextClient::sendBufferSizeEvent()
75    {
76      std::map<int, CClientBuffer*>::iterator it, itE;
77      std::map<int, StdSize>::const_iterator itMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end();
78
79      if (itMap == iteMap)
80         ERROR("CBufferOut*  CContextClient::sendBufferSizeEvent() ;",
81              <<"No information about server buffer, that should not happen...");
82
83      for (; itMap != iteMap; ++iteMap)
84      {
85        if (buffers.end() == buffers.find(itMap->first))
86          newBuffer(itMap->first);
87      }
88
89      CBufferOut* bufOut(NULL);
90      itE = buffers.end();
91      for (it = buffers.begin(); it != itE; ++it)
92      {
93        bufOut = (it->second)->getBuffer(sizeof(StdSize));
94        bufOut->put(mapBufferSize_[it->first]);  // Stupid C++
95        (it->second)->checkBuffer();
96      }
97    }
98
99    void CContextClient::waitEvent(list<int>& ranks)
100    {
101//      context->server->setPendingEvent() ;
102//      while(checkBuffers(ranks))
103//      {
104//        context->server->listen() ;
105//        context->server->checkPendingRequest() ;
106//      }
107//
108//      while(context->server->hasPendingEvent())
109//      {
110//       context->server->eventLoop() ;
111//      }
112
113      parentServer->server->setPendingEvent() ;
114      while(checkBuffers(ranks))
115      {
116        parentServer->server->listen() ;
117        parentServer->server->checkPendingRequest() ;
118      }
119
120      while(parentServer->server->hasPendingEvent())
121      {
122       parentServer->server->eventLoop() ;
123      }
124
125    }
126
127    list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList)
128    {
129      list<int>::iterator itServer,itSize ;
130      list<CClientBuffer*> bufferList ;
131      map<int,CClientBuffer*>::iterator it ;
132      list<CClientBuffer*>::iterator itBuffer ;
133      list<CBufferOut*>  retBuffer ;
134      bool free ;
135
136      for(itServer=serverList.begin();itServer!=serverList.end();itServer++)
137      {
138        it=buffers.find(*itServer) ;
139        if (it==buffers.end())
140        {
141          newBuffer(*itServer) ;
142          it=buffers.find(*itServer) ;
143        }
144        bufferList.push_back(it->second) ;
145      }
146      free=false ;
147
148      CTimer::get("Blocking time").resume();
149      while(!free)
150      {
151        free=true ;
152        for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
153        {
154          (*itBuffer)->checkBuffer() ;
155         free&=(*itBuffer)->isBufferFree(*itSize) ;
156        }
157      }
158      CTimer::get("Blocking time").suspend();
159
160      for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
161      {
162        retBuffer.push_back((*itBuffer)->getBuffer(*itSize)) ;
163      }
164      return retBuffer ;
165
166   }
167
168   void CContextClient::newBuffer(int rank)
169   {
170//     buffers[rank]=new CClientBuffer(interComm,rank);
171      buffers[rank]=new CClientBuffer(interComm,rank, mapBufferSize_[rank]) ;
172   }
173
174   bool CContextClient::checkBuffers(void)
175   {
176      map<int,CClientBuffer*>::iterator itBuff ;
177      bool pending=false ;
178      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) pending|=itBuff->second->checkBuffer() ;
179      return pending ;
180   }
181
182   void CContextClient::releaseBuffers(void)
183   {
184      map<int,CClientBuffer*>::iterator itBuff ;
185      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) delete itBuff->second ;
186   }
187
188   bool CContextClient::checkBuffers(list<int>& ranks)
189   {
190      list<int>::iterator it ;
191      bool pending=false ;
192      for(it=ranks.begin();it!=ranks.end();it++) pending|=buffers[*it]->checkBuffer() ;
193      return pending ;
194   }
195
196   void CContextClient::setBufferSize(const std::map<int, StdSize>& mapSize)
197   {
198     mapBufferSize_ = mapSize;
199     sendBufferSizeEvent();
200   }
201
202   int CContextClient::getServerLeader(void)
203   {
204     int clientByServer=clientSize/serverSize ;
205     int remain=clientSize%serverSize ;
206
207     if (clientRank<(clientByServer+1)*remain)
208     {
209       return clientRank/(clientByServer+1) ;
210     }
211     else
212     {
213       int rank=clientRank-(clientByServer+1)*remain ;
214       int nbServer=serverSize-remain ;
215       return remain+rank/clientByServer ;
216     }
217   }
218
219   bool CContextClient::isServerLeader(void)
220   {
221     int clientByServer=clientSize/serverSize ;
222     int remain=clientSize%serverSize ;
223
224     if (clientRank<(clientByServer+1)*remain)
225     {
226       if (clientRank%(clientByServer+1)==0) return true ;
227       else return false ;
228     }
229     else
230     {
231       int rank=clientRank-(clientByServer+1)*remain ;
232       int nbServer=serverSize-remain ;
233       if  (rank%clientByServer==0) return true ;
234       else return false ;
235     }
236   }
237
238   void CContextClient::finalize(void)
239   {
240
241     map<int,CClientBuffer*>::iterator itBuff ;
242     bool stop=true ;
243
244     CEventClient event(CContext::GetType(),CContext::EVENT_ID_CONTEXT_FINALIZE) ;
245     if (isServerLeader())
246     {
247       CMessage msg ;
248       event.push(getServerLeader(),1,msg) ;
249       sendEvent(event) ;
250     }
251     else sendEvent(event) ;
252
253     CTimer::get("Blocking time").resume();
254     while(stop)
255     {
256       checkBuffers() ;
257       stop=false ;
258       for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) stop|=itBuff->second->hasPendingRequest() ;
259     }
260     CTimer::get("Blocking time").suspend();
261//     report(0)<< " Memory report : Context <"<<context->getId()<<"> : client side : total memory used for buffer "<<buffers.size()*CXios::bufferSize<<" bytes"<<endl ;
262
263     std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
264                                            iteMap = mapBufferSize_.end(), itMap;
265     StdSize totalBuf = 0;
266     for (itMap = itbMap; itMap != iteMap; ++itMap)
267     {
268       report(10)<< " Memory report : Context <"<<context->getId()<<"> : client side : memory used for buffer of each connection to server" << endl
269                 << "  +)To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
270       totalBuf += itMap->second;
271     }
272     report(0)<< " Memory report : Context <"<<context->getId()<<"> : client side : total memory used for buffer "<<totalBuf<<" bytes"<<endl ;
273
274     releaseBuffers() ;
275   }
276}
Note: See TracBrowser for help on using the repository browser.