Skip to content

Commit 0d7bcf8

Browse files
committed
Merge branch 'trunk' of https://github.com/apache/kafka into MINOR-rewrite-EligibleLeaderReplicasIntegrationTest
2 parents 493a7c7 + 4a6a546 commit 0d7bcf8

File tree

351 files changed

+7015
-4842
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

351 files changed

+7015
-4842
lines changed

.asf.yaml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,16 @@ notifications:
2929
# Read more here: https://github.com/apache/infrastructure-asfyaml
3030
github:
3131
collaborators:
32-
- brandboat
33-
- FrankYang0529
34-
- gongxuanzhang
3532
- m1a2st
36-
- mingyen066
37-
- ShivsundarR
3833
- smjn
3934
- TaiJuWu
40-
- xijiu
35+
- brandboat
4136
- Yunyung
37+
- xijiu
38+
- chirag-wadhwa5
39+
- mingyen066
40+
- ShivsundarR
41+
- Rancho-7
4242
enabled_merge_buttons:
4343
squash: true
4444
squash_commit_message: PR_TITLE_AND_DESC

.github/actions/run-gradle/action.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ runs:
8383
RUN_FLAKY_TESTS: ${{ inputs.run-flaky-tests }}
8484
TEST_XML_OUTPUT_DIR: ${{ inputs.test-xml-output }}
8585
TEST_VERBOSE: ${{ inputs.test-verbose }}
86+
# This build step is invoked by build.yml to run junit tests only,
87+
# Spotbugs is being run by that workflow via the "check" task and does not need to also be run here,
88+
# since that is redundant.
8689
run: |
8790
set +e
8891
./.github/scripts/thread-dump.sh &
@@ -97,6 +100,8 @@ runs:
97100
-Pkafka.cluster.test.repeat=$TEST_REPEAT \
98101
-Pkafka.test.verbose=$TEST_VERBOSE \
99102
-PcommitId=xxxxxxxxxxxxxxxx \
103+
-x spotbugsMain \
104+
-x spotbugsTest \
100105
$TEST_TASK
101106
exitcode="$?"
102107
echo "exitcode=$exitcode" >> $GITHUB_OUTPUT

.github/workflows/build.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ jobs:
127127
- name: Setup Gradle
128128
uses: ./.github/actions/setup-gradle
129129
with:
130-
java-version: 23
130+
java-version: 17
131131
gradle-cache-read-only: ${{ !inputs.is-trunk }}
132132
gradle-cache-write-only: ${{ inputs.is-trunk }}
133133
develocity-access-key: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
@@ -181,7 +181,7 @@ jobs:
181181
fail-fast: false
182182
matrix:
183183
# If we change these, make sure to adjust ci-complete.yml
184-
java: [ 23, 17 ]
184+
java: [ 24, 17 ]
185185
run-flaky: [ true, false ]
186186
run-new: [ true, false ]
187187
exclude:
@@ -270,7 +270,7 @@ jobs:
270270
python .github/scripts/junit.py \
271271
--path build/junit-xml >> $GITHUB_STEP_SUMMARY
272272
273-
# This job downloads all the JUnit XML files and thread dumps from the JDK 23 test runs.
273+
# This job downloads all the JUnit XML files and thread dumps from the JDK 24 test runs.
274274
# If any test job fails, we will not run this job. Also, if any thread dump artifacts
275275
# are present, this means there was a timeout in the tests and so we will not proceed
276276
# with catalog creation.
@@ -288,7 +288,7 @@ jobs:
288288
- name: Download Thread Dumps
289289
uses: actions/download-artifact@v4
290290
with:
291-
pattern: junit-thread-dumps-23-*
291+
pattern: junit-thread-dumps-24-*
292292
path: thread-dumps
293293
merge-multiple: true
294294
- name: Check For Thread Dump
@@ -302,7 +302,7 @@ jobs:
302302
- name: Download JUnit XMLs
303303
uses: actions/download-artifact@v4
304304
with:
305-
pattern: junit-xml-23-* # Only look at JDK 23 tests for the test catalog
305+
pattern: junit-xml-24-* # Only look at JDK 24 tests for the test catalog
306306
path: junit-xml
307307
merge-multiple: true
308308
- name: Collate Test Catalog

