Ignore:
Timestamp:
11/16/21 17:37:42 (3 years ago)
Author:
ymipsl
Message:

One sided protocol improvment.
YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp

    r2246 r2258  
    4646    else  attachedMode=true ; 
    4747     
    48     int clientSize ; 
    49     if (flag) MPI_Comm_remote_size(interComm,&clientSize); 
    50     else  MPI_Comm_size(interComm,&clientSize); 
     48    if (flag) MPI_Comm_remote_size(interComm,&clientSize_); 
     49    else  MPI_Comm_size(interComm,&clientSize_); 
    5150 
    5251    
     
    8079      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 
    8180 
    82       // We create dummy pair of intercommunicator between clients and server 
    83       // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 
    84       // We don't know the reason 
     81      double time ; 
     82      windows_.resize(clientSize_) ; 
    8583      MPI_Comm commSelf ; 
    8684      MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf) ; 
    87       vector<MPI_Comm> dummyComm(clientSize) ; 
    88       for(int rank=0; rank<clientSize ; rank++) MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &dummyComm[rank]) ; 
    89  
    90       // create windows for one sided comm 
    91       MPI_Comm winComm ; 
    92       windows.resize(2) ; 
    93       for(int rank=clientSize; rank<clientSize+intraCommSize; rank++) 
    94       { 
    95         if (rank==clientSize+intraCommRank)  
    96         { 
    97           MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 
    98           MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]); 
    99           MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);    
    100         } 
    101         else MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 
    102         //       ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 
    103         //            Bug or not ?           
    104         //         MPI_Comm_free(&winComm) ; 
    105       } 
    106        
    107       // free dummy intercommunicator 
    108       for(int rank=0; rank<clientSize ; rank++)  MPI_Comm_free(&dummyComm[rank]) ; 
     85      MPI_Comm interComm ; 
     86      winComm_.resize(clientSize_) ; 
     87      for(int rank=0; rank<clientSize_ ; rank++)  
     88      { 
     89        time=MPI_Wtime() ; 
     90        MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &interComm) ; 
     91        MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 
     92        windows_[rank].resize(2) ; 
     93        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
     94        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
     95        time=MPI_Wtime()-time ; 
     96        info(100)<< "MPI_Win_create_dynamic : server to client rank "<<rank<<" => "<<time/1e-6<<" us"<<endl ; 
     97      } 
    10998      MPI_Comm_free(&commSelf) ; 
    11099      CTimer::get("create Windows").suspend() ; 
     
    112101    else  
    113102    { 
    114       windows.resize(2) ; 
    115       windows[0]=MPI_WIN_NULL ; 
    116       windows[1]=MPI_WIN_NULL ; 
     103      winComm_.resize(clientSize_) ; 
     104      windows_.resize(clientSize_) ; 
     105      for(int rank=0; rank<clientSize_ ; rank++)  
     106      { 
     107        winComm_[rank] = MPI_COMM_NULL ; 
     108        windows_[rank].resize(2) ; 
     109        windows_[rank][0]=MPI_WIN_NULL ; 
     110        windows_[rank][1]=MPI_WIN_NULL ; 
     111      } 
    117112    } 
    118113     
     
    160155    return finished; 
    161156  } 
    162 /* 
    163   void CContextServer::listen(void) 
    164   { 
    165     int rank; 
    166     int flag; 
    167     int count; 
    168     char * addr; 
    169     MPI_Status status; 
    170     map<int,CServerBuffer*>::iterator it; 
    171     bool okLoop; 
    172  
    173     traceOff(); 
    174     // WARNING : with intel MPI, probing crash on an intercommunicator with release library but not with release_mt 
    175     // ==>  source $I_MPI_ROOT/intel64/bin/mpivars.sh release_mt    needed 
    176     MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status); 
    177     traceOn(); 
    178  
    179     if (flag==true) 
    180     { 
    181       rank=status.MPI_SOURCE ; 
    182       okLoop = true; 
    183       if (pendingRequest.find(rank)==pendingRequest.end()) 
    184         okLoop = !listenPendingRequest(status) ; 
    185       if (okLoop) 
    186       { 
    187         for(rank=0;rank<commSize;rank++) 
    188         { 
    189           if (pendingRequest.find(rank)==pendingRequest.end()) 
    190           { 
    191  
    192             traceOff(); 
    193             MPI_Iprobe(rank, 20,interComm,&flag,&status); 
    194             traceOn(); 
    195             if (flag==true) listenPendingRequest(status) ; 
    196           } 
    197         } 
    198       } 
    199     } 
    200   } 
    201  
    202   bool CContextServer::listenPendingRequest(MPI_Status& status) 
    203   { 
    204     int count; 
    205     char * addr; 
    206     map<int,CServerBuffer*>::iterator it; 
    207     int rank=status.MPI_SOURCE ; 
    208  
    209     it=buffers.find(rank); 
    210     if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
    211     { 
    212        MPI_Aint recvBuff[4] ; 
    213        MPI_Recv(recvBuff, 4, MPI_AINT, rank, 20, interComm, &status); 
    214        remoteHashId_ = recvBuff[0] ; 
    215        StdSize buffSize = recvBuff[1]; 
    216        vector<MPI_Aint> winAdress(2) ; 
    217        winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 
    218        mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    219        it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 
    220       
    221        lastTimeLine[rank]=0 ; 
    222        itLastTimeLine=lastTimeLine.begin() ; 
    223  
    224        return true; 
    225     } 
    226     else 
    227     { 
    228       MPI_Get_count(&status,MPI_CHAR,&count); 
    229       if (it->second->isBufferFree(count)) 
    230       { 
    231          addr=(char*)it->second->getBuffer(count); 
    232          MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]); 
    233          bufferRequest[rank]=addr; 
    234          return true; 
    235        } 
    236       else 
    237         return false; 
    238     } 
    239   } 
    240 */ 
    241157 
    242158 void CContextServer::listen(void) 
     
    274190       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 
    275191       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    276        it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 
     192       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first; 
    277193       lastTimeLine[rank]=0 ; 
    278194       itLastTimeLine=lastTimeLine.begin() ; 
     
    407323        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ; 
    408324        buffers.erase(rank) ; 
    409         buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, newSize))); 
     325        buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, newSize))); 
    410326      } 
    411327      else 
     
    500416  void CContextServer::releaseBuffers() 
    501417  { 
    502     map<int,CServerBuffer*>::iterator it; 
    503     bool out ; 
    504     do 
    505     { 
    506       out=true ; 
    507       for(it=buffers.begin();it!=buffers.end();++it) 
    508       { 
    509 //        out = out && it->second->freeWindows() ; 
    510  
    511       } 
    512     } while (! out) ;  
    513       MPI_Win_free(&windows[0]) ; 
    514       MPI_Win_free(&windows[1]) ; 
     418    for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ; 
     419    buffers.clear() ;  
     420    freeWindows() ; 
     421  } 
     422 
     423  void CContextServer::freeWindows() 
     424  { 
     425    if (!isAttachedModeEnabled()) 
     426    { 
     427      for(int rank=0; rank<clientSize_; rank++) 
     428      { 
     429        MPI_Win_free(&windows_[rank][0]); 
     430        MPI_Win_free(&windows_[rank][1]); 
     431        MPI_Comm_free(&winComm_[rank]) ; 
     432      } 
     433    } 
    515434  } 
    516435 
     
    538457      finished=true; 
    539458      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl; 
    540 //      releaseBuffers() ; 
    541459      notifyClientsFinalize() ; 
    542460      CTimer::get("receiving requests").suspend(); 
    543461      context->finalize(); 
    544  
    545 // don't know where release windows 
    546       MPI_Win_free(&windows[0]) ; 
    547       MPI_Win_free(&windows[1]) ; 
     462      freeWindows() ; 
    548463 
    549464      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
Note: See TracChangeset for help on using the changeset viewer.