Skip to content

Migration from java 8 to 21, from springboot 2.7 to 3.5 #878

@nds17

Description

@nds17

Hello,
My teams is migrating from java 8 to 21 and also springboot 3.7.x to 3.5 and I have errors.

I also upgrate the libraries
from
<confluent.version>7.2.1</confluent.version>
to
<confluent.version>7.9.0</confluent.version>

and
from
<parallel-consumer.version>0.5.2.1</parallel-consumer.version>
to
<parallel-consumer.version>0.5.3.2</parallel-consumer.version>

I don't understand why I have this problem.

Thank fo yout help

My pom.xml
`

4.0.0

org.springframework.boot
spring-boot-starter-parent
3.5.0


xxx
notif_service
jar
v2025.1.1-snapshoot

<properties>
    <spring-boot.version>3.5.0</spring-boot.version>
    <java.version>21</java.version>
    <maven.compiler.target>21</maven.compiler.target>
    <maven.compiler.source>21</maven.compiler.source>
    <!--<confluent.version>7.2.1</confluent.version>-->
    <!--<parallel-consumer.version>0.5.3.2</parallel-consumer.version>-->
    <confluent.version>7.9.0</confluent.version>
    <parallel-consumer.version>0.5.2.8</parallel-consumer.version>
    <git-commit-id-plugin.version>4.9.10</git-commit-id-plugin.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <vavr.version>0.10.6</vavr.version>
    <!-- JaCoCo and sonar Properties -->
    <jacoco.version>0.8.13</jacoco.version>
    <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
    <sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis>
    <sonar.coverage.jacoco.xmlReportPaths>${project.build.directory}/site/jacoco/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
    <sonar.language>java</sonar.language>
    <sonar.sources>src/main/java</sonar.sources>
    <sonar.test>src/test</sonar.test>
    <sonar.exclusions>**/src/test/**</sonar.exclusions>
    <argLine>-Duser.timezone=UTC</argLine>

</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.velocity</groupId>
        <artifactId>velocity-engine-core</artifactId>
        <version>2.3</version>
    </dependency>
    <dependency>
        <groupId>de.codecentric</groupId>
        <artifactId>spring-boot-admin-starter-client</artifactId>
        <version>3.4.5</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-streams-avro-serde</artifactId>
        <version>${confluent.version}</version>
    </dependency>
    <dependency>
        <groupId>io.confluent.parallelconsumer</groupId>
        <artifactId>parallel-consumer-core</artifactId>
        <version>${parallel-consumer.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>com.microsoft.sqlserver</groupId>
        <artifactId>mssql-jdbc</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>io.vavr</groupId>
        <artifactId>vavr</artifactId>
        <version>${vavr.version}</version>
    </dependency>


    <!-- Testing libraries -->
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.hsqldb</groupId>
        <artifactId>hsqldb</artifactId>
        <scope>test</scope>
    </dependency>

    <!-- Health check -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!-- Micrometer Prometheus registry  -->
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>jul-to-slf4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-to-slf4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

<build>
    <finalName>kur-notification-service</finalName>
    <plugins>
        <plugin>
            <groupId>pl.project13.maven</groupId>
            <artifactId>git-commit-id-plugin</artifactId>
            <version>${git-commit-id-plugin.version}</version>
        </plugin>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <version>${spring-boot.version}</version>
            <executions>
                <execution>
                    <id>build-info</id>
                    <goals>
                        <goal>build-info</goal>
                    </goals>
                    <configuration>
                        <additionalProperties>
                            <java.version>${java.version}</java.version>
                            <kafka.version>${kafka.version}</kafka.version>
                        </additionalProperties>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.jacoco</groupId>
            <artifactId>jacoco-maven-plugin</artifactId>
            <version>${jacoco.version}</version>
            <executions>
                <execution>
                    <id>jacoco-initialize</id>
                    <goals>
                        <goal>prepare-agent</goal>
                    </goals>
                </execution>
                <!--
                <execution>
                    <id>jacoco-report</id>
                    <phase>test</phase>
                    <goals>
                        <goal>report</goal>
                    </goals>
                </execution>
                -->
                <execution>
                <id>jacoco-site</id>
                <phase>test</phase>
                <goals>
                    <goal>report</goal>
                </goals>
            </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <!--<version>3.3.0</version>-->
            <configuration>
                <appendAssemblyId>false</appendAssemblyId>
                <descriptors>
                    <descriptor>src/main/assembly/assembly_notification_service.xml</descriptor>
                </descriptors>
            </configuration>
            <executions>
                <execution>
                    <id>package</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
        </plugin>
    </plugins>
</build>

`

