Какой лучший способ отправить сообщение в rabbitmq с помощью spring-amqp из обратного вызова PublisherReturn?

Я использую spring-amqp:2.1.6.RELEASE

У меня есть RabbitTemplate с обратным вызовом PublisherReturn.

  • Если я отправлю сообщение на routingKey, у которого нет очередей, привязанных к это, то обратный вызов обратного вызова вызывается правильно. Когда это происходит, я хотите отправить сообщение на альтернативный routingKey. Однако, если Я использую RabbitTemplate в ReturnCallback, он просто зависает. я не вижу ничего о том, что сообщение может/не может быть отправлено, RabbitTemplate не возвращает управление моему ReturnCallback, и я также не вижу никакого PublisherConfirm.
  • Если я создам новый RabbitTemplate (с тем же CachingConnectionFactory) потом ведет себя так же. Мой звонок просто зависает.
  • Если я отправлю сообщение на routingKey, к которому привязана очередь, то сообщение корректно поступает в очередь. ReturnCallback не вызывается в этом сценарии.

После некоторого расследования я пришел к выводу, что rabbitTemplate и/или соединение заблокированы до тех пор, пока исходное сообщение не будет полностью обработано.

Если я создам второй CachingConnectionFactory и RabbitTemplate и использую их в обратном вызове PublisherReturn, то, похоже, все будет работать нормально.

Итак, вот вопрос: как лучше всего отправить сообщение в обратном вызове PublisherReturn с помощью spring-amqp?

Я искал, но не могу найти ничего, что объясняет, как вы должны это сделать.

Вот упрощенные детали того, что у меня есть:

@Configuration
public class MyConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setPublisherReturns(true);
        // ... other settings left out for brevity
        return connectionFactory;
    }

    @Bean
    @Qualifier("rabbitTemplate")
    public RabbitTemplate rabbitTemplate(ReturnCallbackForAlternative returnCallbackForAlternative) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(returnCallbackForAlternative);
        // ... other settings left out for brevity
        return rabbitTemplate;
    }

    @Bean
    @Qualifier("connectionFactoryForUndeliverable")
    public ConnectionFactory connectionFactoryForUndeliverable() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        // ... other settings left out for brevity
        return connectionFactory;
    }

    @Bean
    @Qualifier("rabbitTemplateForUndeliverable")
    public RabbitTemplate rabbitTemplateForUndeliverable() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryForUndeliverable());
        // ... other settings left out for brevity
        return rabbitTemplate;
    }

}

Затем, чтобы отправить сообщение, которое я использую

    @Autowired
    @Qualifier("rabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    public void send(Message message) {
        rabbitTemplate.convertAndSend(
            "exchange-name",
            "primary-key",
            message);
    }

И код в ReturnCallback

@Component
public class ReturnCallbackForAlternative implements RabbitTemplate.ReturnCallback {

    @Autowired
    @Qualifier("rabbitTemplateForUndeliverable")
    private RabbitTemplate rabbitTemplate;

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        rabbitTemplate.convertAndSend(
            "exchange-name",
            "alternative-key",
            message);
    }

}

РЕДАКТИРОВАТЬ

Упрощенный пример для воспроизведения проблемы. Чтобы запустить его:

  1. Запустите RabbitMq
  2. Связать обмен с именем foo с очередью с именем foo
  3. Запустить как весеннее загрузочное приложение

Вы увидите следующий вывод:

in returnCallback before message send

но ты не увидишь:

in returnCallback after message send

Если вы закомментируете connectionFactory.setPublisherConfirms(true);, он работает нормально.

@SpringBootApplication
public class HangingApplication {

