При использовании ServicebusSessionReceiverAsyncClient для получения одного сообщения из очереди служебной шины создается исключение IllegalStateException. В сообщении упоминается о попытке добавить кредиты к уже закрытому соединению.
Я использую take(1) и next() для преобразования Flux в единый результат Mono. В документации сказано, что использование take(1) в потоке закроет поток после первого результата, что я и собираюсь сделать.
Мой код получателя:
private <T extends IWocTransaction> Mono<Optional<T>> responseAsync(String transactionId, Class<T> clazz) {
var asyncClient = sbClientBuilder.connectionString(sbConnectionString)
.sessionReceiver()
.queueName("my-callback-queue")
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.buildAsyncClient();
var msgStream = Flux.usingWhen(asyncClient.acceptSession(transactionId),
receiver -> receiver.receiveMessages(),
receiver -> Mono.fromRunnable(receiver::close)
);
return Mono.from(msgStream
.timeout(timeout)
.take(1)
.next()
).map(message -> {
var json = message.getBody().toString();
try {
var val = objectMapper.readValue(json, clazz);
return val != null ? Optional.of(val) : Optional.<T>empty();
} catch (Exception e) {
log.error("Error deserializing response from string {}", json, e);
return Optional.<T>empty();
}
})
.doOnError(t -> {
if (t instanceof TimeoutException) {
log.error("Timeout error waiting on API callback {}", kv("ApiTimeout", timeout.toString()), t);
} else {
log.error("Error waiting for async callback", t);
}
}).onErrorReturn(Optional.empty());
}
Этот код работает нормально, но я получаю это исключение при каждом запуске:
13:46:27.122 [io-executor-thread-1] INFO c.a.m.s.ServiceBusClientBuilder - {"az.sdk.message":"Closing a dependent client.","numberOfOpenClients":1}
13:46:27.127 [receiver-0-1] INFO c.a.m.s.ServiceBusSessionReceiver - {"az.sdk.message":"There is no lock token.","sessionId":"adfadsr","messageId":"fb70e81e4d304b8fb34092440243554a"}
13:46:27.138 [receiver-0-1] INFO c.a.m.s.ServiceBusReceiverAsyncClient - Removing receiver links.
13:46:27.167 [receiver-0-1] ERROR c.a.c.a.i.ReactorReceiver - {"az.sdk.message":"Cannot add credits to closed link: adfadsr","exception":"Cannot add credits to closed link: adfadsr","connectionId":"MF_57a511_1680201985206","entityPath":"woc-callback-queue","linkName":"adfadsr"}
13:46:27.175 [receiver-0-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
**reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr**
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
at com.azure.core.amqp.implementation.ReactorReceiver.addCredits(ReactorReceiver.java:227)
at com.azure.messaging.servicebus.ServiceBusSessionReceiver.lambda$new$2(ServiceBusSessionReceiver.java:92)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:447)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at io.micronaut.reactive.reactor.instrument.ReactorInstrumentation.lambda$init$0(ReactorInstrumentation.java:62)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
13:46:27.194 [reactor-executor-1] INFO c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_57a511_1680201985206","errorCondition":null,"errorDescription":null,"linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.198 [reactor-executor-1] INFO c.a.c.a.i.ReactorSession - {"az.sdk.message":"Complete. Removing receive link.","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.199 [reactor-executor-1] INFO c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}
Как я могу предотвратить создание исключения IllegalStateException или хотя бы обработать его?
это IllegalStateException: невозможно добавить кредиты к закрытой ссылке, «не следует бросать» в приложение, а «должно быть только зарегистрировано».
Иногда это происходит из-за одновременного выполнения нескольких потоков. Один из них — неблокирующий IO_thread (обработка кадров сообщений, отправка кредита через кадры потока), а второй — Worker_thread, который доставляет сообщение приложению. Есть и третий, handler_thread приложения, для которого вызывается responseAsync приложения.
Что происходит, когда responseAsync закрывает клиент из [любого] _thread, все еще возможно, что IO_thread в фоновом режиме выполняет некоторую работу, пока он получает запрос на закрытие. Ошибка появляется в журнале, когда IO_thread находится в середине отправки кадра потока, когда другие части клиента отключаются. Эта запись в журнале будет проигнорирована.
Похоже, дизайн приложения предназначен для создания и удаления клиентов для каждого запроса. Это означает, что приложение создает и закрывает соединения TCP (к служебной шине) при каждом запросе, что может быть тяжелым.
Спасибо за ответ! Это было информативно. Есть ли способ поймать и проигнорировать это исключение, используя реактивный стек? Мне так не кажется. Кроме того, w.r.t. Создавая соединение для каждого запроса, он использует встроенный пул соединений MS и создает транзакцию сеанса для получения определенного ответа в соответствии с шаблоном запроса/ответа, описанным командой клиента служебной шины. Создание приемника сеанса занимает около 120 мс (что очень много), но я не знаю лучшего способа, кроме публикации всех ответов всем слушателям.