My code :
`import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.avro.specific.SpecificRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class SerdesUtil {

public static <S extends SpecificRecord> SpecificAvroSerde<S> getValueSerdes(Class<S> type, Properties props, boolean isKey) {
    Map<String, String> registry = getPropertiesMap(props);
    registry.put(AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicNameStrategy.class.getName());
    SpecificAvroSerde<S> dataAvroSerde = new SpecificAvroSerde<>();
    dataAvroSerde.configure(registry, isKey);
    return dataAvroSerde;
}

private static Map<String, String> getPropertiesMap(Properties props) {
    Map<String, String> originals = new HashMap<>();
    for (final String name : props.stringPropertyNames()) {
        originals.put(name, String.valueOf(props.get(name)));
    }
    return originals;
}

}`

`import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serdes;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Properties;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static java.lang.Integer.parseInt;
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;

public class GenericParallelConsumer {

private final Properties properties;
private final String inputTopic;

private static final String PROPERTY_INPUT_TOPIC_PREFIX = "kafka.topic.input.";
private static final String PROPERTY_MAX_CONCURRENCY = "maxConcurrency";

public GenericParallelConsumer(Properties properties, String inputTopic) {
    this.properties = properties;
    this.inputTopic = inputTopic;
}

public ParallelStreamProcessor<String, SpecificRecord> createParallelConsumer(String groupId) {
    try {
        properties.put(TRANSACTIONAL_ID_CONFIG, properties.getProperty(groupId) + "-" + InetAddress.getLocalHost().getHostName());
    } catch (UnknownHostException e) {
        throw new RuntimeException(e);
    }

    Consumer<String, SpecificRecord> consumer = new KafkaConsumer<>(properties, null, SerdesUtil.getValueSerdes(SpecificRecord.class, properties, false).deserializer());
    Producer<String, SpecificRecord> producer = new KafkaProducer<>(properties, Serdes.String().serializer(), SerdesUtil.getValueSerdes(SpecificRecord.class, properties, false).serializer());
    ParallelConsumerOptions<String, SpecificRecord> options =
            ParallelConsumerOptions.<String, SpecificRecord>builder()
                    .ordering(KEY)
                    .maxConcurrency(parseInt(properties.getProperty(PROPERTY_MAX_CONCURRENCY)))
                    .consumer(consumer)
                    .producer(producer)
                    .commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)
                    .build();

    ParallelStreamProcessor<String, SpecificRecord> parallelConsumer =
            ParallelStreamProcessor.createEosStreamProcessor(options);

    parallelConsumer.subscribe(Collections.singleton(properties.getProperty(PROPERTY_INPUT_TOPIC_PREFIX.concat(inputTopic))));
    return parallelConsumer;
}

}`

