Spring Kafka, ручная фиксация в разных потоках

Добрый день коллеги. Я использую Spring Kafka 2.2.5. У меня есть слушатель:

@KafkaListener(topics = "${kafka.execution-task-topic}", containerFactory = "executionTaskObjectContainerFactory")
public void protocolEventsHandle(ExecutionTask executionTask,
    Acknowledgment acknowledgment,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    @Header(KafkaHeaders.OFFSET) long offset) {

    ResponseEntity < String > stringResponseEntity = airflowRestRunner.startDag(executionTask);
    JSONObject body = new JSONObject(stringResponseEntity.getBody());
    String message = body.getString("message");
    String runId = messageParser.getRunId(message);
    ExecutionTaskMessageInfo messageInfo = new ExecutionTaskMessageInfo(offset, partition, false, acknowledgment);
    kafkaAcknowledgeObject.putMessageInfo(messageInfo, partition);

    this.executorService.submit(kafkaAlertProducer.produceMessageAfterTaskSuccess(runId, executionTask, messageInfo));

}

Я делаю некоторые операции, и если они успешны, я использую интерфейс подтверждения для фиксации смещения.

У меня проблема. Пока в созданном потоке происходят вычисления, слушатель снова читает сообщение с того же смещения. Из-за этого, когда я пытаюсь подтвердить смещение, приложение вылетает.

Как лучше всего работать с Kafka в параллелизме? Я мог получить до 10 сообщений параллельно, и мне нужно их зафиксировать только после вычислений.

ОБНОВЛЕНИЕ1

Я храню все свои сообщения от kafka в: ключ - номер раздела значение - специальный класс модели, который содержит ссылку на необходимое подтверждение

@Data
@NoArgsConstructor
@AllArgsConstructor
public abstract class KafkaAcknowledgeObject < T extends Comparable > {

    protected ConcurrentHashMap < Integer,
    TreeSet < T >> hashMap = new ConcurrentHashMap < > ();

    public abstract void doAck();

    public void putMessageInfo(T message, int partition) {
        if (hashMap.containsKey(partition)) {
            hashMap.get(partition).add(message);
        } else {
            TreeSet < T > messageInfos = new TreeSet < > ();
            messageInfos.add(message);
            hashMap.put(partition, messageInfos);
        }
    }

}

После расчетов вызываю doAck(), например

@Override
public void doAck() {
    for (TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet: super.hashMap.values()) {
        checkHandledOffsets(messageInfoTreeSet);
    }
}

private void checkHandledOffsets(TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet) {
    ExecutionTaskMessageInfo first = getFirstMessageInfo(messageInfoTreeSet);
    if (first.isCompleted()) {
        first.getAcknowledgment().acknowledge();
        messageInfoTreeSet.remove(first);
        checkHandledOffsets(messageInfoTreeSet);
    }

    return;
}

private ExecutionTaskMessageInfo getFirstMessageInfo(TreeSet < ExecutionTaskMessageInfo > messageInfoTreeSet) {
    Iterator < ExecutionTaskMessageInfo > iterator = messageInfoTreeSet.iterator();

    return iterator.next();
}

«Сбои» — недостаточно информации; добавьте фактическую ошибку, трассировку стека и т. д. Также покажите код, который на самом деле вызывает acknowledge(). Записи не должны быть повторно доставлены, если не происходит повторная балансировка,

Gary Russell 17.05.2019 18:35

@GaryRussell, спасибо за ответ. Я добавил некоторую информацию. Я не мог поймать исключение, но я пытаюсь сделать это правильно. Основная проблема в том, что после запуска расчета в собственном методе потока с KafkaListener снова появляется похожее сообщение.

Александр Шаповалов 17.05.2019 20:08
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
2
566
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

То, что вы делаете, должно быть в порядке; Я только что проверил подобную схему, и она отлично работает для меня...

@SpringBootApplication
public class So56190029Application {

    public static void main(String[] args) {
        SpringApplication.run(So56190029Application.class, args);
    }

    private final ExecutorService exec = Executors.newSingleThreadExecutor();

    private final AtomicInteger count = new AtomicInteger();

    @KafkaListener(id = "so56190029", topics = "so56190029")
    public void listen(String in, Acknowledgment ack) {
        this.exec.execute(runner(in, ack));
    }

    private Runnable runner(String payload, Acknowledgment ack) {
        return () -> {
            System.out.println(payload);
            if (this.count.incrementAndGet() % 3 == 0) {
                System.out.println("acking");
                ack.acknowledge();
            }
        };
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<?, String> template) {
        return args -> IntStream.range(0, 6).forEach(i -> template.send("so56190029", "foo" + i));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setCommitLogLevel(Level.INFO);
        return factory;
    }

}

а также

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.records=3
spring.kafka.listener.ack-mode=MANUAL

а также

foo0
foo1
foo2
acking
foo3
foo4
foo5
acking
2019-05-17 14:46:28.790  INFO 62429 --- [o56190029-0-C-1] essageListenerContainer$ListenerConsumer 
    : Committing: {so56190029-0=OffsetAndMetadata{offset=36, leaderEpoch=null, metadata=''}}

Глупый вопрос, но может ли это быть из-за того, что я слишком долго держу точки останова отладки?

Александр Шаповалов 17.05.2019 21:04

Это может привести к перебалансировке, если вы задержите слушателя на 5 минут (max.poll.interval.ms). Я вижу, вы используете версию 2.2.5, поэтому у вас нет клиента до KIP-62 (который бы перебалансировался намного быстрее).

Gary Russell 17.05.2019 21:17

Итак, если я правильно понял, то мой расчет должен быть завершен быстрее, чем 5 минут? Например, если у меня есть только одно сообщение в разделе /

Александр Шаповалов 17.05.2019 21:21

Нет; поскольку вы переходите к другому потоку, вам не нужно об этом беспокоиться. Метод @KafkaListener должен занимать не более max.poll.interval.ms / max.poll.records. т.е. вы должны обработать все записи из опроса в течение max.poll.interval.ms. Включение ведения журнала DEBUG должно помочь вам понять, что происходит.

Gary Russell 17.05.2019 21:28

Другие вопросы по теме