Повторная попытка Quarkus Mutiny не удалась, потому что в другом потоке

Мне нужно написать код, который будет:

  • подключиться к внешней БД
  • запросите эту БД для данных
  • обрабатывать эти данные, то есть создавать несколько объектов в локальной БД

Я создал метод, как показано ниже:

  @WithSession
  public Uni<Void> retrieveDataAndSaveInLocalDB() {
    final Sample timer = Timer.start(meterRegistry);
    return Uni.createFrom()
        .deferred(() -> Uni.createFrom()
            .item(dbService::connectToDB))
        .onFailure()
        .retry()
        .withBackOff(Duration.ofSeconds(3), Duration.ofMinutes(5))
        .atMost(3)
        .map(this::getListOfEntities)
        .flatMap(entityRepository::persist)
        .call(entityRepository::flush)
        .invoke(() -> timer.stop(meterRegistry.timer("timer")))
        .invoke(() -> Log.info("logged info"));
  }

Когда dbService::connectToDB выдает исключение, я хочу повторить попытку через несколько секунд. У меня проблема в том, что когда dbService::connectToDB выдает ошибку, повторные попытки выполняются в другом потоке. Вот журналы:

2023-04-18 12:09:42,926 INFO  [DbService] (vert.x-eventloop-thread-1) CONNECTED! org.postgresql.jdbc.PgConnection@b931fa7
2023-04-18 12:09:46,142 INFO  [DbService] (executor-thread-1) CONNECTED! org.postgresql.jdbc.PgConnection@73903696
2023-04-18 12:09:51,210 INFO  [DbService] (executor-thread-1) CONNECTED! org.postgresql.jdbc.PgConnection@379aed2d
2023-04-18 12:09:51,237 ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (vert.x-eventloop-thread-1) HTTP Request to /sampleendpoint failed, error id: 8dc81df2-e526-4a64-a67d-bf04cbb9e580-1: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    [Exception 0] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [107]: 'vert.x-eventloop-thread-1' current Thread [99]: 'executor-thread-1'
    [Exception 1] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [107]: 'vert.x-eventloop-thread-1' current Thread [99]: 'executor-thread-1'
    at io.smallrye.mutiny.groups.UniOnItemOrFailure.lambda$call$1(UniOnItemOrFailure.java:75)
    at io.smallrye.context.impl.wrappers.SlowContextualBiFunction.apply(SlowContextualBiFunction.java:21)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:86)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:65)
    at io.smallrye.mutiny.operators.uni.UniOnTermination$UniOnTerminationProcessor.onFailure(UniOnTermination.java:52)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.helpers.EmptyUniSubscription.propagateFailureEvent(EmptyUniSubscription.java:40)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:26)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
    at org.hibernate.reactive.context.impl.VertxContext.execute(VertxContext.java:90)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.subscribe(UniRunSubscribeOn.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnTermination.subscribe(UniOnTermination.java:21)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap.subscribe(UniOnItemOrFailureFlatMap.java:27)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:99)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:65)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.onFailure(UniOnItemConsume.java:65)
    at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.onFailure(UniOnItemConsume.java:65)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage$CompletionStageUniSubscription.forwardResult(UniCreateFromCompletionStage.java:58)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:144)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage$CompletionStageUniSubscription.forward(UniCreateFromCompletionStage.java:51)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:35)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
    at org.hibernate.reactive.context.impl.VertxContext.execute(VertxContext.java:90)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.subscribe(UniRunSubscribeOn.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:43)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromPublisher$PublisherSubscriber.onNext(UniCreateFromPublisher.java:70)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:30)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:84)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.subscription.SerializedSubscriber.onItem(SerializedSubscriber.java:74)
    at io.smallrye.mutiny.operators.multi.MultiRetryWhenOp$RetryWhenOperator.onItem(MultiRetryWhenOp.java:111)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onItem(UniToMultiPublisher.java:92)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:29)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromDeferredSupplier.subscribe(UniCreateFromDeferredSupplier.java:36)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:73)
    at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.setOrSwitchUpstream(SwitchableSubscriptionSubscriber.java:205)
    at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.onSubscribe(SwitchableSubscriptionSubscriber.java:107)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:25)
    at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:165)
    at io.smallrye.mutiny.operators.multi.MultiRetryWhenOp$RetryWhenOperator.resubscribe(MultiRetryWhenOp.java:157)
    at io.smalliny.operators.multi.MultiRetryWhenOp$TriggerSubscriber.onNext(MultiRetryWhenOp.java:188)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:30)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:84)
    at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapMainSubscriber.tryEmit(MultiConcatMapOp.java:182)
    at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapInner.onItem(MultiConcatMapOp.java:285)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onItem(UniToMultiPublisher.java:92)
    at io.smallrye.mutiny.operators.uni.UniDelayOnItem$UniDelayOnItemProcessor.lambda$onItem$0(UniDelayOnItem.java:53)
    at org.jboss.threads.EnhancedQueueExecutor$RunnableScheduledFuture.performTask(EnhancedQueueExecutor.java:2892)
    at org.jboss.threads.EnhancedQueueExecutor$RunnableScheduledFuture.performTask(EnhancedQueueExecutor.java:2883)
    at org.jboss.threads.EnhancedQueueExecutor$AbstractScheduledFuture.run(EnhancedQueueExecutor.java:2741)
    at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)
    at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538)
    at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
    at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
    Suppressed: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [107]: 'vert.x-eventloop-thread-1' current Thread [99]: 'executor-thread-1'
        at org.hibernate.reactive.common.InternalStateAssertions.assertCurrentThreadMatches(InternalStateAssertions.java:46)
        at org.hibernate.reactive.session.impl.ReactiveSessionImpl.threadCheck(ReactiveSessionImpl.java:190)
        at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1786)
        at org.hibernate.internal.AbstractSharedSessionContract.checkOpenOrWaitingForAutoClose(AbstractSharedSessionContract.java:447)
        at org.hibernate.internal.SessionImpl.checkOpenOrWaitingForAutoClose(SessionImpl.java:616)
        at org.hibernate.internal.SessionImpl.closeWithoutOpenChecks(SessionImpl.java:410)
        at org.hibernate.internal.SessionImpl.close(SessionImpl.java:397)
        at org.hibernate.reactive.session.impl.ReactiveSessionImpl.reactiveClose(ReactiveSessionImpl.java:1738)
        at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:24)
        ... 76 more
