Spring Kafka Класс не входит в доверенные пакеты

В моем приложении Spring Boot / Kafka перед обновлением библиотеки я использовал следующий класс org.telegram.telegrambots.api.objects.Update, чтобы отправлять сообщения в тему Kafka. Сейчас я использую следующий org.telegram.telegrambots.meta.api.objects.Update. Как видите - у них разные пакеты.

После перезапуска приложения я столкнулся со следующей проблемой:

[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

Это мой конфиг:

@EnableAsync
@Configuration
public class ApplicationConfig {

    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

}

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Update> updateProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Update> updateKafkaTemplate() {
        return new KafkaTemplate<>(updateProducerFactory());
    }

}

@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.max.poll.interval.ms}")
    private String kafkaConsumerMaxPollIntervalMs;

    @Value("${kafka.consumer.max.poll.records}")
    private String kafkaConsumerMaxPollRecords;

    @Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}")
    private Integer updateConsumerConcurrency;

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(consumerFactory(kafkaProperties));

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(updateConsumerFactory(kafkaProperties));
        factory.setConcurrency(updateConsumerConcurrency);

        return factory;
    }

}

application.properties

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Как решить эту проблему и позволить Kafka десериализовать старые сообщения в новые?

ОБНОВЛЕНО

Это мой слушатель

@Component
public class UpdateConsumer {

    @KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
    public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {

        //do some logic here

        ack.acknowledge();
    }

}
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
24
0
31 168
6
Перейти к ответу Данный вопрос помечен как решенный

Ответы 6

Ответ принят как подходящий

См. документация.

Starting with version 2.1, type information can be conveyed in record Headers, allowing the handling of multiple types. In addition, the serializer/deserializer can be configured using Kafka properties.

JsonSerializer.ADD_TYPE_INFO_HEADERS (default true); set to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).

JsonDeserializer.KEY_DEFAULT_TYPE; fallback type for deserialization of keys if no header information is present.

JsonDeserializer.VALUE_DEFAULT_TYPE; fallback type for deserialization of values if no header information is present.

JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang); comma-delimited list of package patterns allowed for deserialization; * means deserialize all.

По умолчанию сериализатор добавляет в заголовки информацию о типе.

см. документация по загрузке.

Similarly, you can disable the JsonSerializer default behavior of sending type information in headers:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializerspring.kafka.producer.properties.spring.json.add.type.headers=false

Или вы можете добавить сопоставление типов в конвертер входящих сообщений, чтобы сопоставить тип источника с типом назначения.

РЕДАКТИРОВАТЬ

Сказав это, какую версию вы используете?

Спасибо за Ваш ответ! Я добавил реализацию @KafkaListener. Я использую Kafka 1.1.0

alexanoid 05.08.2018 20:23

Я имел ввиду какую версию spring-kafka. Вам нужно будет выполнить одно из моих предложений - подавить заголовки типов от отправителя или добавить сопоставление с сопоставителем типов в десериализаторе. Заголовки типа имеют приоритет над целевым типом, переданным в конструктор десериализатора. new JsonDeserializer<>(Update.class). Вероятно, нам следует добавить логическое значение к десериализатору, чтобы разрешить использование предоставленного типа, даже если заголовки существуют.

Gary Russell 05.08.2018 21:05

Спасибо! Я использую Spring Kafka 2.1.8.RELEASE под управлением Spring Boot 2.0.4.RELEASE. Кроме того, я добавил spring.kafka.producer.value-serializer=org.springframework.k‌​afka.support.seriali‌​zer.JsonSerializer после того, как столкнулся с проблемой, описанной в моем вопросе .. это моя неудачная попытка решить эту проблему самостоятельно.

alexanoid 05.08.2018 21:09

Нет; вам нужен spring.kafka.producer.properties.spring.json.add.type.header‌​s=false на стороне производителя, но вам понадобится сопоставление типов на стороне потребителя, чтобы читать любые существующие сообщения, у которых уже есть заголовки (если вы не можете использовать их со своей старой версией приложения). См. setTypeMapper на десериализаторе и setIdClassMapping() на DefaultJackson2JavaTypeMapper. Вам нужно будет сопоставить имя исходного класса с целевым классом.

Gary Russell 05.08.2018 21:16

См. Ссылку, на которую я указал вам (глава Boot Kafka). Это не стандартное свойство загрузки. Это действительно есть в приложении: spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client.

Gary Russell 05.08.2018 21:22

Открыл проблема для этой проблемы.

Gary Russell 05.08.2018 21:23

Следует отметить два ключевых момента.

  1. Есть два отдельных проекта для продюсера и потребителя.
  2. Тогда отправка сообщения (значения) - это довольно примитивный тип объекта.

Проблема в том, что объект создания сообщения недоступен на стороне потребителя, потому что это два отдельных проекта.

Два решения этой проблемы, пожалуйста, следуйте нижеуказанным шагам в приложениях Spring boot Producer и Consumer.

---- Приложение производителя -------------

** Класс конфигурации производителя **

import com.kafka.producer.models.Container;    
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, Container> producerFactory(){

    Map<String, Object> config = new HashMap<>();

config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory(config);
}

@Bean
public KafkaTemplate<String, Container> kafkaTemplate(){
    return new KafkaTemplate<>(producerFactory());
}
}

Примечание. Контейнер - это настраиваемый объект, который будет размещен в теме kafka.


