Фабрика контейнеров прослушивателя Kafka должна быть настроена на использование сообщений, значения которых представляют собой список объектов. Я попытался настроить фабрику с помощью ListDeserializer следующим образом:
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value(value = "${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, List<Item>>
kafkaListenerContainerFactoryItem() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConcurrentKafkaListenerContainerFactory<String, List<Item>> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(
new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new ListDeserializer<>(ArrayList.class, new JsonDeserializer<>(Item.class, false))));
return factory;
}
}
Но появляется исключение ниже:
org.apache.kafka.common.config.ConfigException: List deserializer was already initialized using a non-default constructor
at org.apache.kafka.common.serialization.ListDeserializer.configure(ListDeserializer.java:78) ~[kafka-clients-3.1.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.lambda$valueDeserializerSupplier$9(DefaultKafkaConsumerFactory.java:199) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:776) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:352) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:461) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:461) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:347) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:292) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.22.jar:5.3.22]
... 14 common frames omitted
Как правильно настроить фабрику контейнеров с помощью ListDeserializer?
"@param configureDeserializers false to not configure the deserializers"
@ xerx593 Это работает, и я еще не сталкивался с побочными эффектами. Можете ли вы опубликовать это как ответ, чтобы я мог принять его?




Поскольку, если бы мы не использовали какую-либо соответствующую конфигурацию (десериализатора), самым быстрым решением было бы использование:
// ...
new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new ListDeserializer<>(
ArrayList.class, new JsonDeserializer<>(Item.class, false)
),
false /* !!! */
)//...
Где false относится к configureDesirializers параметру (перегруженного) конструктора. (ссылка)
В этом случае он не имеет побочных/негативных эффектов, так как вы:
Используйте StringDeserializer в качестве десериализатора ключей.
У него есть единственная «опция конфигурации»: [[key|value].]deserializer.encoding (которую мы не устанавливаем и которая по умолчанию равна UTF-8). (ссылка)
Для десириализатора значения (a ListDeserializer) вы уже "настраиваете" все полезное (через параметры конструктора). (ссылка)
Мы делаем:
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
.../хотим настроить наш JsonDeserializer, то "обходной путь" ломает эту настройку!
Мы должны настроить props и инициализировать фабрику с пустыми (десериализаторами) конструкторами:
// props.put(...);
// when this is *really* needed (otherwise, quickfix!):
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
// extra config for our "value deserializer" (analogous to constructor):
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, ArrayList.class /*alternatively: "java.util.ArrayList" (string!) */);
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
/* this looks little hacky, i am open to better proposals (we need Serde here): */
Serdes.serdeFrom(
new SomeItemSerializer(), /* <- we'll need this (not null!) due to api, but it can be "no-op" (is never used with given code) */
new JsonDeserializer<>(Item.class, false)
)
);
// and then:
// ...
factory.setConsumerFactory(
new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new ListDeserializer<>(/* just empty */)
/* now with configureDeserializers = true/default */
)
); // ...
Чтобы обойти (уродливый) no-op-serializer, мы могли бы (попробовать):
new Serdes.WrapperSerde(/*!!*/null, new JsonDeserializer<>(Item.class, false))
..вместо того:
Serdes.serdeFrom(..)
..и скрещенные пальцы, что никто/вещь не вызывает ничего, кроме deserializer() на WrapperSerde (..иначе: NPE! ..consumer не сериализуется!;). (ссылка)
хм, как насчет JsonDeserializer.TRUSTED_PACKAGES, "*" ?
... это не будет распространено на JsonDeserializer с помощью «быстрого исправления»!
Верно, но в моем конкретном случае я использую new JsonDeserializer<>(Item.class, false), где false соответствует атрибуту useHeadersIfPresent, что делает JsonDeserializer.TRUSTED_PACKAGES избыточным, если я не ошибаюсь, потому что он даже не ищет заголовок типа и, следовательно, не проверяет доверенные пакеты.
Выполнение
new ListDeserializer<>(ArrayList.class, new JsonDeserializer<>(Item.class, false), false)поможет обойти это исключение/строку, но в отношении «(побочных) эффектов»: не уверен, пожалуйста, протестируйте/посмотрите... (есть перегруженный конструктор с параметромboolean configureDeserializers...github.com/spring-projects /spring-kafka/blob/main/spring-kafka/…)