Caused by: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [107]: 'vert.x-eventloop-thread-1' current Thread [99]: 'executor-thread-1'
    at org.hibernate.reactive.common.InternalStateAssertions.assertCurrentThreadMatches(InternalStateAssertions.java:46)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.threadCheck(ReactiveSessionImpl.java:190)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1786)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.reactivePersist(ReactiveSessionImpl.java:833)
    at org.hibernate.reactive.util.impl.CompletionStages.lambda$loop$2(CompletionStages.java:178)
    at org.hibernate.reactive.util.impl.CompletionStages.lambda$loop$7(CompletionStages.java:409)
    at org.hibernate.reactive.util.impl.CompletionStages$ArrayLoop.next(CompletionStages.java:483)
    at org.hibernate.reactive.util.async.impl.AsyncTrampoline.lambda$asyncWhile$1(AsyncTrampoline.java:215)
    at org.hibernate.reactive.util.async.impl.AsyncTrampoline$TrampolineInternal.unroll(AsyncTrampoline.java:121)
    at org.hibernate.reactive.util.async.impl.AsyncTrampoline$TrampolineInternal.trampoline(AsyncTrampoline.java:102)
    at org.hibernate.reactive.util.async.impl.AsyncTrampoline.asyncWhile(AsyncTrampoline.java:197)
    at org.hibernate.reactive.util.async.impl.AsyncTrampoline.asyncWhile(AsyncTrampoline.java:215)
    at org.hibernate.reactive.util.impl.CompletionStages.loop(CompletionStages.java:410)
    at org.hibernate.reactive.util.impl.CompletionStages.loop(CompletionStages.java:381)
    at org.hibernate.reactive.util.impl.CompletionStages.loop(CompletionStages.java:178)
    at org.hibernate.reactive.util.impl.CompletionStages.applyToAll(CompletionStages.java:505)
    at org.hibernate.reactive.mutiny.impl.MutinySessionImpl.lambda$persistAll$10(MutinySessionImpl.java:229)
    at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:24)
    ... 51 more

