Кафка Ровно один раз с Transactional Producer

Я пытаюсь понять Kafka ровно один раз, используя транзакционный производитель / потребитель.

Я наткнулся на пример ниже. Но все же мне трудно понять ровно один раз. Это правильный код?

producer.sendOffsetsToTransaction - What this code does? Should this be done to the same target topic?

Что такое сбой системы перед consumer.commitSync (); // Те же сообщения будут прочитаны снова, и будут создаваться повторяющиеся сообщения?

public class ExactlyOnceLowLevel {

    public void runConsumer() throws Exception {
        final KafkaConsumer<byte[], byte[]> consumer = createConsumer();
        final Producer<Long, String> producer = createProducer();

        producer.initTransactions();

        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));

            try {
                final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
                producer.beginTransaction();
                for (final ConsumerRecord<byte[], byte[]> record : records) {
                    System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
                                record.offset(), record.key(), record.value());

                    final ProducerRecord<Long, String> producerRecord =
                                new ProducerRecord<>(TOPIC_1, new BigInteger(record.key()).longValue(), record.value().toString());
                    // send returns Future
                    final RecordMetadata metadata = producer.send(producerRecord).get();
                    currentOffsets.put(new TopicPartition(TOPIC_1, record.partition()), new OffsetAndMetadata(record.offset()));
                }
                producer.sendOffsetsToTransaction(currentOffsets, "my-transactional-consumer-group"); // a bit annoying here to reference group id rwice
                producer.commitTransaction();
                consumer.commitSync();
                currentOffsets.clear();
                // EXACTLY ONCE!
            }
            catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                e.printStackTrace();
                // We can't recover from these exceptions, so our only option is to close the producer and exit.
                producer.close();
            }
            catch (final KafkaException e) {
                e.printStackTrace();
                // For all other exceptions, just abort the transaction and try again.
                producer.abortTransaction();
            }
            finally {
                producer.flush();
                producer.close();
            }
        }
    }

    private static KafkaConsumer<byte[], byte[]> createConsumer() {
        final Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

        consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); // this has to be

        return new KafkaConsumer<>(consumerConfig);
    }

    private static Producer<Long, String> createProducer() {
        final Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.RETRIES_CONFIG, 3); // this is now safe !!!!
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // this has to be all
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // this has to be 1

        return new KafkaProducer<>(props);
    }

    public static void main(final String... args) throws Exception {

        final ExactlyOnceLowLevel example = new ExactlyOnceLowLevel();
        example.runConsumer();

    }
}
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
2
0
1 048
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы не должны пытаться фиксировать смещения с Потребителем при использовании шаблона чтения / обработки / записи с транзакциями Kafka. Как вы намекнули, это может вызвать проблемы.

В этом случае смещения необходимо добавить в транзакцию, и для этого следует использовать только sendOffsetsToTransaction(). Этот метод гарантирует, что эти смещения будут зафиксированы только в случае успешного завершения транзакции. См. Javadoc:

Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.

This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern. Thus, the specified consumerGroupId should be the same as config parameter group.id of the used consumer. Note, that the consumer should have enable.auto.commit=false and should also not commit offsets manually (via sync or async commits).

Итак, sendOffsetsToTransaction должен быть в исходной теме. Что делать, если при вызове currentOffsets.clear (); возникают проблемы?

user1578872 08.08.2018 14:23

В вашем примере вам не нужно вызывать clear(), поскольку вы будете воссоздавать новую карту при запуске следующей итерации. Даже если вы хотите вызвать clear(), это произойдет после фиксации транзакции, чтобы данные не были потеряны.

Mickael Maison 08.08.2018 14:55

Нужен ли нам здесь транзакционный потребитель. ConsumerConfig.put (ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); Какую роль он здесь играет, поскольку он просто читает из другой темы.

user1578872 08.08.2018 15:01

Если входная тема не содержит транзакций, оставьте значение по умолчанию для этого параметра. Если он содержит транзакции, это зависит от ваших требований.

Mickael Maison 08.08.2018 15:23

Producer.sendOffsetsToTransaction -> Где хранятся смещения этой транзакции? Это тема __stransaction_state, или в журнале фиксации транзакции, или ....

user1578872 08.08.2018 16:54

Они пишутся в тему __consumer_offset как обычные офсеты. Но они будут «видимы» только после того, как транзакция будет совершена. Если вы хотите понять, как именно работают транзакции, рекомендую посмотреть проектный документ: cwiki.apache.org/confluence/display/KAFKA/…

Mickael Maison 08.08.2018 17:01

Другие вопросы по теме