Я установил производителя и потребителя кафки весеннего облачного потока, и работают 3 брокера кафки. Я установил min.insync.replicas на 4, чтобы увидеть, как работает обработка ошибок производителя. Вызов messagechannel.send немедленно возвращается, и журналы производителя продолжают сообщать NOT_ENOUGH_REPLICAS, что нормально и ожидаемо.
server.port: 9050
spring:
cloud:
stream:
bindings:
errorChannel:
destination: error-topic
output:
destination: stream-topic
group: top-group
producer:
errorChannelEnabled: true
kafka:
bindings:
output:
producer:
retries: 3
sync: false
binder:
autoCreateTopics: true
configuration:
value:
serializer: com.example.kafkapublisher.MySerializer
producer-properties:
acks: all
spring.cloud.stream.kafka.bindings.errorChannel.consumer.enableDlq: true
Выше приведена конфигурация моего производителя. Несмотря на то, что число повторных попыток равно 3, производитель продолжает повторять большое количество попыток. Несмотря на то, что для синхронизации установлено значение true, вызов отправки выполняется немедленно. Хотя канал ошибки и место назначения определены, а для errorChannelEnabled установлено значение true, я не вижу сообщение об ошибке в теме ошибки my-error, и тема ошибки не создана. Запросить вашу помощь




Произвольные свойства производителя Kafka входят в свойство ...producer.configuration.
configuration
Map with a key/value pair containing generic Kafka producer properties. The bootstrap.servers property cannot be set here; use multi-binder support if you need to connect to multiple clusters.
Default: Empty map.
Нет; используйте KafkaTemplate в @ServiceActivator, чтобы отправить его.
Спасибо . Теперь проблема в том, что я имитирую NOT_ENOUGH_REPLICAS и, следовательно, не могу направить ошибку в тему. Есть ли какой-либо другой способ, которым мы можем имитировать ошибки производителя при работающем брокере kafka. Я пытался использовать собственный сериализатор, который вслепую выдает ошибку, однако в этом случае ServiceActivator не вызывается.
Также не могли бы вы объяснить, что происходит, когда мы устанавливаем acks = all и sync = true вместе. Если acks=all и sync=false, то вызов отправки вернется немедленно, это правильно? тогда какова цель acks = all . Если acks = 0 и sync = true, что это означает?
См. документацию, что означает acks=0. kafka.apache.org/documentation/#producerconfigs_ackssync=true означает, что исключение будет выдано при отправке, false означает, что ошибка отправляется в канал ошибок. Чтобы вызвать ошибку только по одной теме, можно было уменьшить ее max.message.bytes и отправить запись большего размера. kafka.apache.org/documentation/#topicconfigs_max.message.bytes
будет ли sync = true гарантировать, что записи будут распространяться на все реплики insync? я знаю о acks, но путаюсь с обоими этими конфигами, используемыми вместе. Я предполагаю, что sync = true предназначен только для получения исключения, если вообще запись лидера не удалась. Пожалуйста, поправьте меня, если я ошибаюсь.
sync=true просто говорит Spring дождаться результата отправки, прежде чем вернуться к отправителю. Он не зависит от аков — всем этим занимается сама Kafka. На практике да, это означает, что запись защищена в необходимом количестве реплик. Но по этой причине это будет намного медленнее, чем sync=false.
Довольно легко обрабатывать ошибки с sync=true, особенно если вы хотите объединить операции записи db и kafka send msg в одной транзакции. В противном случае нам придется полагаться на исходящие шаблоны с debezium/eventuate и т.д., которые сложнее реализовать и управлять ими. Любые мысли о том, в какой степени производительность снижается с sync = true, для моего тестирования по теме с менее чем 3 записями метод отправки возвращался через несколько миллисекунд, и это был кластер kafka с 3 узлами локально на моем ноутбуке.
Это определенно проще кодировать; производительность действительно проблема, только если вы хотите опубликовать много записей за короткое время.
Означает, что если я буду публиковать с высокой скоростью в теме A и с низкой скоростью публикации в теме B, это повлияет только на производительность темы A по сравнению с темой B с синхронизацией = true. Это правильно ?
Да; это правильно - свойство синхронизации для каждой привязки.
Спасибо. Любая идея, почему ошибка сериализации не обнаруживалась в @ServiceActivator, когда для синхронизации было установлено значение false. Хотя я не пробовал с sync=true, который, как я полагаю, будет возвращен в тот же поток вызывающего абонента, который вызвал отправку. Также было бы здорово узнать, есть ли другие способы имитации ошибок производителя.
Сериализация всегда выполняется в вызывающем потоке; sync относится только к тому, ждать ли асинхронного подтверждения или нет (в вызывающем потоке).
Хорошо, поэтому независимо от синхронизации ошибка сериализации всегда будет отправляться обратно в вызывающий поток. Это верно? Также было бы здорово узнать, есть ли другие способы имитации ошибок производителя, кроме not_enough_replicas и max.message.bytes.
Я не исследовал другие возможные способы имитации ошибок. Вероятно, вы могли бы сделать это с фиктивным продюсером, но это потребовало бы некоторого размышления. Если вам нужна помощь с этим, я предлагаю вам задать новый вопрос; админы здесь не любят длинных комментариев.
Конечно, спасибо Гэри. Большое спасибо за твою помощь .
Спасибо Гэри. С обновленной конфигурацией (вопрос отредактирован) я могу заставить работать флаг синхронизации. Также ошибки, возникшие во время производителя.send, такие как NOT_ENOUGH_REPLICAS, отправляются в errorChannel, и я получаю обратные вызовы на ServiceActivator. Однако есть способ отправить ошибки, возникающие из-за вызовов производителя. отправить, как указано выше, для отправки в dlq. У меня есть потребитель, который работает хорошо, и ошибки, выдаваемые потребителем, правильно переносятся в DLQ. Данная конфигурация предназначена для производителя, хотя я дал dlq для канала ошибки, я не вижу его в теме ошибок