Я пытаюсь понять Kafka ровно один раз, используя транзакционный производитель / потребитель.
Я наткнулся на пример ниже. Но все же мне трудно понять ровно один раз. Это правильный код?
producer.sendOffsetsToTransaction - What this code does? Should this be done to the same target topic?
Что такое сбой системы перед consumer.commitSync (); // Те же сообщения будут прочитаны снова, и будут создаваться повторяющиеся сообщения?
public class ExactlyOnceLowLevel {
public void runConsumer() throws Exception {
final KafkaConsumer<byte[], byte[]> consumer = createConsumer();
final Producer<Long, String> producer = createProducer();
producer.initTransactions();
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
try {
final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
producer.beginTransaction();
for (final ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
final ProducerRecord<Long, String> producerRecord =
new ProducerRecord<>(TOPIC_1, new BigInteger(record.key()).longValue(), record.value().toString());
// send returns Future
final RecordMetadata metadata = producer.send(producerRecord).get();
currentOffsets.put(new TopicPartition(TOPIC_1, record.partition()), new OffsetAndMetadata(record.offset()));
}
producer.sendOffsetsToTransaction(currentOffsets, "my-transactional-consumer-group"); // a bit annoying here to reference group id rwice
producer.commitTransaction();
consumer.commitSync();
currentOffsets.clear();
// EXACTLY ONCE!
}
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
e.printStackTrace();
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
}
catch (final KafkaException e) {
e.printStackTrace();
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
finally {
producer.flush();
producer.close();
}
}
}
private static KafkaConsumer<byte[], byte[]> createConsumer() {
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); // this has to be
return new KafkaConsumer<>(consumerConfig);
}
private static Producer<Long, String> createProducer() {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, 3); // this is now safe !!!!
props.put(ProducerConfig.ACKS_CONFIG, "all"); // this has to be all
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // this has to be 1
return new KafkaProducer<>(props);
}
public static void main(final String... args) throws Exception {
final ExactlyOnceLowLevel example = new ExactlyOnceLowLevel();
example.runConsumer();
}
}
Вы не должны пытаться фиксировать смещения с Потребителем при использовании шаблона чтения / обработки / записи с транзакциями Kafka. Как вы намекнули, это может вызвать проблемы.
В этом случае смещения необходимо добавить в транзакцию, и для этого следует использовать только sendOffsetsToTransaction()
. Этот метод гарантирует, что эти смещения будут зафиксированы только в случае успешного завершения транзакции. См. Javadoc:
Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern. Thus, the specified consumerGroupId should be the same as config parameter group.id of the used consumer. Note, that the consumer should have enable.auto.commit=false and should also not commit offsets manually (via sync or async commits).
В вашем примере вам не нужно вызывать clear()
, поскольку вы будете воссоздавать новую карту при запуске следующей итерации. Даже если вы хотите вызвать clear()
, это произойдет после фиксации транзакции, чтобы данные не были потеряны.
Нужен ли нам здесь транзакционный потребитель. ConsumerConfig.put (ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); Какую роль он здесь играет, поскольку он просто читает из другой темы.
Если входная тема не содержит транзакций, оставьте значение по умолчанию для этого параметра. Если он содержит транзакции, это зависит от ваших требований.
Producer.sendOffsetsToTransaction -> Где хранятся смещения этой транзакции? Это тема __stransaction_state, или в журнале фиксации транзакции, или ....
Они пишутся в тему __consumer_offset
как обычные офсеты. Но они будут «видимы» только после того, как транзакция будет совершена. Если вы хотите понять, как именно работают транзакции, рекомендую посмотреть проектный документ: cwiki.apache.org/confluence/display/KAFKA/…
Итак, sendOffsetsToTransaction должен быть в исходной теме. Что делать, если при вызове currentOffsets.clear (); возникают проблемы?