.github/workflows/ci-complete.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
fail-fast: false
4545
matrix:
4646
# Make sure these match build.yml
47-
java: [ 23, 17 ]
47+
java: [ 24, 17 ]
4848
run-flaky: [ true, false ]
4949
run-new: [ true, false ]
5050
exclude:

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed.
1515

16-
We build and test Apache Kafka with 17 and 23. The `release` parameter in javac is set to `11` for the clients
16+
We build and test Apache Kafka with 17 and 24. The `release` parameter in javac is set to `11` for the clients
1717
and streams modules, and `17` for the rest, ensuring compatibility with their respective
1818
minimum Java versions. Similarly, the `release` parameter in scalac is set to `11` for the streams modules and `17`
1919
for the rest.

build.gradle

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ plugins {
3737
id 'org.nosphere.apache.rat' version "0.8.1"
3838
id "io.swagger.core.v3.swagger-gradle-plugin" version "${swaggerVersion}"
3939

40-
id "com.github.spotbugs" version '6.0.25' apply false
40+
id "com.github.spotbugs" version '6.2.3' apply false
4141
id 'org.scoverage' version '8.0.3' apply false
4242
id 'com.gradleup.shadow' version '8.3.6' apply false
4343
id 'com.diffplug.spotless' version "6.25.0"
@@ -47,7 +47,7 @@ ext {
4747
gradleVersion = versions.gradle
4848
minClientJavaVersion = 11
4949
minNonClientJavaVersion = 17
50-
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams-scala", ":test-common:test-common-util"]
50+
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams:examples", ":streams-scala", ":test-common:test-common-util"]
5151

5252
buildVersionFileName = "kafka-version.properties"
5353

@@ -72,6 +72,12 @@ ext {
7272
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
7373
)
7474

75+
if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_24)) {
76+
// Spotbugs is not compatible with Java 24+ until Spotbugs 4.9.4. Disable it until we can upgrade to that version.
77+
project.gradle.startParameter.excludedTaskNames.add("spotbugsMain")
78+
project.gradle.startParameter.excludedTaskNames.add("spotbugsTest")
79+
}
80+
7581
maxTestForks = project.hasProperty('maxParallelForks') ? maxParallelForks.toInteger() : Runtime.runtime.availableProcessors()
7682
maxScalacThreads = project.hasProperty('maxScalacThreads') ? maxScalacThreads.toInteger() :
7783
Math.min(Runtime.runtime.availableProcessors(), 8)
@@ -2917,10 +2923,10 @@ project(':streams:examples') {
29172923
}
29182924

