Ошибка CompletionException в KafkaListener при прослушивании событий формата KafkaAvro

Я получаю эту ошибку CompletionError при использовании KafkaListener для прослушивания событий формата Avro из Azure EventHub. Журналы ошибок:

java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: com/azure/core/models/MessageContent
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:842) ~[na:na]
Caused by: java.lang.NoClassDefFoundError: com/azure/core/models/MessageContent
    at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:81) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:na]
    at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:28) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) ~[kafka-clients-3.6.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1649) ~[spring-kafka-3.1.1.jar:3.1.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1624) ~[spring-kafka-3.1.1.jar:3.1.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1421) ~[spring-kafka-3.1.1.jar:3.1.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-3.1.1.jar:3.1.1]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
    ... 1 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.azure.core.models.MessageContent
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[na:na]
    ... 16 common frames omitted

Конфигурация прослушивателя. Конфигурации, которые я использовал для десериализации:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.class);

В файле pom.xml у меня была эта зависимость, которая предоставляет класс десериализатора Kafka Avro.

<dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure-schemaregistry-kafka-avro</artifactId>
<!--            <version>1.0.0-beta.9</version>-->
            <version>1.1.1</version>
        </dependency>

Когда я использовал версию этой зависимости «1.0.0-beta.9», я мог плавно читать события, но после изменения ее на версию «1.1.1». Я вижу эту ошибку.

Я попробовал заменить KafkaAvroDeserializer на ByteDeserizlier в конфигурациях Kafkalitener, но это не сработало. Я ожидаю увидеть десериализованное событие/сообщение, которое можно прочитать.

Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
В предыдущей статье мы завершили установку базы данных, для тех, кто не знает.
Как установить LAMP Stack 1/2 на Azure Linux VM
Как установить LAMP Stack 1/2 на Azure Linux VM
В дополнение к нашему предыдущему сообщению о намерении Azure прекратить поддержку Azure Database для MySQL в качестве единого сервера после 16...
0
0
62
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий
  • Зависимости и конфигурация, которыми вы поделились, похоже, правильно настраивают потребительские свойства Kafka. NoClassDefFoundError вместо com.azure.core.models.MessageContent указывает, что транзитивная зависимость, требуемая azure-schemaregistry-kafka-avro, отсутствует/неправильная версия.

В новой версии библиотеки azure-schemaregistry-kafka-avro есть дополнительные зависимости, которые необходимо четко указать. Знак NoClassDefFoundError вместо com.azure.core.models.MessageContent предполагает, что для версии azure-core необходима библиотека 1.1.1.

Пожалуйста, следуйте обновленному pom.xml с необходимыми зависимостями ниже.

Пом.xml:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-schemaregistry-kafka-avro</artifactId>
    <version>1.1.1</version>
</dependency>

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-core</artifactId>
    <version>1.31.0</version>
</dependency>

Кроме того, добавьте зависимость azure-core к необходимому файлу pom.xml.

Здесь я привел пример десериализованной обработки событий.

@KafkaListener(topics = "your-topic", groupId = "your-group-id")
public void listen(ConsumerRecord<String, SpecificRecord> record) {
    String key = record.key();
    SpecificRecord value = record.value();

    System.out.println("Received message with key: " + key + " and value: " + value);

    // Process the event
    processEvent(key, value);
}

public void processEvent(String key, SpecificRecord value) {
    // write event processing logic here
}

Если приложение правильно настроено и вы устранили проблемы с зависимостями, как я указал выше, вы сможете успешно получать и обрабатывать сообщения.

Журналы выполнения:

2024-05-21 12:00:00 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 - Starting Kafka listener endpoint container [KafkaListenerEndpointContainer#0-0-C-1]
2024-05-21 12:00:00 INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [your-bootstrap-servers]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = your-group-id
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    ...
    value.deserializer = class com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer
2024-05-21 12:00:00 INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.6.1
2024-05-21 12:00:00 INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: abc123def456
2024-05-21 12:00:00 INFO  org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-your-group-id-1, groupId=your-group-id] Subscribed to topic(s): your-topic
2024-05-21 12:00:01 INFO  com.example.YourKafkaListener - Received message with key: key1 and value: {"field1": "value1", "field2": 123}
2024-05-21 12:00:01 INFO  com.example.YourKafkaListener - Processing event with key: key1 and value: {"field1": "value1", "field2": 123}
2024-05-21 12:00:02 INFO  com.example.YourKafkaListener - Received message with key: key2 and value: {"field1": "value2", "field2": 456}
2024-05-21 12:00:02 INFO  com.example.YourKafkaListener - Processing event with key: key2 and value: {"field1": "value2", "field2": 456}

Я внес изменения, как было предложено, и включил пакет «azure-core» и указанную вами версию, но получил Spring Kafka error: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' Итак, я попробовал использовать относительно последнюю версию: <version>1.46.0</version>, и это сработало. Большое спасибо.

Sai Madhav 21.05.2024 18:27

Другие вопросы по теме