Как правильно использовать ListDeserializer?

Фабрика контейнеров прослушивателя Kafka должна быть настроена на использование сообщений, значения которых представляют собой список объектов. Я попытался настроить фабрику с помощью ListDeserializer следующим образом:

@Configuration
public class KafkaConsumerConfig {
  @Value(value = "${spring.kafka.consumer.bootstrap-servers}")
  private String bootstrapServers;

  @Value(value = "${spring.kafka.consumer.group-id}")
  private String groupId;

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, List<Item>>
      kafkaListenerContainerFactoryItem() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    ConcurrentKafkaListenerContainerFactory<String, List<Item>> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(
        new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            new ListDeserializer<>(ArrayList.class, new JsonDeserializer<>(Item.class, false))));
    return factory;
  }
}

Но появляется исключение ниже:

org.apache.kafka.common.config.ConfigException: List deserializer was already initialized using a non-default constructor 
    at org.apache.kafka.common.serialization.ListDeserializer.configure(ListDeserializer.java:78) ~[kafka-clients-3.1.1.jar:na]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.lambda$valueDeserializerSupplier$9(DefaultKafkaConsumerFactory.java:199) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:776) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:352) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:461) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:461) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:347) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:292) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.22.jar:5.3.22]
    ... 14 common frames omitted

Как правильно настроить фабрику контейнеров с помощью ListDeserializer?

Выполнение new ListDeserializer<>(ArrayList.class, new JsonDeserializer<>(Item.class, false), false) поможет обойти это исключение/строку, но в отношении «(побочных) эффектов»: не уверен, пожалуйста, протестируйте/посмотрите... (есть перегруженный конструктор с параметром boolean configureDeserializers...github.com/spring-projects /spring-kafka/blob/mai‌​n/spring-kafka/…)

xerx593 05.12.2022 10:57

"@param configureDeserializers false to not configure the deserializers"

xerx593 05.12.2022 11:03

@ xerx593 Это работает, и я еще не сталкивался с побочными эффектами. Можете ли вы опубликовать это как ответ, чтобы я мог принять его?

birca123 05.12.2022 11:30
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
3
65
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Поскольку, если бы мы не использовали какую-либо соответствующую конфигурацию (десериализатора), самым быстрым решением было бы использование:

// ...
new DefaultKafkaConsumerFactory<>(
  props,
  new StringDeserializer(),
  new ListDeserializer<>(
    ArrayList.class, new JsonDeserializer<>(Item.class, false)
  ),
  false /* !!! */
)//...

Где false относится к configureDesirializers параметру (перегруженного) конструктора. (ссылка)

В этом случае он не имеет побочных/негативных эффектов, так как вы:

  • Используйте StringDeserializer в качестве десериализатора ключей.

    У него есть единственная «опция конфигурации»: [[key|value].]deserializer.encoding (которую мы не устанавливаем и которая по умолчанию равна UTF-8). (ссылка)

  • Для десириализатора значения (a ListDeserializer) вы уже "настраиваете" все полезное (через параметры конструктора). (ссылка)

Мы делаем:

props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

.../хотим настроить наш JsonDeserializer, то "обходной путь" ломает эту настройку!

Мы должны настроить props и инициализировать фабрику с пустыми (десериализаторами) конструкторами:

// props.put(...);
// when this is *really* needed (otherwise, quickfix!):
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
// extra config for our "value deserializer" (analogous to constructor):
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, ArrayList.class /*alternatively: "java.util.ArrayList" (string!) */);
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
 /* this looks little hacky, i am open to better proposals (we need Serde here): */
 Serdes.serdeFrom(
   new SomeItemSerializer(), /* <- we'll need this (not null!) due to api, but it can be "no-op" (is never used with given code) */
   new JsonDeserializer<>(Item.class, false)
 )
);
// and then:
// ...
factory.setConsumerFactory(
  new DefaultKafkaConsumerFactory<>(
    props,
    new StringDeserializer(),
    new ListDeserializer<>(/* just empty */)
    /* now with configureDeserializers = true/default */
  )
); // ...

Чтобы обойти (уродливый) no-op-serializer, мы могли бы (попробовать):

new Serdes.WrapperSerde(/*!!*/null, new JsonDeserializer<>(Item.class, false))

..вместо того:

Serdes.serdeFrom(..)

..и скрещенные пальцы, что никто/вещь не вызывает ничего, кроме deserializer() на WrapperSerde (..иначе: NPE! ..consumer не сериализуется!;). (ссылка)

хм, как насчет JsonDeserializer.TRUSTED_PACKAGES, "*" ?

xerx593 05.12.2022 14:06

... это не будет распространено на JsonDeserializer с помощью «быстрого исправления»!

xerx593 05.12.2022 14:14

Верно, но в моем конкретном случае я использую new JsonDeserializer<>(Item.class, false), где false соответствует атрибуту useHeadersIfPresent, что делает JsonDeserializer.TRUSTED_PACKAGES избыточным, если я не ошибаюсь, потому что он даже не ищет заголовок типа и, следовательно, не проверяет доверенные пакеты.

birca123 05.12.2022 15:12

Другие вопросы по теме