From 858aa1c12feb9d9b5b31812ffecc5b1342fc8064 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Fri, 7 Feb 2025 08:58:47 -0800 Subject: [PATCH 01/10] RATIS-2245. Ratis should wait for all apply transaction futures before taking snapshot and group remove --- .../server/impl/StateMachineUpdater.java | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 64fabfa2b0..7d7d7054a1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -37,8 +37,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -182,6 +180,7 @@ public String toString() { @Override public void run() { + CompletableFuture applyLogFutures = CompletableFuture.completedFuture(null); for(; state != State.STOP; ) { try { waitForCommit(); @@ -190,11 +189,11 @@ public void run() { reload(); } - final MemoizedSupplier>> futures = applyLog(); - checkAndTakeSnapshot(futures); + applyLogFutures = applyLog(applyLogFutures); + checkAndTakeSnapshot(applyLogFutures); if (shouldStop()) { - checkAndTakeSnapshot(futures); + checkAndTakeSnapshot(applyLogFutures); stop(); } } catch (Throwable t) { @@ -239,8 +238,7 @@ private void reload() throws IOException { state = State.RUNNING; } - private MemoizedSupplier>> applyLog() throws RaftLogIOException { - final MemoizedSupplier>> futures = MemoizedSupplier.valueOf(ArrayList::new); + private CompletableFuture applyLog(CompletableFuture applyLogFutures) throws RaftLogIOException { final long committed = raftLog.getLastCommittedIndex(); for(long applied; (applied = getLastAppliedIndex()) < committed && state == State.RUNNING && !shouldStop(); ) { final long nextIndex = applied + 1; @@ -263,7 +261,7 @@ private MemoizedSupplier>> applyLog() throws Raf final long incremented = appliedIndex.incrementAndGet(debugIndexChange); Preconditions.assertTrue(incremented == nextIndex); if (f != null) { - futures.get().add(f); + applyLogFutures = applyLogFutures.thenCombine(f, (previous, current) -> previous); f.thenAccept(m -> notifyAppliedIndex(incremented)); } else { notifyAppliedIndex(incremented); @@ -272,17 +270,14 @@ private MemoizedSupplier>> applyLog() throws Raf next.release(); } } - return futures; + return applyLogFutures; } - private void checkAndTakeSnapshot(MemoizedSupplier>> futures) + private void checkAndTakeSnapshot(CompletableFuture futures) throws ExecutionException, InterruptedException { // check if need to trigger a snapshot if (shouldTakeSnapshot()) { - if (futures.isInitialized()) { - JavaUtils.allOf(futures.get()).get(); - } - + futures.get(); takeSnapshot(); } } From ef35f590a8e7659fb9fd293e4b06246077be2e59 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Fri, 7 Feb 2025 12:26:42 -0800 Subject: [PATCH 02/10] RATIS-2245. Address review comments --- .../java/org/apache/ratis/server/impl/StateMachineUpdater.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 7d7d7054a1..4d2ba43e0a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -193,7 +193,6 @@ public void run() { checkAndTakeSnapshot(applyLogFutures); if (shouldStop()) { - checkAndTakeSnapshot(applyLogFutures); stop(); } } catch (Throwable t) { From 7626c7909d344c5455779f37d82648e19820febc Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Sat, 8 Feb 2025 09:18:19 -0800 Subject: [PATCH 03/10] RATIS-2245. Add futures.get() on stop --- .../java/org/apache/ratis/server/impl/StateMachineUpdater.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 4d2ba43e0a..21c766710e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -193,6 +193,7 @@ public void run() { checkAndTakeSnapshot(applyLogFutures); if (shouldStop()) { + applyLogFutures.get(); stop(); } } catch (Throwable t) { @@ -260,7 +261,7 @@ private CompletableFuture applyLog(CompletableFuture applyLogFutures final long incremented = appliedIndex.incrementAndGet(debugIndexChange); Preconditions.assertTrue(incremented == nextIndex); if (f != null) { - applyLogFutures = applyLogFutures.thenCombine(f, (previous, current) -> previous); + applyLogFutures = applyLogFutures.thenCombine(f, (v, message) -> null); f.thenAccept(m -> notifyAppliedIndex(incremented)); } else { notifyAppliedIndex(incremented); From e25ba08e383a574cc3fa99ce0684e82b7f92c116 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Sat, 8 Feb 2025 10:57:03 -0800 Subject: [PATCH 04/10] RATIS-2245. Wait for apply log futures before taking snapshot --- .../ratis/server/impl/StateMachineUpdater.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 21c766710e..be5e298532 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -183,7 +183,7 @@ public void run() { CompletableFuture applyLogFutures = CompletableFuture.completedFuture(null); for(; state != State.STOP; ) { try { - waitForCommit(); + waitForCommit(applyLogFutures); if (state == State.RELOAD) { reload(); @@ -209,14 +209,14 @@ public void run() { } } - private void waitForCommit() throws InterruptedException { + private void waitForCommit(CompletableFuture applyLogFutures) throws InterruptedException, ExecutionException { // When a peer starts, the committed is initialized to 0. // It will be updated only after the leader contacts other peers. // Thus it is possible to have applied > committed initially. final long applied = getLastAppliedIndex(); for(; applied >= raftLog.getLastCommittedIndex() && state == State.RUNNING && !shouldStop(); ) { if (server.getSnapshotRequestHandler().shouldTriggerTakingSnapshot()) { - takeSnapshot(); + takeSnapshot(applyLogFutures); } if (awaitForSignal.await(100, TimeUnit.MILLISECONDS)) { return; @@ -277,13 +277,13 @@ private void checkAndTakeSnapshot(CompletableFuture futures) throws ExecutionException, InterruptedException { // check if need to trigger a snapshot if (shouldTakeSnapshot()) { - futures.get(); - takeSnapshot(); + takeSnapshot(futures); } } - private void takeSnapshot() { + private void takeSnapshot(CompletableFuture applyLogFutures) throws ExecutionException, InterruptedException { final long i; + applyLogFutures.get(); try { try(UncheckedAutoCloseable ignored = Timekeeper.start(stateMachineMetrics.get().getTakeSnapshotTimer())) { i = stateMachine.takeSnapshot(); From e76749effaa343d8d1f965a3e8268079d918d469 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Mon, 10 Feb 2025 22:34:33 -0800 Subject: [PATCH 05/10] RATIS-2245. Add test case --- .../server/impl/StateMachineUpdater.java | 7 +- .../impl/StateMachineShutdownTests.java | 128 ++++++++++++++---- 2 files changed, 106 insertions(+), 29 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index be5e298532..6583c65508 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -197,6 +197,11 @@ public void run() { stop(); } } catch (Throwable t) { + try { + applyLogFutures.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.info("{}: interrupted while waiting for apply transactions", this, t); + } if (t instanceof InterruptedException && state == State.STOP) { Thread.currentThread().interrupt(); LOG.info("{} was interrupted. Exiting ...", this); @@ -209,7 +214,7 @@ public void run() { } } - private void waitForCommit(CompletableFuture applyLogFutures) throws InterruptedException, ExecutionException { + private void waitForCommit(CompletableFuture applyLogFutures) throws InterruptedException, ExecutionException { // When a peer starts, the committed is initialized to 0. // It will be updated only after the leader contacts other peers. // Thus it is possible to have applied > committed initially. diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java index 246abb99f0..f9373940fc 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java @@ -28,47 +28,106 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; -import org.junit.Assert; -import org.junit.Test; - -import java.util.concurrent.CompletableFuture; +import org.junit.*; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; public abstract class StateMachineShutdownTests extends BaseTest implements MiniRaftCluster.Factory.Get { - + public static Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class); + private static MockedStatic mocked; protected static class StateMachineWithConditionalWait extends SimpleStateMachine4Testing { + boolean unblockAllTxns = false; + final Set blockTxns = ConcurrentHashMap.newKeySet(); + private final ExecutorService executor = Executors.newFixedThreadPool(10); + public static Map>> futures = new ConcurrentHashMap<>(); + public static Map numTxns = new ConcurrentHashMap<>(); + private final Map appliedTxns = new ConcurrentHashMap<>(); + + private synchronized void updateTxns() { + long appliedIndex = this.getLastAppliedTermIndex().getIndex() + 1; + Long appliedTerm = null; + while (appliedTxns.containsKey(appliedIndex)) { + appliedTerm = appliedTxns.remove(appliedIndex); + appliedIndex += 1; + } + if (appliedTerm != null) { + updateLastAppliedTermIndex(appliedTerm, appliedIndex - 1); + } + } - private final Long objectToWait = 0L; - volatile boolean blockOnApply = true; + @Override + public void notifyTermIndexUpdated(long term, long index) { + appliedTxns.put(index, term); + updateTxns(); + } @Override public CompletableFuture applyTransaction(TransactionContext trx) { - if (blockOnApply) { - synchronized (objectToWait) { - try { - objectToWait.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(); + final RaftProtos.LogEntryProto entry = trx.getLogEntryUnsafe(); + + CompletableFuture future = new CompletableFuture<>(); + futures.computeIfAbsent(Thread.currentThread().getId(), k -> new HashSet<>()).add(future); + executor.submit(() -> { + synchronized (blockTxns) { + if (!unblockAllTxns) { + blockTxns.add(entry.getIndex()); + } + while (!unblockAllTxns && blockTxns.contains(entry.getIndex())) { + try { + blockTxns.wait(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } + numTxns.computeIfAbsent(getId(), (k) -> new AtomicLong()).incrementAndGet(); + appliedTxns.put(entry.getIndex(), entry.getTerm()); + updateTxns(); + future.complete(new RaftTestUtil.SimpleMessage("done")); + }); + return future; + } + + public void unBlockApplyTxn(long txnId) { + synchronized (blockTxns) { + blockTxns.remove(txnId); + blockTxns.notifyAll(); } - final RaftProtos.LogEntryProto entry = trx.getLogEntryUnsafe(); - updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); - return CompletableFuture.completedFuture(new RaftTestUtil.SimpleMessage("done")); } - public void unBlockApplyTxn() { - blockOnApply = false; - synchronized (objectToWait) { - objectToWait.notifyAll(); + public void unblockAllTxns() { + unblockAllTxns = true; + synchronized (blockTxns) { + for (Long txnId : blockTxns) { + blockTxns.remove(txnId); + } + blockTxns.notifyAll(); } } } + @Before + public void setup() { + mocked = Mockito.mockStatic(CompletableFuture.class, Mockito.CALLS_REAL_METHODS); + } + + @After + public void tearDownClass() { + if (mocked != null) { + mocked.close(); + } + + } + @Test public void testStateMachineShutdownWaitsForApplyTxn() throws Exception { final RaftProperties prop = getProperties(); @@ -82,10 +141,9 @@ public void testStateMachineShutdownWaitsForApplyTxn() throws Exception { //Unblock leader and one follower ((StateMachineWithConditionalWait)leader.getStateMachine()) - .unBlockApplyTxn(); + .unblockAllTxns(); ((StateMachineWithConditionalWait)cluster. - getFollowers().get(0).getStateMachine()).unBlockApplyTxn(); - + getFollowers().get(0).getStateMachine()).unblockAllTxns(); cluster.getLeaderAndSendFirstMessage(true); try (final RaftClient client = cluster.createClient(leaderId)) { @@ -107,16 +165,30 @@ public void testStateMachineShutdownWaitsForApplyTxn() throws Exception { final Thread t = new Thread(secondFollower::close); t.start(); - // The second follower should still be blocked in apply transaction - Assert.assertTrue(secondFollower.getInfo().getLastAppliedIndex() < logIndex); + // Now unblock the second follower - ((StateMachineWithConditionalWait) secondFollower.getStateMachine()) - .unBlockApplyTxn(); + long minIndex = ((StateMachineWithConditionalWait) secondFollower.getStateMachine()).blockTxns.stream() + .min(Comparator.naturalOrder()).get(); + Assert.assertEquals(2, StateMachineWithConditionalWait.numTxns.values().stream() + .filter(val -> val.get() == 3).count()); + // The second follower should still be blocked in apply transaction + Assert.assertTrue(secondFollower.getInfo().getLastAppliedIndex() < minIndex); + for (long index : ((StateMachineWithConditionalWait) secondFollower.getStateMachine()).blockTxns) { + if (minIndex != index) { + ((StateMachineWithConditionalWait) secondFollower.getStateMachine()).unBlockApplyTxn(index); + } + } + Assert.assertEquals(2, StateMachineWithConditionalWait.numTxns.values().stream() + .filter(val -> val.get() == 3).count()); + Assert.assertTrue(secondFollower.getInfo().getLastAppliedIndex() < minIndex); + ((StateMachineWithConditionalWait) secondFollower.getStateMachine()).unBlockApplyTxn(minIndex); // Now wait for the thread t.join(5000); Assert.assertEquals(logIndex, secondFollower.getInfo().getLastAppliedIndex()); + Assert.assertEquals(3, StateMachineWithConditionalWait.numTxns.values().stream() + .filter(val -> val.get() == 3).count()); cluster.shutdown(); } From b635b137329f011e715c9083e93c7bb7f22b221f Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Tue, 11 Feb 2025 13:56:06 -0800 Subject: [PATCH 06/10] RATIS-2245. Fix exception handling --- .../ratis/server/impl/StateMachineUpdater.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 6583c65508..8d6a140501 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -197,10 +197,12 @@ public void run() { stop(); } } catch (Throwable t) { - try { - applyLogFutures.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.info("{}: interrupted while waiting for apply transactions", this, t); + if (!(t instanceof InterruptedException) && !(t instanceof ExecutionException)) { + try { + applyLogFutures.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.info("Exception thrown : {} while waiting for apply transactions", e, t); + } } if (t instanceof InterruptedException && state == State.STOP) { Thread.currentThread().interrupt(); @@ -266,7 +268,11 @@ private CompletableFuture applyLog(CompletableFuture applyLogFutures final long incremented = appliedIndex.incrementAndGet(debugIndexChange); Preconditions.assertTrue(incremented == nextIndex); if (f != null) { - applyLogFutures = applyLogFutures.thenCombine(f, (v, message) -> null); + applyLogFutures = applyLogFutures.thenCombine(f.exceptionally(ex -> { + LOG.error("Exception while {}: applying txn index={}, nextLog={}", this, nextIndex, + LogProtoUtils.toLogEntryString(entry)); + return null; + }), (v, message) -> null); f.thenAccept(m -> notifyAppliedIndex(incremented)); } else { notifyAppliedIndex(incremented); From fcbf2136afa55b4df03ca8ffeb251beded11d02f Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Mon, 17 Feb 2025 18:24:40 -0800 Subject: [PATCH 07/10] RATIS-2245. Fix exception handling and group removal --- .../apache/ratis/statemachine/StateMachine.java | 8 ++++++++ .../apache/ratis/server/impl/RaftServerImpl.java | 8 +++++++- .../ratis/server/impl/StateMachineUpdater.java | 14 +++++++------- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 3960ab8287..c638d789cc 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -244,6 +244,14 @@ default void notifyConfigurationChanged(long term, long index, RaftConfiguration */ default void notifyGroupRemove() {} + /** + * Notify the {@link StateMachine} a group removal event. + * @param applyTransactionFailure true if there was a failure in applying transaction. + */ + default void notifyGroupRemove(boolean applyTransactionFailure) { + notifyGroupRemove(); + } + /** * Notify the {@link StateMachine} that a log operation failed. * diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c67329cae1..2127353075 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -481,6 +481,12 @@ void groupRemove(boolean deleteDirectory, boolean renameDirectory) { /* Shutdown is triggered here inorder to avoid any locked files. */ state.getStateMachineUpdater().setRemoving(); close(); + if (state.getStateMachineUpdater().getError() != null) { + LOG.error("{}: groupRemove failed: {}. Falling back to directory rename.", + getGroup().getGroupId(), getMemberId(), state.getStateMachineUpdater().getError()); + deleteDirectory = false; + renameDirectory = true; + } try { closeFinishedLatch.await(); } catch (InterruptedException e) { @@ -488,7 +494,7 @@ void groupRemove(boolean deleteDirectory, boolean renameDirectory) { LOG.warn("{}: Waiting closing interrupted, will not continue to remove group locally", getMemberId()); return; } - getStateMachine().event().notifyGroupRemove(); + getStateMachine().event().notifyGroupRemove(state.getStateMachineUpdater().getError() != null); if (deleteDirectory) { for (int i = 0; i < FileUtils.NUM_ATTEMPTS; i ++) { try { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 8d6a140501..f2c3aa1074 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -21,6 +21,7 @@ import org.apache.ratis.metrics.Timekeeper; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.exceptions.RaftException; import org.apache.ratis.protocol.exceptions.StateMachineException; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; @@ -84,6 +85,7 @@ enum State { private final RaftLogIndex snapshotIndex; private final AtomicReference stopIndex = new AtomicReference<>(); private volatile State state = State.RUNNING; + private Throwable error; private final SnapshotRetentionPolicy snapshotRetentionPolicy; @@ -197,18 +199,12 @@ public void run() { stop(); } } catch (Throwable t) { - if (!(t instanceof InterruptedException) && !(t instanceof ExecutionException)) { - try { - applyLogFutures.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.info("Exception thrown : {} while waiting for apply transactions", e, t); - } - } if (t instanceof InterruptedException && state == State.STOP) { Thread.currentThread().interrupt(); LOG.info("{} was interrupted. Exiting ...", this); } else { state = State.EXCEPTION; + error = t; LOG.error(this + " caught a Throwable.", t); server.close(); } @@ -382,4 +378,8 @@ long getStateMachineLastAppliedIndex() { .map(TermIndex::getIndex) .orElse(RaftLog.INVALID_LOG_INDEX); } + + public Throwable getError() { + return error; + } } From f956ea64bfcdde6e95a2ec515bbb774ac8767475 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Wed, 19 Feb 2025 09:54:45 -0800 Subject: [PATCH 08/10] RATIS-2245. Fix checkstyle --- .../java/org/apache/ratis/server/impl/StateMachineUpdater.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index f2c3aa1074..5a517ee3eb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -21,7 +21,6 @@ import org.apache.ratis.metrics.Timekeeper; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.exceptions.RaftException; import org.apache.ratis.protocol.exceptions.StateMachineException; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; From c6f5fe3e520aa00319a43b98e09bc79d37c6edf5 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Wed, 19 Feb 2025 14:12:16 -0800 Subject: [PATCH 09/10] RATIS-2245. Address review comments --- .../org/apache/ratis/statemachine/StateMachine.java | 8 -------- .../org/apache/ratis/server/impl/RaftServerImpl.java | 8 +------- .../apache/ratis/server/impl/StateMachineUpdater.java | 11 +++-------- 3 files changed, 4 insertions(+), 23 deletions(-) diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java index c638d789cc..3960ab8287 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -244,14 +244,6 @@ default void notifyConfigurationChanged(long term, long index, RaftConfiguration */ default void notifyGroupRemove() {} - /** - * Notify the {@link StateMachine} a group removal event. - * @param applyTransactionFailure true if there was a failure in applying transaction. - */ - default void notifyGroupRemove(boolean applyTransactionFailure) { - notifyGroupRemove(); - } - /** * Notify the {@link StateMachine} that a log operation failed. * diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 2127353075..c67329cae1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -481,12 +481,6 @@ void groupRemove(boolean deleteDirectory, boolean renameDirectory) { /* Shutdown is triggered here inorder to avoid any locked files. */ state.getStateMachineUpdater().setRemoving(); close(); - if (state.getStateMachineUpdater().getError() != null) { - LOG.error("{}: groupRemove failed: {}. Falling back to directory rename.", - getGroup().getGroupId(), getMemberId(), state.getStateMachineUpdater().getError()); - deleteDirectory = false; - renameDirectory = true; - } try { closeFinishedLatch.await(); } catch (InterruptedException e) { @@ -494,7 +488,7 @@ void groupRemove(boolean deleteDirectory, boolean renameDirectory) { LOG.warn("{}: Waiting closing interrupted, will not continue to remove group locally", getMemberId()); return; } - getStateMachine().event().notifyGroupRemove(state.getStateMachineUpdater().getError() != null); + getStateMachine().event().notifyGroupRemove(); if (deleteDirectory) { for (int i = 0; i < FileUtils.NUM_ATTEMPTS; i ++) { try { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 5a517ee3eb..c561625c4c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -84,7 +84,6 @@ enum State { private final RaftLogIndex snapshotIndex; private final AtomicReference stopIndex = new AtomicReference<>(); private volatile State state = State.RUNNING; - private Throwable error; private final SnapshotRetentionPolicy snapshotRetentionPolicy; @@ -203,7 +202,6 @@ public void run() { LOG.info("{} was interrupted. Exiting ...", this); } else { state = State.EXCEPTION; - error = t; LOG.error(this + " caught a Throwable.", t); server.close(); } @@ -263,11 +261,12 @@ private CompletableFuture applyLog(CompletableFuture applyLogFutures final long incremented = appliedIndex.incrementAndGet(debugIndexChange); Preconditions.assertTrue(incremented == nextIndex); if (f != null) { - applyLogFutures = applyLogFutures.thenCombine(f.exceptionally(ex -> { + CompletableFuture exceptionHandledFuture = f.exceptionally(ex -> { LOG.error("Exception while {}: applying txn index={}, nextLog={}", this, nextIndex, LogProtoUtils.toLogEntryString(entry)); return null; - }), (v, message) -> null); + }); + applyLogFutures = applyLogFutures.thenCombine(exceptionHandledFuture, (v, message) -> null); f.thenAccept(m -> notifyAppliedIndex(incremented)); } else { notifyAppliedIndex(incremented); @@ -377,8 +376,4 @@ long getStateMachineLastAppliedIndex() { .map(TermIndex::getIndex) .orElse(RaftLog.INVALID_LOG_INDEX); } - - public Throwable getError() { - return error; - } } From 17be3939eae53b6b271ef8db213deecae3dab448 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Wed, 19 Feb 2025 14:16:15 -0800 Subject: [PATCH 10/10] RATIS-2245. Add log for debuggability --- .../java/org/apache/ratis/server/impl/StateMachineUpdater.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index c561625c4c..d0d5442663 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -263,7 +263,7 @@ private CompletableFuture applyLog(CompletableFuture applyLogFutures if (f != null) { CompletableFuture exceptionHandledFuture = f.exceptionally(ex -> { LOG.error("Exception while {}: applying txn index={}, nextLog={}", this, nextIndex, - LogProtoUtils.toLogEntryString(entry)); + LogProtoUtils.toLogEntryString(entry), ex); return null; }); applyLogFutures = applyLogFutures.thenCombine(exceptionHandledFuture, (v, message) -> null);