Changeset 2560


Ignore:
Timestamp:
09/11/23 14:31:18 (16 months ago)
Author:
jderouillat
Message:

Forced the request processing, on servers, to be ordered in the P2P protocol

Location:
XIOS3/trunk/src/transport
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/transport/p2p_context_server.cpp

    r2559 r2560  
    101101      if (flag==true) 
    102102      { 
    103         requests_.push_back(new CRequest(interCommMerged_, status)) ; 
    104         if (requests_.back()->test())  
     103        int rank=status.MPI_SOURCE ; 
     104        requests_[rank].push_back(new CRequest(interCommMerged_, status)) ; 
     105        // Test 1st request of the list, request treatment must be ordered  
     106        if (requests_[rank].front()->test()) 
    105107        { 
    106           if ( processRequest( *(requests_.back()) ) ) 
    107           { 
    108             delete requests_.back(); 
    109             requests_.pop_back() ; 
    110           } 
     108          processRequest( *(requests_[rank].front()) ); 
     109          delete requests_[rank].front(); 
     110          requests_[rank].pop_front() ; 
    111111        } 
    112112      } 
     
    116116  void CP2pContextServer::listenPendingRequest(void) 
    117117  { 
    118     auto it = requests_.begin() ; 
    119     while (it != requests_.end()) 
    120     { 
    121       if ((*it)->test()) 
    122       { 
    123         if (processRequest(*(*it))) 
    124         { 
    125           delete (*it); 
    126           auto it2=it ; 
    127           ++it ; 
    128           requests_.erase(it2) ; 
    129         } 
    130       } 
    131       else ++it ; 
    132     } 
    133   } 
    134  
    135   bool CP2pContextServer::processRequest(CRequest& request) 
     118    for(auto it_rank=requests_.begin() ; it_rank!=requests_.end() ; ++it_rank) 
     119    { 
     120      int rank = it_rank->first; 
     121      while ( (!requests_[rank].empty()) && (requests_[rank].front()->test()) ) 
     122      { 
     123        processRequest( *(requests_[rank].front()) ); 
     124        delete requests_[rank].front(); 
     125        requests_[rank].pop_front() ; 
     126      } 
     127    } 
     128  } 
     129 
     130  void CP2pContextServer::processRequest(CRequest& request) 
    136131  { 
    137132    int rank = request.getRank() ; 
    138133    auto it=buffers_.find(rank); 
    139     // getCount(new CP2pServerBuffer) = sizeof(MPI_AINT) 
    140     // getCount(RESIZE) : size_t timeline, size_t size + size_t EVENT_BUFFER_RESIZE = 24 
    141     // getCount(HEADER) : size_t timeline, int nbSenders, int nbBlocs 
    142     //                     + nbBlocs * (sizeof(MPI_Aint) addr + int count + int window) = 16 + nbBlocs * 16 
    143     if ((it==buffers_.end())&&(request.getCount() < 3*sizeof(size_t))) 
     134    if (it==buffers_.end()) 
    144135    { 
    145136      buffers_[rank] = new CP2pServerBuffer(rank, commSelf_, interCommMerged_, pendingEvents_, completedEvents_, request.getBuffer()) ; 
    146       return true; 
    147     } 
    148     else if (it!=buffers_.end()) { 
     137    } 
     138    else 
     139    { 
    149140      it->second->receivedRequest(request.getBuffer()) ; 
    150       return true; 
    151     } 
    152     else 
    153       return false; 
     141    } 
    154142  } 
    155143 
  • XIOS3/trunk/src/transport/p2p_context_server.hpp

    r2559 r2560  
    6363    void listen(void) ; 
    6464    void listenPendingRequest(void) ; 
    65     bool processRequest(CRequest& request) ; 
     65    void processRequest(CRequest& request) ; 
    6666    void checkBuffers(void) ; 
    6767    void processEvents(bool enableEventsProcessing) ; 
     
    106106      MPI_Request processEventRequest_ ; 
    107107 
    108       std::list<CRequest*> requests_ ; 
     108      std::map<int,std::list<CRequest*> >requests_ ; 
    109109 
    110110      std::map<size_t, SPendingEvent> pendingEvents_   ; 
Note: See TracChangeset for help on using the changeset viewer.