Задержка начала потребления сообщений

У меня очень простой потребитель Reactor Kafka.

Однако для того, чтобы сообщение обрабатывалось должным образом, необходимо предварительно выполнить пару @Postconstruct.

@Postconstruct инициализирует некоторые объекты в памяти, подготавливает данные и т. д.

Если сообщения начинают потребляться, но @Postconstruct не завершена, обработка сообщения обязательно завершится ошибкой.

Для завершения @Postconstruct требуется некоторое время, примерно 3 секунды. Что я пробовал: Поэтому в настоящее время в моем коде я бы просто обрабатывал сообщения, и в течение первых 3-х секунд вся обработка сообщений завершалась сбоем, и я бы помещал неудавшиеся сообщения в другую тему.

В какой-то момент, через 3 секунды, @Postconstruct завершается, сообщения обрабатываются нормально, доволен.

Тем не менее, я нашел это очень избыточным, у меня есть другая тема и другая работа, которая существует только здесь, чтобы повторно обработать сообщения, которые не удалось выполнить в течение этих 3 секунд.

Есть ли более простой способ сказать Reactor Kafka только начать потребление, только после завершения @Postconstruct (возможно, отправив какой-то сигнал в приложение), или, может быть, даже просто подождать 3 секунды?

До сих пор я пробовал это:

  public Flux<String> myConsumer() {
        return KafkaReceiver
                .create(receiverOptions)
                .receive()
                .delayUntil(a -> Mono.delay(Duration.ofSeconds(4)))
                .flatMap((oneMessage) ->
                        Mono.deferContextual(contextView -> {
                            var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
                            try (scope) {
                                return consume(oneMessage);
                            }
                        }), 500)
                .name("greeting.call")      //1
                .tag("latency", "low")  //2
                .tap(Micrometer.observation(observationRegistry));

Однако он не делает того, что я ожидаю, он просто откладывает.

Вы пробовали delaySubscription()?

kerbermeister 08.02.2023 07:38

Вы можете попытаться вызвать пользовательское событие, когда @PostConstruct будет выполнено; аннотировать myConsumer с @EventListener. Таким образом, потребление начнется только в конце PostConstruct.

artiomi 08.02.2023 08:14
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
2
52
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы не должны subscribe() изменяться в методе пост-конструкции. Вместо этого внедрите SmartLifecycle и подпишитесь на start().

Или используйте прослушиватель событий и подписывайтесь только после обновления контекста.

Таким образом, все bean-компоненты будут полностью подключены до того, как вы начнете получать записи.

Другие вопросы по теме

Я пытаюсь построить коннектор снежинки для Кафки. Есть ли способ, которым коннектор обрабатывает такие события, как обновление, удаление и создание?
Приложение Kafka и Python в Kubernetes в отдельных модулях — NoBrokersAvailable()
Confluent Schema Registry — максимальное количество схем
NotEnoughReplicasException: сообщения отклоняются, так как синхронизированных реплик меньше, чем требуется
В указанном файле конфигурации JAAS не найден раздел конфигурации JAAS с именем «Клиент»
Разница между 3 приложениями setConcurrency(1) и 1 приложением setConcurrency(3)
Настройка Nifi для использования с Kafa в Kubernetes с использованием Helm в VirtualBox
Невозможно запустить PySpark (от Kafka до Delta) локально и получить исключение SparkException: не удается найти класс подключаемого модуля каталога для каталога «spark_catalog»
Понимание варианта использования свойства max.in.flight.request в Kafka
Кластер Apache Kafka 3.X: требуется уже запущенный zookeeper: Исключение: невозможно проанализировать PLAINTEXT://127.0.0.1: на конечную точку брокера —