Ignore:
Timestamp:
10/11/21 14:41:56 (3 years ago)
Author:
ymipsl
Message:
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.cpp

    r2240 r2246  
    5050      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 
    5151 
    52       if (flag)  
    53       { 
    54         MPI_Intercomm_merge(interComm_,false, &interCommMerged) ; 
    55         int interCommMergedRank; 
    56         MPI_Comm_rank(interComm_, &interCommMergedRank); 
    57         MPI_Comm_rank(interCommMerged, &interCommMergedRank); 
    58         MPI_Comm_rank(intraComm, &interCommMergedRank); 
    59       } 
     52      if (flag) MPI_Intercomm_merge(interComm_,false, &interCommMerged) ; 
    6053       
    6154      if (!isAttachedModeEnabled()) 
    6255      {   
     56 
     57        CTimer::get("create Windows").resume() ; 
     58 
     59        // We create dummy pair of intercommunicator between clients and server 
     60        // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 
     61        // We don't know the reason 
     62       
     63        MPI_Comm commSelf ; 
     64        MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 
     65        vector<MPI_Comm> dummyComm(serverSize) ; 
     66        for(int rank=0; rank<serverSize; rank++) MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &dummyComm[rank]) ; 
     67 
     68        // create windows for one-sided 
    6369        windows.resize(serverSize) ; 
    6470        MPI_Comm winComm ; 
     
    6773          windows[rank].resize(2) ; 
    6874          MPI_Comm_split(interCommMerged, rank, clientRank, &winComm); 
    69           int myRank ; 
    70           MPI_Comm_rank(winComm,&myRank); 
    7175          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][0]); 
    7276          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][1]); 
    7377//       ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 
    7478//            Bug or not ?           
    75 //        MPI_Comm_free(&winComm) ; 
    76         } 
    77       } 
    78  
    79       MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 
     79//          MPI_Comm_free(&winComm) ; 
     80        } 
     81         
     82        // free dummy intercommunicator => take times ? 
     83        for(int rank=0; rank<serverSize; rank++)  MPI_Comm_free(&dummyComm[rank]) ; 
     84        MPI_Comm_free(&commSelf) ; 
     85 
     86        CTimer::get("create Windows").resume() ; 
     87     } 
    8088 
    8189      auto time=chrono::system_clock::now().time_since_epoch().count() ; 
     
    281289      } 
    282290 
     291      double lastTimeBuffersNotFree=0. ; 
     292      double time ; 
     293      bool doUnlockBuffers ; 
    283294      CTimer::get("Blocking time").resume(); 
    284295      do 
    285296      { 
    286297        areBuffersFree = true; 
    287         for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
    288         { 
    289           areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 
    290         } 
     298        doUnlockBuffers=false ; 
     299        time=MPI_Wtime() ; 
     300        if (time-lastTimeBuffersNotFree > latency_) 
     301        { 
     302          for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     303          { 
     304            areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 
     305          } 
     306          if (!areBuffersFree) 
     307          { 
     308            lastTimeBuffersNotFree = time ; 
     309            doUnlockBuffers=true ; 
     310          }           
     311        } 
     312        else areBuffersFree = false ; 
    291313 
    292314        if (!areBuffersFree) 
    293315        { 
    294           for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 
     316          if (doUnlockBuffers) for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 
    295317          checkBuffers(); 
    296 /*           
    297           context->server->listen(); 
    298  
    299           if (context->serverPrimServer.size()>0) 
    300           { 
    301             for (int i = 0; i < context->serverPrimServer.size(); ++i)  context->serverPrimServer[i]->listen(); 
    302  //ym           CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 
    303             context->globalEventLoop() ; 
    304           } 
    305 */ 
    306            context_->globalEventLoop() ; 
     318 
     319          context_->globalEventLoop() ; 
    307320        } 
    308321 
     
    383396        } 
    384397      }  
    385  
    386398   } 
    387399 
Note: See TracChangeset for help on using the changeset viewer.