diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 1c8e4ea97d..455d0aaf36 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -143,7 +143,6 @@ public void run() throws IOException { } appendLog(installSnapshotRequired || haveTooManyPendingRequests()); - } getLeaderState().checkHealth(getFollower()); } @@ -235,7 +234,10 @@ private void sendRequest(AppendEntriesRequest request, AppendEntriesRequestProto scheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()), LOG, () -> "Timeout check failed for append entry request: " + request); - getFollower().updateLastRpcSendTime(); + request.recordSendTime(); + if (request.isHeartbeat()) { + getFollower().updateLastHeartBeatSendTime(); + } } private void timeoutAppendRequest(long cid, boolean heartbeat) { @@ -285,16 +287,17 @@ public void onNext(AppendEntriesReplyProto reply) { } try { - onNextImpl(reply); + Timestamp sendTime = request != null ? request.getSendTime() : Timestamp.currentTime(); + onNextImpl(reply, sendTime); } catch(Exception t) { LOG.error("Failed onNext request=" + request + ", reply=" + ServerStringUtils.toAppendEntriesReplyString(reply), t); } } - private void onNextImpl(AppendEntriesReplyProto reply) { + private void onNextImpl(AppendEntriesReplyProto reply, Timestamp sendTime) { // update the last rpc time - getFollower().updateLastRpcResponseTime(); + getFollower().updateLastRpcResponseTime(sendTime); if (!firstResponseReceived) { firstResponseReceived = true; @@ -358,7 +361,7 @@ private synchronized void updateNextIndex(long replyNextIndex) { private class InstallSnapshotResponseHandler implements StreamObserver { private final String name = getFollower().getName() + "-" + JavaUtils.getClassSimpleName(getClass()); - private final Queue pending; + private final Map pending = new ConcurrentHashMap<>(); private final AtomicBoolean done = new AtomicBoolean(false); private final boolean isNotificationOnly; @@ -367,18 +370,19 @@ private class InstallSnapshotResponseHandler implements StreamObserver(); this.isNotificationOnly = notifyOnly; } synchronized void addPending(InstallSnapshotRequestProto request) { - pending.offer(request.getSnapshotChunk().getRequestIndex()); + InstallSnapshotRequest installSnapshotRequest = new InstallSnapshotRequest(request); + installSnapshotRequest.recordSendTime(); + pending.put(request.getSnapshotChunk().getRequestIndex(), installSnapshotRequest); } synchronized void removePending(InstallSnapshotReplyProto reply) { - final Integer index = pending.poll(); - Objects.requireNonNull(index, "index == null"); - Preconditions.assertTrue(index == reply.getRequestIndex()); + InstallSnapshotRequest request = pending.remove(reply.getRequestIndex()); + Objects.requireNonNull(request, "index == null"); + Preconditions.assertTrue(request.getProto().getSnapshotChunk().getRequestIndex() == reply.getRequestIndex()); } boolean isDone() { @@ -402,7 +406,10 @@ public void onNext(InstallSnapshotReplyProto reply) { } // update the last rpc time - getFollower().updateLastRpcResponseTime(); + InstallSnapshotRequest request = pending.get(reply.getRequestIndex()); + + Timestamp sendTime = request != null ? request.getSendTime() : Timestamp.currentTime(); + getFollower().updateLastRpcResponseTime(sendTime); if (!firstResponseReceived) { firstResponseReceived = true; @@ -484,7 +491,6 @@ private void installSnapshot(SnapshotInfo snapshot) { for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) { if (isRunning()) { snapshotRequestObserver.onNext(request); - getFollower().updateLastRpcSendTime(); responseHandler.addPending(request); } else { break; @@ -534,7 +540,6 @@ private void installSnapshot(TermIndex firstAvailableLogTermIndex) { try { snapshotRequestObserver = getClient().installSnapshot(responseHandler); snapshotRequestObserver.onNext(request); - getFollower().updateLastRpcSendTime(); responseHandler.addPending(request); snapshotRequestObserver.onCompleted(); } catch (Exception e) { @@ -577,6 +582,27 @@ private TermIndex shouldNotifyToInstallSnapshot() { return null; } + static class InstallSnapshotRequest { + private final InstallSnapshotRequestProto proto; + private Timestamp sendTime; + + InstallSnapshotRequest(InstallSnapshotRequestProto proto) { + this.proto = proto; + } + + InstallSnapshotRequestProto getProto() { + return proto; + } + + void recordSendTime() { + this.sendTime = Timestamp.currentTime(); + } + + Timestamp getSendTime() { + return sendTime; + } + } + static class AppendEntriesRequest { private final Timer timer; private volatile Timer.Context timerContext; @@ -587,6 +613,8 @@ static class AppendEntriesRequest { private final TermIndex lastEntry; + private Timestamp sendTime; + AppendEntriesRequest(AppendEntriesRequestProto proto, RaftPeerId followerId, GrpcServerMetrics grpcServerMetrics) { this.callId = proto.getServerRequest().getCallId(); this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null; @@ -601,6 +629,14 @@ long getCallId() { return callId; } + Timestamp getSendTime() { + return sendTime; + } + + void recordSendTime() { + this.sendTime = Timestamp.currentTime(); + } + TermIndex getPreviousLog() { return previousLog; } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java index 487576fd17..4db1336352 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java @@ -71,11 +71,14 @@ public interface FollowerInfo { Timestamp getLastRpcResponseTime(); /** Update lastRpcResponseTime to the current time. */ - void updateLastRpcResponseTime(); + void updateLastRpcResponseTime(Timestamp sendTime); - /** Update lastRpcSendTime to the current time. */ - void updateLastRpcSendTime(); + /** Update lastHeartBeatSendTime to the current time. */ + void updateLastHeartBeatSendTime(); - /** @return the latest of the lastRpcSendTime and the lastRpcResponseTime . */ - Timestamp getLastRpcTime(); + /** @return the lastRpcSendTimeWithResponse . */ + Timestamp getLastRpcSendTimeWithResponse(); + + /** @return the lastHeartBeatSendTime . */ + Timestamp getLastHeartBeatSendTime(); } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java index 382f48a274..1204f93eea 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java @@ -152,7 +152,9 @@ default boolean shouldHeartbeat() { * @return the time in milliseconds that the leader should send a heartbeat. */ default long getHeartbeatRemainingTimeMs() { - return getServer().properties().minRpcTimeoutMs()/2 - getFollower().getLastRpcTime().elapsedTimeMs(); + return getServer().properties().minRpcTimeoutMs()/2 - + Math.min(getFollower().getLastRpcSendTimeWithResponse().elapsedTimeMs(), + getFollower().getLastHeartBeatSendTime().elapsedTimeMs()); } /** Handle the event that the follower has replied a term. */ diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java index 0f6c1ab0f9..5af214fe29 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java @@ -34,7 +34,8 @@ class FollowerInfoImpl implements FollowerInfo { private final RaftPeer peer; private final AtomicReference lastRpcResponseTime; - private final AtomicReference lastRpcSendTime; + private final AtomicReference lastRpcSendTimeWithResponse; + private final AtomicReference lastHeartBeatSendTime; private final RaftLogIndex nextIndex; private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L); private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX); @@ -48,7 +49,8 @@ class FollowerInfoImpl implements FollowerInfo { this.peer = peer; this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime); - this.lastRpcSendTime = new AtomicReference<>(lastRpcTime); + this.lastRpcSendTimeWithResponse = new AtomicReference<>(lastRpcTime); + this.lastHeartBeatSendTime = new AtomicReference<>(lastRpcTime); this.nextIndex = new RaftLogIndex("nextIndex", nextIndex); this.attendVote = attendVote; } @@ -119,7 +121,8 @@ public String getName() { public String toString() { return name + "(c" + getCommitIndex() + ",m" + getMatchIndex() + ",n" + getNextIndex() + ", attendVote=" + attendVote + - ", lastRpcSendTime=" + lastRpcSendTime.get().elapsedTimeMs() + + ", lastHeartBeatSendTime=" + lastHeartBeatSendTime.get().elapsedTimeMs() + + ", lastRpcSendTimeWithResponse=" + lastRpcSendTimeWithResponse.get().elapsedTimeMs() + ", lastRpcResponseTime=" + lastRpcResponseTime.get().elapsedTimeMs() + ")"; } @@ -137,8 +140,11 @@ public RaftPeer getPeer() { } @Override - public void updateLastRpcResponseTime() { + public void updateLastRpcResponseTime(Timestamp sendTime) { lastRpcResponseTime.set(Timestamp.currentTime()); + if (sendTime.compareTo(lastRpcSendTimeWithResponse.get()) > 0) { + lastRpcSendTimeWithResponse.set(sendTime); + } } @Override @@ -147,12 +153,17 @@ public Timestamp getLastRpcResponseTime() { } @Override - public void updateLastRpcSendTime() { - lastRpcSendTime.set(Timestamp.currentTime()); + public Timestamp getLastRpcSendTimeWithResponse() { + return lastRpcSendTimeWithResponse.get(); } @Override - public Timestamp getLastRpcTime() { - return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get()); + public void updateLastHeartBeatSendTime() { + lastHeartBeatSendTime.set(Timestamp.currentTime()); + } + + @Override + public Timestamp getLastHeartBeatSendTime() { + return lastHeartBeatSendTime.get(); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 1a38734bbb..58470bbebb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -482,7 +482,8 @@ Collection addSenders(Collection newPeers, long nextIndex LogAppender logAppender = server.newLogAppender(this, f); peerIdFollowerInfoMap.put(peer.getId(), f); raftServerMetrics.addFollower(peer.getId()); - logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime); + logAppenderMetrics.addFollowerGauges( + peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcResponseTime); return logAppender; }).collect(Collectors.toList()); senders.addAll(newAppenders); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index 07fc7c2aca..f43f346a0d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -27,6 +27,7 @@ import org.apache.ratis.server.raftlog.RaftLogIOException; import org.apache.ratis.server.util.ServerStringUtils; import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.util.Timestamp; import java.io.IOException; import java.io.InterruptedIOException; @@ -60,9 +61,12 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries() return null; } - getFollower().updateLastRpcSendTime(); + if (request.getEntriesCount() == 0) { + getFollower().updateLastHeartBeatSendTime(); + } + Timestamp sendTime = Timestamp.currentTime(); final AppendEntriesReplyProto r = getServerRpc().appendEntries(request); - getFollower().updateLastRpcResponseTime(); + getFollower().updateLastRpcResponseTime(sendTime); getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit()); return r; @@ -87,9 +91,9 @@ private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InstallSnapshotReplyProto reply = null; try { for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) { - getFollower().updateLastRpcSendTime(); + Timestamp sendTime = Timestamp.currentTime(); reply = getServerRpc().installSnapshot(request); - getFollower().updateLastRpcResponseTime(); + getFollower().updateLastRpcResponseTime(sendTime); if (!reply.getServerReply().getSuccess()) { return reply;