29192925
dependencies {
2920-
// this dependency should be removed after we unify data API
2921-
implementation(project(':connect:json'))
29222926
implementation project(':streams')
29232927
implementation libs.slf4jApi
2928+
implementation libs.jacksonDatabind
2929+
implementation libs.jacksonAnnotations
29242930

29252931
testImplementation project(':streams:test-utils')
29262932
testImplementation project(':clients').sourceSets.test.output // for org.apache.kafka.test.IntegrationTest

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ private Set<Integer> findCoordinators(List<String> groups) throws Exception {
380380
TestUtils.waitForCondition(() -> {
381381
FindCoordinatorResponse response = null;
382382
try {
383-
response = IntegrationTestUtils.connectAndReceive(request, clusterInstance.boundPorts().get(0));
383+
response = IntegrationTestUtils.connectAndReceive(request, clusterInstance.brokerBoundPorts().get(0));
384384
} catch (IOException e) {
385385
return false;
386386
}

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 156 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.kafka.common.errors.InvalidConfigurationException;
4242
import org.apache.kafka.common.errors.InvalidRecordStateException;
4343
import org.apache.kafka.common.errors.InvalidTopicException;
44+
import org.apache.kafka.common.errors.RecordDeserializationException;
45+
import org.apache.kafka.common.errors.SerializationException;
4446
import org.apache.kafka.common.errors.WakeupException;
4547
import org.apache.kafka.common.header.Header;
4648
import org.apache.kafka.common.header.Headers;
@@ -67,6 +69,7 @@
6769
import org.junit.jupiter.api.Tag;
6870
import org.junit.jupiter.api.Timeout;
6971

72+
import java.nio.ByteBuffer;
7073
import java.time.Duration;
7174
import java.util.ArrayList;
7275
import java.util.Arrays;
@@ -359,7 +362,7 @@ public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() throws
359362
return partitionOffsetsMap.containsKey(tp);
360363
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to callback");
361364

362-
// We expect no exception as the acknowledgment error code is null.
365+
// We expect no exception as the acknowledgement error code is null.
363366
assertFalse(partitionExceptionMap.containsKey(tp));
364367
verifyShareGroupStateTopicRecordsProduced();
365368
}
@@ -388,7 +391,7 @@ public void testAcknowledgementCommitCallbackOnClose() {
388391
shareConsumer.poll(Duration.ofMillis(1000));
389392
shareConsumer.close();
390393

391-
// We expect no exception as the acknowledgment error code is null.
394+
// We expect no exception as the acknowledgement error code is null.
392395
assertFalse(partitionExceptionMap.containsKey(tp));
393396
verifyShareGroupStateTopicRecordsProduced();
394397
}
@@ -843,6 +846,144 @@ public void testExplicitAcknowledgeThrowsNotInBatch() {
843846
}
844847
}
845848

849+
@ClusterTest
850+
public void testExplicitOverrideAcknowledgeCorruptedMessage() {
851+
alterShareAutoOffsetReset("group1", "earliest");
852+
try (Producer<byte[], byte[]> producer = createProducer();
853+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
854+
"group1",
855+
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT),
856+
null,
857+
mockErrorDeserializer(3))) {
858+
859+
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
860+
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
861+
ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
862+
producer.send(record1);
863+
producer.send(record2);
864+
producer.send(record3);
865+
producer.flush();
866+
867+
shareConsumer.subscribe(Set.of(tp.topic()));
868+
869+
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
870+
assertEquals(2, records.count());
871+
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
872+
873+
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
874+
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
875+
assertEquals(0L, firstRecord.offset());
876+
assertEquals(1L, secondRecord.offset());
877+
shareConsumer.acknowledge(firstRecord);
878+
shareConsumer.acknowledge(secondRecord);
879+
880+
RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
881+
assertEquals(2, rde.offset());
882+
shareConsumer.commitSync();
883+
884+
// The corrupted record was automatically released, so we can still obtain it.
885+
rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
886+
assertEquals(2, rde.offset());
887+
888+
// Reject this record
889+
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
890+
shareConsumer.commitSync();
891+
892+
records = shareConsumer.poll(Duration.ZERO);
893+
assertEquals(0, records.count());
894+
verifyShareGroupStateTopicRecordsProduced();
895+
}
896+
}
897+
898+
@ClusterTest
899+
public void testExplicitAcknowledgeOffsetThrowsNotException() {
900+
alterShareAutoOffsetReset("group1", "earliest");
901+
try (Producer<byte[], byte[]> producer = createProducer();
902+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
903+
"group1",
904+
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
905+
906+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
907+
producer.send(record);
908+
producer.flush();
909+
910+
shareConsumer.subscribe(Set.of(tp.topic()));
911+
912+
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
913+
assertEquals(1, records.count());
914+
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
915+
assertEquals(0L, consumedRecord.offset());
916+
917+
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(tp.topic(), tp.partition(), consumedRecord.offset(), AcknowledgeType.ACCEPT));
918+
919+
shareConsumer.acknowledge(consumedRecord);
920+
verifyShareGroupStateTopicRecordsProduced();
921+
}
922+
}
923+
924+
@ClusterTest
925+
public void testExplicitAcknowledgeOffsetThrowsParametersError() {
926+
alterShareAutoOffsetReset("group1", "earliest");
927+
try (Producer<byte[], byte[]> producer = createProducer();
928+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
929+
"group1",
930+
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT),
931+
null,
932+
mockErrorDeserializer(2))) {
933+
934+
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
935+
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
936+
producer.send(record1);
937+
producer.send(record2);
938+
producer.flush();
939+
940+
shareConsumer.subscribe(Set.of(tp.topic()));
941+
942+
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofSeconds(60));
943+
assertEquals(1, records.count());
944+
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
945+
946+
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
947+
assertEquals(0L, firstRecord.offset());
948+
shareConsumer.acknowledge(firstRecord);
949+
950+
final RecordDeserializationException rde = assertThrows(RecordDeserializationException.class, () -> shareConsumer.poll(Duration.ofSeconds(60)));
951+
assertEquals(1, rde.offset());
952+
953+
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge("foo", rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT));
954+
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), 1, rde.offset(), AcknowledgeType.REJECT));
955+
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), tp2.partition(), 0, AcknowledgeType.REJECT));
956+
957+
// Reject this record.
958+
shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT);
959+
shareConsumer.commitSync();
960+
961+
// The next acknowledge() should throw an IllegalStateException as the record has been acked.
962+
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(rde.topicPartition().topic(), rde.topicPartition().partition(), rde.offset(), AcknowledgeType.REJECT));
963+
964+
records = shareConsumer.poll(Duration.ZERO);
965+
assertEquals(0, records.count());
966+
verifyShareGroupStateTopicRecordsProduced();
967+
}
968+
}
969+
970+
private ByteArrayDeserializer mockErrorDeserializer(int recordNumber) {
971+
int recordIndex = recordNumber - 1;
972+
return new ByteArrayDeserializer() {
973+
int i = 0;
974+
975+
@Override
976+
public byte[] deserialize(String topic, Headers headers, ByteBuffer data) {
977+
if (i == recordIndex) {
978+
throw new SerializationException();
979+
} else {
980+
i++;
981+
return super.deserialize(topic, headers, data);
982+
}
983+
}
984+
};
985+
}
986+
846987
@ClusterTest
847988
public void testImplicitAcknowledgeFailsExplicit() {
848989
alterShareAutoOffsetReset("group1", "earliest");
@@ -1359,7 +1500,7 @@ public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed() {
13591500
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer));
13601501
shareConsumer.subscribe(Set.of(tp.topic()));
13611502

