Я работаю над приложением Beam, которое использует KafkaIO в качестве входных данных.
KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("bootstrapServers")
.withTopic("topicName")
.withConsumerConfigUpdates(confs)
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer((Deserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata();
Я пытаюсь понять, как именно работает commitOffsetsInFinalize()
.
Как можно завершить потоковое задание?
Последним шагом в конвейере является настраиваемый DoFn, который записывает сообщения в DynamoDb
. Есть ли способ вручную вызвать там какой-нибудь метод finalize()
, чтобы смещения фиксировались после каждого успешного выполнения DoFn
?
Также мне трудно понять, какая связь между контрольными точками и финализацией? Если в конвейере не включена контрольная точка, смогу ли я доработать и заставить работать commitOffsetsInFinalize()
?
p.s. То, как сейчас работает конвейер, даже с commitOffsetsInFinalize()
каждым прочитанным сообщением, независимо от того, есть ли сбой в нисходящем направлении, фиксируется, что приводит к потере данных.
Спасибо!
Завершение здесь относится к завершению контрольной точки, другими словами, когда данные были надежно зафиксированы в состоянии выполнения Beam (таким образом, сбои/переназначение рабочих будут повторяться без необходимости повторного чтения этого сообщения от Kafka). Это не означает, что данные, о которых идет речь, прошли оставшуюся часть конвейера.
Правильно, это полезно только в том случае, если вы также не установили AUTO_COMMIT в конфигурации потребителя kafka.
Спасибо, робертвб. Что произойдет, если контрольная точка никогда не будет определена? Каков механизм контрольной точки по умолчанию, если приложение работает на Flink? Также я понимаю, что если у меня нет
ENABLE_AUTO_COMMIT_CONFIG, false
, сообщения все равно будут зафиксированы независимо от того, установлен лиcommitOffsetsInFinalize()
или нет, это правильно?