-
Notifications
You must be signed in to change notification settings - Fork 156
Description
Hello,
My teams is migrating from java 8 to 21 and also springboot 3.7.x to 3.5 and I have errors.
I also upgrate the libraries
from
<confluent.version>7.2.1</confluent.version>
to
<confluent.version>7.9.0</confluent.version>
and
from
<parallel-consumer.version>0.5.2.1</parallel-consumer.version>
to
<parallel-consumer.version>0.5.3.2</parallel-consumer.version>
I don't understand why I have this problem.
Thank fo yout help
My pom.xml
`
4.0.0
org.springframework.boot
spring-boot-starter-parent
3.5.0
xxx
notif_service
jar
v2025.1.1-snapshoot
<properties>
<spring-boot.version>3.5.0</spring-boot.version>
<java.version>21</java.version>
<maven.compiler.target>21</maven.compiler.target>
<maven.compiler.source>21</maven.compiler.source>
<!--<confluent.version>7.2.1</confluent.version>-->
<!--<parallel-consumer.version>0.5.3.2</parallel-consumer.version>-->
<confluent.version>7.9.0</confluent.version>
<parallel-consumer.version>0.5.2.8</parallel-consumer.version>
<git-commit-id-plugin.version>4.9.10</git-commit-id-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<vavr.version>0.10.6</vavr.version>
<!-- JaCoCo and sonar Properties -->
<jacoco.version>0.8.13</jacoco.version>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis>
<sonar.coverage.jacoco.xmlReportPaths>${project.build.directory}/site/jacoco/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
<sonar.language>java</sonar.language>
<sonar.sources>src/main/java</sonar.sources>
<sonar.test>src/test</sonar.test>
<sonar.exclusions>**/src/test/**</sonar.exclusions>
<argLine>-Duser.timezone=UTC</argLine>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-engine-core</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent.parallelconsumer</groupId>
<artifactId>parallel-consumer-core</artifactId>
<version>${parallel-consumer.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
<version>${vavr.version}</version>
</dependency>
<!-- Testing libraries -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>test</scope>
</dependency>
<!-- Health check -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Micrometer Prometheus registry -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<finalName>kur-notification-service</finalName>
<plugins>
<plugin>
<groupId>pl.project13.maven</groupId>
<artifactId>git-commit-id-plugin</artifactId>
<version>${git-commit-id-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<id>build-info</id>
<goals>
<goal>build-info</goal>
</goals>
<configuration>
<additionalProperties>
<java.version>${java.version}</java.version>
<kafka.version>${kafka.version}</kafka.version>
</additionalProperties>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.version}</version>
<executions>
<execution>
<id>jacoco-initialize</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<!--
<execution>
<id>jacoco-report</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
-->
<execution>
<id>jacoco-site</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<!--<version>3.3.0</version>-->
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>src/main/assembly/assembly_notification_service.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>package</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>
`
My code :
`import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.avro.specific.SpecificRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class SerdesUtil {
public static <S extends SpecificRecord> SpecificAvroSerde<S> getValueSerdes(Class<S> type, Properties props, boolean isKey) {
Map<String, String> registry = getPropertiesMap(props);
registry.put(AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicNameStrategy.class.getName());
SpecificAvroSerde<S> dataAvroSerde = new SpecificAvroSerde<>();
dataAvroSerde.configure(registry, isKey);
return dataAvroSerde;
}
private static Map<String, String> getPropertiesMap(Properties props) {
Map<String, String> originals = new HashMap<>();
for (final String name : props.stringPropertyNames()) {
originals.put(name, String.valueOf(props.get(name)));
}
return originals;
}
}`
`import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serdes;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Properties;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static java.lang.Integer.parseInt;
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
public class GenericParallelConsumer {
private final Properties properties;
private final String inputTopic;
private static final String PROPERTY_INPUT_TOPIC_PREFIX = "kafka.topic.input.";
private static final String PROPERTY_MAX_CONCURRENCY = "maxConcurrency";
public GenericParallelConsumer(Properties properties, String inputTopic) {
this.properties = properties;
this.inputTopic = inputTopic;
}
public ParallelStreamProcessor<String, SpecificRecord> createParallelConsumer(String groupId) {
try {
properties.put(TRANSACTIONAL_ID_CONFIG, properties.getProperty(groupId) + "-" + InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
Consumer<String, SpecificRecord> consumer = new KafkaConsumer<>(properties, null, SerdesUtil.getValueSerdes(SpecificRecord.class, properties, false).deserializer());
Producer<String, SpecificRecord> producer = new KafkaProducer<>(properties, Serdes.String().serializer(), SerdesUtil.getValueSerdes(SpecificRecord.class, properties, false).serializer());
ParallelConsumerOptions<String, SpecificRecord> options =
ParallelConsumerOptions.<String, SpecificRecord>builder()
.ordering(KEY)
.maxConcurrency(parseInt(properties.getProperty(PROPERTY_MAX_CONCURRENCY)))
.consumer(consumer)
.producer(producer)
.commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)
.build();
ParallelStreamProcessor<String, SpecificRecord> parallelConsumer =
ParallelStreamProcessor.createEosStreamProcessor(options);
parallelConsumer.subscribe(Collections.singleton(properties.getProperty(PROPERTY_INPUT_TOPIC_PREFIX.concat(inputTopic))));
return parallelConsumer;
}
}`
The values of my options :
`
"compression.type" -> "zstd"
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer"
"group.id.email" -> "EmailPreparationService"
"auto.register.schemas" -> "false"
"group.id.report" -> "EmailPreparationServiceReport"
"group.id" -> "EmailPreparationServiceReport"
"kafka.topic.input.report" -> "KureReportRequested_V1"
"bootstrap.servers" -> "server:8080"
"schema.registry.ssl.truststore.location" -> "C:/Dev/Configs/CertifKure/kafka.client.truststore.jks"
"kafka.topic.input.notification" -> "KureNotificationPrepared_V1"
"maxConcurrency" -> "1"
"transactional.id" -> "EmailPreparationServiceReport-L0009228"
"schema.registry.url" -> "https://schema.com"
"outputTopic" -> "KureEmailPrepared_V1"
"enable.auto.commit" -> "false"
"sasl.mechanism" -> "SCRAM-SHA-512"
"schema.registry.basic.auth.credentials.source" -> "USER_INFO"
"sasl.jaas.config" -> "org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="pwd";"
"group.id.reminder" -> "eminderPreparationService"
"ssl.truststore.password" -> "pwd"
"ssl.endpoint.identification.algorithm" -> "https"
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
"kafka.topic.input.reminder" -> "CommentPublished_V1"
"schema.registry.ssl.truststore.password" -> "pwd"
"errorTopic" -> "GlobalError_DLQ_V1"
"security.protocol" -> "SASL_SSL"
"avro.remove.java.properties" -> "true"
"ssl.truststore.location" -> "C:/Dev/Configs/CertifKure/kafka.client.truststore.jks"
"isolation.level" -> "read_committed"
"schema.registry.basic.auth.user.info" -> "user:pwd
`
The error messages :
16-06-2025 10:48:50.147 [Thread-3] INFO org.apache.kafka.common.utils.AppInfoParser.<init> - Kafka version: 3.9.1 16-06-2025 10:48:50.147 [Thread-3] INFO org.apache.kafka.common.utils.AppInfoParser.<init> - Kafka commitId: f745dfdcee2b9851 16-06-2025 10:48:50.147 [Thread-3] INFO org.apache.kafka.common.utils.AppInfoParser.<init> - Kafka startTimeMs: 1750063730146 16-06-2025 10:48:50.156 [Thread-3] WARN io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.getAutoCommitEnabled - Encountered unknown consumer delegate class org.apache.kafka.clients.consumer.KafkaConsumer 16-06-2025 10:48:50.156 [Thread-1] WARN io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.getAutoCommitEnabled - Encountered unknown consumer delegate class org.apache.kafka.clients.consumer.KafkaConsumer 16-06-2025 10:48:50.156 [Thread-2] WARN io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.getAutoCommitEnabled - Encountered unknown consumer delegate class org.apache.kafka.clients.consumer.KafkaConsumer Exception in thread "Thread-3" Exception in thread "Thread-1" Exception in thread "Thread-2" io.confluent.parallelconsumer.ParallelConsumerException: Unable to check whether auto commit is enabled for consumer type class org.apache.kafka.clients.consumer.KafkaConsumer. This exception can be ignored by enabling the ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option. at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:501) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.validateConfiguration(AbstractParallelEoSStreamProcessor.java:336) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:289) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:272) at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:46) at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:26) at xxx.service.common.consumer.GenericParallelConsumer.createParallelConsumer(GenericParallelConsumer.java:54) at xxx.service.reportpreparation.ReportPreparationServiceExecutor.run(ReportPreparationServiceExecutor.java:40) at java.base/java.lang.Thread.run(Thread.java:1583) io.confluent.parallelconsumer.ParallelConsumerException: Unable to check whether auto commit is enabled for consumer type class org.apache.kafka.clients.consumer.KafkaConsumer. This exception can be ignored by enabling the ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option. at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:501) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.validateConfiguration(AbstractParallelEoSStreamProcessor.java:336) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:289) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:272) at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:46) at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:26) at xxx.service.common.consumer.GenericParallelConsumer.createParallelConsumer(GenericParallelConsumer.java:54) at xxx.service.reminder.ReminderPreparationServiceExecutor.run(ReminderPreparationServiceExecutor.java:44) at java.base/java.lang.Thread.run(Thread.java:1583) io.confluent.parallelconsumer.ParallelConsumerException: Unable to check whether auto commit is enabled for consumer type class org.apache.kafka.clients.consumer.KafkaConsumer. This exception can be ignored by enabling the ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option. at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:501) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.validateConfiguration(AbstractParallelEoSStreamProcessor.java:336) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:289) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:272) at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:46) at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:26) at xxx.service.common.consumer.GenericParallelConsumer.createParallelConsumer(GenericParallelConsumer.java:54) at xxx.service.email.EmailPreparationServiceExecutor.run(EmailPreparationServiceExecutor.java:56) at java.base/java.lang.Thread.run(Thread.java:1583)