Ignore:
Timestamp:
10/11/21 14:41:56 (3 years ago)
Author:
ymipsl
Message:
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.cpp

    r2130 r2246  
    22#include "exception.hpp" 
    33#include "buffer_server.hpp" 
     4#include "timer.hpp" 
    45 
    56 
     
    3132  } 
    3233 
    33 /* 
    34   void CServerBuffer::createWindows(MPI_Comm oneSidedComm) 
    35   { 
    36     MPI_Barrier(oneSidedComm) ; 
    37     MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
    38     MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
    39     hasWindows=true ; 
    40     updateCurrentWindows() ; 
    41     MPI_Barrier(oneSidedComm) ; 
    42   } 
    43 */ 
    44  
    45 /* 
    46   bool CServerBuffer::freeWindows() 
    47   { 
    48     if (hasWindows) 
    49     { 
    50       size_t header[3] ; 
    51       size_t& control=header[2] ; 
    52       MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows_[0]) ; 
    53       MPI_Get(&control, 1, MPI_LONG_LONG_INT, windowsRank , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ; 
    54       MPI_Win_unlock(0,windows[0]) ; 
    55       if (control==2)  // ok for free windows 
    56       { 
    57         MPI_Win_free( &(windows[0])) ; 
    58         MPI_Win_free( &(windows[1])) ; 
    59         hasWindows=false ; 
    60         return true ; 
    61       } 
    62       else return false ; 
    63     } 
    64     else return true ; 
    65   } 
    66 */ 
    6734 
    6835  bool CServerBuffer::isBufferFree(size_t count) 
     
    222189  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 
    223190  { 
     191    count = -1 ; 
    224192    if (!hasWindows || resizingBuffer_) return false ; 
    225  
    226      
    227     size_t header[3] ; 
    228     size_t& clientTimeline=header[0] ; 
    229     size_t& clientCount=header[1] ; 
    230     size_t& control=header[2] ; 
     193    double time=MPI_Wtime() ; 
     194    if (time-bufferFromClientTime_ < bufferFromClientLatency_ ) return false; 
     195    bufferFromClientTime_ = time ; 
     196    CTimer::get("getBufferFromClient").resume() ;    
     197    size_t clientTimeline ; 
     198    size_t clientCount ; 
    231199    bool ok=false ; 
    232200     
     
    238206     
    239207    lockBuffer();  
    240  
     208    CTimer::get("getBufferFromClient_locked").resume() ;    
    241209// lock is acquired 
    242210 
    243     MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],0), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    244     MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     211    MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     212    MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    245213    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
    246214 
    247 //    control=1 ; 
    248 //    MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    249     
    250 //    MPI_Win_unlock(windowsRank_, windows_[currentWindows]) ; 
    251     MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
    252 //    info(100)<<"getBufferFromClient : windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 
    253215    if (timeLine==clientTimeline) 
    254216    { 
    255 //      info(50)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 
    256   
    257 //      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
    258217      buffer=(char*)getBuffer(clientCount) ; 
    259218      count=clientCount ; 
     
    261220      clientTimeline = 0 ; 
    262221      clientCount = 0 ; 
    263 //      control=0 ; 
    264       MPI_Put(&header[0], 2, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],0) , 2, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     222      MPI_Put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     223      MPI_Put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    265224 
    266225// release lock 
    267      unlockBuffer() ; 
     226      CTimer::get("getBufferFromClient_locked").suspend() ;    
     227      unlockBuffer() ; 
    268228 
    269229      ok=true ; 
     
    283243    else 
    284244    { 
    285       //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
    286       //control=0 ; 
    287       //MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     245      count=0 ; 
    288246  
    289247 // release lock 
     248      CTimer::get("getBufferFromClient_locked").suspend() ;  
    290249      unlockBuffer() ; 
    291250    } 
    292  
     251    CTimer::get("getBufferFromClient").suspend() ;    
    293252    if (ok) return true ; 
    294253 
     
    299258  { 
    300259    if (!hasWindows) return ; 
    301  
    302     long long int lock=1 ; 
    303     long long int zero=0, one=1 ; 
    304 //    control=1 ; 
    305260    MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
    306     while(lock!=0) 
    307     { 
    308       MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 
    309                            windows_[currentWindows]) ; 
    310       MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
    311     } 
    312261  } 
    313262 
     
    315264  { 
    316265    if (!hasWindows) return ; 
    317     long long int lock=1 ; 
    318     long long int zero=0, one=1 ; 
    319      
    320     MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 
    321                           windows_[currentWindows]) ; 
    322     MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;  
    323266    MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ; 
    324267  } 
     
    327270  { 
    328271    if (!hasWindows) return ; 
    329     size_t finalize=1 ; 
     272    size_t notify=notifyFinalize_ ; 
    330273    lockBuffer();  
    331274// lock is acquired 
    332     MPI_Put(&finalize, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],3*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     275    MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    333276    unlockBuffer() ; 
    334277  } 
     278 
     279  void CServerBuffer::notifyBufferResizing(void) 
     280  { 
     281    resizingBuffer_=true ; 
     282    if (!hasWindows) return ; 
     283    size_t notify=notifyResizeBuffer_ ; 
     284    lockBuffer();  
     285// lock is acquired 
     286    MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     287    unlockBuffer() ; 
     288  } 
    335289} 
Note: See TracChangeset for help on using the changeset viewer.