Changeset 2399 for XIOS3/trunk/src/transport/one_sided_server_buffer.cpp
- Timestamp:
- 09/09/22 17:23:16 (22 months ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/transport/one_sided_server_buffer.cpp
r2346 r2399 61 61 else // receive standard event 62 62 { 63 63 info(logProtocol)<<"received request from rank : "<<clientRank_<<" with timeline : "<<timeline 64 <<" at time : "<<CTimer::get("XIOS server").getTime()<<endl ; 64 65 bufferIn>> nbSenders ; 65 66 nbSenders_[timeline] = nbSenders ; … … 101 102 { 102 103 info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ; 104 if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ; 103 105 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ; 104 106 MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ; 105 107 MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ; 106 108 MPI_Win_unlock(windowRank_,winControl_) ; 109 if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ; 107 110 lastBlocToFree_ = 0 ; 108 111 } … … 126 129 if (!pendingRmaRequests_.empty()) 127 130 { 128 int flag ; 131 int flag ; 132 133 if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ; 129 134 MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ; 135 if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ; 136 130 137 if (flag==true) 131 138 { … … 134 141 { 135 142 info(logProtocol)<<"unlock window "<<win<<endl ; 143 if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ; 136 144 MPI_Win_unlock(windowRank_,windows_[win]) ; 145 if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ; 137 146 } 138 147 windowsLocked_.clear() ; 139 148 149 150 if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ; 151 if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ; 152 153 size_t transferedSize = 0 ; 154 for(auto& count : pendingRmaCount_) transferedSize+=count ; 155 156 if (info.isActive(logProtocol)) 157 { 158 double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ; 159 info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<" nbBlocs : "<< pendingRmaStatus_.size() 160 << " total count = "<<transferedSize<<" duration : "<<time<<" s" 161 << " Bandwith : "<< transferedSize/time<< "byte/s"<<endl ; 162 CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ; 163 } 164 140 165 isLocked_=false ; 141 166 pendingRmaRequests_.clear() ; 142 167 pendingRmaStatus_.clear() ; 168 pendingRmaCount_.clear() ; 143 169 completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ; 144 170 … … 195 221 if (bufferResize_.front().first==timeline) 196 222 { 197 currentBufferSize_=bufferResize_.front().second ;223 currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 198 224 info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<" at timeline="<<timeline<<endl ; 199 225 bufferResize_.pop_front() ; … … 216 242 217 243 if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked"); 244 245 if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ; 246 if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ; 218 247 for(auto& bloc : blocs) 219 248 { … … 221 250 if (windowsLocked_.count(win)==0) 222 251 { 252 info(logProtocol)<<"lock window "<<win<<endl ; 253 if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ; 223 254 MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ; 255 if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ; 224 256 windowsLocked_.insert(win) ; 225 257 } … … 233 265 pendingBlocs_.erase(pendingBlocs_.begin()) ; 234 266 235 //break ; // transfering just one event temporary => to remove267 // break ; // transfering just one event temporary => to remove 236 268 237 269 if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop … … 244 276 if (bufferResize_.front().first==timeline) 245 277 { 246 currentBufferSize_=bufferResize_.front().second ;278 currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 247 279 info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<" at timeline="<<timeline<<endl ; 248 280 bufferResize_.pop_front() ; … … 259 291 if (windowsLocked_.count(win)==0) 260 292 { 293 info(logProtocol)<<"lock window "<<win<<endl ; 294 if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ; 261 295 MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ; 296 if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ; 262 297 windowsLocked_.insert(win) ; 263 298 } … … 331 366 MPI_Request request ; 332 367 MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ; 333 info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<" addr="<<addr<<" count="<<count<<" buffer="<<buffer<<" start="<<start<<endl ; 334 info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<" end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1) 335 <<" start="<<static_cast<void*>(buffer->getBuffer()+start)<<" end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ; 368 if (info.isActive(logProtocol)) 369 { 370 info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<" addr="<<addr<<" count="<<count<<" buffer="<<buffer<<" start="<<start<<endl ; 371 info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<" end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1) 372 <<" start="<<static_cast<void*>(buffer->getBuffer()+start)<<" end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ; 373 } 374 if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ; 336 375 MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ; 376 if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ; 337 377 pendingRmaRequests_.push_back(request) ; 378 pendingRmaCount_.push_back(count) ; 338 379 onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ; 339 380 } … … 348 389 349 390 ostringstream outStr ; 350 outStr<<"Received Event from client "<<clientRank_<<" timeline="<<timeline<<" nbBlocs="<<completedEvent.size()<<endl ; 391 if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<" timeline="<<timeline 392 <<" nbBlocs="<<completedEvent.size()<<endl ; 351 393 int i=0 ; 352 394 MPI_Aint addr ;
Note: See TracChangeset
for help on using the changeset viewer.