Ignore:
Timestamp:
01/22/21 12:00:29 (3 years ago)
Author:
yushan
Message:

Graph intermedia commit to a tmp branch

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_trunk_graph/src/context_client.cpp

    r1634 r2019  
    2222    */ 
    2323    CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) 
    24      : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4) 
    25     { 
     24     : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4), associatedServer_(nullptr) 
     25    { 
     26       
    2627      context = parent; 
    2728      intraComm = intraComm_; 
     
    3233      int flag; 
    3334      MPI_Comm_test_inter(interComm, &flag); 
     35      if (flag) isAttached_=false ; 
     36      else  isAttached_=true ; 
     37 
     38      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
     39      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
     40       
     41 
     42 
    3443      if (flag) MPI_Comm_remote_size(interComm, &serverSize); 
    3544      else  MPI_Comm_size(interComm, &serverSize); 
     
    3746      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 
    3847 
    39       timeLine = 0; 
     48      if (flag) MPI_Intercomm_merge(interComm_,false,&interCommMerged) ; 
     49       
     50      if (!isAttachedModeEnabled()) 
     51      {   
     52        windows.resize(serverSize) ; 
     53        MPI_Comm winComm ; 
     54        for(int rank=0; rank<serverSize; rank++) 
     55        { 
     56          windows[rank].resize(2) ; 
     57          MPI_Comm_split(interCommMerged, rank, clientRank, &winComm); 
     58          int myRank ; 
     59          MPI_Comm_rank(winComm,&myRank); 
     60          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][0]); 
     61          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][1]); 
     62          MPI_Comm_free(&winComm) ; 
     63        } 
     64      } 
     65 
     66      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 
     67 
     68      timeLine = 1; 
    4069    } 
    4170 
     
    116145        list<int> sizes = event.getSizes(); 
    117146 
    118         // We force the getBuffers call to be non-blocking on classical servers 
     147         // We force the getBuffers call to be non-blocking on classical servers 
    119148        list<CBufferOut*> buffList; 
    120         bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 
    121 //        bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer ); 
    122  
    123         if (couldBuffer) 
    124         { 
    125           event.send(timeLine, sizes, buffList); 
    126           info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<"  sent"<<endl ; 
    127  
    128           checkBuffers(ranks); 
    129  
    130           if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
    131           { 
    132             waitEvent(ranks); 
    133             CContext::setCurrent(context->getId()); 
    134           } 
    135         } 
    136         else 
    137         { 
    138           tmpBufferedEvent.ranks = ranks; 
    139           tmpBufferedEvent.sizes = sizes; 
    140  
    141           for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 
    142             tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 
    143           info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ; 
    144           event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 
    145           info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<"  sent"<<endl ; 
    146         } 
    147       } 
    148  
     149        getBuffers(timeLine, ranks, sizes, buffList) ; 
     150 
     151        event.send(timeLine, sizes, buffList); 
     152        
     153        //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ; 
     154 
     155        unlockBuffers(ranks) ; 
     156        info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<"  sent"<<endl ; 
     157           
     158        checkBuffers(ranks); 
     159      } 
     160       
     161      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
     162      { 
     163        waitEvent(ranks); 
     164        CContext::setCurrent(context->getId()); 
     165      } 
     166       
    149167      timeLine++; 
    150     } 
    151  
    152     /*! 
    153      * Send the temporarily buffered event (if any). 
    154      * 
    155      * \return true if a temporarily buffered event could be sent, false otherwise  
    156      */ 
    157     bool CContextClient::sendTemporarilyBufferedEvent() 
    158     { 
    159       bool couldSendTmpBufferedEvent = false; 
    160  
    161       if (hasTemporarilyBufferedEvent()) 
    162       { 
    163         list<CBufferOut*> buffList; 
    164         if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call 
    165         { 
    166           list<CBufferOut*>::iterator it, itBuffer; 
    167  
    168           for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++) 
    169             (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 
    170  
    171           info(100)<<"DEBUG : temporaly event sent "<<endl ; 
    172           checkBuffers(tmpBufferedEvent.ranks); 
    173  
    174           tmpBufferedEvent.clear(); 
    175  
    176           couldSendTmpBufferedEvent = true; 
    177         } 
    178       } 
    179  
    180       return couldSendTmpBufferedEvent; 
    181168    } 
    182169 
     
    188175    void CContextClient::waitEvent(list<int>& ranks) 
    189176    { 
     177      while (checkBuffers(ranks)) 
     178      { 
     179        CXios::getDaemonsManager()->eventLoop() ; 
     180      } 
     181 
     182      MPI_Request req ; 
     183      MPI_Status status ; 
     184 
     185      MPI_Ibarrier(intraComm,&req) ; 
     186      int flag=false ; 
     187 
     188      do   
     189      { 
     190        CXios::getDaemonsManager()->eventLoop() ; 
     191        MPI_Test(&req,&flag,&status) ; 
     192      } while (!flag) ; 
     193 
     194 
     195    } 
     196 
     197 
     198    void CContextClient::waitEvent_old(list<int>& ranks) 
     199    { 
    190200      parentServer->server->setPendingEvent(); 
    191201      while (checkBuffers(ranks)) 
     
    205215     * it is explicitly requested to be non-blocking. 
    206216     * 
     217     * 
     218     * \param [in] timeLine time line of the event which will be sent to servers 
    207219     * \param [in] serverList list of rank of connected server 
    208220     * \param [in] sizeList size of message corresponding to each connection 
     
    211223     * \return whether the already allocated buffers could be used 
    212224    */ 
    213     bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 
     225    bool CContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 
    214226                                    bool nonBlocking /*= false*/) 
    215227    { 
     
    236248        areBuffersFree = true; 
    237249        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     250        { 
    238251          areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 
     252        } 
    239253 
    240254        if (!areBuffersFree) 
    241255        { 
     256          for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 
    242257          checkBuffers(); 
    243           if (CServer::serverLevel == 0) 
    244             context->server->listen(); 
    245  
    246           else if (CServer::serverLevel == 1) 
     258           
     259          context->server->listen(); 
     260 
     261          if (context->serverPrimServer.size()>0) 
    247262          { 
    248             context->server->listen(); 
    249             for (int i = 0; i < context->serverPrimServer.size(); ++i) 
    250               context->serverPrimServer[i]->listen(); 
    251             CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 
     263            for (int i = 0; i < context->serverPrimServer.size(); ++i)  context->serverPrimServer[i]->listen(); 
     264 //ym           CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 
     265            context->globalEventLoop() ; 
    252266          } 
    253267 
    254           else if (CServer::serverLevel == 2) 
    255             context->server->listen(); 
    256  
    257268        } 
    258269      } while (!areBuffersFree && !nonBlocking); 
    259  
    260270      CTimer::get("Blocking time").suspend(); 
    261271 
     
    263273      { 
    264274        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
    265           retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 
    266       } 
    267  
     275          retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 
     276      } 
    268277      return areBuffersFree; 
    269278   } 
     
    281290        maxEventSizes[rank] = CXios::minBufferSize; 
    282291      } 
    283       CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank], maxBufferedEvents); 
     292       
     293      vector<MPI_Win> Wins(2,MPI_WIN_NULL) ; 
     294      if (!isAttachedModeEnabled()) Wins=windows[rank] ; 
     295   
     296      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, Wins, clientRank, rank, mapBufferSize_[rank], maxEventSizes[rank]); 
    284297      // Notify the server 
    285       CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize)); 
    286       bufOut->put(mapBufferSize_[rank]); // Stupid C++ 
    287       buffer->checkBuffer(); 
     298      CBufferOut* bufOut = buffer->getBuffer(0, 3*sizeof(MPI_Aint)); 
     299      MPI_Aint sendBuff[3] ; 
     300      sendBuff[0]=mapBufferSize_[rank]; // Stupid C++ 
     301      sendBuff[1]=buffers[rank]->getWinAddress(0);  
     302      sendBuff[2]=buffers[rank]->getWinAddress(1);  
     303      info(100)<<"CContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl; 
     304      bufOut->put(sendBuff, 3); // Stupid C++ 
     305      buffer->checkBuffer(true); 
     306 
    288307   } 
    289308 
     
    297316      bool pending = false; 
    298317      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
    299         pending |= itBuff->second->checkBuffer(); 
     318        pending |= itBuff->second->checkBuffer(!pureOneSided); 
    300319      return pending; 
    301320   } 
     
    307326      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
    308327      { 
    309           delete itBuff->second; 
     328         delete itBuff->second; 
    310329      } 
    311330      buffers.clear(); 
    312    } 
    313  
     331 
     332/* don't know when release windows 
     333 
     334      if (!isAttachedModeEnabled()) 
     335      {   
     336        for(int rank=0; rank<serverSize; rank++) 
     337        { 
     338          MPI_Win_free(&windows[rank][0]); 
     339          MPI_Win_free(&windows[rank][1]); 
     340        } 
     341      }  
     342*/ 
     343   } 
     344 
     345       
     346  /*! 
     347   Lock the buffers for one sided communications 
     348   \param [in] ranks list rank of server to which client connects to 
     349   */ 
     350   void CContextClient::lockBuffers(list<int>& ranks) 
     351   { 
     352      list<int>::iterator it; 
     353      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer(); 
     354   } 
     355 
     356  /*! 
     357   Unlock the buffers for one sided communications 
     358   \param [in] ranks list rank of server to which client connects to 
     359   */ 
     360   void CContextClient::unlockBuffers(list<int>& ranks) 
     361   { 
     362      list<int>::iterator it; 
     363      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer(); 
     364   } 
     365       
    314366   /*! 
    315367   Verify state of buffers corresponding to a connection 
     
    321373      list<int>::iterator it; 
    322374      bool pending = false; 
    323       for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(); 
     375      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided); 
    324376      return pending; 
    325377   } 
     
    335387     mapBufferSize_ = mapSize; 
    336388     maxEventSizes = maxEventSize; 
    337  
    338      // Compute the maximum number of events that can be safely buffered. 
    339      double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max(); 
    340      for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it) 
    341      { 
    342        double ratio = double(it->second) / maxEventSizes[it->first]; 
    343        if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio; 
    344      } 
    345      MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm); 
    346  
    347      if (minBufferSizeEventSizeRatio < 1.0) 
    348      { 
    349        ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)", 
    350              << "The buffer sizes and the maximum events sizes are incoherent."); 
    351      } 
    352      else if (minBufferSizeEventSizeRatio == std::numeric_limits<double>::max()) 
    353        minBufferSizeEventSizeRatio = 1.0; // In this case, maxBufferedEvents will never be used but we want to avoid any floating point exception 
    354  
    355      maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server 
    356                           + size_t(minBufferSizeEventSizeRatio)  // one local buffer can always be fully used 
    357                           + 1;                                   // the other local buffer might contain only one event 
    358389   } 
    359390 
     
    394425  } 
    395426 
    396   /*! 
    397    * Check if the attached mode is used. 
    398    * 
    399    * \return true if and only if attached mode is used 
    400    */ 
    401   bool CContextClient::isAttachedModeEnabled() const 
    402   { 
    403     return (parentServer != 0); 
    404   } 
    405  
    406427   /*! 
    407428   * Finalize context client and do some reports. Function is non-blocking. 
     
    410431  { 
    411432    map<int,CClientBuffer*>::iterator itBuff; 
     433    std::list<int>::iterator ItServerLeader;  
     434     
    412435    bool stop = false; 
    413436 
     437    int* nbServerConnectionLocal  = new int[serverSize] ; 
     438    int* nbServerConnectionGlobal  = new int[serverSize] ; 
     439    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ; 
     440    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ; 
     441    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ; 
     442     
     443    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm); 
     444     
     445    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
     446    CMessage msg; 
     447 
     448    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ; 
     449    sendEvent(event); 
     450 
     451    delete[] nbServerConnectionLocal ; 
     452    delete[] nbServerConnectionGlobal ; 
     453 
     454 
    414455    CTimer::get("Blocking time").resume(); 
    415     while (hasTemporarilyBufferedEvent()) 
    416     { 
    417       checkBuffers(); 
    418       sendTemporarilyBufferedEvent(); 
    419     } 
    420     CTimer::get("Blocking time").suspend(); 
    421  
    422     CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
    423     if (isServerLeader()) 
    424     { 
    425       CMessage msg; 
    426       const std::list<int>& ranks = getRanksServerLeader(); 
    427       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    428       { 
    429         info(100)<<"DEBUG : Sent context Finalize event to rank "<<*itRank<<endl ; 
    430         event.push(*itRank, 1, msg); 
    431       } 
    432       sendEvent(event); 
    433     } 
    434     else sendEvent(event); 
    435  
    436     CTimer::get("Blocking time").resume(); 
    437 //    while (!stop) 
    438     { 
    439       checkBuffers(); 
    440       if (hasTemporarilyBufferedEvent()) 
    441         sendTemporarilyBufferedEvent(); 
    442  
    443       stop = true; 
    444 //      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 
    445     } 
     456    checkBuffers(); 
    446457    CTimer::get("Blocking time").suspend(); 
    447458 
     
    458469    report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 
    459470 
    460     //releaseBuffers(); // moved to CContext::finalize() 
    461471  } 
    462472 
     
    472482    return pending; 
    473483  } 
    474  
     484   
     485  bool CContextClient::isNotifiedFinalized(void) 
     486  { 
     487    if (isAttachedModeEnabled()) return true ; 
     488 
     489    bool finalized = true; 
     490    map<int,CClientBuffer*>::iterator itBuff; 
     491    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
     492      finalized &= itBuff->second->isNotifiedFinalized(); 
     493    return finalized; 
     494  } 
    475495 
    476496} 
Note: See TracChangeset for help on using the changeset viewer.