    public static void main(String[] args) {
      SpringApplication.run(HangingApplication.class, args);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
      connectionFactory.setPublisherReturns(true);
      connectionFactory.setPublisherConfirms(true);
      return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setExchange("foo");
      rabbitTemplate.setMandatory(true);

      rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        System.out.println("Confirm callback for main template. Ack = " + ack);
      });

      rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        System.out.println("in returnCallback before message send");
        rabbitTemplate.send("foo", message);
        System.out.println("in returnCallback after message send");
      });

      return rabbitTemplate;
    }

    @Bean
    public ApplicationRunner runner(@Qualifier("rabbitTemplate") RabbitTemplate template) {

      return args -> {
        template.convertAndSend("BADKEY", "foo payload");
      };
    }

    @RabbitListener(queues = "foo")
    public void listen(String in) {
      System.out.println("Message received on undeliverable queue : " + in);
    }

}

Вот build.gradle, который я использовал:

plugins {
    id 'org.springframework.boot' version '2.1.5.RELEASE'
    id 'java'
}

apply plugin: 'io.spring.dependency-management'

group 'pcoates'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.11

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.springframework.boot:spring-boot-starter-amqp'
}
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
2 558
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Это вызывает некоторую взаимоблокировку в коде amqp-client. Самое простое решение - сделать отправку в отдельном потоке - использовать TaskExecutor в обратном вызове...

exec.execute(() -> template.send(...));

Вы можете использовать ту же фабрику шаблонов/соединений, но отправка должна выполняться в другом потоке.

Я думал, что мы недавно изменили структуру, чтобы всегда вызывать обратный вызов return в другом потоке (после того, как последний человек сообщил об этом), но похоже, что он провалился.

Я открыл проблема на этот раз.

РЕДАКТИРОВАТЬ

Вы уверены, что используете 2.1.6?

Мы исправили эту проблему в версии 2.1.0, предотвратив попытку отправки использовать тот же канал, по которому прибыл возврат. Это отлично работает для меня...

@SpringBootApplication
public class So57234770Application {

    public static void main(String[] args) {
        SpringApplication.run(So57234770Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            template.send("foo", message);
        });
        return args -> {
            template.convertAndSend("BADKEY", "foo");
        };
    }

    @RabbitListener(queues = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}

Если вы можете предоставить пример приложения, демонстрирующего такое поведение, я посмотрю, что происходит.

Кстати, вам не следует использовать convertAndSend, так как message уже является Message (пост-конверсией).

Gary Russell 29.07.2019 16:34

Хммм - вы уверены, что используете 2.1.6? Мы исправили это по-другому в версии 2.1.0, и у нас есть тест здесь, который это подтверждает. Если вы можете собрать небольшое тестовое приложение, которое демонстрирует это, я с удовольствием посмотрю (это может быть что-то другое). Спасибо.

Gary Russell 29.07.2019 17:17

Я только что попробовал ваш пример, и он работает и для меня. Я попытаюсь воссоздать проблему, с которой я сталкиваюсь, в небольшом тестовом приложении. Я почти уверен, что использую 2.1.6.

pcoates 30.07.2019 12:24

Когда я добавляю publisherConfirms, отправка в ReturnCallback кажется зависшей. Я добавил образец в конец вопроса. Я взял ваш пример и добавил файл publisherConfirms. Я добавлю файл build.gradle, который я использую.

pcoates 30.07.2019 15:20

Подтвержденный; спасибо - я продолжу и позабочусь о том, чтобы в следующей версии мы всегда вызывали обратный вызов в другом потоке. До тех пор вы можете передать отправку другому потоку в обратном вызове. Еще раз спасибо, что нашли это.

Gary Russell 30.07.2019 16:31

Для полноты картины обратный вызов вызывается непосредственно в потоке ввода-вывода соединения клиента; чтобы выполнить отправку, мы открываем новый канал и ждем результата Channel.Open-OK (который отправляется брокером), но теперь поток ввода-вывода, который будет обрабатывать этот результат, заблокирован в ожидании этого. На самом деле это заканчивается исключением тайм-аута через 10 минут!

Gary Russell 30.07.2019 16:55

Теперь это имеет смысл, и приятно знать, что передача этого другому потоку является безопасным решением. Спасибо за вашу помощь в этом. Очень признателен

pcoates 30.07.2019 17:06

Другие вопросы по теме