Пытаюсь загрузить около 50К сообщений в тему KAFKA. В начале нескольких прогонов становится ниже исключения, но не всегда.
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?]
at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?]
...
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.
Блок кода ниже:
public void persistUpdatesPostAction(List<Message> messageList ) {
if ((messageList == null) || (messageList.isEmpty())) {
return;
}
logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
try {
producer.beginTransaction();
createKafkaBulkInsert1(producer, messageList, "Topic1");
createKafkaBulkInsert2(producer, messageList, "Topic2");
createKafkaBulkInsert3(producer, messageList, "Topic3");
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
producer.close();
KafkaUtils.removeProducer(Thread.currentThread().getName());
}
}
-----------
static Properties setPropertiesProducer() {
Properties temp = new Properties();
temp.put("bootstrap.servers", "localhost:9092");
temp.put("acks", "all");
temp.put("retries", 1);
temp.put("batch.size", 16384);
temp.put("linger.ms", 5);
temp.put("buffer.memory", 33554432);
temp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
temp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return temp;
}
public static Producer<String, String> getProducer(String aThreadId) {
if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null)) {
Properties temp = producerProps;
temp.put("transactional.id", aThreadId);
Producer<String, String> producer = new KafkaProducer<String, String>(temp);
producerMap.put(aThreadId, producer);
producer.initTransactions();
return producer;
}
return producerMap.get(aThreadId);
}
public static void removeProducer(String aThreadId) {
logger.createDebug("Removing Thread ID :" + aThreadId);
if (producerMap.get(aThreadId) == null)
return;
producerMap.remove(aThreadId);
}




В моем коде инициализации производителя было состояние гонки. Я исправил это, изменив карту Producer на тип ConcurrentHashMap, чтобы обеспечить потокобезопасность.
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Это сообщение об исключении не очень полезно. Я считаю, что пытающийся означает, что у брокера больше нет записи идентификатора транзакции, отправляемой клиентом. Это может быть потому, что:
UUID.randomUUID().В нашем случае мы время от времени превышали таймауты транзакций, что приводило к возникновению этого исключения. Есть 2 свойства, которые определяют, как долго брокер будет помнить транзакцию, прежде чем прервать ее и забыть о ней.
transaction.max.timeout.ms - свойство маклер, указывающее максимальное количество миллисекунд, пока транзакция не будет прервана и забыта. По умолчанию во многих версиях Kafka кажется, что это 900000 (15 минут). Документация от Кафки говорит:
The maximum allowed timeout for transactions. If a client’s requested transaction time exceeds this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.
transaction.timeout.ms - свойство производитель-клиент, которое устанавливает время ожидания в миллисекундах при создании транзакции. По умолчанию во многих версиях Kafka кажется, что это 60000 (1 минута). В документации от Кафки говорится:
The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
Если свойство transaction.timeout.ms, установленное в клиенте, превышает свойство transaction.max.timeout.ms в брокере, производитель немедленно выдаст что-то вроде следующего исключения:
org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse
The transaction timeout is larger than the maximum value allowed by the broker
(as configured by transaction.max.timeout.ms).
Я пишу модульный тест, чтобы воспроизвести это, из этого фрагмента кода Java вы можете легко понять, как это происходит, с помощью двух одинаковых tansactional id.
@Test
public void SendOffset_TwoProducerDuplicateTrxId_ThrowException() {
// create two producer with same transactional id
Producer producer1 = KafkaBuilder.buildProducer(trxId, servers);
Producer producer2 = KafkaBuilder.buildProducer(trxId, servers);
offsetMap.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1000));
// initial and start two transactions
sendOffsetBegin(producer1);
sendOffsetBegin(producer2);
try {
// when commit first transaction it expected to throw exception
sendOffsetEnd(producer1);
// it expects not run here
Assert.assertTrue(false);
} catch (Throwable t) {
// it expects to catch the exception
Assert.assertTrue(t instanceof ProducerFencedException);
}
}
private void sendOffsetBegin(Producer producer) {
producer.initTransactions();
producer.beginTransaction();
producer.sendOffsetsToTransaction(offsetMap, consumerGroup);
}
private void sendOffsetEnd(Producer producer) {
producer.commitTransaction();
}
When running multiple instances of the application,
transactional.idmust be the same on all instances to satisfy fencing zombies when producing records on a listener container thread. However, when producing records using transactions that are not started by a listener container, the prefix has to be different on each instance.
https://docs.spring.io/spring-kafka/reference/html/#transaction-id-prefix
Спасибо, это объяснение очень полезно, чем официальный документ!