Я использую spring-kafka 2.1.10.RELEASE. У меня есть потребитель со следующими свойствами (почти все они скопированы):
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka1.local:9093, kafka2.local:9093, kafka3.local:9093]
check.crcs = true
client.id = kafkaListener-0
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = kafkaLisneterContainer
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
max.poll.interval.ms = 300000
max.poll.records = 50
metadata.max.age.ms = 300000
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
Версия Apache Kafka на моем производстве — 2.11-1.0.0-0pan4. Есть кластер с 3 нодами кафки внутри:
Столкнулся с серьезной проблемой и не может даже воспроизвести ее локально. И вот что произошло:
Я начал свое приложение с kafka Producer и Consumer внутри.
Все работало нормально, пока нода лидера для моей темы не была изменена в 2019-01-17 06:47:39:
2019-01-17/controller.2019-01-17-03.aaa-aa3.gz:2019-01-17 06:47:39,365 +0000 [controller-event-thread] [kafka.controller.KafkaController] INFO [Controller id=3] New leader and ISR for partition topic_name-0 is {"leader":1,"leader_epoch":3,"isr":[1,3]} (kafka.controller.KafkaController)

4) САМОЕ ЗАГАДОЧНОЕ: в приложении вроде все нормально работает. Spring-consumer читает новые сообщения и отправляет их в kafka. Я вижу такие логи. Похоже, что потребитель Spring сохраняет свое смещение в памяти и отправляет коммит на удаленную кафку (без ошибок и т. д.):
2019-01-23 14:03:20,975 +0000 [kafkaLisneterContainer-0-C-1] [Fetcher] DEBUG [Consumer clientId=kafkaListener-0, groupId=kafkaLisneterContainer] Fetch READ_UNCOMMITTED at offset 164871 for partition aaa-1 returned fetch data (error=NONE, highWaterMark=164871, lastStableOffset = -1, logStartOffset = 116738, abortedTransactions = null, recordsSizeInBytes=0) 2019-01-23 14:03:20,975 +0000
[externalbetting] [kafkaLisneterContainer-0-C-1] [Fetcher] DEBUG [Consumer clientId=kafkaListener-0, groupId=kafkaLisneterContainer] Added READ_UNCOMMITTED fetch request for partition eaaa-1 at offset 164871 to node aaa-aa1.local:9093 (id: 1 rack: null) 2019-01-23 14:03:20,975
5) Но в любом случае Лаг в Apache kafka растет. И если я перезапущу свое приложение, потребитель Spring bean будет воссоздан и потеряет свое сохраненное в памяти смещение. Он прочитает этот лаг из kafka и обработает эти записи во второй раз.
Пожалуйста, помогите найти ключ!

Когда вы включаете автоматическую фиксацию (по умолчанию Kafka), коммиты полностью управляются клиентами kafka, и Spring не имеет над ними контроля.
Установка его в false позволит контейнеру прослушивателя фиксировать смещения, которые он будет выполнять после каждого пакета записей (результат опроса) по умолчанию или после каждой записи, если вы установите для свойства контейнера AckMode значение RECORD.
Контейнер также будет надежно фиксировать любые ожидающие смещения, когда разделы будут отозваны из-за перебалансировки.
Обычно я рекомендую не использовать автоматическую фиксацию.
Я не могу комментировать это; Я не знаю внутренностей кафка-клиентов; возможно, он фиксируется только вовремя и не учитывает перебалансировку. Все, что я могу сказать, это то, что контейнер прослушивателя будет надежно фиксировать смещения, когда он уведомлен о том, что разделы были отозваны во время перебалансировки (если не используется автоматическая фиксация).
Нашел блок кода внутри кафка-клиентов: if (autoCommitEnabled) { if (coordinatorUnknown()) { this.nextAutoCommitDeadline = now + retryBackoffMs; } else if (now >= nextAutoCommitDeadline) { this.nextAutoCommitDeadline = now + autoCommitIntervalMs; doAutoCommitOffsetsAsync(); } } } Похоже, что в моем случае coordinatorUnknown всегда верно.. Почему это могло произойти?