Потребитель Spring kafka не связывается с сервером kafka после смены лидера

Я использую spring-kafka 2.1.10.RELEASE. У меня есть потребитель со следующими свойствами (почти все они скопированы):

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka1.local:9093, kafka2.local:9093, kafka3.local:9093]
    check.crcs = true
    client.id = kafkaListener-0
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = kafkaLisneterContainer
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    max.poll.interval.ms = 300000
    max.poll.records = 50
    metadata.max.age.ms = 300000
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

Версия Apache Kafka на моем производстве — 2.11-1.0.0-0pan4. Есть кластер с 3 нодами кафки внутри:

Потребитель Spring kafka не связывается с сервером kafka после смены лидера

Столкнулся с серьезной проблемой и не может даже воспроизвести ее локально. И вот что произошло:

  1. Я начал свое приложение с kafka Producer и Consumer внутри.

  2. Все работало нормально, пока нода лидера для моей темы не была изменена в 2019-01-17 06:47:39:

2019-01-17/controller.2019-01-17-03.aaa-aa3.gz:2019-01-17 06:47:39,365 +0000 [controller-event-thread] [kafka.controller.KafkaController] INFO [Controller id=3] New leader and ISR for partition topic_name-0 is {"leader":1,"leader_epoch":3,"isr":[1,3]} (kafka.controller.KafkaController)

  1. После этого мой потребитель перестал совершать зачеты в Kafka. Последняя фиксация произошла в тот же час и в ту же минуту, когда сменился лидер — 17 января 2019 г., 06:47. Потребитель Spring kafka не связывается с сервером kafka после смены лидера

4) САМОЕ ЗАГАДОЧНОЕ: в приложении вроде все нормально работает. Spring-consumer читает новые сообщения и отправляет их в kafka. Я вижу такие логи. Похоже, что потребитель Spring сохраняет свое смещение в памяти и отправляет коммит на удаленную кафку (без ошибок и т. д.):

2019-01-23 14:03:20,975 +0000 [kafkaLisneterContainer-0-C-1] [Fetcher] DEBUG [Consumer clientId=kafkaListener-0, groupId=kafkaLisneterContainer] Fetch READ_UNCOMMITTED at offset 164871 for partition aaa-1 returned fetch data (error=NONE, highWaterMark=164871, lastStableOffset = -1, logStartOffset = 116738, abortedTransactions = null, recordsSizeInBytes=0) 2019-01-23 14:03:20,975 +0000
[externalbetting] [kafkaLisneterContainer-0-C-1] [Fetcher] DEBUG [Consumer clientId=kafkaListener-0, groupId=kafkaLisneterContainer] Added READ_UNCOMMITTED fetch request for partition eaaa-1 at offset 164871 to node aaa-aa1.local:9093 (id: 1 rack: null) 2019-01-23 14:03:20,975

5) Но в любом случае Лаг в Apache kafka растет. И если я перезапущу свое приложение, потребитель Spring bean будет воссоздан и потеряет свое сохраненное в памяти смещение. Он прочитает этот лаг из kafka и обработает эти записи во второй раз.

Пожалуйста, помогите найти ключ!

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
4
0
2 994
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Когда вы включаете автоматическую фиксацию (по умолчанию Kafka), коммиты полностью управляются клиентами kafka, и Spring не имеет над ними контроля.

Установка его в false позволит контейнеру прослушивателя фиксировать смещения, которые он будет выполнять после каждого пакета записей (результат опроса) по умолчанию или после каждой записи, если вы установите для свойства контейнера AckMode значение RECORD.

Контейнер также будет надежно фиксировать любые ожидающие смещения, когда разделы будут отозваны из-за перебалансировки.

Обычно я рекомендую не использовать автоматическую фиксацию.

Нашел блок кода внутри кафка-клиентов: if (autoCommitEnabled) { if (coordinatorUnknown()) { this.nextAutoCommitDeadline = now + retryBackoffMs; } else if (now >= nextAutoCommitDeadline) { this.nextAutoCommitDeadline = now + autoCommitIntervalMs; doAutoCommitOffsetsAsync(); } } } Похоже, что в моем случае coordinatorUnknown всегда верно.. Почему это могло произойти?

Sviatlana 25.01.2019 09:33

Я не могу комментировать это; Я не знаю внутренностей кафка-клиентов; возможно, он фиксируется только вовремя и не учитывает перебалансировку. Все, что я могу сказать, это то, что контейнер прослушивателя будет надежно фиксировать смещения, когда он уведомлен о том, что разделы были отозваны во время перебалансировки (если не используется автоматическая фиксация).

Gary Russell 25.01.2019 15:33

Другие вопросы по теме