Элементы Flink Kafka Producer вышли из строя

Я пытаюсь создать элементы с помощью FlinkKafkaProducer010, однако, когда я открываю окно консоли потребителя, кажется, что элементы поступают не по порядку.

Я создал тему, используя: kafka-themes.bat --create --topic mytopic --zookeeper localhost: 2181 --partitions 1 --replication-factor 1

Потребитель создается с использованием: kafka-console-consumer.bat --zookeeper localhost: 2181 --topic mytopic

Я использую следующий код Kafka Producer:

public static void main(String[] args) throws Exception {
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    if (parameterTool.getNumberOfParameters() < 2) {
        System.out.println("Missing parameters!");
        System.out.println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>");
        return;
    }

    StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));

    DataStream<String> messageStream = env.addSource(getSourceFunction());

    FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
    messageStream.addSink(producer);
    env.execute("Kafka Producer");
}

public static SourceFunction<String> getSourceFunction() {
    return new SourceFunction<String>() {
        private static final long serialVersionUID = 6369260225318862378L;
        public boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) {
            int counter = 0;
            while (this.running && counter < 500) {
                String data = "item " + Integer.toString(counter);
                ctx.collect(data);

                counter++;
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    };
}

Когда я смотрю файлы журнала Kafka, я вижу файл .log, в котором элементы также не в порядке. Порядок элементов дает скачки примерно на 10 значений. В моем случае очень важно иметь правильный порядок. Я искал, как обеспечить доставку элементов по порядку, но пока безуспешно. Есть ли что-то, что я пропустил, что исправляет порядок?

Заранее благодарю за любую помощь!

У вас гарантированный заказ в перегородке. Можете ли вы программно создать потребителя и проверить.

Indraneel Bende 29.05.2018 06:03

Я уже потреблял элементы в Flink, используя FlinkKafkaConsumer010 с TimeCharacteristic, установленным на EventTime. Тем не менее, это также было не по порядку, поэтому я вернулся к основам проверки только с помощью Kafka и простого потребителя консоли Kafka.

Hans 29.05.2018 06:53
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
2
419
1

Ответы 1

Я предполагаю, что вы используете параллелизм> 1 для стока. Порядок элементов гарантируется только в одном экземпляре оператора. Если вы пишете из нескольких параллельных экземпляров приемника в один раздел kafka, то порядок не дает никаких гарантий.

Другие вопросы по теме