Строка «Подключено» в логах исходит из тела метода dbService::connectToDB.

Журналы показывают, что первое выполнение выполняется в правильном потоке, в моем случае это «vert.x-eventloop-thread-4», но затем повторные попытки выполняются в «executor-thread-1». Если при первом выполнении не возникает исключение, вся обработка проходит успешно. Если первое выполнение завершается ошибкой, но одна из попыток завершается успешно, выдается исключение, как указано выше.

Что не так с моим кодом? Запуск нового сеанса для каждой повторной попытки невозможен; Quarkus жалуется, что новый сеанс находится не в потоке vert.x-eventloop-thread.

Возможно, также стоит отметить, что при вызове serviceImpactRepository::persist я действительно хочу сохранить список сущностей, а не одну сущность.

Я использую новую версию Quarkus (3.0.0.CR2) с Reactive Hibernate.

Не могли бы вы создать где-нибудь тестовый проект, чтобы я мог лучше его рассмотреть?

Davide D'Alto 26.04.2023 10:39

Я попытался воссоздать эту проблему в новом тестовом проекте, но не смог. Повторные попытки теперь работают, как и ожидалось. Вижу 3.0.1 вышла, но даже понижение до 3.0.0.CR2 ничего не изменило. Может быть, я закодировал что-то действительно глупое, что теперь я не могу воспроизвести, так как лучше разбираюсь в фреймворке.

Slawomir 28.04.2023 18:13

Привет @DavideD'Alto, мне удалось воспроизвести это на новой версии 3.0.3.Final. Вот тестовый проект: github.com/sfeliks/retry-test. Когда SchedulerService выдает ошибку, это: CompositeException: поймано несколько исключений: [Exception 0] java.lang.RuntimeException: Test Runtime [Exception 1] java.lang.IllegalStateException: HR000069: обнаружено использование реактивного сеанса из другого потока, чем тот, который использовался для открытия реактивного сеанса - это предполагает недопустимую интеграцию; исходный поток [116]: 'vert.x-eventloop-thread-3' текущий поток [118]: 'executor-thread-2'

Slawomir 17.05.2023 13:01
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
3
193
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

В конце концов повторные попытки начали работать для меня, но неясно, что это исправило. Основные изменения, которые я сделал, которые могут помочь:

  1. Я понизил Quarkus до 2.16 - предыдущая версия (3.0.0.CR2) для меня главный виновник
  2. Я полностью отказался от использования @WithSession - оставил это Quarkus для обработки
  3. И продолжение к номеру 2. - я лучше разбираюсь в работе с потоками Mutiny, поэтому теперь я просто возвращаю Uni исходному вызывающему абоненту (конечная точка отдыха и планировщик).
Ответ принят как подходящий

Проблема вызвана .withBackoff.

Hibernate Reactive проверяет две вещи при использовании сеанса:

  1. Сеанс должен вызываться в потоке цикла событий
  2. Сеанс должен использоваться в том же потоке, в котором он был создан.

В этом случае первая проверка не проходит из-за ограничения (или ошибки) в Mutiny: при повторной попытке с использованием опции .withBackoff uni запускается в рабочем потоке (а не в потоке событийного цикла, в котором была создана сессия). Обратите внимание, что даже если вы создадите новый сеанс, проверка завершится ошибкой, потому что код выполняется не в рабочем потоке (а не в потоке цикла событий).

Это работает с Quarkus 2.16, потому что есть проверка, которая гарантирует, что все выполняется в потоке цикла событий. В Quarkus 3 эту проверку убрали, возможно это баг.

На данный момент у меня нет обходного пути, но он должен работать, если вы избегаете опции .withBackoff.

Спасибо. Избегание .withBackoff действительно устраняет эту проблему,

Slawomir 18.05.2023 08:31

Другие вопросы по теме