Как установить смещение Kafka для потребителя?

Предположим, что в моей теме уже есть 10 данных, и теперь я запускаю свой потребительский Flink, потребитель будет потреблять 11-е данные.

Поэтому у меня 3 вопроса:

  1. Как получить количество разделов в текущей теме и смещение каждого раздела соответственно?
  2. Как установить начальную позицию для каждого раздела для потребителя вручную?
  3. Если потребитель Flink выйдет из строя и через несколько минут он будет восстановлен. Как потребитель узнает, где перезапустить?

Любая помощь приветствуется. Примеры кодов (я пробовал FlinkKafkaConsumer08, FlinkKafkaConsumer10, но все исключения.):

public class kafkaConsumer {
public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(5000);

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.95.2:9092");
    properties.setProperty("group.id", "test");
    properties.setProperty("auto.offset.reset", "earliest");

    FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>(
            "game_event", new SimpleStringSchema(), properties);


    DataStream<String> stream = env.addSource(myConsumer);

    stream.map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            return "Stream Value: " + value;
        }
    }).print();

    env.execute();
    }
}

И pom.xml:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.6.1</version>
    </dependency>

Не могли бы вы пояснить, что вы имеете в виду под Как получить номер раздела и смещение для каждого раздела? Также какое смещение вы ищете? На последнюю?

Giorgos Myrianthous 31.10.2018 10:37

@GiorgosMyrianthous `Как получить количество разделов` означает получить количество разделов в текущей теме; и последнее смещение каждого раздела.

user2894829 31.10.2018 10:47
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
2
2
6 610
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий
  1. Чтобы получать сообщения из раздела, начиная с определенного смещения, вы можете обратиться к Документация по Flinkl:

You can also specify the exact offsets the consumer should start from for each partition:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

The above example configures the consumer to start from the specified offsets for partitions 0, 1, and 2 of topic myTopic. The offset values should be the next record that the consumer should read for each partition. Note that if the consumer needs to read a partition which does not have a specified offset within the provided offsets map, it will fallback to the default group offsets behaviour (i.e. setStartFromGroupOffsets()) for that particular partition.

Note that these start position configuration methods do not affect the start position when the job is automatically restored from a failure or manually restored using a savepoint. On restore, the start position of each Kafka partition is determined by the offsets stored in the savepoint or checkpoint (please see the next section for information about checkpointing to enable fault tolerance for the consumer).

  1. В случае сбоя одного из потребителей, после восстановления Kafka обратится к теме consumer_offsets, чтобы продолжить обработку сообщений с той точки, в которой он был оставлен до сбоя. consumer_offsets - это тема, которая используется для хранения информации метаданных о подтвержденных смещениях для каждой тройки (тема, раздел, группа). Его также периодически уплотняют, поэтому доступны только последние смещения. Вы также можете обратиться к Показатели коннекторов Kafka от Flink:

Flink’s Kafka connectors provide some metrics through Flink’s metrics system to analyze the behavior of the connector. The producers export Kafka’s internal metrics through Flink’s metric system for all supported versions. The consumers export all metrics starting from Kafka version 0.9. The Kafka documentation lists all exported metrics in its documentation.

In addition to these metrics, all consumers expose the current-offsets and committed-offsets for each topic partition. The current-offsets refers to the current offset in the partition. This refers to the offset of the last element that we retrieved and emitted successfully. The committed-offsets is the last committed offset.

The Kafka Consumers in Flink commit the offsets back to Zookeeper (Kafka 0.8) or the Kafka brokers (Kafka 0.9+). If checkpointing is disabled, offsets are committed periodically. With checkpointing, the commit happens once all operators in the streaming topology have confirmed that they’ve created a checkpoint of their state. This provides users with at-least-once semantics for the offsets committed to Zookeeper or the broker. For offsets checkpointed to Flink, the system provides exactly once guarantees.

The offsets committed to ZK or the broker can also be used to track the read progress of the Kafka consumer. The difference between the committed offset and the most recent offset in each partition is called the consumer lag. If the Flink topology is consuming the data slower from the topic than new data is added, the lag will increase and the consumer will fall behind. For large production deployments we recommend monitoring that metric to avoid increasing latency.

Другие вопросы по теме