** Класс производителя **

import com.kafka.producer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class Producer {

private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "final-topic";

@Autowired
private KafkaTemplate<String, Container> kafkaTemplate;

public void sendUserMessage(Container msg) {
    LOGGER.info(String.format("\n ===== Producing message in JSON ===== \n"+msg));
    Message<Container> message = MessageBuilder
            .withPayload(msg)
            .setHeader(KafkaHeaders.TOPIC, TOPIC)
            .build();
    this.kafkaTemplate.send(message);
}
}

** Контроллер производителя **

import com.kafka.producer.models.Container;
import com.kafka.producer.services.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/message")
public class MessageController {

@Autowired
private Producer producer;

@PostMapping(value = "/publish")
public String sendMessageToKafkaTopic(@RequestBody Container containerMsg) {
    this.producer.sendUserMessage(containerMsg);
    return "Successfully Published !!";
}
}

Примечание. Сообщение с типом Контейнер будет опубликовано в теме kafka name: final-topic как сообщение JSON.

================================================== =============================

- Потребительское приложение -

** Класс конфигурации **

import com.kafka.consumer.models.Container;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerOneConfig {

@Bean
public ConsumerFactory<String, Container> consumerFactory(){
    JsonDeserializer<Container> deserializer = new JsonDeserializer<>(Container.class);
    deserializer.setRemoveTypeHeaders(false);
    deserializer.addTrustedPackages("*");
    deserializer.setUseTypeMapperForKey(true);

    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_one");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Container> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, Container> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;

}
}

Примечание. Здесь вы можете видеть, что вместо использования JsonDeserializer () по умолчанию мы должны использовать настраиваемый JsonDeserializer для использования сообщений Json типа объекта контейнера из final-topic (название темы).


** Бытовое обслуживание **

import com.kafka.consumer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.io.IOException;

@Service
public class ConsumerOne {

private final Logger LOGGER = LoggerFactory.getLogger(ConsumerOne.class);

@KafkaListener(topics = "final-topic", groupId = "group_one", containerFactory = "kafkaListenerContainerFactory")
public void consumeUserMessage(@Payload Container msg, @Headers MessageHeaders headers) throws IOException {
    System.out.println("received data in Consumer One  = "+ msg.getMessageTypes());
}
}

Ошибка Invalid value org.springframework.kafka.support.serializer.JsonDeserialize‌​r@279489ac for configuration value.deserializer: Expected a Class instance or class name., когда она попадает сюда: config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); Кажется, мне нужно использовать фабрику вместо экземпляра.

WesternGun 08.06.2020 12:00
jsonDeserializer.addTrustedPackages("*");

решил мою проблему для spring-kafka-2.2.8.


Чтобы добавить его в application.properties:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

ВАЖНАЯ ЗАМЕТКА:

They have no effect if you have provided Serializer and Deserializer instances for KafkaConsumer and KafkaProducer, respectively.

Ссылки:
[1] https://docs.spring.io/spring-kafka/reference/html/#json-serde
[2] https://github.com/spring-projects/spring-kafka/issues/535

Как добавить это в файл application.yml, ответьте.

Vijay 10.12.2020 15:03

Для этого есть два способа сделать это: в десериализаторе или в application.yml.

Либо в вашем десериализаторе

В десериализаторе, который вы используете в своем DefaultKafkaConsumerFactory (для создания своей фабрики потребителей). Допустим, вы хотите создать ConsumerFactory<String, Foo> , где Foo является моделью / POJO, которую вы хотите использовать для своих сообщений kafka.

Вам нужен addTrustedPackages из JsonDeserializer У меня есть пример в Kotlin, но в java почти такой же синтаксис:

 val deserializer = JsonDeserializer<Foo>()
 deserializer.addTrustedPackages("com.example.entity.Foo") // Adding Foo to our trusted packages

 val consumerFactory = DefaultKafkaConsumerFactory(
      consumerConfigs(),  // your consumer config
      StringDeserializer(), 
      deserializer // Using our newly created deserializer
 )

Или в вашем application.yml

В вашем файле application.yml следуйте инструкциям весна-кафка. Мы добавляем класс Foo из пакета com.example.entity.Foo в доверенное хранилище, используя:

spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: "com.example.entity.Foo"

С spring.json.trusted.packages, принимающим массив пакетов. Вы можете указать пакет класса или использовать * для любых пакетов. В этом случае вам не нужно передавать свой deserializer в DefaultKafkaConsumerFactory() только в конфигурации потребителя.

Первый способ помог мне в моем тесте, где потребитель создается вручную. Спасибо.

WesternGun 08.06.2020 11:55

Я также столкнулся с этой проблемой, однако приведенные выше решения не помогли мне. Однако в чем заключалась хитрость, так это в настройке фабрики потребителей kafka следующим образом:

props.put(JsonDeserializer.TRUSTED_PACKAGES, "your.package.name");

Моя версия spring-kafka - 2.2.11, и у меня тоже была эта ошибка.

Я получил эту ошибку, потому что я настроил двух потребителей в одной теме kafta с разной конфигурацией. У одного из них был ConsumerFactory <String, OrderDTO>, а у другого ConsumerFactory <String, String>.

Я решил ошибку, изменив конфигурацию одного потребителя, потому что был неправ.

Просто проверяете потребителей темы

Другие вопросы по теме