Экспоненциальная отсрочка для бизнес-исключений при использовании реактивного spring-amqp?

Я использую Spring AMQP 2.1.6 и Spring Boot 2.1.5 и ищу рекомендуемый способ настройки spring-amqp для повторения бизнес-исключений для реактивных компонентов (Mono) с экспоненциальной отсрочкой. Например:

@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
    Mono<Void> mono = myService.doSomething(myMessage);
    return mono;
}

Я бы хотел, чтобы spring-amqp автоматически повторял попытку, если doSomething возвращает ошибку. Обычно это можно настроить для блокировки RabbitListener при настройке фабрики контейнеров:

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setAdviceChain(retryInterceptor(..));

Где retryInterceptor можно определить следующим образом:

private static RetryOperationsInterceptor retryInterceptor(long backoffInitialInterval, double backoffMultiplier, long backoffMaxInterval, int maxAttempts) {
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(backoffInitialInterval);
    backOffPolicy.setMultiplier(backoffMultiplier);
    backOffPolicy.setMaxInterval(backoffMaxInterval);

    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy((new SimpleRetryPolicy(maxAttempts)));
    retryTemplate.setBackOffPolicy(backOffPolicy);

    StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
    bean.setRetryOperations(retryTemplate);
    return bean.getObject();
}

Но цепочка советов, похоже, не используется для реактивных RabbitListener. Вероятно, это потому, что, если я правильно понимаю, RetryTemplate/ExponentialBackOffPolicy фактически блокирует поток.

В качестве обходного пути я, конечно, мог бы сделать что-то вроде (переключиться на Kotlin, потому что это немного проще):

@RabbitListener
fun myListener(MyMessage myMessage) : Mono<Void> {
    return myService.doSomething(myMessage)
                    .retryExponentialBackoff(10, Duration.ofMillis(100), Duration.ofSeconds(5)) { ctx ->
            log.info("Caught exception ${ctx.exception()}")
        }
}

Но я бы хотел, чтобы эта логика повтора применялась ко всем экземплярам Mono, возвращаемым из RabbitListener. Возможно ли что-то подобное или вам следует настроить это по-другому при использовании реактивных последовательностей из проектного реактора с spring-amqp?

1
0
367
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Действительно лучше применить логику повторных попыток в вашей реактивной последовательности, аналогично тому, как вы это делаете с retryExponentialBackoff(). Просто потому, что выполнение Reactive Streams не происходит в том же потоке, мы можем применить это Retrytemplate для myListener().

Сейчас внутренняя логика такая:

private static class MonoHandler {

    static boolean isMono(Object result) {
        return result instanceof Mono;
    }

    @SuppressWarnings("unchecked")
    static void subscribe(Object returnValue, Consumer<? super Object> success,
            Consumer<? super Throwable> failure) {

        ((Mono<? super Object>) returnValue).subscribe(success, failure);
    }

}

Это Consumer<? super Throwable> failure делает это:

private void asyncFailure(Message request, Channel channel, Throwable t) {
    this.logger.error("Future or Mono was completed with an exception for " + request, t);
    try {
        channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, true);
    }
    catch (IOException e) {
        this.logger.error("Failed to nack message", e);
    }
} 

Итак, у нас нет никакого способа инициировать это RetryTemplate вообще, но в то же время с явным basicNack() у нас есть естественная повторная попытка с повторной загрузкой того же сообщения из RabbitMQ обратно.

Вероятно, мы могли бы применить повторную попытку Reactor для этого Mono внутренне, но не похоже, что RetryOperationsInterceptor можно просто преобразовать в Mono.retry().

Другими словами, RetryOperationsInterceptor — неправильный способ реактивной обработки. Используйте Mono.retry() явно в своем собственном коде.

Вы можете предоставить какой-либо общий служебный метод и применить его как Mono.transform(Function<? super Mono<T>, ? extends Publisher<V>> transformer) всякий раз, когда у вас есть реактивный возврат для метода @RabbitListener.

Обсуждается добавление возможностей реактора в spring-retry, но, похоже, не было большого прогресса.

Gary Russell 28.05.2019 15:51

Другие вопросы по теме