Повторные попытки производителя облачного потока Spring и обработка ошибок

Я установил производителя и потребителя кафки весеннего облачного потока, и работают 3 брокера кафки. Я установил min.insync.replicas на 4, чтобы увидеть, как работает обработка ошибок производителя. Вызов messagechannel.send немедленно возвращается, и журналы производителя продолжают сообщать NOT_ENOUGH_REPLICAS, что нормально и ожидаемо.

server.port: 9050
spring:
  cloud:
    stream:
      bindings:
        errorChannel:
          destination: error-topic
        output:
          destination: stream-topic
          group: top-group
          producer:
            errorChannelEnabled: true
      kafka:
        bindings:
          output:
            producer:
              retries: 3
              sync: false
        binder:
          autoCreateTopics: true
          configuration:
            value:
              serializer: com.example.kafkapublisher.MySerializer
          producer-properties:
            acks: all
spring.cloud.stream.kafka.bindings.errorChannel.consumer.enableDlq: true
  

Выше приведена конфигурация моего производителя. Несмотря на то, что число повторных попыток равно 3, производитель продолжает повторять большое количество попыток. Несмотря на то, что для синхронизации установлено значение true, вызов отправки выполняется немедленно. Хотя канал ошибки и место назначения определены, а для errorChannelEnabled установлено значение true, я не вижу сообщение об ошибке в теме ошибки my-error, и тема ошибки не создана. Запросить вашу помощь

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
0
0
37
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Произвольные свойства производителя Kafka входят в свойство ...producer.configuration.

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties

configuration

Map with a key/value pair containing generic Kafka producer properties. The bootstrap.servers property cannot be set here; use multi-binder support if you need to connect to multiple clusters.

Default: Empty map.

Спасибо Гэри. С обновленной конфигурацией (вопрос отредактирован) я могу заставить работать флаг синхронизации. Также ошибки, возникшие во время производителя.send, такие как NOT_ENOUGH_REPLICAS, отправляются в errorChannel, и я получаю обратные вызовы на ServiceActivator. Однако есть способ отправить ошибки, возникающие из-за вызовов производителя. отправить, как указано выше, для отправки в dlq. У меня есть потребитель, который работает хорошо, и ошибки, выдаваемые потребителем, правильно переносятся в DLQ. Данная конфигурация предназначена для производителя, хотя я дал dlq для канала ошибки, я не вижу его в теме ошибок

redeemed 10.05.2022 16:21

Нет; используйте KafkaTemplate в @ServiceActivator, чтобы отправить его.

Gary Russell 10.05.2022 17:16

Спасибо . Теперь проблема в том, что я имитирую NOT_ENOUGH_REPLICAS и, следовательно, не могу направить ошибку в тему. Есть ли какой-либо другой способ, которым мы можем имитировать ошибки производителя при работающем брокере kafka. Я пытался использовать собственный сериализатор, который вслепую выдает ошибку, однако в этом случае ServiceActivator не вызывается.

redeemed 11.05.2022 11:44

Также не могли бы вы объяснить, что происходит, когда мы устанавливаем acks = all и sync = true вместе. Если acks=all и sync=false, то вызов отправки вернется немедленно, это правильно? тогда какова цель acks = all . Если acks = 0 и sync = true, что это означает?

redeemed 11.05.2022 12:25

См. документацию, что означает acks=0. kafka.apache.org/documentation/#producerconfigs_ackssync=true означает, что исключение будет выдано при отправке, false означает, что ошибка отправляется в канал ошибок. Чтобы вызвать ошибку только по одной теме, можно было уменьшить ее max.message.bytes и отправить запись большего размера. kafka.apache.org/documentation/#topicconfigs_max.message.byt‌​es

Gary Russell 11.05.2022 14:58

будет ли sync = true гарантировать, что записи будут распространяться на все реплики insync? я знаю о acks, но путаюсь с обоими этими конфигами, используемыми вместе. Я предполагаю, что sync = true предназначен только для получения исключения, если вообще запись лидера не удалась. Пожалуйста, поправьте меня, если я ошибаюсь.

redeemed 11.05.2022 14:59
sync=true просто говорит Spring дождаться результата отправки, прежде чем вернуться к отправителю. Он не зависит от аков — всем этим занимается сама Kafka. На практике да, это означает, что запись защищена в необходимом количестве реплик. Но по этой причине это будет намного медленнее, чем sync=false.
Gary Russell 11.05.2022 15:37

Довольно легко обрабатывать ошибки с sync=true, особенно если вы хотите объединить операции записи db и kafka send msg в одной транзакции. В противном случае нам придется полагаться на исходящие шаблоны с debezium/eventuate и т.д., которые сложнее реализовать и управлять ими. Любые мысли о том, в какой степени производительность снижается с sync = true, для моего тестирования по теме с менее чем 3 записями метод отправки возвращался через несколько миллисекунд, и это был кластер kafka с 3 узлами локально на моем ноутбуке.

redeemed 11.05.2022 19:03

Это определенно проще кодировать; производительность действительно проблема, только если вы хотите опубликовать много записей за короткое время.

Gary Russell 11.05.2022 19:06

Означает, что если я буду публиковать с высокой скоростью в теме A и с низкой скоростью публикации в теме B, это повлияет только на производительность темы A по сравнению с темой B с синхронизацией = true. Это правильно ?

redeemed 11.05.2022 19:10

Да; это правильно - свойство синхронизации для каждой привязки.

Gary Russell 11.05.2022 19:25

Спасибо. Любая идея, почему ошибка сериализации не обнаруживалась в @ServiceActivator, когда для синхронизации было установлено значение false. Хотя я не пробовал с sync=true, который, как я полагаю, будет возвращен в тот же поток вызывающего абонента, который вызвал отправку. Также было бы здорово узнать, есть ли другие способы имитации ошибок производителя.

redeemed 11.05.2022 19:30

Сериализация всегда выполняется в вызывающем потоке; sync относится только к тому, ждать ли асинхронного подтверждения или нет (в вызывающем потоке).

Gary Russell 11.05.2022 19:34

Хорошо, поэтому независимо от синхронизации ошибка сериализации всегда будет отправляться обратно в вызывающий поток. Это верно? Также было бы здорово узнать, есть ли другие способы имитации ошибок производителя, кроме not_enough_replicas и max.message.bytes.

redeemed 11.05.2022 19:46

Я не исследовал другие возможные способы имитации ошибок. Вероятно, вы могли бы сделать это с фиктивным продюсером, но это потребовало бы некоторого размышления. Если вам нужна помощь с этим, я предлагаю вам задать новый вопрос; админы здесь не любят длинных комментариев.

Gary Russell 11.05.2022 20:15

Конечно, спасибо Гэри. Большое спасибо за твою помощь .

redeemed 11.05.2022 20:21

Другие вопросы по теме