Понимание весеннего облачного обмена сообщениями с rabbitmq

Я думаю, что у меня проблема с пониманием весенних облачных сообщений, и я не могу найти ответ на «проблему», с которой я столкнулся.

У меня следующая настройка (с использованием spring -boot 2.0.3.RELEASE).

application.yml

spring:
    rabbitmq:
      host: localhost
      port: 5672
      username: guest
      password: guest
      virtual-host: /
    cloud:
      stream:
        bindings:
          input:
            destination: foo
            group: fooGroup
          fooChannel:
            destination: foo

Класс обслуживания

@Autowired
FoodOrderController foodOrderController;

@Bean
public CommandLineRunner runner() {
    return (String[] args) -> {
       IntStream.range(0,50).forEach(e -> foodOrderController.orderFood());
    };
}

@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals(String meal){
    System.out.println("This was a great meal!: "+ meal);
}

@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals1(String meal){
    System.out.println("This was a great meal!: "+ meal);
}

FoodOrderController

public class FoodOrderController {

    @Autowired
    FoodOrderSource foodOrderSource;

    public String orderFood(){
        var foodOrder = new FoodOrder();
        foodOrder.setCustomerAddress(UUID.randomUUID().toString());
        foodOrder.setOrderDescription(UUID.randomUUID().toString());
        foodOrder.setRestaurant("foo");
        foodOrderSource.foodOrders().send(MessageBuilder.withPayload(foodOrder).build());
       // System.out.println(foodOrder.toString());
        return "food ordered!";
    }
}

FoodOrderSource

public interface FoodOrderSource {
    String INPUT = "foo";
    String OUTPUT = "fooChannel";

    @Input("foo")
    SubscribableChannel foo();
    @Output("fooChannel")
    MessageChannel foodOrders();
}

FoodOrderPublisher

@EnableBinding(FoodOrderSource.class)
public class FoodOrderPublisher {
}

Настройка работает, за исключением того, что оба StreamListener получают одинаковые сообщения. Так что все регистрируется дважды. Читая документацию, говорится, что указав group внутри привязок очередей, оба слушателя будут зарегистрированы внутри группы, и только один слушатель получит одно сообщение. Я знаю, что приведенный выше пример не имеет смысла, но я хочу имитировать многоузловую среду с настройкой нескольких слушателей.

Почему сообщение получают оба слушателя? И как я могу убедиться, что сообщение получено только один раз в группе настройки?

Согласно документации, сообщения также должны автоматически подтверждаться по умолчанию, но я не могу найти ничего, что указывало бы на то, что сообщения действительно подтверждаются. Я что-то упустил?

Вот несколько скриншотов кролика админа

Понимание весеннего облачного обмена сообщениями с rabbitmqПонимание весеннего облачного обмена сообщениями с rabbitmqПонимание весеннего облачного обмена сообщениями с rabbitmqПонимание весеннего облачного обмена сообщениями с rabbitmqПонимание весеннего облачного обмена сообщениями с rabbitmq

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
185
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Reading the documentation, it says specifying a group inside the queues bindings, both the listeners will be registered inside the group and only one listener will receive a single message.

Это верно, когда слушатели находятся в разных экземплярах приложения. Когда в одном экземпляре есть несколько слушателей, все они получают одно и то же сообщение. Обычно это используется с condition, где каждый слушатель может выразить интерес к тому, какие блюда ему интересны. Документировано здесь.

По сути, конкурирующий потребитель - это сама привязка, которая отправляет сообщение фактическим @StreamListener в приложении.

Таким образом, вы не можете «имитировать многоузловую среду с настройкой нескольких слушателей».

but I can't find anything that indicates that the messages actually get acknowledged

Что ты имеешь в виду? Если сообщение обработано успешно, контейнер подтверждает сообщение, и оно удаляется из очереди.

Спасибо! Я согрешил, создал еще одно приложение для весенней загрузки, настроил его как первый экземпляр, запустил их оба и снова, все сообщения регистрируются в обоих экземплярах. Можете ли вы подтвердить, что моя конфигурация, сделанная в application.yml, верна?

baao 18.08.2018 17:15

Выглядит нормально; вы тоже добавляли ApplicationRunner в оба экземпляра? Это удвоило бы количество сообщений. Вы можете посмотреть на пользовательский интерфейс администратора RabbitMQ (http://localhost:15672), чтобы увидеть очередь foo.fooGroup и ее потребителей.

Gary Russell 18.08.2018 17:22

Я добавил несколько картинок, я не вижу соединения между группой и очередями, а вы? У обмена foo есть обе очереди, но я не получаю соединение с группой. Я не добавлял ApplicationRunner во второе приложение, оно работает только в первом.

baao 18.08.2018 17:32

Странно - у вас 2 анонимные очереди, значит, свойство group по какой-то причине не применяется. Очереди правильно привязаны к обмену foo; обмена fooGroup быть не должно. Должна быть одна очередь foo.fooGroup, к которой присоединены оба потребителя, с очередью, привязанной к foo. Если вы не можете понять это, опубликуйте свой проект где-нибудь, например, на github; Я могу посмотреть, что не так. Но сегодня я буду здесь ненадолго, так что это может быть сегодня вечером или завтра (EDT).

Gary Russell 18.08.2018 17:44

Я только что изменил StreamListener с использования target = FoodOrderSource.INPUT на Sink.INPUT. Это однозначно распределяет сообщения по обоим и создает очереди, как вы описали, но вызывает ошибки, связанные с тем, что у application-1.foo нет подписчиков. Попробую разобраться; если не могу, то закачу на гитхаб и пингую. Спасибо за это предложение!

baao 18.08.2018 17:51

Ах - это из-за этого @Input("foo") - вам нужен bindings: foo: вместо bindings: input:. Извините, я не заметил несоответствия в названии привязки. Если для foo нет явной привязки, он использует значения по умолчанию (пункт назначения = foo, анонимный потребитель).

Gary Russell 18.08.2018 17:56

На правильный ответ уже дан ответ в сообщении, но вы все равно можете изучить это:

https://github.com/jinternals/spring-cloud-stream

Спасибо, это кажется полезным и решит мою следующую задачу!

baao 18.08.2018 20:45

пожалуйста, любите репо, если вы сочтете это полезным, счастливого кодирования :)

Mradul Pandey 18.08.2018 20:47

Другие вопросы по теме