@KafkaListener не использует сообщения — проблема с десериализацией

Message Producer, использующий привязки Kafka к облачным потокам Spring

@Component
public static class PageViewEventSource implements ApplicationRunner {

private final MessageChannel pageViewsOut;
private final Log log = LogFactory.getLog(getClass());

public PageViewEventSource(AnalyticsBinding binding) {
    this.pageViewsOut = binding.pageViewsOut();
}

@Override
public void run(ApplicationArguments args) throws Exception {
    List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
    List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
    Runnable runnable = () -> {
        String rPage = pages.get(new Random().nextInt(pages.size()));
        String rName = pages.get(new Random().nextInt(names.size()));
        PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);


        Serializer<PageViewEvent> serializer = new JsonSerde<>(PageViewEvent.class).serializer();
        byte[] m = serializer.serialize(null, pageViewEvent);

        Message<byte[]> message =  MessageBuilder
                .withPayload(m).build();

        try {
            this.pageViewsOut.send(message);
            log.info("sent " + message);
        } catch (Exception e) {
            log.error(e);
        }
    };
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
}

Это использование ниже сериализации

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$BytesSerde

Я пытаюсь использовать эти сообщения в отдельном потребительском приложении через Spring Kafka – KafkaListener

@Service
public class PriceEventConsumer {


private static final Logger LOG = LoggerFactory.getLogger(PriceEventConsumer.class);

@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")
public void receive(Bytes data){
//public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
    LOG.info("Message received");
    LOG.info("received data='{}'", data);

    }

Конфигурация контейнерного завода

 @Bean
 public ConsumerFactory<String, Bytes> consumerFactory() {

  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  StringDeserializer.class);
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
  BytesDeserializer.class);
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

  return new DefaultKafkaConsumerFactory<>(props);

 }

 @Bean
 public ConcurrentKafkaListenerContainerFactory<String, Bytes> 

 kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  return factory;
 }

С этой конфигурацией потребитель не получает сообщения (байты). Если я изменю прослушиватель Kafka, чтобы принять строку, это даст мне следующее исключение:

   @KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")

 public void receive(String data){

    LOG.info("Message received");
    LOG.info("received data='{}'", data);

    }

Вызванный:

org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.kafka.common.utils.Bytes] to [java.lang.String] for GenericMessage [payload = {"userId":"facebook","page":"about","duration":10}, headers = {kafka_offset=4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test1, kafka_receivedTimestamp=1553007593670}], failedMessage=GenericMessage [payload = {"userId":"facebook","page":"about","duration":10}, headers = {kafka_offset=4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test1, kafka_receivedTimestamp=1553007593670}] ... 23 more Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.kafka.common.utils.Bytes] to [java.lang.String] for GenericMessage [payload = {"userId":"facebook","page":"about","duration":10}, headers = {kafka_offset=4213, kafka_consumer=brave.kafka.clients.TracingConsumer@9a75f94, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test1, kafka_receivedTimestamp=1553007593670}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE] ... 22 more

Любые указатели будут очень полезны.

Обновление части POJO

Партия Пожо ——

  @KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")

  public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
    LOG.info("Message received");
    LOG.info("received data='{}'", data);

   }

Конфигурация контейнерного завода

 @Bean
 public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class));



}

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(priceEventConsumerFactory());
     return factory;
     }

Режиссер -

  @Override
  public void run(ApplicationArguments args) throws Exception {
  List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
   List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
   Runnable runnable = () -> {
    String rPage = pages.get(new Random().nextInt(pages.size()));
    String rName = pages.get(new Random().nextInt(names.size()));
    PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);

    Message<PageViewEvent> message =  MessageBuilder
            .withPayload(pageViewEvent).build();
                    try {
        this.pageViewsOut.send(message);
        log.info("sent " + message);
    } catch (Exception e) {
        log.error(e);
    }
};

вы хотите десериализовать в байты или pojo?

Deadpool 19.03.2019 16:31

Изначально я пытался десериализовать его с помощью POJO. Но мой потребитель не получал сообщения. Любой способ в порядке, но кроме String он не собирает данные

Priya Tanwar 19.03.2019 17:01

Покажи ту часть, с которой я могу тебе помочь @PriyaTanwar

Deadpool 19.03.2019 17:02

@Bean public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(priceEventConsumerFactory()); возврат на заводе; }

Priya Tanwar 19.03.2019 17:04

Это была моя фабрика контейнеров, и я использовал String и Json Deserializer для ключа и значения соответственно. В конце прослушивателя kafka у меня было следующее изменение: @KafkaListener(topics = "test1", groupId = "json", containerFactory = "priceEventsKafkaListenerContainerFactory") public void receive(@Payload PageViewEvent data, @Headers MessageHeaders headers) {}

Priya Tanwar 19.03.2019 17:06

Пожалуйста, обновите это в посте, а не в комментариях @PriyaTanwar

Deadpool 19.03.2019 17:07

@Deadpool — обновлено в посте.

Priya Tanwar 19.03.2019 17:40
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
3
7
5 346
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Вы можете десериализовать запись из kfka в POJO, для версий <2.2.x используйте MessageConverter

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean

@Bean
public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class,false));

}

Или с помощью MessageConverter

 @Bean
 public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {

 ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
 factory.setConsumerFactory(consumerFactory());
 factory.setMessageConverter(new StringJsonMessageConverter());
 return factory;
}

Используйте JsonDeserializer упаковки org.springframework.kafka.support.serializer

Также доверяйте пакету десериализатора json props.put(JsonDeserializer.TRUSTED_PACKAGES, <package-name>)

Другие вопросы по теме