Я думаю, что у меня проблема с пониманием весенних облачных сообщений, и я не могу найти ответ на «проблему», с которой я столкнулся.
У меня следующая настройка (с использованием spring -boot 2.0.3.RELEASE).
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
cloud:
stream:
bindings:
input:
destination: foo
group: fooGroup
fooChannel:
destination: foo
Класс обслуживания
@Autowired
FoodOrderController foodOrderController;
@Bean
public CommandLineRunner runner() {
return (String[] args) -> {
IntStream.range(0,50).forEach(e -> foodOrderController.orderFood());
};
}
@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals(String meal){
System.out.println("This was a great meal!: "+ meal);
}
@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals1(String meal){
System.out.println("This was a great meal!: "+ meal);
}
FoodOrderController
public class FoodOrderController {
@Autowired
FoodOrderSource foodOrderSource;
public String orderFood(){
var foodOrder = new FoodOrder();
foodOrder.setCustomerAddress(UUID.randomUUID().toString());
foodOrder.setOrderDescription(UUID.randomUUID().toString());
foodOrder.setRestaurant("foo");
foodOrderSource.foodOrders().send(MessageBuilder.withPayload(foodOrder).build());
// System.out.println(foodOrder.toString());
return "food ordered!";
}
}
FoodOrderSource
public interface FoodOrderSource {
String INPUT = "foo";
String OUTPUT = "fooChannel";
@Input("foo")
SubscribableChannel foo();
@Output("fooChannel")
MessageChannel foodOrders();
}
FoodOrderPublisher
@EnableBinding(FoodOrderSource.class)
public class FoodOrderPublisher {
}
Настройка работает, за исключением того, что оба StreamListener получают одинаковые сообщения. Так что все регистрируется дважды. Читая документацию, говорится, что указав group внутри привязок очередей, оба слушателя будут зарегистрированы внутри группы, и только один слушатель получит одно сообщение. Я знаю, что приведенный выше пример не имеет смысла, но я хочу имитировать многоузловую среду с настройкой нескольких слушателей.
Почему сообщение получают оба слушателя? И как я могу убедиться, что сообщение получено только один раз в группе настройки?
Согласно документации, сообщения также должны автоматически подтверждаться по умолчанию, но я не могу найти ничего, что указывало бы на то, что сообщения действительно подтверждаются. Я что-то упустил?
Вот несколько скриншотов кролика админа




Reading the documentation, it says specifying a group inside the queues bindings, both the listeners will be registered inside the group and only one listener will receive a single message.
Это верно, когда слушатели находятся в разных экземплярах приложения. Когда в одном экземпляре есть несколько слушателей, все они получают одно и то же сообщение. Обычно это используется с condition, где каждый слушатель может выразить интерес к тому, какие блюда ему интересны. Документировано здесь.
По сути, конкурирующий потребитель - это сама привязка, которая отправляет сообщение фактическим @StreamListener в приложении.
Таким образом, вы не можете «имитировать многоузловую среду с настройкой нескольких слушателей».
but I can't find anything that indicates that the messages actually get acknowledged
Что ты имеешь в виду? Если сообщение обработано успешно, контейнер подтверждает сообщение, и оно удаляется из очереди.
Выглядит нормально; вы тоже добавляли ApplicationRunner в оба экземпляра? Это удвоило бы количество сообщений. Вы можете посмотреть на пользовательский интерфейс администратора RabbitMQ (http://localhost:15672), чтобы увидеть очередь foo.fooGroup и ее потребителей.
Я добавил несколько картинок, я не вижу соединения между группой и очередями, а вы? У обмена foo есть обе очереди, но я не получаю соединение с группой. Я не добавлял ApplicationRunner во второе приложение, оно работает только в первом.
Странно - у вас 2 анонимные очереди, значит, свойство group по какой-то причине не применяется. Очереди правильно привязаны к обмену foo; обмена fooGroup быть не должно. Должна быть одна очередь foo.fooGroup, к которой присоединены оба потребителя, с очередью, привязанной к foo. Если вы не можете понять это, опубликуйте свой проект где-нибудь, например, на github; Я могу посмотреть, что не так. Но сегодня я буду здесь ненадолго, так что это может быть сегодня вечером или завтра (EDT).
Я только что изменил StreamListener с использования target = FoodOrderSource.INPUT на Sink.INPUT. Это однозначно распределяет сообщения по обоим и создает очереди, как вы описали, но вызывает ошибки, связанные с тем, что у application-1.foo нет подписчиков. Попробую разобраться; если не могу, то закачу на гитхаб и пингую. Спасибо за это предложение!
Ах - это из-за этого @Input("foo") - вам нужен bindings: foo: вместо bindings: input:. Извините, я не заметил несоответствия в названии привязки. Если для foo нет явной привязки, он использует значения по умолчанию (пункт назначения = foo, анонимный потребитель).
На правильный ответ уже дан ответ в сообщении, но вы все равно можете изучить это:
https://github.com/jinternals/spring-cloud-stream
Спасибо, это кажется полезным и решит мою следующую задачу!
пожалуйста, любите репо, если вы сочтете это полезным, счастливого кодирования :)
Спасибо! Я согрешил, создал еще одно приложение для весенней загрузки, настроил его как первый экземпляр, запустил их оба и снова, все сообщения регистрируются в обоих экземплярах. Можете ли вы подтвердить, что моя конфигурация, сделанная в application.yml, верна?