Ignore:
Timestamp:
10/18/19 14:55:57 (5 years ago)
Author:
ymipsl
Message:

Implement one sided communication in client/server protocol to avoid dead-lock when some buffer are full.

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.cpp

    r1639 r1757  
    3333    int flag; 
    3434    MPI_Comm_test_inter(interComm,&flag); 
     35 
     36    if (flag) attachedMode=false ; 
     37    else  attachedMode=true ; 
     38     
    3539    if (flag) MPI_Comm_remote_size(interComm,&commSize); 
    3640    else  MPI_Comm_size(interComm,&commSize); 
    3741 
    38     currentTimeLine=0; 
     42      
     43    currentTimeLine=1; 
    3944    scheduled=false; 
    4045    finished=false; 
     
    4449    else 
    4550      hashId=hashString(context->getId()); 
    46   } 
    47  
     51 
     52    if (!isAttachedModeEnabled()) 
     53    { 
     54      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 
     55// create windows for one sided comm 
     56      int interCommMergedRank; 
     57      MPI_Comm winComm ; 
     58      MPI_Comm_rank(intraComm, &interCommMergedRank); 
     59      windows.resize(2) ; 
     60      for(int rank=commSize; rank<commSize+intraCommSize; rank++) 
     61      { 
     62        if (rank==commSize+interCommMergedRank)  
     63        { 
     64          MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 
     65          int myRank ; 
     66          MPI_Comm_rank(winComm,&myRank); 
     67          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]); 
     68          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);       
     69        } 
     70        else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 
     71        MPI_Comm_free(&winComm) ; 
     72      } 
     73    } 
     74    else  
     75    { 
     76      windows.resize(2) ; 
     77      windows[0]=MPI_WIN_NULL ; 
     78      windows[1]=MPI_WIN_NULL ; 
     79    } 
     80 
     81 
     82     
     83    MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ; 
     84    itLastTimeLine=lastTimeLine.begin() ; 
     85 
     86    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
     87    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
     88       
     89  } 
     90 
     91//! Attached mode is used ? 
     92//! \return true if attached mode is used, false otherwise 
     93  bool CContextServer::isAttachedModeEnabled() const 
     94  { 
     95    return attachedMode ; 
     96  } 
     97   
    4898  void CContextServer::setPendingEvent(void) 
    4999  { 
     
    65115    listen(); 
    66116    checkPendingRequest(); 
    67     if (enableEventsProcessing) 
    68       processEvents(); 
     117    if (enableEventsProcessing)  processEvents(); 
    69118    return finished; 
    70119  } 
     
    117166    if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
    118167    { 
    119        StdSize buffSize = 0; 
    120        MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 
     168       MPI_Aint recvBuff[3] ; 
     169       MPI_Recv(recvBuff, 3, MPI_AINT, rank, 20, interComm, &status); 
     170       StdSize buffSize = recvBuff[0]; 
     171       vector<MPI_Aint> winAdress(2) ; 
     172       winAdress[0]=recvBuff[1] ; winAdress[1]=recvBuff[2] ; 
    121173       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    122        it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 
     174       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 
     175      /* 
     176       if (!isAttachedModeEnabled()) 
     177       { 
     178         MPI_Comm OneSidedInterComm, oneSidedComm ; 
     179         MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0, &OneSidedInterComm ); 
     180         MPI_Intercomm_merge(OneSidedInterComm,true,&oneSidedComm); 
     181         buffers[rank]->createWindows(oneSidedComm) ; 
     182       } 
     183       */ 
     184       lastTimeLine[rank]=0 ; 
     185       itLastTimeLine=lastTimeLine.begin() ; 
     186 
    123187       return true; 
    124188    } 
     
    157221      if (flag==true) 
    158222      { 
     223        buffers[rank]->updateCurrentWindows() ; 
    159224        recvRequest.push_back(rank); 
    160225        MPI_Get_count(&status,MPI_CHAR,&count); 
     
    170235  } 
    171236 
     237  void CContextServer::getBufferFromClient(size_t timeLine) 
     238  { 
     239    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 
     240    {   
     241      int rank ; 
     242      char *buffer ; 
     243      size_t count ;  
     244 
     245      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 
     246      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 
     247      { 
     248        rank=itLastTimeLine->first ; 
     249        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0) 
     250        { 
     251          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 
     252          { 
     253            processRequest(rank, buffer, count); 
     254            break ; 
     255          } 
     256        } 
     257      } 
     258    } 
     259  } 
     260          
     261        
    172262  void CContextServer::processRequest(int rank, char* buff,int count) 
    173263  { 
     
    176266    char* startBuffer,endBuffer; 
    177267    int size, offset; 
    178     size_t timeLine; 
     268    size_t timeLine=0; 
    179269    map<size_t,CEventServer*>::iterator it; 
    180270 
     271     
    181272    CTimer::get("Process request").resume(); 
    182273    while(count>0) 
     
    185276      CBufferIn newBuffer(startBuffer,buffer.remain()); 
    186277      newBuffer>>size>>timeLine; 
    187  
    188278      it=events.find(timeLine); 
    189279      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first; 
     
    193283      count=buffer.remain(); 
    194284    } 
     285 
     286    if (timeLine>0) lastTimeLine[rank]=timeLine ; 
     287     
    195288    CTimer::get("Process request").suspend(); 
    196289  } 
     
    230323        } 
    231324      } 
    232     } 
     325      else getBufferFromClient(currentTimeLine) ; 
     326    } 
     327    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line 
    233328  } 
    234329 
     
    237332    map<int,CServerBuffer*>::iterator it; 
    238333    for(it=buffers.begin();it!=buffers.end();++it) delete it->second; 
     334  } 
     335 
     336  void CContextServer::releaseBuffers() 
     337  { 
     338    map<int,CServerBuffer*>::iterator it; 
     339    bool out ; 
     340    do 
     341    { 
     342      out=true ; 
     343      for(it=buffers.begin();it!=buffers.end();++it) 
     344      { 
     345//        out = out && it->second->freeWindows() ; 
     346 
     347      } 
     348    } while (! out) ;  
     349  } 
     350 
     351  void CContextServer::notifyClientsFinalize(void) 
     352  { 
     353    for(auto it=buffers.begin();it!=buffers.end();++it) 
     354    { 
     355      it->second->notifyClientFinalize() ; 
     356    } 
    239357  } 
    240358 
     
    254372      finished=true; 
    255373      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl; 
     374//      releaseBuffers() ; 
     375      notifyClientsFinalize() ; 
    256376      context->finalize(); 
     377 
     378/* don't know where release windows 
     379      MPI_Win_free(&windows[0]) ; 
     380      MPI_Win_free(&windows[1]) ; 
     381*/      
    257382      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
    258383                           iteMap = mapBufferSize_.end(), itMap; 
Note: See TracChangeset for help on using the changeset viewer.