The values of my options :
`
"compression.type" -> "zstd"

"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer"

"group.id.email" -> "EmailPreparationService"

"auto.register.schemas" -> "false"

"group.id.report" -> "EmailPreparationServiceReport"

"group.id" -> "EmailPreparationServiceReport"

"kafka.topic.input.report" -> "KureReportRequested_V1"

"bootstrap.servers" -> "server:8080"

"schema.registry.ssl.truststore.location" -> "C:/Dev/Configs/CertifKure/kafka.client.truststore.jks"

"kafka.topic.input.notification" -> "KureNotificationPrepared_V1"

"maxConcurrency" -> "1"

"transactional.id" -> "EmailPreparationServiceReport-L0009228"

"schema.registry.url" -> "https://schema.com"

"outputTopic" -> "KureEmailPrepared_V1"

"enable.auto.commit" -> "false"

"sasl.mechanism" -> "SCRAM-SHA-512"

"schema.registry.basic.auth.credentials.source" -> "USER_INFO"

"sasl.jaas.config" -> "org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="pwd";"

"group.id.reminder" -> "eminderPreparationService"

"ssl.truststore.password" -> "pwd"

"ssl.endpoint.identification.algorithm" -> "https"

"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

"kafka.topic.input.reminder" -> "CommentPublished_V1"

"schema.registry.ssl.truststore.password" -> "pwd"

"errorTopic" -> "GlobalError_DLQ_V1"

"security.protocol" -> "SASL_SSL"

"avro.remove.java.properties" -> "true"

"ssl.truststore.location" -> "C:/Dev/Configs/CertifKure/kafka.client.truststore.jks"

"isolation.level" -> "read_committed"

"schema.registry.basic.auth.user.info" -> "user:pwd
`

The error messages :
16-06-2025 10:48:50.147 [Thread-3] INFO org.apache.kafka.common.utils.AppInfoParser.<init> - Kafka version: 3.9.1 16-06-2025 10:48:50.147 [Thread-3] INFO org.apache.kafka.common.utils.AppInfoParser.<init> - Kafka commitId: f745dfdcee2b9851 16-06-2025 10:48:50.147 [Thread-3] INFO org.apache.kafka.common.utils.AppInfoParser.<init> - Kafka startTimeMs: 1750063730146 16-06-2025 10:48:50.156 [Thread-3] WARN io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.getAutoCommitEnabled - Encountered unknown consumer delegate class org.apache.kafka.clients.consumer.KafkaConsumer 16-06-2025 10:48:50.156 [Thread-1] WARN io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.getAutoCommitEnabled - Encountered unknown consumer delegate class org.apache.kafka.clients.consumer.KafkaConsumer 16-06-2025 10:48:50.156 [Thread-2] WARN io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.getAutoCommitEnabled - Encountered unknown consumer delegate class org.apache.kafka.clients.consumer.KafkaConsumer Exception in thread "Thread-3" Exception in thread "Thread-1" Exception in thread "Thread-2" io.confluent.parallelconsumer.ParallelConsumerException: Unable to check whether auto commit is enabled for consumer type class org.apache.kafka.clients.consumer.KafkaConsumer. This exception can be ignored by enabling the ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option. at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:501) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.validateConfiguration(AbstractParallelEoSStreamProcessor.java:336) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:289) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:272) at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:46) at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:26) at xxx.service.common.consumer.GenericParallelConsumer.createParallelConsumer(GenericParallelConsumer.java:54) at xxx.service.reportpreparation.ReportPreparationServiceExecutor.run(ReportPreparationServiceExecutor.java:40) at java.base/java.lang.Thread.run(Thread.java:1583) io.confluent.parallelconsumer.ParallelConsumerException: Unable to check whether auto commit is enabled for consumer type class org.apache.kafka.clients.consumer.KafkaConsumer. This exception can be ignored by enabling the ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option. at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:501) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.validateConfiguration(AbstractParallelEoSStreamProcessor.java:336) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:289) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:272) at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:46) at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:26) at xxx.service.common.consumer.GenericParallelConsumer.createParallelConsumer(GenericParallelConsumer.java:54) at xxx.service.reminder.ReminderPreparationServiceExecutor.run(ReminderPreparationServiceExecutor.java:44) at java.base/java.lang.Thread.run(Thread.java:1583) io.confluent.parallelconsumer.ParallelConsumerException: Unable to check whether auto commit is enabled for consumer type class org.apache.kafka.clients.consumer.KafkaConsumer. This exception can be ignored by enabling the ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option. at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:501) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.validateConfiguration(AbstractParallelEoSStreamProcessor.java:336) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:289) at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:272) at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:46) at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:26) at xxx.service.common.consumer.GenericParallelConsumer.createParallelConsumer(GenericParallelConsumer.java:54) at xxx.service.email.EmailPreparationServiceExecutor.run(EmailPreparationServiceExecutor.java:56) at java.base/java.lang.Thread.run(Thread.java:1583)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions