ServiceBusSessionReceiverAsyncClient создает исключение IllegalStateException во время закрытия

При использовании ServicebusSessionReceiverAsyncClient для получения одного сообщения из очереди служебной шины создается исключение IllegalStateException. В сообщении упоминается о попытке добавить кредиты к уже закрытому соединению.

Я использую take(1) и next() для преобразования Flux в единый результат Mono. В документации сказано, что использование take(1) в потоке закроет поток после первого результата, что я и собираюсь сделать.

Мой код получателя:

private <T extends IWocTransaction> Mono<Optional<T>> responseAsync(String transactionId, Class<T> clazz) {

        var asyncClient = sbClientBuilder.connectionString(sbConnectionString)
                .sessionReceiver()
                .queueName("my-callback-queue")
                .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                .buildAsyncClient();

        var msgStream = Flux.usingWhen(asyncClient.acceptSession(transactionId),
                receiver -> receiver.receiveMessages(),
                receiver -> Mono.fromRunnable(receiver::close)
        );

        return Mono.from(msgStream

                        .timeout(timeout)
                        .take(1)
                        .next()

                ).map(message -> {

                    var json = message.getBody().toString();

                    try {
                        var val = objectMapper.readValue(json, clazz);
                        return val != null ? Optional.of(val) : Optional.<T>empty();
                    } catch (Exception e) {
                        log.error("Error deserializing response from string {}", json, e);
                        return Optional.<T>empty();
                    }
                })
                .doOnError(t -> {
                    if (t instanceof TimeoutException) {
                        log.error("Timeout error waiting on API callback {}", kv("ApiTimeout", timeout.toString()), t);
                    } else {
                        log.error("Error waiting for async callback", t);
                    }
                }).onErrorReturn(Optional.empty());
    }

Этот код работает нормально, но я получаю это исключение при каждом запуске:

13:46:27.122 [io-executor-thread-1] INFO  c.a.m.s.ServiceBusClientBuilder - {"az.sdk.message":"Closing a dependent client.","numberOfOpenClients":1}
13:46:27.127 [receiver-0-1] INFO  c.a.m.s.ServiceBusSessionReceiver - {"az.sdk.message":"There is no lock token.","sessionId":"adfadsr","messageId":"fb70e81e4d304b8fb34092440243554a"}
13:46:27.138 [receiver-0-1] INFO  c.a.m.s.ServiceBusReceiverAsyncClient - Removing receiver links.
13:46:27.167 [receiver-0-1] ERROR c.a.c.a.i.ReactorReceiver - {"az.sdk.message":"Cannot add credits to closed link: adfadsr","exception":"Cannot add credits to closed link: adfadsr","connectionId":"MF_57a511_1680201985206","entityPath":"woc-callback-queue","linkName":"adfadsr"}
13:46:27.175 [receiver-0-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
**reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr**
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
    at com.azure.core.amqp.implementation.ReactorReceiver.addCredits(ReactorReceiver.java:227)
    at com.azure.messaging.servicebus.ServiceBusSessionReceiver.lambda$new$2(ServiceBusSessionReceiver.java:92)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:447)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr

    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    at io.micronaut.reactive.reactor.instrument.ReactorInstrumentation.lambda$init$0(ReactorInstrumentation.java:62)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
13:46:27.194 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_57a511_1680201985206","errorCondition":null,"errorDescription":null,"linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.198 [reactor-executor-1] INFO  c.a.c.a.i.ReactorSession - {"az.sdk.message":"Complete. Removing receive link.","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.199 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}

Как я могу предотвратить создание исключения IllegalStateException или хотя бы обработать его?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
105
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

это IllegalStateException: невозможно добавить кредиты к закрытой ссылке, «не следует бросать» в приложение, а «должно быть только зарегистрировано».

Иногда это происходит из-за одновременного выполнения нескольких потоков. Один из них — неблокирующий IO_thread (обработка кадров сообщений, отправка кредита через кадры потока), а второй — Worker_thread, который доставляет сообщение приложению. Есть и третий, handler_thread приложения, для которого вызывается responseAsync приложения.

Что происходит, когда responseAsync закрывает клиент из [любого] _thread, все еще возможно, что IO_thread в фоновом режиме выполняет некоторую работу, пока он получает запрос на закрытие. Ошибка появляется в журнале, когда IO_thread находится в середине отправки кадра потока, когда другие части клиента отключаются. Эта запись в журнале будет проигнорирована.

Похоже, дизайн приложения предназначен для создания и удаления клиентов для каждого запроса. Это означает, что приложение создает и закрывает соединения TCP (к служебной шине) при каждом запросе, что может быть тяжелым.

Спасибо за ответ! Это было информативно. Есть ли способ поймать и проигнорировать это исключение, используя реактивный стек? Мне так не кажется. Кроме того, w.r.t. Создавая соединение для каждого запроса, он использует встроенный пул соединений MS и создает транзакцию сеанса для получения определенного ответа в соответствии с шаблоном запроса/ответа, описанным командой клиента служебной шины. Создание приемника сеанса занимает около 120 мс (что очень много), но я не знаю лучшего способа, кроме публикации всех ответов всем слушателям.

LPal 07.04.2023 18:02

Другие вопросы по теме