Процессор GetKafka
— устарел. Вместо этого используйте процессор ConsumeKafka
. Этот процессор имеет параметр Сброс смещения, который принимает самый ранний, самый последний и никто в качестве параметров. Самый последний должен делать то, что вам нужно.
Примечание:, если вам все еще нужно GetKafka
для работы с экземпляром Kafka 0.8, есть вариант для Автоматический сброс смещения с возможными значениями самый маленький и самый большой.
Чтобы добавить к комментарию Энди ... даже с ConsumeKafka, сброс смещения используется, когда для текущей группы потребителей нет смещения, поэтому, если вы раньше запускали процессор, останавливали его и запускали снова, это не будет используется, потому что он будет начинаться со смещения раньше. Вам нужно изменить идентификатор группы потребителей, чтобы начать сначала.