diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 1c8e4ea97d..e2632ea614 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -276,6 +276,7 @@ public void onNext(AppendEntriesReplyProto reply) { AppendEntriesRequest request = pendingRequests.remove(reply); if (request != null) { request.stopRequestTimer(); // Update completion time + getFollower().updateLastRpcSendTimeWithResponse(request.getSendTimeWithResponse()); } if (LOG.isDebugEnabled()) { @@ -587,6 +588,8 @@ static class AppendEntriesRequest { private final TermIndex lastEntry; + private final Timestamp sendTimeWithResponse; + AppendEntriesRequest(AppendEntriesRequestProto proto, RaftPeerId followerId, GrpcServerMetrics grpcServerMetrics) { this.callId = proto.getServerRequest().getCallId(); this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null; @@ -595,12 +598,17 @@ static class AppendEntriesRequest { this.timer = grpcServerMetrics.getGrpcLogAppenderLatencyTimer(followerId.toString(), isHeartbeat()); grpcServerMetrics.onRequestCreate(isHeartbeat()); + this.sendTimeWithResponse = Timestamp.currentTime(); } long getCallId() { return callId; } + Timestamp getSendTimeWithResponse() { + return sendTimeWithResponse; + } + TermIndex getPreviousLog() { return previousLog; } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionProperties.java b/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionProperties.java index a7b8c9d6b6..ed2521b04c 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionProperties.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionProperties.java @@ -39,6 +39,12 @@ default int minRpcTimeoutMs() { return minRpcTimeout().toIntExact(TimeUnit.MILLISECONDS); } + TimeDuration leaderLeaseTimeout(); + + default int leaderLeaseTimeoutMs() { + return leaderLeaseTimeout().toIntExact(TimeUnit.MILLISECONDS); + } + /** @return the maximum rpc timeout. */ TimeDuration maxRpcTimeout(); diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index f2f0596136..512af252d8 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -461,6 +461,19 @@ static void setTimeoutMin(RaftProperties properties, TimeDuration minDuration) { setTimeDuration(properties::setTimeDuration, TIMEOUT_MIN_KEY, minDuration); } + String LEADER_LEASE_TIMEOUT_RATIO_KEY = PREFIX + ".leader.lease.timeout.ratio"; + int LEADER_LEASE_TIMEOUT_RATIO_DEFAULT = 90; + + static int getLeaderLeaseTimeoutRatio(RaftProperties properties) { + return getInt(properties::getInt, LEADER_LEASE_TIMEOUT_RATIO_KEY, + LEADER_LEASE_TIMEOUT_RATIO_DEFAULT, getDefaultLog(), + requireMin(0), requireMax(100)); + } + + static void setLeaderLeaseTimeoutRatio(RaftProperties properties, int ratio) { + setInt(properties::setInt, LEADER_LEASE_TIMEOUT_RATIO_KEY, ratio); + } + String TIMEOUT_MAX_KEY = PREFIX + ".timeout.max"; TimeDuration TIMEOUT_MAX_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS); static TimeDuration timeoutMax(RaftProperties properties) { diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java index 487576fd17..1cb9e229d5 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java @@ -76,6 +76,10 @@ public interface FollowerInfo { /** Update lastRpcSendTime to the current time. */ void updateLastRpcSendTime(); + void updateLastRpcSendTimeWithResponse(Timestamp time); + + Timestamp getLastRpcSendTimeWithResponse(); + /** @return the latest of the lastRpcSendTime and the lastRpcResponseTime . */ Timestamp getLastRpcTime(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java index 63cbc02ed6..888049b1f0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java @@ -26,6 +26,7 @@ class DivisionPropertiesImpl implements DivisionProperties { private final TimeDuration rpcTimeoutMin; private final TimeDuration rpcTimeoutMax; + private final TimeDuration leaderLeaseTimeout; private final TimeDuration rpcSleepTime; private final TimeDuration rpcSlownessTimeout; @@ -35,6 +36,11 @@ class DivisionPropertiesImpl implements DivisionProperties { Preconditions.assertTrue(rpcTimeoutMax.compareTo(rpcTimeoutMin) >= 0, "rpcTimeoutMax = %s < rpcTimeoutMin = %s", rpcTimeoutMax, rpcTimeoutMin); + double ratio = RaftServerConfigKeys.Rpc.getLeaderLeaseTimeoutRatio(properties) * 1.0 / 100; + this.leaderLeaseTimeout = this.rpcTimeoutMin.multiply(ratio); + Preconditions.assertTrue(rpcTimeoutMin.compareTo(leaderLeaseTimeout) >= 0, + "rpcTimeoutMin = %s < leaderLeaseTimeout = %s", rpcTimeoutMin, leaderLeaseTimeout); + this.rpcSleepTime = RaftServerConfigKeys.Rpc.sleepTime(properties); this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties); } @@ -44,6 +50,11 @@ public TimeDuration minRpcTimeout() { return rpcTimeoutMin; } + @Override + public TimeDuration leaderLeaseTimeout() { + return leaderLeaseTimeout; + } + @Override public TimeDuration maxRpcTimeout() { return rpcTimeoutMax; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java index 0f6c1ab0f9..b345d58313 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java @@ -35,6 +35,7 @@ class FollowerInfoImpl implements FollowerInfo { private final RaftPeer peer; private final AtomicReference lastRpcResponseTime; private final AtomicReference lastRpcSendTime; + private final AtomicReference lastRpcSendTimeWithResponse; private final RaftLogIndex nextIndex; private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L); private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX); @@ -49,6 +50,7 @@ class FollowerInfoImpl implements FollowerInfo { this.peer = peer; this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime); this.lastRpcSendTime = new AtomicReference<>(lastRpcTime); + this.lastRpcSendTimeWithResponse = new AtomicReference<>(lastRpcTime); this.nextIndex = new RaftLogIndex("nextIndex", nextIndex); this.attendVote = attendVote; } @@ -151,6 +153,16 @@ public void updateLastRpcSendTime() { lastRpcSendTime.set(Timestamp.currentTime()); } + @Override + public void updateLastRpcSendTimeWithResponse(Timestamp time) { + lastRpcSendTimeWithResponse.set(time); + } + + @Override + public Timestamp getLastRpcSendTimeWithResponse() { + return lastRpcSendTimeWithResponse.get(); + } + @Override public Timestamp getLastRpcTime() { return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get()); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 1a38734bbb..51c42e3c14 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -71,6 +71,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -543,6 +544,9 @@ private synchronized void sendStartLeaderElectionToHigherPriorityPeer(RaftPeerId return; } + // leader must step down to support leader lease. + stepDown(currentTerm, StepDownReason.HIGHER_PRIORITY); + final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto( server.getMemberId(), follower, lastEntry); CompletableFuture.supplyAsync(() -> { @@ -905,6 +909,49 @@ private void yieldLeaderToHigherPriorityPeer() { } } + boolean isLeaderLeaseValid() { + if (checkLeaderLease()) { + return true; + } + + updateLeaderLease(); + return checkLeaderLease(); + } + + private boolean checkLeaderLease() { + return lastLeaderTimestamp.get().elapsedTimeMs() < server.properties().leaderLeaseTimeoutMs(); + } + + private void updateLeaderLease() { + List activePeers = new ArrayList<>(); + List followerInfos = getFollowerInfos(); + List lastRpcSendTimeWithResponseList = new ArrayList<>(); + for (final FollowerInfo info : followerInfos) { + Timestamp lastRpcSendTimeWithResponse = info.getLastRpcSendTimeWithResponse(); + lastRpcSendTimeWithResponseList.add(lastRpcSendTimeWithResponse); + if (lastRpcSendTimeWithResponse.elapsedTimeMs() <= server.properties().leaderLeaseTimeoutMs()) { + activePeers.add(info.getPeer().getId()); + } + } + + final RaftConfigurationImpl conf = server.getRaftConf(); + if (conf.hasMajority(activePeers, server.getId())) { + // leader lease check passed + if (lastRpcSendTimeWithResponseList.size() > 0) { + Collections.sort(lastRpcSendTimeWithResponseList); + Timestamp startLease = lastRpcSendTimeWithResponseList.get(lastRpcSendTimeWithResponseList.size() / 2); + lastLeaderTimestamp.set(startLease); + } else { + lastLeaderTimestamp.set(Timestamp.currentTime()); + } + } else { + LOG.warn("{} Can not update leader lease on term: {} lease timeout: {}ms conf: {}", + this, currentTerm, server.properties().leaderLeaseTimeoutMs(), conf); + } + } + + private AtomicReference lastLeaderTimestamp = new AtomicReference<>(Timestamp.currentTime()); + /** * See the thesis section 6.2: A leader in Raft steps down * if an election timeout elapses without a successful @@ -938,10 +985,8 @@ public boolean checkLeadership() { return true; } - LOG.warn(this + ": Lost leadership on term: " + currentTerm - + ". Election timeout: " + server.getMaxTimeoutMs() + "ms" - + ". In charge for: " + server.getRole().getRoleElapsedTimeMs() + "ms" - + ". Conf: " + conf); + LOG.warn("{} Lost leadership on term: {} Election timeout: {}ms. In charge for: {}ms. Conf: {}", + this, currentTerm, server.getMaxTimeoutMs(), server.getRole().getRoleElapsedTimeMs(), conf); senders.stream().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f)); // step down as follower @@ -1017,6 +1062,12 @@ List getFollowers() { .collect(Collectors.toList())); } + List getFollowerInfos() { + return Collections.unmodifiableList(senders.stream() + .map(sender -> sender.getFollower()) + .collect(Collectors.toList())); + } + Stream getLogAppenders() { return senders.stream(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java index 617b617cbf..b283872487 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java @@ -66,7 +66,8 @@ RaftPeerRole getCurrentRole() { } boolean isLeaderReady() { - return getLeaderState().map(LeaderStateImpl::isReady).orElse(false); + return getLeaderState().map(LeaderStateImpl::isReady).orElse(false) + && getLeaderState().map(LeaderStateImpl::isLeaderLeaseValid).orElse(false); } Optional getLeaderState() { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index a6753d215e..7e98598155 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -155,6 +155,33 @@ public void testTransferLeader() throws Exception { } } + @Test + public void testLeaderLease() throws Exception { + TimeDuration rpcTimeoutMin = RaftServerConfigKeys.Rpc.timeoutMin(getProperties()); + double ratio = RaftServerConfigKeys.Rpc.getLeaderLeaseTimeoutRatio(getProperties()) * 1.0 / 100; + TimeDuration leaderLeaseTimeout = rpcTimeoutMin.multiply(ratio); + + try (final MiniRaftCluster cluster = newCluster(3)) { + cluster.start(); + + final RaftServer.Division leader = waitForLeader(cluster); + try (RaftClient client = cluster.createClient(leader.getId())) { + client.io().send(new RaftTestUtil.SimpleMessage("message")); + + Assert.assertTrue(leader.getInfo().isLeader()); + Assert.assertTrue(leader.getInfo().isLeaderReady()); + + isolate(cluster, leader.getId()); + Thread.sleep(leaderLeaseTimeout.toLong(TimeUnit.MILLISECONDS)); + + Assert.assertTrue(leader.getInfo().isLeader()); + Assert.assertFalse(leader.getInfo().isLeaderReady()); + } + + cluster.shutdown(); + } + } + @Test public void testTransferLeaderTimeout() throws Exception { try(final MiniRaftCluster cluster = newCluster(3)) {