Я использую spring-amqp:2.1.6.RELEASE
У меня есть RabbitTemplate с обратным вызовом PublisherReturn.
После некоторого расследования я пришел к выводу, что rabbitTemplate и/или соединение заблокированы до тех пор, пока исходное сообщение не будет полностью обработано.
Если я создам второй CachingConnectionFactory и RabbitTemplate и использую их в обратном вызове PublisherReturn, то, похоже, все будет работать нормально.
Итак, вот вопрос: как лучше всего отправить сообщение в обратном вызове PublisherReturn с помощью spring-amqp?
Я искал, но не могу найти ничего, что объясняет, как вы должны это сделать.
Вот упрощенные детали того, что у меня есть:
@Configuration
public class MyConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setPublisherReturns(true);
// ... other settings left out for brevity
return connectionFactory;
}
@Bean
@Qualifier("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ReturnCallbackForAlternative returnCallbackForAlternative) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallbackForAlternative);
// ... other settings left out for brevity
return rabbitTemplate;
}
@Bean
@Qualifier("connectionFactoryForUndeliverable")
public ConnectionFactory connectionFactoryForUndeliverable() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
// ... other settings left out for brevity
return connectionFactory;
}
@Bean
@Qualifier("rabbitTemplateForUndeliverable")
public RabbitTemplate rabbitTemplateForUndeliverable() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryForUndeliverable());
// ... other settings left out for brevity
return rabbitTemplate;
}
}
Затем, чтобы отправить сообщение, которое я использую
@Autowired
@Qualifier("rabbitTemplate")
private RabbitTemplate rabbitTemplate;
public void send(Message message) {
rabbitTemplate.convertAndSend(
"exchange-name",
"primary-key",
message);
}
И код в ReturnCallback
@Component
public class ReturnCallbackForAlternative implements RabbitTemplate.ReturnCallback {
@Autowired
@Qualifier("rabbitTemplateForUndeliverable")
private RabbitTemplate rabbitTemplate;
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
rabbitTemplate.convertAndSend(
"exchange-name",
"alternative-key",
message);
}
}
РЕДАКТИРОВАТЬ
Упрощенный пример для воспроизведения проблемы. Чтобы запустить его:
Вы увидите следующий вывод:
in returnCallback before message send
но ты не увидишь:
in returnCallback after message send
Если вы закомментируете connectionFactory.setPublisherConfirms(true);, он работает нормально.
@SpringBootApplication
public class HangingApplication {
public static void main(String[] args) {
SpringApplication.run(HangingApplication.class, args);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setPublisherReturns(true);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("foo");
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("Confirm callback for main template. Ack = " + ack);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("in returnCallback before message send");
rabbitTemplate.send("foo", message);
System.out.println("in returnCallback after message send");
});
return rabbitTemplate;
}
@Bean
public ApplicationRunner runner(@Qualifier("rabbitTemplate") RabbitTemplate template) {
return args -> {
template.convertAndSend("BADKEY", "foo payload");
};
}
@RabbitListener(queues = "foo")
public void listen(String in) {
System.out.println("Message received on undeliverable queue : " + in);
}
}
Вот build.gradle, который я использовал:
plugins {
id 'org.springframework.boot' version '2.1.5.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'
group 'pcoates'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.11
repositories {
mavenCentral()
}
dependencies {
compile 'org.springframework.boot:spring-boot-starter-amqp'
}




Это вызывает некоторую взаимоблокировку в коде amqp-client. Самое простое решение - сделать отправку в отдельном потоке - использовать TaskExecutor в обратном вызове...
exec.execute(() -> template.send(...));
Вы можете использовать ту же фабрику шаблонов/соединений, но отправка должна выполняться в другом потоке.
Я думал, что мы недавно изменили структуру, чтобы всегда вызывать обратный вызов return в другом потоке (после того, как последний человек сообщил об этом), но похоже, что он провалился.
Я открыл проблема на этот раз.
РЕДАКТИРОВАТЬ
Вы уверены, что используете 2.1.6?
Мы исправили эту проблему в версии 2.1.0, предотвратив попытку отправки использовать тот же канал, по которому прибыл возврат. Это отлично работает для меня...
@SpringBootApplication
public class So57234770Application {
public static void main(String[] args) {
SpringApplication.run(So57234770Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
template.send("foo", message);
});
return args -> {
template.convertAndSend("BADKEY", "foo");
};
}
@RabbitListener(queues = "foo")
public void listen(String in) {
System.out.println(in);
}
}
Если вы можете предоставить пример приложения, демонстрирующего такое поведение, я посмотрю, что происходит.
Хммм - вы уверены, что используете 2.1.6? Мы исправили это по-другому в версии 2.1.0, и у нас есть тест здесь, который это подтверждает. Если вы можете собрать небольшое тестовое приложение, которое демонстрирует это, я с удовольствием посмотрю (это может быть что-то другое). Спасибо.
Я только что попробовал ваш пример, и он работает и для меня. Я попытаюсь воссоздать проблему, с которой я сталкиваюсь, в небольшом тестовом приложении. Я почти уверен, что использую 2.1.6.
Когда я добавляю publisherConfirms, отправка в ReturnCallback кажется зависшей. Я добавил образец в конец вопроса. Я взял ваш пример и добавил файл publisherConfirms. Я добавлю файл build.gradle, который я использую.
Подтвержденный; спасибо - я продолжу и позабочусь о том, чтобы в следующей версии мы всегда вызывали обратный вызов в другом потоке. До тех пор вы можете передать отправку другому потоку в обратном вызове. Еще раз спасибо, что нашли это.
Для полноты картины обратный вызов вызывается непосредственно в потоке ввода-вывода соединения клиента; чтобы выполнить отправку, мы открываем новый канал и ждем результата Channel.Open-OK (который отправляется брокером), но теперь поток ввода-вывода, который будет обрабатывать этот результат, заблокирован в ожидании этого. На самом деле это заканчивается исключением тайм-аута через 10 минут!
Теперь это имеет смысл, и приятно знать, что передача этого другому потоку является безопасным решением. Спасибо за вашу помощь в этом. Очень признателен
Кстати, вам не следует использовать
convertAndSend, так какmessageуже являетсяMessage(пост-конверсией).