В чем причина получения ProducerFencedException во время ProducerFencedException?

Пытаюсь загрузить около 50К сообщений в тему KAFKA. В начале нескольких прогонов становится ниже исключения, но не всегда.

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state  
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?]  
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?]  
at  org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?]  
at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?]  
...  
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.  

Блок кода ниже:

public void persistUpdatesPostAction(List<Message> messageList ) {
    if ((messageList == null) || (messageList.isEmpty())) {
        return;
    }
    logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
    Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
    try {
        producer.beginTransaction();
        createKafkaBulkInsert1(producer, messageList, "Topic1");
        createKafkaBulkInsert2(producer, messageList, "Topic2");
        createKafkaBulkInsert3(producer, messageList, "Topic3");
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
        producer.close();
        KafkaUtils.removeProducer(Thread.currentThread().getName());
    }
}

-----------

static Properties setPropertiesProducer() {
    Properties temp = new Properties();
    temp.put("bootstrap.servers", "localhost:9092");
    temp.put("acks", "all");
    temp.put("retries", 1);
    temp.put("batch.size", 16384);
    temp.put("linger.ms", 5);
    temp.put("buffer.memory", 33554432);
    temp.put("key.serializer",   "org.apache.kafka.common.serialization.StringSerializer");
    temp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return temp;
}

public static Producer<String, String> getProducer(String aThreadId) {
    if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null)) {
        Properties temp = producerProps;
        temp.put("transactional.id", aThreadId);
        Producer<String, String> producer = new KafkaProducer<String, String>(temp);
        producerMap.put(aThreadId, producer);
        producer.initTransactions();
        return producer;
    }
    return producerMap.get(aThreadId);
}

public static void removeProducer(String aThreadId) {
    logger.createDebug("Removing Thread ID :" + aThreadId);
    if (producerMap.get(aThreadId) == null)
        return;
    producerMap.remove(aThreadId);
}
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
8
0
9 906
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

В моем коде инициализации производителя было состояние гонки. Я исправил это, изменив карту Producer на тип ConcurrentHashMap, чтобы обеспечить потокобезопасность.

Ответ принят как подходящий

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Это сообщение об исключении не очень полезно. Я считаю, что пытающийся означает, что у брокера больше нет записи идентификатора транзакции, отправляемой клиентом. Это может быть потому, что:

  • Кто-то другой использовал тот же идентификатор транзакции и уже зафиксировал его. По моему опыту, это менее вероятно, если вы не разделяете идентификаторы транзакций между клиентами. Мы гарантируем, что наши идентификаторы уникальны, используя UUID.randomUUID().
  • Время ожидания транзакции истекло, и она была удалена автоматизацией брокера.

В нашем случае мы время от времени превышали таймауты транзакций, что приводило к возникновению этого исключения. Есть 2 свойства, которые определяют, как долго брокер будет помнить транзакцию, прежде чем прервать ее и забыть о ней.

  • transaction.max.timeout.ms - свойство маклер, указывающее максимальное количество миллисекунд, пока транзакция не будет прервана и забыта. По умолчанию во многих версиях Kafka кажется, что это 900000 (15 минут). Документация от Кафки говорит:

    The maximum allowed timeout for transactions. If a client’s requested transaction time exceeds this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

  • transaction.timeout.ms - свойство производитель-клиент, которое устанавливает время ожидания в миллисекундах при создании транзакции. По умолчанию во многих версиях Kafka кажется, что это 60000 (1 минута). В документации от Кафки говорится:

    The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

Если свойство transaction.timeout.ms, установленное в клиенте, превышает свойство transaction.max.timeout.ms в брокере, производитель немедленно выдаст что-то вроде следующего исключения:

org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse
The transaction timeout is larger than the maximum value allowed by the broker 
(as configured by transaction.max.timeout.ms).

Спасибо, это объяснение очень полезно, чем официальный документ!

Eynzhang 08.06.2020 15:11

Я пишу модульный тест, чтобы воспроизвести это, из этого фрагмента кода Java вы можете легко понять, как это происходит, с помощью двух одинаковых tansactional id.

  @Test
  public void SendOffset_TwoProducerDuplicateTrxId_ThrowException() {
    // create two producer with same transactional id
    Producer producer1 = KafkaBuilder.buildProducer(trxId, servers);
    Producer producer2 = KafkaBuilder.buildProducer(trxId, servers);

    offsetMap.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1000));

    // initial and start two transactions
    sendOffsetBegin(producer1);
    sendOffsetBegin(producer2);

    try {
      // when commit first transaction it expected to throw exception
      sendOffsetEnd(producer1);

      // it expects not run here
      Assert.assertTrue(false);
    } catch (Throwable t) {
      // it expects to catch the exception
      Assert.assertTrue(t instanceof ProducerFencedException);
    }
  }

  private void sendOffsetBegin(Producer producer) {
    producer.initTransactions();
    producer.beginTransaction();
    producer.sendOffsetsToTransaction(offsetMap, consumerGroup);
  }

  private void sendOffsetEnd(Producer producer) {
    producer.commitTransaction();
  }

When running multiple instances of the application, transactional.id must be the same on all instances to satisfy fencing zombies when producing records on a listener container thread. However, when producing records using transactions that are not started by a listener container, the prefix has to be different on each instance.

https://docs.spring.io/spring-kafka/reference/html/#transaction-id-prefix

Другие вопросы по теме