У меня проблема с получением сообщения от слушателя издателю. я получаю ** AmqpReplyTimeoutException **. Ниже приведен код издателя, откуда я публикую в очереди.
for(CsvWrapperPojo item : items){
resultList.addAll(item.getDbResultList());
for(CSVPojo pojo :item.getQueueRequestList()){
sampleResponseMessageRabbitConverterFuture= asyncRabbitTemplate.convertSendAndReceive("spring-boot-rabbitmq-Interactive.async_Solve_InteractiveMsg", "Interactive_RequestQueue", pojo);
//CSVPojo res =(CSVPojo)rabbitTemplate.convertSendAndReceive("spring-boot-rabbitmq-Interactive.async_Solve_InteractiveMsg", "Interactive_RequestQueue", pojo);
System.out.println("heyyyyyy:" + sampleResponseMessageRabbitConverterFuture.get().getLatitute());
//resultList.add(res);
//resultList.add(sampleResponseMessageRabbitConverterFuture.get());
}
}
Используя его, я могу опубликовать в очереди, у меня есть код подписчика ниже.
@EnableRabbit
public class ListenerQueueSubscriber {
@RabbitHandler
@RabbitListener(containerFactory = "simpleMessageListenerContainerFactory", queues = "Interactive_RequestQueue")
public void subscribeToRequestQueue(@Payload CSVPojo sampleRequestMessage, Message message) throws InterruptedException {
System.out.println("inside listener");
sampleRequestMessage.setResult("Hello");
Thread.sleep(120000);
System.out.println("After sleep:" +sampleRequestMessage.getLongitude());
//return sampleRequestMessage;
}
}
Используя вышеупомянутый подписчик, способный прослушивать сообщение, я добавляю "Привет и помещаю в режим сна на 2 минуты, после чего я должен получить сообщение обратно издателю, откуда я опубликовал. Но, к сожалению, я не получаю сообщение с добавленным Привет получение ** AmqpReplyTimeoutException **. Пожалуйста, помогите добиться такого поведения.
Заранее спасибо!!!!
Спасибо Билану за ответ. Я попытался установить таймаут приема на 5 минут. asyncRabbitTemplate.setReceiveTimeout (300000); Несмотря на то, что появляется такая же ошибка. Пытался удалить Thread.sleep, но не повезло. Мне нужно вернуть объект в моем классе слушателя? получить в издатель
Да, вы действительно должны вернуться
Спасибо, Билан. Это сработало. Последний вопрос с моей стороны: есть ли способ установить ReceiveTimeout (asyncRabbitTemplate.setReceiveTimeout (300000)) до тех пор, пока он не получит ответ от подписчика, я имею в виду, вместо того, чтобы ставить статические миллисекунды, есть ли какое-либо значение, которое мы можем поставить, чтобы дождаться, пока он не получит ответ назад от абонента?
Эта функция доступна? или он ведет себя / имеет возможность, как «до тех пор, пока не будет получено ответное сообщение, или до тех пор, пока не истечет время ожидания, в зависимости от того, что больше»?
Что ж, вы можете установить его на что-то определенно большое, но это не будет практичным. Поскольку ваш производитель и потребитель распределены, нет гарантии, что что-то не произойдет между ними. Таким образом, в реальном мире ждать вечно будет напрасной тратой ресурсов, если мы каким-то образом потеряем ответ.
Спасибо за разъяснения!! просто чтобы убедиться, что моему потребителю потребуется немного времени, чтобы вернуть сообщение издателю, будет ли он ждать, пока он не получит сообщение обратно? или будет исключение тайм-аута?
Он будет ждать максимум таймаута. Если ответ придет раньше, это хорошо для вас. В противном случае это не сработает: потребитель полностью не блокирует производителя, потому что между ними есть брокер RabbitMQ. Таким образом, продюсер должен что-то делать самостоятельно, чтобы дождаться ответа. Больше ничего нельзя сделать.
Потрясающие!!! Спасибо за разъяснения, Билан.




Пожалуйста, посмотрите здесь возможное решение: rabbitmq.com/tutorials/tutorial-six-spring-amqp.html. Не уверен, однако, как вы ожидаете, что не получите
AmqpReplyTimeoutException, если поспите две минуты перед отправкой ответа ...