From 0cf74cd2f02b679f5ca10c66808ef38abc8dcdbb Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Fri, 18 Jul 2025 23:19:50 +0200 Subject: [PATCH 01/18] refactor: rewrite EligibleLeaderReplicasIntegrationTest with new test infra --- ...EligibleLeaderReplicasIntegrationTest.java | 159 ++++++++---------- .../kafka/common/test/ClusterInstance.java | 17 ++ 2 files changed, 85 insertions(+), 91 deletions(-) diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java index 28c12cf6bceea..87c9339980f8b 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java @@ -15,10 +15,7 @@ * limitations under the License. */ package kafka.server.integration; -import kafka.integration.KafkaServerTestHarness; import kafka.server.KafkaBroker; -import kafka.server.KafkaConfig; -import kafka.utils.Logging; import kafka.utils.TestUtils; import org.apache.kafka.clients.CommonClientConfigs; @@ -41,16 +38,20 @@ import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestInfo; import java.io.File; @@ -65,9 +66,7 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; -import scala.collection.JavaConverters; import scala.collection.Seq; -import scala.collection.mutable.HashMap; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -75,58 +74,35 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging { +public class EligibleLeaderReplicasIntegrationTest { private String bootstrapServer; private String testTopicName; private Admin adminClient; - @Override - public MetadataVersion metadataVersion() { - return MetadataVersion.IBP_4_0_IV1; - } + private final ClusterInstance clusterInstance; - @Override - public Seq generateConfigs() { - List brokerConfigs = new ArrayList<>(); - brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( - 5, // The tests require 4 brokers to host the partition. However, we need the 5th broker to handle the admin client requests. - true, - true, - scala.Option.empty(), - scala.Option.empty(), - scala.Option.empty(), - true, - false, - false, - false, - new HashMap<>(), - 1, - false, - 1, - (short) 4, - 0, - false - ))); - List configs = new ArrayList<>(); - for (Properties props : brokerConfigs) { - configs.add(KafkaConfig.fromProps(props)); - } - return JavaConverters.asScalaBuffer(configs).toSeq(); - } + @ClusterTest( + types = {Type.KRAFT}, + metadataVersion = MetadataVersion.IBP_4_0_IV1, + brokers = 5, + serverProperties = { + @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4"), + } + ) @BeforeEach - @Override public void setUp(TestInfo info) { - super.setUp(info); // create adminClient Properties props = new Properties(); - bootstrapServer = bootstrapServers(listenerName()); + //bootstrapServer = bootstrapServers(listenerName()); + bootstrapServer = clusterInstance.bootstrapServers(listenerName()); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); adminClient = Admin.create(props); adminClient.updateFeatures( - Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, - new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() ); testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); } @@ -136,11 +112,12 @@ public void close() throws Exception { if (adminClient != null) adminClient.close(); } - @Test - public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { + @ClusterTest + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -180,8 +157,8 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); waitUntilOneMessageIsConsumed(consumer); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 2 && elrSize == 1; @@ -193,15 +170,15 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count()); // Restore the min ISR and the previous log should be visible. - startBroker(initialReplicas.get(1).id()); - startBroker(initialReplicas.get(0).id()); + clusterInstance.startBroker(initialReplicas.get(1).id()); + clusterInstance.startBroker(initialReplicas.get(0).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 4 && elrSize == 0; }); waitUntilOneMessageIsConsumed(consumer); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); if (consumer != null) consumer.close(); if (producer != null) producer.close(); } @@ -222,11 +199,12 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { ); } - @Test - public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { + @ClusterTest + public void testElrMemberCanBeElected(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -244,15 +222,15 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx assertEquals(0, topicPartitionInfo.elr().size()); assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 1 && elrSize == 2; }); - killBroker(initialReplicas.get(3).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 0 && elrSize == 3; @@ -270,7 +248,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx int expectLeader = topicPartitionInfo.elr().stream() .filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id(); - startBroker(expectLeader); + clusterInstance.startBroker(expectLeader); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 1 && elrSize == 2; }); @@ -282,7 +260,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx // Start another 2 brokers and the ELR fields should be cleaned. topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) - .forEach(node -> startBroker(node.id())); + .forEach(node -> clusterInstance.startBroker(node.id())); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 3 && elrSize == 0; @@ -293,15 +271,16 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); } } - @Test - public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { + @ClusterTest + public void testElrMemberShouldBeKickOutWhenUncleanShutdown(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -319,10 +298,10 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx assertEquals(0, topicPartitionInfo.elr().size()); assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); - killBroker(initialReplicas.get(3).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 0 && elrSize == 3; @@ -331,9 +310,8 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx .allTopicNames().get().get(testTopicName).partitions().get(0); int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); - KafkaBroker broker = brokers().find(b -> { - return b.config().brokerId() == brokerToBeUncleanShutdown; - }).get(); + KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown).findFirst() + .orElseThrow(() -> new RuntimeException("No broker found")); Seq dirs = broker.logManager().liveLogDirs(); assertEquals(1, dirs.size()); CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); @@ -341,7 +319,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx assertDoesNotThrow(() -> handler.delete()); // After remove the clean shutdown file, the broker should report unclean shutdown during restart. - startBroker(brokerToBeUncleanShutdown); + clusterInstance.startBroker(brokerToBeUncleanShutdown); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 0 && elrSize == 2; }); @@ -350,18 +328,19 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx assertNull(topicPartitionInfo.leader()); assertEquals(1, topicPartitionInfo.lastKnownElr().size()); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); } } /* This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. */ - @Test - public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { + @ClusterTest + public void testLastKnownLeaderShouldBeElectedIfEmptyElr(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -379,10 +358,10 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep assertEquals(0, topicPartitionInfo.elr().size()); assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); - killBroker(initialReplicas.get(3).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 0 && elrSize == 3; @@ -392,20 +371,18 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); Set initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet()); - brokers().foreach(broker -> { + for (KafkaBroker broker : clusterInstance.brokers().values()) { if (initialReplicaSet.contains(broker.config().brokerId())) { Seq dirs = broker.logManager().liveLogDirs(); assertEquals(1, dirs.size()); CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); assertDoesNotThrow(() -> handler.delete()); } - return true; - }); - + } // After remove the clean shutdown file, the broker should report unclean shutdown during restart. topicPartitionInfo.replicas().forEach(replica -> { - if (replica.id() != lastKnownLeader) startBroker(replica.id()); + if (replica.id() != lastKnownLeader) clusterInstance.startBroker(replica.id()); }); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 0 && elrSize == 1; @@ -416,7 +393,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep assertEquals(1, topicPartitionInfo.lastKnownElr().size()); // Now if the last known leader goes through unclean shutdown, it will still be elected. - startBroker(lastKnownLeader); + clusterInstance.startBroker(lastKnownLeader); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize > 0 && elrSize == 0; }); @@ -436,7 +413,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep DEFAULT_MAX_WAIT_MS, 100L ); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); } } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index ceb30af6e97b6..299c4c3a8d856 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -71,7 +71,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import scala.collection.Seq; import scala.jdk.javaapi.CollectionConverters; +import scala.collection.JavaConverters; import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; @@ -444,6 +446,21 @@ default List boundPorts() { .map(KafkaBroker::socketServer) .map(s -> s.boundPort(clientListener())) .toList(); + } + + default void restartDeadBrokers() { + for (Map.Entry entry : brokers().entrySet()) { + int brokerId = entry.getKey(); + KafkaBroker broker = entry.getValue(); + + if (broker.isShutdown()) { + startBroker(brokerId); + } + } + } + default String bootstrapServers(ListenerName listenerName){ + Seq brokerSeq = new ArrayList<>(brokers().values()).asScala().toSeq(); + kafka.utils.TestUtils.bootstrapServers(brokerSeq, listenerName); } } From 9dd08e5a35ee5ea3db5c41bbaae92a902ff24fdf Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Mon, 21 Jul 2025 21:06:42 +0200 Subject: [PATCH 02/18] rewrite the logic of ClusterTestDefaults and restartDeadBrokers --- ...EligibleLeaderReplicasIntegrationTest.java | 43 ++++++++++--------- .../kafka/common/test/ClusterInstance.java | 17 +++----- 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java index 87c9339980f8b..00b945707c2c5 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java @@ -43,15 +43,14 @@ import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterConfigProperty; import org.apache.kafka.common.test.api.ClusterTest; -import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; -import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.config.ReplicationConfigs; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; import java.io.File; @@ -74,6 +73,15 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +@ClusterTestDefaults( + brokers = 5, + serverProperties = { + @ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ServerConfigs.BROKER_RACK_DOC, value = "new HashMap<>()"), + @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4"), + } +) public class EligibleLeaderReplicasIntegrationTest { private String bootstrapServer; private String testTopicName; @@ -81,28 +89,21 @@ public class EligibleLeaderReplicasIntegrationTest { private final ClusterInstance clusterInstance; - @ClusterTest( - types = {Type.KRAFT}, - metadataVersion = MetadataVersion.IBP_4_0_IV1, - brokers = 5, - serverProperties = { - @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"), - @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4"), - } - ) + EligibleLeaderReplicasIntegrationTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } @BeforeEach public void setUp(TestInfo info) { // create adminClient Properties props = new Properties(); - //bootstrapServer = bootstrapServers(listenerName()); - bootstrapServer = clusterInstance.bootstrapServers(listenerName()); + bootstrapServer = clusterInstance.bootstrapServers(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); adminClient = Admin.create(props); adminClient.updateFeatures( - Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, - new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() ); testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); } @@ -113,7 +114,7 @@ public void close() throws Exception { } @ClusterTest - public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); @@ -200,7 +201,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { } @ClusterTest - public void testElrMemberCanBeElected(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); @@ -276,7 +277,7 @@ public void testElrMemberCanBeElected(ClusterInstance clusterInstance) throws Ex } @ClusterTest - public void testElrMemberShouldBeKickOutWhenUncleanShutdown(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); @@ -336,7 +337,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown(ClusterInstance clus This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. */ @ClusterTest - public void testLastKnownLeaderShouldBeElectedIfEmptyElr(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 299c4c3a8d856..11bcd09628924 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -35,6 +35,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; @@ -71,9 +72,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import scala.collection.Seq; import scala.jdk.javaapi.CollectionConverters; -import scala.collection.JavaConverters; import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; @@ -449,18 +448,12 @@ default List boundPorts() { } default void restartDeadBrokers() { + if (brokers().isEmpty()) + throw new KafkaException("Must supply at least one server config."); for (Map.Entry entry : brokers().entrySet()) { - int brokerId = entry.getKey(); - KafkaBroker broker = entry.getValue(); - - if (broker.isShutdown()) { - startBroker(brokerId); + if (entry.getValue().isShutdown()) { + startBroker(entry.getKey()); } } } - - default String bootstrapServers(ListenerName listenerName){ - Seq brokerSeq = new ArrayList<>(brokers().values()).asScala().toSeq(); - kafka.utils.TestUtils.bootstrapServers(brokerSeq, listenerName); - } } From 9aa594e749398fd4eabc458081739d59622269aa Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Tue, 22 Jul 2025 08:29:07 +0200 Subject: [PATCH 03/18] Rewrite the logic of restartDeadBrokers and replace JavaConverters.asScalaBuffer with CollectionConverters.asScala --- .../EligibleLeaderReplicasIntegrationTest.java | 9 +++++---- .../org/apache/kafka/common/test/ClusterInstance.java | 11 ++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java index 00b945707c2c5..fca1a0b8bc979 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java @@ -66,6 +66,7 @@ import java.util.stream.Collectors; import scala.collection.Seq; +import scala.jdk.javaapi.CollectionConverters; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -117,7 +118,7 @@ public void close() throws Exception { public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); @@ -204,7 +205,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); @@ -280,7 +281,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); @@ -340,7 +341,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 11bcd09628924..1d3a86e0abb42 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -448,12 +448,13 @@ default List boundPorts() { } default void restartDeadBrokers() { - if (brokers().isEmpty()) + if (brokers().isEmpty()){ throw new KafkaException("Must supply at least one server config."); - for (Map.Entry entry : brokers().entrySet()) { - if (entry.getValue().isShutdown()) { - startBroker(entry.getKey()); - } } + brokers().entrySet().foreach(entry -> { + if (!entry.getValue().isShutdown()) { + startBroker(id); + } + }); } } From eb25348a2e170aaca62b739b1bd9bbed9f836ac7 Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Tue, 22 Jul 2025 23:49:30 +0200 Subject: [PATCH 04/18] Move EligibleLeaderReplicasIntegrationTest to server module, rewrite import-control-server.xml, and fix the bug in restartDeadBrokers --- checkstyle/import-control-server.xml | 5 +- ...EligibleLeaderReplicasIntegrationTest.java | 52 +++++++++---------- .../kafka/common/test/ClusterInstance.java | 4 +- 3 files changed, 28 insertions(+), 33 deletions(-) rename {core/src/test/java/kafka/server/integration => server/src/test/java/org/apache/kafka/server}/EligibleLeaderReplicasIntegrationTest.java (92%) diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index b3d1b928cc6db..fe659cc9d38fe 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -32,9 +32,8 @@ - - - + + diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java similarity index 92% rename from core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java rename to server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index fca1a0b8bc979..165f79b3a5c8b 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.server.integration; +package org.apache.kafka.server; + import kafka.server.KafkaBroker; -import kafka.utils.TestUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; @@ -48,6 +48,7 @@ import org.apache.kafka.server.config.ReplicationConfigs; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -66,7 +67,6 @@ import java.util.stream.Collectors; import scala.collection.Seq; -import scala.jdk.javaapi.CollectionConverters; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -118,8 +118,7 @@ public void close() throws Exception { public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); - TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -186,8 +185,8 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc } } - void waitUntilOneMessageIsConsumed(Consumer consumer) { - TestUtils.waitUntilTrue( + void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedException { + TestUtils.waitForCondition( () -> { try { ConsumerRecords record = consumer.poll(Duration.ofMillis(100L)); @@ -196,8 +195,8 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { return false; } }, - () -> "fail to consume messages", - DEFAULT_MAX_WAIT_MS, 100L + DEFAULT_MAX_WAIT_MS, + () -> "fail to consume messages" ); } @@ -205,8 +204,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); - TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -281,8 +279,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); - TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -312,8 +309,8 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx .allTopicNames().get().get(testTopicName).partitions().get(0); int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); - KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown).findFirst() - .orElseThrow(() -> new RuntimeException("No broker found")); + KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown) + .findFirst().get(); Seq dirs = broker.logManager().liveLogDirs(); assertEquals(1, dirs.size()); CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); @@ -341,8 +338,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); - TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -373,14 +369,14 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); Set initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet()); - for (KafkaBroker broker : clusterInstance.brokers().values()) { - if (initialReplicaSet.contains(broker.config().brokerId())) { + clusterInstance.brokers().forEach((id, broker) -> { + if (initialReplicaSet.contains(id)) { Seq dirs = broker.logManager().liveLogDirs(); assertEquals(1, dirs.size()); CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); assertDoesNotThrow(() -> handler.delete()); } - } + }); // After remove the clean shutdown file, the broker should report unclean shutdown during restart. topicPartitionInfo.replicas().forEach(replica -> { @@ -399,8 +395,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep waitForIsrAndElr((isrSize, elrSize) -> { return isrSize > 0 && elrSize == 0; }); - - TestUtils.waitUntilTrue( + TestUtils.waitForCondition( () -> { try { TopicPartitionInfo partition = adminClient.describeTopics(List.of(testTopicName)) @@ -411,16 +406,16 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep return false; } }, - () -> String.format("Partition metadata for %s is not correct", testTopicName), - DEFAULT_MAX_WAIT_MS, 100L + DEFAULT_MAX_WAIT_MS, + () -> String.format("Partition metadata for %s is not correct", testTopicName) ); } finally { clusterInstance.restartDeadBrokers(); } } - void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied) { - TestUtils.waitUntilTrue( + void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied) throws InterruptedException { + TestUtils.waitForCondition( () -> { try { TopicDescription topicDescription = adminClient.describeTopics(List.of(testTopicName)) @@ -431,7 +426,8 @@ void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatis return false; } }, - () -> String.format("Partition metadata for %s is not propagated", testTopicName), - DEFAULT_MAX_WAIT_MS, 100L); + DEFAULT_MAX_WAIT_MS, + () -> String.format("Partition metadata for %s is not propagated", testTopicName) + ); } } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 15a3cfcbd6136..96710f4caa616 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -451,9 +451,9 @@ default void restartDeadBrokers() { if (brokers().isEmpty()){ throw new KafkaException("Must supply at least one server config."); } - brokers().entrySet().foreach(entry -> { + brokers().entrySet().forEach(entry -> { if (!entry.getValue().isShutdown()) { - startBroker(id); + startBroker(entry.getKey()); } }); } From f3d61ef8ca6a3d8f537a2f54b1e6485bad80caf6 Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Wed, 23 Jul 2025 12:29:05 +0200 Subject: [PATCH 05/18] Fix checkstyleTest error --- .../java/org/apache/kafka/common/test/api/ClusterTest.java | 3 +-- .../java/org/apache/kafka/common/test/ClusterInstance.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java index f81f2739907be..cabd2fbf60802 100644 --- a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java +++ b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java @@ -39,8 +39,7 @@ @TestTemplate @Timeout(60) @Tag("integration") -public @interface ClusterTest { - Type[] types() default {}; +public @interface ClusterTest { Type[] types() default {}; int brokers() default 0; int controllers() default 0; int disksPerBroker() default 0; diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 96710f4caa616..1f2cfa08bc26f 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -448,7 +448,7 @@ default List boundPorts() { } default void restartDeadBrokers() { - if (brokers().isEmpty()){ + if (brokers().isEmpty()) { throw new KafkaException("Must supply at least one server config."); } brokers().entrySet().forEach(entry -> { From e68ee42810e0608f69b2f7bb88efeb203df029d9 Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Wed, 23 Jul 2025 12:44:22 +0200 Subject: [PATCH 06/18] Add metadataVersion to ClusterTest --- .../server/EligibleLeaderReplicasIntegrationTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index 165f79b3a5c8b..d4b748dc22c52 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.config.ReplicationConfigs; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; @@ -114,7 +115,7 @@ public void close() throws Exception { if (adminClient != null) adminClient.close(); } - @ClusterTest + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); @@ -200,7 +201,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedExceptio ); } - @ClusterTest + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); @@ -275,7 +276,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx } } - @ClusterTest + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); @@ -334,7 +335,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx /* This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. */ - @ClusterTest + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); From 4fd332343031b2c4c37a518066345d1b0e6c56b5 Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Wed, 23 Jul 2025 13:13:58 +0200 Subject: [PATCH 07/18] replace KafkaException with InterruptedException --- .../java/org/apache/kafka/common/test/api/ClusterTest.java | 3 ++- .../java/org/apache/kafka/common/test/ClusterInstance.java | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java index cabd2fbf60802..f81f2739907be 100644 --- a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java +++ b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java @@ -39,7 +39,8 @@ @TestTemplate @Timeout(60) @Tag("integration") -public @interface ClusterTest { Type[] types() default {}; +public @interface ClusterTest { + Type[] types() default {}; int brokers() default 0; int controllers() default 0; int disksPerBroker() default 0; diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 1f2cfa08bc26f..a1d36ab44dcf9 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -35,7 +35,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; @@ -447,9 +446,9 @@ default List boundPorts() { .toList(); } - default void restartDeadBrokers() { + default void restartDeadBrokers() throws InterruptedException { if (brokers().isEmpty()) { - throw new KafkaException("Must supply at least one server config."); + throw new IllegalArgumentException("Must supply at least one server config."); } brokers().entrySet().forEach(entry -> { if (!entry.getValue().isShutdown()) { From 493a7c7ad52fa572c4171ac4018e0fbf3aacf628 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Thu, 24 Jul 2025 22:22:01 +0200 Subject: [PATCH 08/18] refacotr: Replace Scala Seq with Java List --- checkstyle/import-control-server.xml | 5 +- ...EligibleLeaderReplicasIntegrationTest.java | 64 ++++++------------- .../kafka/common/test/ClusterInstance.java | 2 +- 3 files changed, 24 insertions(+), 47 deletions(-) diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index fe659cc9d38fe..b3d1b928cc6db 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -32,8 +32,9 @@ - - + + + diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index d4b748dc22c52..913362e82eb3a 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.server; -import kafka.server.KafkaBroker; - import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -67,8 +65,6 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; -import scala.collection.Seq; - import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -162,9 +158,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc clusterInstance.shutdownBroker(initialReplicas.get(0).id()); clusterInstance.shutdownBroker(initialReplicas.get(1).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 2 && elrSize == 1; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1); // Now the partition is under min ISR. HWM should not advance. producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); @@ -174,9 +168,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc // Restore the min ISR and the previous log should be visible. clusterInstance.startBroker(initialReplicas.get(1).id()); clusterInstance.startBroker(initialReplicas.get(0).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 4 && elrSize == 0; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 4 && elrSize == 0); waitUntilOneMessageIsConsumed(consumer); } finally { @@ -227,15 +219,11 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx clusterInstance.shutdownBroker(initialReplicas.get(1).id()); clusterInstance.shutdownBroker(initialReplicas.get(2).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 1 && elrSize == 2; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -250,9 +238,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx .filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id(); clusterInstance.startBroker(expectLeader); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 1 && elrSize == 2; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -263,9 +249,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) .forEach(node -> clusterInstance.startBroker(node.id())); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 3 && elrSize == 0; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 3 && elrSize == 0); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -303,26 +287,23 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx clusterInstance.shutdownBroker(initialReplicas.get(2).id()); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); - KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown) + var broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown) .findFirst().get(); - Seq dirs = broker.logManager().liveLogDirs(); + List dirs = new ArrayList<>(); + broker.logManager().liveLogDirs().foreach(dirs::add); assertEquals(1, dirs.size()); - CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.get(0).toString()); assertTrue(handler.exists()); - assertDoesNotThrow(() -> handler.delete()); + assertDoesNotThrow(handler::delete); // After remove the clean shutdown file, the broker should report unclean shutdown during restart. clusterInstance.startBroker(brokerToBeUncleanShutdown); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 2; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 2); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertNull(topicPartitionInfo.leader()); @@ -362,9 +343,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep clusterInstance.shutdownBroker(initialReplicas.get(2).id()); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); @@ -372,10 +351,11 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep Set initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet()); clusterInstance.brokers().forEach((id, broker) -> { if (initialReplicaSet.contains(id)) { - Seq dirs = broker.logManager().liveLogDirs(); + List dirs = new ArrayList<>(); + broker.logManager().liveLogDirs().foreach(dirs::add); assertEquals(1, dirs.size()); - CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); - assertDoesNotThrow(() -> handler.delete()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.get(0).toString()); + assertDoesNotThrow(handler::delete); } }); @@ -383,9 +363,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep topicPartitionInfo.replicas().forEach(replica -> { if (replica.id() != lastKnownLeader) clusterInstance.startBroker(replica.id()); }); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 1; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 1); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertNull(topicPartitionInfo.leader()); @@ -393,9 +371,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep // Now if the last known leader goes through unclean shutdown, it will still be elected. clusterInstance.startBroker(lastKnownLeader); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize > 0 && elrSize == 0; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize > 0 && elrSize == 0); TestUtils.waitForCondition( () -> { try { diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index a1d36ab44dcf9..32871e5e2926e 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -451,7 +451,7 @@ default void restartDeadBrokers() throws InterruptedException { throw new IllegalArgumentException("Must supply at least one server config."); } brokers().entrySet().forEach(entry -> { - if (!entry.getValue().isShutdown()) { + if (entry.getValue().isShutdown()) { startBroker(entry.getKey()); } }); From 4c17b974846ab3bb53233ff4feda071208ef2e75 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Wed, 6 Aug 2025 20:44:19 +0200 Subject: [PATCH 09/18] refactor: Replace RuntimeException with IllegalArgumentException --- .../java/org/apache/kafka/common/test/ClusterInstance.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 05b6c4735aba3..6ecc075d2a59b 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -448,9 +448,9 @@ default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin, " ms since a leader was not elected for partition " + topicPartition); } - default void restartDeadBrokers() throws InterruptedException { + default void restartDeadBrokers() throws ExecutionException { if (brokers().isEmpty()) { - throw new IllegalArgumentException("Must supply at least one server config."); + throw new RuntimeException("Must supply at least one server config."); } brokers().entrySet().forEach(entry -> { if (entry.getValue().isShutdown()) { From 927bf6554ee2bc95350e177bad796107c89b469c Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Thu, 7 Aug 2025 21:32:48 +0200 Subject: [PATCH 10/18] refacor: Refactor restartDeadBrokers and move it to EligibleLeaderReplicasIntegrationTest --- ...EligibleLeaderReplicasIntegrationTest.java | 20 ++++++++++++++----- .../kafka/common/test/ClusterInstance.java | 11 ---------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index 913362e82eb3a..3106be2fe84ea 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -62,6 +62,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -162,7 +163,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc // Now the partition is under min ISR. HWM should not advance. producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); - Thread.sleep(100); + TimeUnit.MILLISECONDS.sleep(100); assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count()); // Restore the min ISR and the previous log should be visible. @@ -172,7 +173,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc waitUntilOneMessageIsConsumed(consumer); } finally { - clusterInstance.restartDeadBrokers(); + restartDeadBrokers(); if (consumer != null) consumer.close(); if (producer != null) producer.close(); } @@ -256,7 +257,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); } finally { - clusterInstance.restartDeadBrokers(); + restartDeadBrokers(); } } @@ -309,7 +310,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx assertNull(topicPartitionInfo.leader()); assertEquals(1, topicPartitionInfo.lastKnownElr().size()); } finally { - clusterInstance.restartDeadBrokers(); + restartDeadBrokers(); } } @@ -387,7 +388,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep () -> String.format("Partition metadata for %s is not correct", testTopicName) ); } finally { - clusterInstance.restartDeadBrokers(); + restartDeadBrokers(); } } @@ -407,4 +408,13 @@ void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatis () -> String.format("Partition metadata for %s is not propagated", testTopicName) ); } + + void restartDeadBrokers() { + if (clusterInstance.brokers().isEmpty()) { + throw new RuntimeException("Must supply at least one server config."); + } + clusterInstance.brokers().forEach((key, value) -> { + if (value.isShutdown()) value.startup(); + }); + } } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 6ecc075d2a59b..7662eeda7a30c 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -447,15 +447,4 @@ default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin, throw new AssertionError("Timing out after " + timeoutMs + " ms since a leader was not elected for partition " + topicPartition); } - - default void restartDeadBrokers() throws ExecutionException { - if (brokers().isEmpty()) { - throw new RuntimeException("Must supply at least one server config."); - } - brokers().entrySet().forEach(entry -> { - if (entry.getValue().isShutdown()) { - startBroker(entry.getKey()); - } - }); - } } From 2055c18ab07451bcbdd4962bddbeb25c44c62011 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Mon, 11 Aug 2025 19:18:52 +0200 Subject: [PATCH 11/18] refactor: Rewrite EligibleLeaderReplicasIntegrationTest by try-with-resource --- ...EligibleLeaderReplicasIntegrationTest.java | 259 ++++++++---------- 1 file changed, 113 insertions(+), 146 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index 3106be2fe84ea..c13bceb76849d 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.server; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.ConfigEntry; @@ -27,9 +26,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; @@ -49,17 +45,12 @@ import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestInfo; - import java.io.File; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -77,57 +68,50 @@ serverProperties = { @ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"), @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"), - @ClusterConfigProperty(key = ServerConfigs.BROKER_RACK_DOC, value = "new HashMap<>()"), - @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4"), + @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4") } ) public class EligibleLeaderReplicasIntegrationTest { - private String bootstrapServer; - private String testTopicName; - private Admin adminClient; - private final ClusterInstance clusterInstance; + private String testTopicName; EligibleLeaderReplicasIntegrationTest(ClusterInstance clusterInstance) { this.clusterInstance = clusterInstance; } - @BeforeEach - public void setUp(TestInfo info) { - // create adminClient - Properties props = new Properties(); - bootstrapServer = clusterInstance.bootstrapServers(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - adminClient = Admin.create(props); - adminClient.updateFeatures( - Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, - new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() - ); - testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); - } - - @AfterEach - public void close() throws Exception { - if (adminClient != null) adminClient.close(); - } - @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { - adminClient.createTopics( - List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - clusterInstance.waitTopicCreation(testTopicName, 1); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Collection ops = new ArrayList<>(); - ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); - Map> configOps = Map.of(configResource, ops); - // alter configs on target cluster - adminClient.incrementalAlterConfigs(configOps).all().get(); - Producer producer = null; - Consumer consumer = null; - try { - TopicDescription testTopicDescription = adminClient.describeTopics(List.of(testTopicName)) + try (var admin = clusterInstance.admin(); + var producer = clusterInstance.producer(Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(), + ProducerConfig.ACKS_CONFIG, "1")); + var consumer = clusterInstance.consumer(Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, "test", + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()))) { + testTopicName = String.format("%s-%s", "testHighWatermarkShouldNotAdvanceIfUnderMinIsr", "ELR-test"); + admin.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() + ); + + admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + clusterInstance.waitTopicCreation(testTopicName, 1); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map> configOps = Map.of(configResource, ops); + // alter configs on target cluster + admin.incrementalAlterConfigs(configOps).all().get(); + + TopicDescription testTopicDescription = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName); TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); List initialReplicas = topicPartitionInfo.replicas(); @@ -135,31 +119,14 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc assertEquals(0, topicPartitionInfo.elr().size()); assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - Properties producerProps = new Properties(); - producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - // Use Ack=1 for the producer. - producerProps.put(ProducerConfig.ACKS_CONFIG, "1"); - producer = new KafkaProducer(producerProps); - - Properties consumerProps = new Properties(); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); - consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10"); - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Set.of(testTopicName)); - producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); waitUntilOneMessageIsConsumed(consumer); clusterInstance.shutdownBroker(initialReplicas.get(0).id()); clusterInstance.shutdownBroker(initialReplicas.get(1).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1, admin); // Now the partition is under min ISR. HWM should not advance. producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); @@ -169,17 +136,13 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc // Restore the min ISR and the previous log should be visible. clusterInstance.startBroker(initialReplicas.get(1).id()); clusterInstance.startBroker(initialReplicas.get(0).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 4 && elrSize == 0); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 4 && elrSize == 0, admin); waitUntilOneMessageIsConsumed(consumer); - } finally { - restartDeadBrokers(); - if (consumer != null) consumer.close(); - if (producer != null) producer.close(); } } - void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedException { + void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedException { TestUtils.waitForCondition( () -> { try { @@ -196,19 +159,25 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedExceptio @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { - adminClient.createTopics( - List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - clusterInstance.waitTopicCreation(testTopicName, 1); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Collection ops = new ArrayList<>(); - ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); - Map> configOps = Map.of(configResource, ops); - // alter configs on target cluster - adminClient.incrementalAlterConfigs(configOps).all().get(); - - try { - TopicDescription testTopicDescription = adminClient.describeTopics(List.of(testTopicName)) + try (var admin = clusterInstance.admin()) { + testTopicName = String.format("%s-%s", "testElrMemberCanBeElected", "ELR-test"); + + admin.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() + ); + admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + clusterInstance.waitTopicCreation(testTopicName, 1); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map> configOps = Map.of(configResource, ops); + // alter configs on target cluster + admin.incrementalAlterConfigs(configOps).all().get(); + + TopicDescription testTopicDescription = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName); TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); List initialReplicas = topicPartitionInfo.replicas(); @@ -220,13 +189,13 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx clusterInstance.shutdownBroker(initialReplicas.get(1).id()); clusterInstance.shutdownBroker(initialReplicas.get(2).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2, admin); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertEquals(1, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); int expectLastKnownLeader = initialReplicas.get(3).id(); @@ -239,9 +208,9 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx .filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id(); clusterInstance.startBroker(expectLeader); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2, admin); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); @@ -250,32 +219,36 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) .forEach(node -> clusterInstance.startBroker(node.id())); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 3 && elrSize == 0); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 3 && elrSize == 0, admin); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); - } finally { - restartDeadBrokers(); } } @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { - adminClient.createTopics( - List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - clusterInstance.waitTopicCreation(testTopicName, 1); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Collection ops = new ArrayList<>(); - ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); - Map> configOps = Map.of(configResource, ops); - // alter configs on target cluster - adminClient.incrementalAlterConfigs(configOps).all().get(); - - try { - TopicDescription testTopicDescription = adminClient.describeTopics(List.of(testTopicName)) + try (var admin = clusterInstance.admin()) { + testTopicName = String.format("%s-%s", "testElrMemberShouldBeKickOutWhenUncleanShutdown", "ELR-test"); + + admin.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() + ); + admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + clusterInstance.waitTopicCreation(testTopicName, 1); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map> configOps = Map.of(configResource, ops); + // alter configs on target cluster + admin.incrementalAlterConfigs(configOps).all().get(); + + TopicDescription testTopicDescription = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName); TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); List initialReplicas = topicPartitionInfo.replicas(); @@ -288,8 +261,8 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx clusterInstance.shutdownBroker(initialReplicas.get(2).id()); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin); + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); @@ -304,13 +277,11 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx // After remove the clean shutdown file, the broker should report unclean shutdown during restart. clusterInstance.startBroker(brokerToBeUncleanShutdown); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 2); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 2, admin); + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertNull(topicPartitionInfo.leader()); assertEquals(1, topicPartitionInfo.lastKnownElr().size()); - } finally { - restartDeadBrokers(); } } @@ -319,19 +290,26 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx */ @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { - adminClient.createTopics( - List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - clusterInstance.waitTopicCreation(testTopicName, 1); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Collection ops = new ArrayList<>(); - ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); - Map> configOps = Map.of(configResource, ops); - // alter configs on target cluster - adminClient.incrementalAlterConfigs(configOps).all().get(); - - try { - TopicDescription testTopicDescription = adminClient.describeTopics(List.of(testTopicName)) + try (var admin = clusterInstance.admin()) { + testTopicName = String.format("%s-%s", "testLastKnownLeaderShouldBeElectedIfEmptyElr", "ELR-test"); + + admin.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() + ); + admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + clusterInstance.waitTopicCreation(testTopicName, 1); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map> configOps = Map.of(configResource, ops); + // alter configs on target cluster + admin.incrementalAlterConfigs(configOps).all().get(); + + + TopicDescription testTopicDescription = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName); TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); List initialReplicas = topicPartitionInfo.replicas(); @@ -344,8 +322,8 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep clusterInstance.shutdownBroker(initialReplicas.get(2).id()); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin); + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); @@ -364,19 +342,19 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep topicPartitionInfo.replicas().forEach(replica -> { if (replica.id() != lastKnownLeader) clusterInstance.startBroker(replica.id()); }); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 1); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 1, admin); + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertNull(topicPartitionInfo.leader()); assertEquals(1, topicPartitionInfo.lastKnownElr().size()); // Now if the last known leader goes through unclean shutdown, it will still be elected. clusterInstance.startBroker(lastKnownLeader); - waitForIsrAndElr((isrSize, elrSize) -> isrSize > 0 && elrSize == 0); + waitForIsrAndElr((isrSize, elrSize) -> isrSize > 0 && elrSize == 0, admin); TestUtils.waitForCondition( () -> { try { - TopicPartitionInfo partition = adminClient.describeTopics(List.of(testTopicName)) + TopicPartitionInfo partition = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); if (partition.leader() == null) return false; return partition.lastKnownElr().isEmpty() && partition.elr().isEmpty() && partition.leader().id() == lastKnownLeader; @@ -387,16 +365,14 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep DEFAULT_MAX_WAIT_MS, () -> String.format("Partition metadata for %s is not correct", testTopicName) ); - } finally { - restartDeadBrokers(); } } - void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied) throws InterruptedException { + void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied, Admin admin) throws InterruptedException { TestUtils.waitForCondition( () -> { try { - TopicDescription topicDescription = adminClient.describeTopics(List.of(testTopicName)) + TopicDescription topicDescription = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName); TopicPartitionInfo partition = topicDescription.partitions().get(0); return isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size()); @@ -408,13 +384,4 @@ void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatis () -> String.format("Partition metadata for %s is not propagated", testTopicName) ); } - - void restartDeadBrokers() { - if (clusterInstance.brokers().isEmpty()) { - throw new RuntimeException("Must supply at least one server config."); - } - clusterInstance.brokers().forEach((key, value) -> { - if (value.isShutdown()) value.startup(); - }); - } } From 18096883b715b9447c14b901feccd4de51cadd26 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Wed, 13 Aug 2025 21:19:04 +0200 Subject: [PATCH 12/18] refactor: Refactor waitUntilOneMessageIsConsumed and Restrict testHighWatermarkShouldNotAdvanceIfUnderMinIsr to Type = KRAFT to avoid running unnecessary test types --- .../server/EligibleLeaderReplicasIntegrationTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index c13bceb76849d..e25bf61a8d492 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.clients.admin.UpdateFeaturesOptions; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; @@ -38,6 +37,7 @@ import org.apache.kafka.common.test.api.ClusterConfigProperty; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.config.ReplicationConfigs; @@ -79,7 +79,7 @@ public class EligibleLeaderReplicasIntegrationTest { this.clusterInstance = clusterInstance; } - @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) + @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { try (var admin = clusterInstance.admin(); var producer = clusterInstance.producer(Map.of( @@ -146,8 +146,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedEx TestUtils.waitForCondition( () -> { try { - ConsumerRecords record = consumer.poll(Duration.ofMillis(100L)); - return record.count() >= 1; + return consumer.poll(Duration.ofMillis(100L)).count() >= 1; } catch (Exception e) { return false; } From 4cef671afe65ee3baea8dd680060546500a1b563 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Thu, 14 Aug 2025 10:35:32 +0200 Subject: [PATCH 13/18] refacotr: Restrict tests to Type = KRAFT to avoid running unnecessary test types --- .../kafka/server/EligibleLeaderReplicasIntegrationTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index e25bf61a8d492..f55990d179192 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -156,7 +156,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedEx ); } - @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) + @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { try (var admin = clusterInstance.admin()) { testTopicName = String.format("%s-%s", "testElrMemberCanBeElected", "ELR-test"); @@ -227,7 +227,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx } } - @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) + @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { try (var admin = clusterInstance.admin()) { testTopicName = String.format("%s-%s", "testElrMemberShouldBeKickOutWhenUncleanShutdown", "ELR-test"); @@ -287,7 +287,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx /* This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. */ - @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) + @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { try (var admin = clusterInstance.admin()) { testTopicName = String.format("%s-%s", "testLastKnownLeaderShouldBeElectedIfEmptyElr", "ELR-test"); From 181f45418996fc96b244f379396c5c9c46432103 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Thu, 14 Aug 2025 17:57:45 +0200 Subject: [PATCH 14/18] refacotr: Make testTopicName a local variable --- ...EligibleLeaderReplicasIntegrationTest.java | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index f55990d179192..97939ebd9678f 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -73,7 +73,6 @@ ) public class EligibleLeaderReplicasIntegrationTest { private final ClusterInstance clusterInstance; - private String testTopicName; EligibleLeaderReplicasIntegrationTest(ClusterInstance clusterInstance) { this.clusterInstance = clusterInstance; @@ -94,7 +93,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()))) { - testTopicName = String.format("%s-%s", "testHighWatermarkShouldNotAdvanceIfUnderMinIsr", "ELR-test"); + String testTopicName = String.format("%s-%s", "testHighWatermarkShouldNotAdvanceIfUnderMinIsr", "ELR-test"); admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), @@ -126,7 +125,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc clusterInstance.shutdownBroker(initialReplicas.get(0).id()); clusterInstance.shutdownBroker(initialReplicas.get(1).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1, admin, testTopicName); // Now the partition is under min ISR. HWM should not advance. producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); @@ -136,7 +135,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc // Restore the min ISR and the previous log should be visible. clusterInstance.startBroker(initialReplicas.get(1).id()); clusterInstance.startBroker(initialReplicas.get(0).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 4 && elrSize == 0, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 4 && elrSize == 0, admin, testTopicName); waitUntilOneMessageIsConsumed(consumer); } @@ -159,7 +158,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedEx @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { try (var admin = clusterInstance.admin()) { - testTopicName = String.format("%s-%s", "testElrMemberCanBeElected", "ELR-test"); + String testTopicName = String.format("%s-%s", "testElrMemberCanBeElected", "ELR-test"); admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, @@ -188,11 +187,11 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx clusterInstance.shutdownBroker(initialReplicas.get(1).id()); clusterInstance.shutdownBroker(initialReplicas.get(2).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2, admin, testTopicName); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin, testTopicName); topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -207,7 +206,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx .filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id(); clusterInstance.startBroker(expectLeader); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2, admin, testTopicName); topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -218,7 +217,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) .forEach(node -> clusterInstance.startBroker(node.id())); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 3 && elrSize == 0, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 3 && elrSize == 0, admin, testTopicName); topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -230,7 +229,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { try (var admin = clusterInstance.admin()) { - testTopicName = String.format("%s-%s", "testElrMemberShouldBeKickOutWhenUncleanShutdown", "ELR-test"); + String testTopicName = String.format("%s-%s", "testElrMemberShouldBeKickOutWhenUncleanShutdown", "ELR-test"); admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, @@ -260,7 +259,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx clusterInstance.shutdownBroker(initialReplicas.get(2).id()); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin, testTopicName); topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -276,7 +275,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx // After remove the clean shutdown file, the broker should report unclean shutdown during restart. clusterInstance.startBroker(brokerToBeUncleanShutdown); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 2, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 2, admin, testTopicName); topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertNull(topicPartitionInfo.leader()); @@ -290,7 +289,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { try (var admin = clusterInstance.admin()) { - testTopicName = String.format("%s-%s", "testLastKnownLeaderShouldBeElectedIfEmptyElr", "ELR-test"); + String testTopicName = String.format("%s-%s", "testLastKnownLeaderShouldBeElectedIfEmptyElr", "ELR-test"); admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, @@ -321,7 +320,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep clusterInstance.shutdownBroker(initialReplicas.get(2).id()); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin, testTopicName); topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); @@ -341,7 +340,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep topicPartitionInfo.replicas().forEach(replica -> { if (replica.id() != lastKnownLeader) clusterInstance.startBroker(replica.id()); }); - waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 1, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 1, admin, testTopicName); topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertNull(topicPartitionInfo.leader()); @@ -349,7 +348,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep // Now if the last known leader goes through unclean shutdown, it will still be elected. clusterInstance.startBroker(lastKnownLeader); - waitForIsrAndElr((isrSize, elrSize) -> isrSize > 0 && elrSize == 0, admin); + waitForIsrAndElr((isrSize, elrSize) -> isrSize > 0 && elrSize == 0, admin, testTopicName); TestUtils.waitForCondition( () -> { try { @@ -367,7 +366,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep } } - void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied, Admin admin) throws InterruptedException { + void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied, Admin admin, String testTopicName) throws InterruptedException { TestUtils.waitForCondition( () -> { try { From 35eebcdb29e341f1fd70df4d75ad9a7619e51f0a Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Fri, 15 Aug 2025 23:24:59 +0200 Subject: [PATCH 15/18] refactor: Add get() to updateFeatures for future result retrieval, add sleep function comment, and create TopicPartition in test to verify consumer.currentLag. --- .../EligibleLeaderReplicasIntegrationTest.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index 97939ebd9678f..ceafd43d26519 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; @@ -97,8 +98,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() - ); + new UpdateFeaturesOptions()).all().get(); admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); clusterInstance.waitTopicCreation(testTopicName, 1); @@ -129,8 +129,10 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc // Now the partition is under min ISR. HWM should not advance. producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); + // We use a short sleep here to give the broker time to process the ISR change. + // To ensure the consumer sees the correct HWM state to avoid flakiness. TimeUnit.MILLISECONDS.sleep(100); - assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count()); + assertEquals(0L, consumer.currentLag(new TopicPartition(testTopicName, 0)).orElse(-1L)); // Restore the min ISR and the previous log should be visible. clusterInstance.startBroker(initialReplicas.get(1).id()); @@ -163,8 +165,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() - ); + new UpdateFeaturesOptions()).all().get(); admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); clusterInstance.waitTopicCreation(testTopicName, 1); @@ -234,8 +235,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() - ); + new UpdateFeaturesOptions()).all().get(); admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); clusterInstance.waitTopicCreation(testTopicName, 1); @@ -294,8 +294,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep admin.updateFeatures( Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() - ); + new UpdateFeaturesOptions()).all().get(); admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); clusterInstance.waitTopicCreation(testTopicName, 1); From 455704c21eafa0a34da1bd33976040654fa9f54e Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Fri, 22 Aug 2025 13:37:02 +0200 Subject: [PATCH 16/18] Replace sleep with listOffsets --- .../server/EligibleLeaderReplicasIntegrationTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index ceafd43d26519..03513651cfcdb 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.UpdateFeaturesOptions; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -54,7 +55,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -127,12 +127,13 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1, admin, testTopicName); + TopicPartition partition = new TopicPartition(testTopicName, 0); + long leoBeforeSend = admin.listOffsets(Map.of(partition, OffsetSpec.latest())).partitionResult(partition).get().offset(); // Now the partition is under min ISR. HWM should not advance. producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); - // We use a short sleep here to give the broker time to process the ISR change. - // To ensure the consumer sees the correct HWM state to avoid flakiness. - TimeUnit.MILLISECONDS.sleep(100); - assertEquals(0L, consumer.currentLag(new TopicPartition(testTopicName, 0)).orElse(-1L)); + long leoAfterSend = admin.listOffsets(Map.of(partition, OffsetSpec.latest())).partitionResult(partition).get().offset(); + assertEquals(leoBeforeSend, leoAfterSend); + assertEquals(0L, consumer.currentLag(partition).orElse(-1L)); // Restore the min ISR and the previous log should be visible. clusterInstance.startBroker(initialReplicas.get(1).id()); From b58181646f73446749debe351e49c08ce005ebdc Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Fri, 22 Aug 2025 13:49:15 +0200 Subject: [PATCH 17/18] Apply spotless and checkstyle fixes --- .../kafka/server/EligibleLeaderReplicasIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index 03513651cfcdb..e53894a9c9ede 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -21,9 +21,9 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.FeatureUpdate; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.UpdateFeaturesOptions; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; From 8a1f3ddbda069d77c979554350ff5b84f431c004 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Fri, 22 Aug 2025 20:38:02 +0200 Subject: [PATCH 18/18] Remove consumer.currentLag in testHighWatermarkShouldNotAdvanceIfUnderMinIsr --- .../kafka/server/EligibleLeaderReplicasIntegrationTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index e53894a9c9ede..61863688f2e42 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -133,7 +133,6 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); long leoAfterSend = admin.listOffsets(Map.of(partition, OffsetSpec.latest())).partitionResult(partition).get().offset(); assertEquals(leoBeforeSend, leoAfterSend); - assertEquals(0L, consumer.currentLag(partition).orElse(-1L)); // Restore the min ISR and the previous log should be visible. clusterInstance.startBroker(initialReplicas.get(1).id());