diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java index 10c59c8b19..6d3f68d5ce 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java @@ -41,11 +41,9 @@ public class ConfigurationManager { * The current raft configuration. If configurations is not empty, should be * the last entry of the map. Otherwise is initialConf. */ - @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type - private volatile RaftConfigurationImpl currentConf; + private RaftConfigurationImpl currentConf; /** Cache the peer corresponding to {@link #id}. */ - @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type - private volatile RaftPeer currentPeer; + private RaftPeer currentPeer; ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) { this.id = id; @@ -78,11 +76,11 @@ private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl conf) } } - RaftConfigurationImpl getCurrent() { + synchronized RaftConfigurationImpl getCurrent() { return currentConf; } - RaftPeer getCurrentPeer() { + synchronized RaftPeer getCurrentPeer() { return currentPeer; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index e980daede5..fa61e90883 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.ToIntFunction; @@ -62,6 +63,7 @@ int update(AtomicInteger outstanding) { @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type private volatile Timestamp lastRpcTime = creationTime; private volatile boolean isRunning = true; + private final CompletableFuture stopped = new CompletableFuture<>(); private final AtomicInteger outstandingOp = new AtomicInteger(); FollowerState(RaftServerImpl server, Object reason) { @@ -93,8 +95,10 @@ boolean isCurrentLeaderValid() { return lastRpcTime.elapsedTime().compareTo(server.properties().minRpcTimeout()) < 0; } - void stopRunning() { + CompletableFuture stopRunning() { this.isRunning = false; + interrupt(); + return stopped; } boolean lostMajorityHeartbeatsRecently() { @@ -122,6 +126,14 @@ private boolean shouldRun() { @Override public void run() { + try { + runImpl(); + } finally { + stopped.complete(null); + } + } + + private void runImpl() { final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold(); while (shouldRun()) { final TimeDuration electionTimeout = server.getRandomElectionTimeout(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index af25ae9126..a5bfba7bec 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -46,6 +46,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -183,6 +184,7 @@ public String toString() { private final String name; private final LifeCycle lifeCycle; private final Daemon daemon; + private final CompletableFuture stopped = new CompletableFuture<>(); private final RaftServerImpl server; private final boolean skipPreVote; @@ -223,8 +225,10 @@ private void startIfNew(Runnable starter) { } } - void shutdown() { + CompletableFuture shutdown() { lifeCycle.checkStateAndClose(); + stopped.complete(null); + return stopped; } @VisibleForTesting @@ -234,6 +238,14 @@ LifeCycle.State getCurrentState() { @Override public void run() { + try { + runImpl(); + } finally { + stopped.complete(null); + } + } + + private void runImpl() { if (!lifeCycle.compareAndTransition(STARTING, RUNNING)) { final LifeCycle.State state = lifeCycle.getCurrentState(); LOG.info("{}: skip running since this is already {}", this, state); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 68121cef6a..3d8bc22195 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -223,11 +223,8 @@ boolean removeAll(Collection c) { } CompletableFuture stopAll() { - final CompletableFuture[] futures = new CompletableFuture[senders.size()]; - for(int i = 0; i < futures.length; i++) { - futures[i] = senders.get(i).stopAsync(); - } - return CompletableFuture.allOf(futures); + return CompletableFuture.allOf(senders.stream(). + map(LogAppender::stopAsync).toArray(CompletableFuture[]::new)); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 0473564b39..ae158ad75e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -132,6 +132,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -565,20 +566,23 @@ void setFirstElection(Object reason) { * @param force Force to start a new {@link FollowerState} even if this server is already a follower. * @return if the term/votedFor should be updated to the new term */ - private synchronized boolean changeToFollower( - long newTerm, - boolean force, - boolean allowListener, - Object reason) { + private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) { + final AtomicReference metadataUpdated = new AtomicReference<>(); + changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join(); + return metadataUpdated.get(); + } + + private synchronized CompletableFuture changeToFollowerAsync( + long newTerm, boolean force, boolean allowListener, Object reason, AtomicReference metadataUpdated) { final RaftPeerRole old = role.getCurrentRole(); if (old == RaftPeerRole.LISTENER && !allowListener) { throw new IllegalStateException("Unexpected role " + old); } - boolean metadataUpdated; + CompletableFuture future = CompletableFuture.completedFuture(null); if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) { setRole(RaftPeerRole.FOLLOWER, reason); if (old == RaftPeerRole.LEADER) { - role.shutdownLeaderState(false) + future = role.shutdownLeaderState(false) .exceptionally(e -> { if (e != null) { if (!getInfo().isAlive()) { @@ -587,21 +591,21 @@ private synchronized boolean changeToFollower( } } throw new CompletionException("Failed to shutdownLeaderState: " + this, e); - }) - .join(); + }); state.setLeader(null, reason); } else if (old == RaftPeerRole.CANDIDATE) { - role.shutdownLeaderElection(); + future = role.shutdownLeaderElection(); } else if (old == RaftPeerRole.FOLLOWER) { - role.shutdownFollowerState(); + future = role.shutdownFollowerState(); } - metadataUpdated = state.updateCurrentTerm(newTerm); + + metadataUpdated.set(state.updateCurrentTerm(newTerm)); role.startFollowerState(this, reason); setFirstElection(reason); } else { - metadataUpdated = state.updateCurrentTerm(newTerm); + metadataUpdated.set(state.updateCurrentTerm(newTerm)); } - return metadataUpdated; + return future; } synchronized void changeToFollowerAndPersistMetadata( diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java index 5eb01a9d6b..a5cd7da665 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java @@ -47,7 +47,7 @@ class RoleInfo { public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class); private final RaftPeerId id; - private volatile RaftPeerRole role; + private final AtomicReference role = new AtomicReference<>(); /** Used when the peer is leader */ private final AtomicReference leaderState = new AtomicReference<>(); /** Used when the peer is follower, to monitor election timeout */ @@ -64,7 +64,7 @@ class RoleInfo { } void transitionRole(RaftPeerRole newRole) { - this.role = newRole; + this.role.set(newRole); this.transitionTime.set(Timestamp.currentTime()); } @@ -73,7 +73,7 @@ long getRoleElapsedTimeMs() { } RaftPeerRole getCurrentRole() { - return role; + return role.get(); } boolean isLeaderReady() { @@ -113,13 +113,13 @@ void startFollowerState(RaftServerImpl server, Object reason) { updateAndGet(followerState, new FollowerState(server, reason)).start(); } - void shutdownFollowerState() { + CompletableFuture shutdownFollowerState() { final FollowerState follower = followerState.getAndSet(null); - if (follower != null) { - LOG.info("{}: shutdown {}", id, follower); - follower.stopRunning(); - follower.interrupt(); + if (follower == null) { + return CompletableFuture.completedFuture(null); } + LOG.info("{}: shutdown {}", id, follower); + return follower.stopRunning(); } void startLeaderElection(RaftServerImpl server, boolean force) { @@ -133,13 +133,13 @@ void setLeaderElectionPause(boolean pause) { pauseLeaderElection.set(pause); } - void shutdownLeaderElection() { + CompletableFuture shutdownLeaderElection() { final LeaderElection election = leaderElection.getAndSet(null); - if (election != null) { - LOG.info("{}: shutdown {}", id, election); - election.shutdown(); - // no need to interrupt the election thread + if (election == null) { + return CompletableFuture.completedFuture(null); } + LOG.info("{}: shutdown {}", id, election); + return election.shutdown(); } private T updateAndGet(AtomicReference ref, T current) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index 958cc6fa81..5a27cda510 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -124,7 +124,7 @@ public void start() { @Override public boolean isRunning() { - return daemon.isWorking(); + return daemon.isWorking() && server.getInfo().isLeader(); } @Override