Ignore:
Timestamp:
05/26/15 16:13:46 (9 years ago)
Author:
rlacroix
Message:

Allow using more servers than clients.

This will be useful later when implementing server to client communications.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/context_client.cpp

    r591 r595  
    2121    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used on case of attached mode) 
    2222    */ 
    23     CContextClient::CContextClient(CContext* parent,MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) 
     23    CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) 
    2424     : mapBufferSize_(), parentServer(cxtSer) 
    2525    { 
    26       context=parent ; 
    27       intraComm=intraComm_ ; 
    28       interComm=interComm_ ; 
    29       MPI_Comm_rank(intraComm,&clientRank) ; 
    30       MPI_Comm_size(intraComm,&clientSize) ; 
    31  
    32       int flag ; 
    33       MPI_Comm_test_inter(interComm,&flag) ; 
    34       if (flag) MPI_Comm_remote_size(interComm,&serverSize); 
    35       else  MPI_Comm_size(interComm,&serverSize) ; 
    36  
    37       timeLine=0 ; 
    38  
     26      context = parent; 
     27      intraComm = intraComm_; 
     28      interComm = interComm_; 
     29      MPI_Comm_rank(intraComm, &clientRank); 
     30      MPI_Comm_size(intraComm, &clientSize); 
     31 
     32      int flag; 
     33      MPI_Comm_test_inter(interComm, &flag); 
     34      if (flag) MPI_Comm_remote_size(interComm, &serverSize); 
     35      else  MPI_Comm_size(interComm, &serverSize); 
     36 
     37      if (clientSize < serverSize) 
     38      { 
     39        int serverByClient = serverSize / clientSize; 
     40        int remain = serverSize % clientSize; 
     41        int rankStart = serverByClient * clientRank; 
     42 
     43        if (clientRank < remain) 
     44        { 
     45          serverByClient++; 
     46          rankStart += clientRank; 
     47        } 
     48        else 
     49          rankStart += remain; 
     50 
     51        for (int i = 0; i < serverByClient; i++) 
     52          ranksServerLeader.push_back(rankStart + i); 
     53      } 
     54      else 
     55      { 
     56        int clientByServer = clientSize / serverSize; 
     57        int remain = clientSize % serverSize; 
     58 
     59        if (clientRank < (clientByServer + 1) * remain) 
     60        { 
     61          if (clientRank % (clientByServer + 1) == 0) 
     62            ranksServerLeader.push_back(clientRank / (clientByServer + 1)); 
     63        } 
     64        else 
     65        { 
     66          int rank = clientRank - (clientByServer + 1) * remain; 
     67          if (rank % clientByServer == 0) 
     68            ranksServerLeader.push_back(remain + rank / clientByServer); 
     69        } 
     70      } 
     71 
     72      timeLine = 0; 
    3973    } 
    4074 
     
    4579    void CContextClient::sendEvent(CEventClient& event) 
    4680    { 
    47       list<int>::iterator itServer ; 
    48       list<int> ranks ; 
    49       list<int> sizes ; 
    50       list<int>::iterator itSize ; 
    51  
    52       ranks=event.getRanks() ; 
    53       if (! event.isEmpty()) 
    54       { 
    55         sizes=event.getSizes() ; 
    56         CMessage msg ; 
    57  
    58         msg<<*(sizes.begin())<<timeLine ; 
    59         for(list<int>::iterator it=sizes.begin();it!=sizes.end();it++) *it+=msg.size() ; 
    60         list<CBufferOut*> buffList=getBuffers(ranks,sizes) ; 
    61  
    62         list<CBufferOut*>::iterator it ; 
    63         for(it=buffList.begin(),itSize=sizes.begin();it!=buffList.end();++it,++itSize) 
    64         { 
    65           **it<<*itSize<<timeLine ; 
    66         } 
    67         event.send(buffList) ; 
    68         checkBuffers(ranks) ; 
    69       } 
    70  
    71 //      if (context->hasServer) 
    72       if (0 != parentServer) 
     81      list<int>::iterator itServer; 
     82      list<int> ranks; 
     83      list<int> sizes; 
     84      list<int>::iterator itSize; 
     85 
     86      ranks = event.getRanks(); 
     87      if (!event.isEmpty()) 
     88      { 
     89        sizes = event.getSizes(); 
     90        CMessage msg; 
     91 
     92        msg << *(sizes.begin()) << timeLine; 
     93        for (list<int>::iterator it = sizes.begin(); it != sizes.end(); it++) *it += msg.size(); 
     94        list<CBufferOut*> buffList = getBuffers(ranks, sizes); 
     95 
     96        list<CBufferOut*>::iterator it; 
     97        for (it = buffList.begin(), itSize = sizes.begin(); it != buffList.end(); ++it, ++itSize) 
     98        { 
     99          **it << *itSize << timeLine; 
     100        } 
     101        event.send(buffList); 
     102        checkBuffers(ranks); 
     103      } 
     104 
     105      if (0 != parentServer) // context->hasServer 
    73106      { 
    74107        waitEvent(ranks); 
     
    76109      } 
    77110 
    78       timeLine++ ; 
     111      timeLine++; 
    79112    } 
    80113 
     
    85118    void CContextClient::sendBufferSizeEvent() 
    86119    { 
    87       std::map<int, CClientBuffer*>::iterator it, itE; 
    88       std::map<int, StdSize>::const_iterator itMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end(); 
     120      std::map<int,CClientBuffer*>::iterator it, itE; 
     121      std::map<int,StdSize>::const_iterator itMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end(); 
    89122 
    90123      if (itMap == iteMap) 
    91          ERROR("CBufferOut*  CContextClient::sendBufferSizeEvent() ;", 
     124         ERROR("void CContextClient::sendBufferSizeEvent()", 
    92125              <<"No information about server buffer, that should not happen..."); 
    93126 
     
    115148    void CContextClient::waitEvent(list<int>& ranks) 
    116149    { 
    117 //      context->server->setPendingEvent() ; 
    118 //      while(checkBuffers(ranks)) 
     150//      context->server->setPendingEvent(); 
     151//      while (checkBuffers(ranks)) 
    119152//      { 
    120 //        context->server->listen() ; 
    121 //        context->server->checkPendingRequest() ; 
     153//        context->server->listen(); 
     154//        context->server->checkPendingRequest(); 
    122155//      } 
    123156// 
    124 //      while(context->server->hasPendingEvent()) 
     157//      while (context->server->hasPendingEvent()) 
    125158//      { 
    126 //       context->server->eventLoop() ; 
     159//       context->server->eventLoop(); 
    127160//      } 
    128161 
    129       parentServer->server->setPendingEvent() ; 
    130       while(checkBuffers(ranks)) 
    131       { 
    132         parentServer->server->listen() ; 
    133         parentServer->server->checkPendingRequest() ; 
    134       } 
    135  
    136       while(parentServer->server->hasPendingEvent()) 
    137       { 
    138        parentServer->server->eventLoop() ; 
     162      parentServer->server->setPendingEvent(); 
     163      while (checkBuffers(ranks)) 
     164      { 
     165        parentServer->server->listen(); 
     166        parentServer->server->checkPendingRequest(); 
     167      } 
     168 
     169      while (parentServer->server->hasPendingEvent()) 
     170      { 
     171       parentServer->server->eventLoop(); 
    139172      } 
    140173    } 
     
    148181    list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList) 
    149182    { 
    150       list<int>::iterator itServer,itSize ; 
    151       list<CClientBuffer*> bufferList ; 
    152       map<int,CClientBuffer*>::iterator it ; 
    153       list<CClientBuffer*>::iterator itBuffer ; 
    154       list<CBufferOut*>  retBuffer ; 
    155       bool free ; 
    156  
    157       for(itServer=serverList.begin();itServer!=serverList.end();itServer++) 
    158       { 
    159         it=buffers.find(*itServer) ; 
    160         if (it==buffers.end()) 
    161         { 
    162           newBuffer(*itServer) ; 
    163           it=buffers.find(*itServer) ; 
    164         } 
    165         bufferList.push_back(it->second) ; 
    166       } 
    167       free=false ; 
     183      list<int>::iterator itServer, itSize; 
     184      list<CClientBuffer*> bufferList; 
     185      map<int,CClientBuffer*>::iterator it; 
     186      list<CClientBuffer*>::iterator itBuffer; 
     187      list<CBufferOut*>  retBuffer; 
     188      bool free; 
     189 
     190      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) 
     191      { 
     192        it = buffers.find(*itServer); 
     193        if (it == buffers.end()) 
     194        { 
     195          newBuffer(*itServer); 
     196          it = buffers.find(*itServer); 
     197        } 
     198        bufferList.push_back(it->second); 
     199      } 
     200      free = false; 
    168201 
    169202      CTimer::get("Blocking time").resume(); 
    170       while(!free) 
    171       { 
    172         free=true ; 
    173         for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++) 
    174         { 
    175           (*itBuffer)->checkBuffer() ; 
    176          free&=(*itBuffer)->isBufferFree(*itSize) ; 
     203      while (!free) 
     204      { 
     205        free = true; 
     206        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     207        { 
     208          (*itBuffer)->checkBuffer(); 
     209         free &= (*itBuffer)->isBufferFree(*itSize); 
    177210        } 
    178211      } 
    179212      CTimer::get("Blocking time").suspend(); 
    180213 
    181       for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++) 
    182       { 
    183         retBuffer.push_back((*itBuffer)->getBuffer(*itSize)) ; 
    184       } 
    185       return retBuffer ; 
    186  
     214      for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     215      { 
     216        retBuffer.push_back((*itBuffer)->getBuffer(*itSize)); 
     217      } 
     218      return retBuffer; 
    187219   } 
    188220 
     
    193225   void CContextClient::newBuffer(int rank) 
    194226   { 
    195       buffers[rank]=new CClientBuffer(interComm,rank, mapBufferSize_[rank]) ; 
     227      buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank]); 
    196228   } 
    197229 
     
    202234   bool CContextClient::checkBuffers(void) 
    203235   { 
    204       map<int,CClientBuffer*>::iterator itBuff ; 
    205       bool pending=false ; 
    206       for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) pending|=itBuff->second->checkBuffer() ; 
    207       return pending ; 
     236      map<int,CClientBuffer*>::iterator itBuff; 
     237      bool pending = false; 
     238      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) pending |= itBuff->second->checkBuffer(); 
     239      return pending; 
    208240   } 
    209241 
     
    211243   void CContextClient::releaseBuffers(void) 
    212244   { 
    213       map<int,CClientBuffer*>::iterator itBuff ; 
    214       for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) delete itBuff->second ; 
     245      map<int,CClientBuffer*>::iterator itBuff; 
     246      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) delete itBuff->second; 
    215247   } 
    216248 
     
    222254   bool CContextClient::checkBuffers(list<int>& ranks) 
    223255   { 
    224       list<int>::iterator it ; 
    225       bool pending=false ; 
    226       for(it=ranks.begin();it!=ranks.end();it++) pending|=buffers[*it]->checkBuffer() ; 
    227       return pending ; 
     256      list<int>::iterator it; 
     257      bool pending = false; 
     258      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(); 
     259      return pending; 
    228260   } 
    229261 
     
    232264   \param [in] mapSize mapping rank of connected server to size of allocated buffer 
    233265   */ 
    234    void CContextClient::setBufferSize(const std::map<int, StdSize>& mapSize) 
     266   void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize) 
    235267   { 
    236268     mapBufferSize_ = mapSize; 
     
    238270   } 
    239271 
    240    /*! 
    241    Get leading server in the group of connected server 
    242    \return rank of leading server 
    243    */ 
    244    int CContextClient::getServerLeader(void) 
    245    { 
    246      int clientByServer=clientSize/serverSize ; 
    247      int remain=clientSize%serverSize ; 
    248  
    249      if (clientRank<(clientByServer+1)*remain) 
    250      { 
    251        return clientRank/(clientByServer+1) ; 
    252      } 
    253      else 
    254      { 
    255        int rank=clientRank-(clientByServer+1)*remain ; 
    256        int nbServer=serverSize-remain ; 
    257        return remain+rank/clientByServer ; 
    258      } 
    259    } 
    260  
    261    /*! 
    262    Check if client connects to leading server 
    263    \return connected(true), not connected (false) 
    264    */ 
    265    bool CContextClient::isServerLeader(void) 
    266    { 
    267      int clientByServer=clientSize/serverSize ; 
    268      int remain=clientSize%serverSize ; 
    269  
    270      if (clientRank<(clientByServer+1)*remain) 
    271      { 
    272        if (clientRank%(clientByServer+1)==0) return true ; 
    273        else return false ; 
    274      } 
    275      else 
    276      { 
    277        int rank=clientRank-(clientByServer+1)*remain ; 
    278        int nbServer=serverSize-remain ; 
    279        if  (rank%clientByServer==0) return true ; 
    280        else return false ; 
    281      } 
    282    } 
     272  /*! 
     273  Get leading server in the group of connected server 
     274  \return ranks of leading servers 
     275  */ 
     276  const std::list<int>& CContextClient::getRanksServerLeader(void) const 
     277  { 
     278    return ranksServerLeader; 
     279  } 
     280 
     281  /*! 
     282  Check if client connects to leading server 
     283  \return connected(true), not connected (false) 
     284  */ 
     285  bool CContextClient::isServerLeader(void) const 
     286  { 
     287    return !ranksServerLeader.empty(); 
     288  } 
    283289 
    284290   /*! 
     
    287293   void CContextClient::finalize(void) 
    288294   { 
    289      map<int,CClientBuffer*>::iterator itBuff ; 
    290      bool stop=true ; 
    291  
    292      CEventClient event(CContext::GetType(),CContext::EVENT_ID_CONTEXT_FINALIZE) ; 
     295     map<int,CClientBuffer*>::iterator itBuff; 
     296     bool stop = true; 
     297 
     298     CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
    293299     if (isServerLeader()) 
    294300     { 
    295        CMessage msg ; 
    296        event.push(getServerLeader(),1,msg) ; 
    297        sendEvent(event) ; 
     301       CMessage msg; 
     302       const std::list<int>& ranks = getRanksServerLeader(); 
     303       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
     304         event.push(*itRank, 1, msg); 
     305       sendEvent(event); 
    298306     } 
    299      else sendEvent(event) ; 
     307     else sendEvent(event); 
    300308 
    301309     CTimer::get("Blocking time").resume(); 
    302      while(stop) 
     310     while (stop) 
    303311     { 
    304        checkBuffers() ; 
    305        stop=false ; 
    306        for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) stop|=itBuff->second->hasPendingRequest() ; 
     312       checkBuffers(); 
     313       stop = false; 
     314       for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest(); 
    307315     } 
    308316     CTimer::get("Blocking time").suspend(); 
    309317 
    310      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
    311                                             iteMap = mapBufferSize_.end(), itMap; 
     318     std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
     319                                           iteMap = mapBufferSize_.end(), itMap; 
    312320     StdSize totalBuf = 0; 
    313321     for (itMap = itbMap; itMap != iteMap; ++itMap) 
    314322     { 
    315        report(10)<< " Memory report : Context <"<<context->getId()<<"> : client side : memory used for buffer of each connection to server" << endl 
    316                  << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 
     323       report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 
     324                  << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 
    317325       totalBuf += itMap->second; 
    318326     } 
    319      report(0)<< " Memory report : Context <"<<context->getId()<<"> : client side : total memory used for buffer "<<totalBuf<<" bytes"<<endl ; 
    320  
    321      releaseBuffers() ; 
     327     report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 
     328 
     329     releaseBuffers(); 
    322330   } 
    323331} 
Note: See TracChangeset for help on using the changeset viewer.