Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public void onNext(AppendEntriesReplyProto reply) {
AppendEntriesRequest request = pendingRequests.remove(reply);
if (request != null) {
request.stopRequestTimer(); // Update completion time
getFollower().updateLastRpcSendTimeWithResponse(request.getSendTimeWithResponse());
}

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -587,6 +588,8 @@ static class AppendEntriesRequest {

private final TermIndex lastEntry;

private final Timestamp sendTimeWithResponse;

AppendEntriesRequest(AppendEntriesRequestProto proto, RaftPeerId followerId, GrpcServerMetrics grpcServerMetrics) {
this.callId = proto.getServerRequest().getCallId();
this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null;
Expand All @@ -595,12 +598,17 @@ static class AppendEntriesRequest {

this.timer = grpcServerMetrics.getGrpcLogAppenderLatencyTimer(followerId.toString(), isHeartbeat());
grpcServerMetrics.onRequestCreate(isHeartbeat());
this.sendTimeWithResponse = Timestamp.currentTime();
}

long getCallId() {
return callId;
}

Timestamp getSendTimeWithResponse() {
return sendTimeWithResponse;
}

TermIndex getPreviousLog() {
return previousLog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ default int minRpcTimeoutMs() {
return minRpcTimeout().toIntExact(TimeUnit.MILLISECONDS);
}

TimeDuration leaderLeaseTimeout();

default int leaderLeaseTimeoutMs() {
return leaderLeaseTimeout().toIntExact(TimeUnit.MILLISECONDS);
}

/** @return the maximum rpc timeout. */
TimeDuration maxRpcTimeout();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,19 @@ static void setTimeoutMin(RaftProperties properties, TimeDuration minDuration) {
setTimeDuration(properties::setTimeDuration, TIMEOUT_MIN_KEY, minDuration);
}

String LEADER_LEASE_TIMEOUT_RATIO_KEY = PREFIX + ".leader.lease.timeout.ratio";
int LEADER_LEASE_TIMEOUT_RATIO_DEFAULT = 90;

static int getLeaderLeaseTimeoutRatio(RaftProperties properties) {
return getInt(properties::getInt, LEADER_LEASE_TIMEOUT_RATIO_KEY,
LEADER_LEASE_TIMEOUT_RATIO_DEFAULT, getDefaultLog(),
requireMin(0), requireMax(100));
}

static void setLeaderLeaseTimeoutRatio(RaftProperties properties, int ratio) {
setInt(properties::setInt, LEADER_LEASE_TIMEOUT_RATIO_KEY, ratio);
}

String TIMEOUT_MAX_KEY = PREFIX + ".timeout.max";
TimeDuration TIMEOUT_MAX_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS);
static TimeDuration timeoutMax(RaftProperties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public interface FollowerInfo {
/** Update lastRpcSendTime to the current time. */
void updateLastRpcSendTime();

void updateLastRpcSendTimeWithResponse(Timestamp time);

Timestamp getLastRpcSendTimeWithResponse();

/** @return the latest of the lastRpcSendTime and the lastRpcResponseTime . */
Timestamp getLastRpcTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
class DivisionPropertiesImpl implements DivisionProperties {
private final TimeDuration rpcTimeoutMin;
private final TimeDuration rpcTimeoutMax;
private final TimeDuration leaderLeaseTimeout;
private final TimeDuration rpcSleepTime;
private final TimeDuration rpcSlownessTimeout;

Expand All @@ -35,6 +36,11 @@ class DivisionPropertiesImpl implements DivisionProperties {
Preconditions.assertTrue(rpcTimeoutMax.compareTo(rpcTimeoutMin) >= 0,
"rpcTimeoutMax = %s < rpcTimeoutMin = %s", rpcTimeoutMax, rpcTimeoutMin);

double ratio = RaftServerConfigKeys.Rpc.getLeaderLeaseTimeoutRatio(properties) * 1.0 / 100;
this.leaderLeaseTimeout = this.rpcTimeoutMin.multiply(ratio);
Preconditions.assertTrue(rpcTimeoutMin.compareTo(leaderLeaseTimeout) >= 0,
"rpcTimeoutMin = %s < leaderLeaseTimeout = %s", rpcTimeoutMin, leaderLeaseTimeout);

this.rpcSleepTime = RaftServerConfigKeys.Rpc.sleepTime(properties);
this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
}
Expand All @@ -44,6 +50,11 @@ public TimeDuration minRpcTimeout() {
return rpcTimeoutMin;
}

@Override
public TimeDuration leaderLeaseTimeout() {
return leaderLeaseTimeout;
}

@Override
public TimeDuration maxRpcTimeout() {
return rpcTimeoutMax;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class FollowerInfoImpl implements FollowerInfo {
private final RaftPeer peer;
private final AtomicReference<Timestamp> lastRpcResponseTime;
private final AtomicReference<Timestamp> lastRpcSendTime;
private final AtomicReference<Timestamp> lastRpcSendTimeWithResponse;
private final RaftLogIndex nextIndex;
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L);
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
Expand All @@ -49,6 +50,7 @@ class FollowerInfoImpl implements FollowerInfo {
this.peer = peer;
this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
this.lastRpcSendTimeWithResponse = new AtomicReference<>(lastRpcTime);
this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
this.attendVote = attendVote;
}
Expand Down Expand Up @@ -151,6 +153,16 @@ public void updateLastRpcSendTime() {
lastRpcSendTime.set(Timestamp.currentTime());
}

@Override
public void updateLastRpcSendTimeWithResponse(Timestamp time) {
lastRpcSendTimeWithResponse.set(time);
}

@Override
public Timestamp getLastRpcSendTimeWithResponse() {
return lastRpcSendTimeWithResponse.get();
}

@Override
public Timestamp getLastRpcTime() {
return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
Expand Down Expand Up @@ -543,6 +544,9 @@ private synchronized void sendStartLeaderElectionToHigherPriorityPeer(RaftPeerId
return;
}

// leader must step down to support leader lease.
stepDown(currentTerm, StepDownReason.HIGHER_PRIORITY);

final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto(
server.getMemberId(), follower, lastEntry);
CompletableFuture.supplyAsync(() -> {
Expand Down Expand Up @@ -905,6 +909,49 @@ private void yieldLeaderToHigherPriorityPeer() {
}
}

boolean isLeaderLeaseValid() {
if (checkLeaderLease()) {
return true;
}

updateLeaderLease();
return checkLeaderLease();
}

private boolean checkLeaderLease() {
return lastLeaderTimestamp.get().elapsedTimeMs() < server.properties().leaderLeaseTimeoutMs();
}

private void updateLeaderLease() {
List<RaftPeerId> activePeers = new ArrayList<>();
List<FollowerInfo> followerInfos = getFollowerInfos();
List<Timestamp> lastRpcSendTimeWithResponseList = new ArrayList<>();
for (final FollowerInfo info : followerInfos) {
Timestamp lastRpcSendTimeWithResponse = info.getLastRpcSendTimeWithResponse();
lastRpcSendTimeWithResponseList.add(lastRpcSendTimeWithResponse);
if (lastRpcSendTimeWithResponse.elapsedTimeMs() <= server.properties().leaderLeaseTimeoutMs()) {
activePeers.add(info.getPeer().getId());
}
}

final RaftConfigurationImpl conf = server.getRaftConf();
if (conf.hasMajority(activePeers, server.getId())) {
// leader lease check passed
if (lastRpcSendTimeWithResponseList.size() > 0) {
Collections.sort(lastRpcSendTimeWithResponseList);
Timestamp startLease = lastRpcSendTimeWithResponseList.get(lastRpcSendTimeWithResponseList.size() / 2);
lastLeaderTimestamp.set(startLease);
} else {
lastLeaderTimestamp.set(Timestamp.currentTime());
}
} else {
LOG.warn("{} Can not update leader lease on term: {} lease timeout: {}ms conf: {}",
this, currentTerm, server.properties().leaderLeaseTimeoutMs(), conf);
}
}

private AtomicReference<Timestamp> lastLeaderTimestamp = new AtomicReference<>(Timestamp.currentTime());

/**
* See the thesis section 6.2: A leader in Raft steps down
* if an election timeout elapses without a successful
Expand Down Expand Up @@ -938,10 +985,8 @@ public boolean checkLeadership() {
return true;
}

LOG.warn(this + ": Lost leadership on term: " + currentTerm
+ ". Election timeout: " + server.getMaxTimeoutMs() + "ms"
+ ". In charge for: " + server.getRole().getRoleElapsedTimeMs() + "ms"
+ ". Conf: " + conf);
LOG.warn("{} Lost leadership on term: {} Election timeout: {}ms. In charge for: {}ms. Conf: {}",
this, currentTerm, server.getMaxTimeoutMs(), server.getRole().getRoleElapsedTimeMs(), conf);
senders.stream().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f));

// step down as follower
Expand Down Expand Up @@ -1017,6 +1062,12 @@ List<RaftPeer> getFollowers() {
.collect(Collectors.toList()));
}

List<FollowerInfo> getFollowerInfos() {
return Collections.unmodifiableList(senders.stream()
.map(sender -> sender.getFollower())
.collect(Collectors.toList()));
}

Stream<LogAppender> getLogAppenders() {
return senders.stream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ RaftPeerRole getCurrentRole() {
}

boolean isLeaderReady() {
return getLeaderState().map(LeaderStateImpl::isReady).orElse(false);
return getLeaderState().map(LeaderStateImpl::isReady).orElse(false)
&& getLeaderState().map(LeaderStateImpl::isLeaderLeaseValid).orElse(false);
}

Optional<LeaderStateImpl> getLeaderState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,33 @@ public void testTransferLeader() throws Exception {
}
}

@Test
public void testLeaderLease() throws Exception {
TimeDuration rpcTimeoutMin = RaftServerConfigKeys.Rpc.timeoutMin(getProperties());
double ratio = RaftServerConfigKeys.Rpc.getLeaderLeaseTimeoutRatio(getProperties()) * 1.0 / 100;
TimeDuration leaderLeaseTimeout = rpcTimeoutMin.multiply(ratio);

try (final MiniRaftCluster cluster = newCluster(3)) {
cluster.start();

final RaftServer.Division leader = waitForLeader(cluster);
try (RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));

Assert.assertTrue(leader.getInfo().isLeader());
Assert.assertTrue(leader.getInfo().isLeaderReady());

isolate(cluster, leader.getId());
Thread.sleep(leaderLeaseTimeout.toLong(TimeUnit.MILLISECONDS));

Assert.assertTrue(leader.getInfo().isLeader());
Assert.assertFalse(leader.getInfo().isLeaderReady());
}

cluster.shutdown();
}
}

@Test
public void testTransferLeaderTimeout() throws Exception {
try(final MiniRaftCluster cluster = newCluster(3)) {
Expand Down