-
Notifications
You must be signed in to change notification settings - Fork 14.5k
MINOR: Migrate EligibleLeaderReplicasIntegrationTest to use new test infra #20199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
MINOR: Migrate EligibleLeaderReplicasIntegrationTest to use new test infra #20199
Conversation
…ScalaBuffer with CollectionConverters.asScala
…ibleLeaderReplicasIntegrationTest
…ibleLeaderReplicasIntegrationTest
…ibleLeaderReplicasIntegrationTest
…import-control-server.xml, and fix the bug in restartDeadBrokers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jim0987795064 thanks for this patch. a couple of comments remain. PTAL
checkstyle/import-control-server.xml
Outdated
|
||
<!-- no one depends on the server --> | ||
<disallow pkg="kafka" /> | ||
<allow pkg="scala" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use var
to avoid importing those dependencies explicitly
killBroker(initialReplicas.get(0).id()); | ||
killBroker(initialReplicas.get(1).id()); | ||
clusterInstance.shutdownBroker(initialReplicas.get(0).id()); | ||
clusterInstance.shutdownBroker(initialReplicas.get(1).id()); | ||
|
||
waitForIsrAndElr((isrSize, elrSize) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3);
return b.config().brokerId() == brokerToBeUncleanShutdown; | ||
}).get(); | ||
KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown) | ||
.findFirst().get(); | ||
Seq<File> dirs = broker.logManager().liveLogDirs(); | ||
assertEquals(1, dirs.size()); | ||
CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); | ||
assertTrue(handler.exists()); | ||
assertDoesNotThrow(() -> handler.delete()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertDoesNotThrow(handler::delete);
throw new IllegalArgumentException("Must supply at least one server config."); | ||
} | ||
brokers().entrySet().forEach(entry -> { | ||
if (!entry.getValue().isShutdown()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems the method is used to restart "dead" brokers, so the condition should be if (entry.getValue().isShutdown())
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback. I've addressed issues mentioned above.
@jim0987795064 could you please fix the conflicts |
Thanks for this comment, I've already fixed this conflicts |
0d7bcf8
to
493a7c7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jim0987795064 thanks for this patch!
waitForIsrAndElr((isrSize, elrSize) -> { | ||
return isrSize == 2 && elrSize == 1; | ||
}); | ||
waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1); | ||
|
||
// Now the partition is under min ISR. HWM should not advance. | ||
producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); | ||
Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use TimeUnit
instead
@@ -447,4 +447,15 @@ default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin, | |||
throw new AssertionError("Timing out after " + timeoutMs + | |||
" ms since a leader was not elected for partition " + topicPartition); | |||
} | |||
|
|||
default void restartDeadBrokers() throws ExecutionException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutionException
is unnecessary
if (brokers().isEmpty()) { | ||
throw new RuntimeException("Must supply at least one server config."); | ||
} | ||
brokers().entrySet().forEach(entry -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
brokers().forEach((key, value) -> {
if (value.isShutdown()) value.startup();
});
); | ||
} finally { | ||
restartDeadBrokers(false); | ||
clusterInstance.restartDeadBrokers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This helper method is very simple, so perhaps we could move it into the test class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712
Thanks for the suggestions. I've addressed problems mentioned above.
09a8771
to
4c17b97
Compare
…licasIntegrationTest
…write-EligibleLeaderReplicasIntegrationTest
Changes: Use ClusterTest to rewrite EligibleLeaderReplicasIntegrationTest.