1362-
// The acknowledgment commit callback will try to call a method of ShareConsumer
1503+
// The acknowledgement commit callback will try to call a method of ShareConsumer
13631504
shareConsumer.poll(Duration.ofMillis(5000));
13641505
// The second poll sends the acknowledgements implicitly.
13651506
// The acknowledgement commit callback will be called and the exception is thrown.
@@ -1399,14 +1540,14 @@ public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup() throws I
13991540
producer.send(record);
14001541
producer.flush();
14011542

1402-
// The acknowledgment commit callback will try to call a method of ShareConsumer
1543+
// The acknowledgement commit callback will try to call a method of ShareConsumer
14031544
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer));
14041545
shareConsumer.subscribe(Set.of(tp.topic()));
14051546

14061547
TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
14071548
DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer");
14081549

1409-
// The second poll sends the acknowledgments implicitly.
1550+
// The second poll sends the acknowledgements implicitly.
14101551
shareConsumer.poll(Duration.ofMillis(2000));
14111552

14121553
// Till now acknowledgement commit callback has not been called, so no exception thrown yet.
@@ -2794,13 +2935,22 @@ private <K, V> ShareConsumer<K, V> createShareConsumer(String groupId) {
27942935
private <K, V> ShareConsumer<K, V> createShareConsumer(
27952936
String groupId,
27962937
Map<?, ?> additionalProperties
2938+
) {
2939+
return createShareConsumer(groupId, additionalProperties, null, null);
2940+
}
2941+
2942+
private <K, V> ShareConsumer<K, V> createShareConsumer(
2943+
String groupId,
2944+
Map<?, ?> additionalProperties,
2945+
Deserializer<K> keyDeserializer,
2946+
Deserializer<V> valueDeserializer
27972947
) {
27982948
Properties props = new Properties();
27992949
props.putAll(additionalProperties);
28002950
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
28012951
Map<String, Object> conf = new HashMap<>();
28022952
props.forEach((k, v) -> conf.put((String) k, v));
2803-
return cluster.shareConsumer(conf);
2953+
return cluster.shareConsumer(conf, keyDeserializer, valueDeserializer);
28042954
}
28052955

28062956
private void warmup() throws InterruptedException {

0 commit comments

Comments
 (0)