Spring-amqp с пустым именем очереди awaiting_declaration

Я хочу создать очередь с пустым именем, чтобы имя могло быть сгенерировано RabbitMQ -

var queue = QueueBuilder
    .durable("")
    .exclusive()
    .autoDelete().build

var binding = BindingBuilder.bind(queue).to(exchange).with(bindingKey).noargs();
Declarables d = new Declarables(queue, binding);

Но затем вызов getActualName возвращает: spring.gen-vuiRwjOmRkihAE8C72rbmw_awaiting_declaration

d.getDeclarablesByType(Queue.class).get(0).getActualName();

В то время как в rabbitMQ имя: amq.gen-wpaYnybu9vOdD5v2ej66IQ

В ядре spring-amqp конструктор Queue объявляет:

    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,
            @Nullable Map<String, Object> arguments) {
    
        super(arguments);
        Assert.notNull(name, "'name' cannot be null");
        this.name = name;
        this.actualName = StringUtils.hasText(name) ? name
                : (Base64UrlNamingStrategy.DEFAULT.generateName() 
+ "_awaiting_declaration");
        this.durable = durable;
        this.exclusive = exclusive;
        this.autoDelete = autoDelete;
    }

Почему spring Queue использует Base64UrlNamingStrategy и добавляет «awaiting_declaration», когда нам нужно имя rabbitMQ? Как мы можем получить имя rabbitMQ, а не сгенерированное весной имя?

Определение очереди: https://github.com/spring-projects/spring-amqp/blob/d4e0f5c366a7ffae073f608c3766c82064cab3d1/spring-amqp/src/main/java/org/springframework/amqp/core/Queue.java#L98

Причина этого варианта использования связана с состоянием гонки в очередях: «Когда автоматически удаляемые или эксклюзивные очереди используют общеизвестные (статические) имена, в случае отключения клиента и немедленного повторного подключения возникнет естественное состояние гонки между узлами RabbitMQ, которое удалит такие очереди и восстановит клиенты, которые попытаются повторно объявить их. Это может привести к сбою или исключениям при восстановлении соединения на стороне клиента, а также создать ненужную путаницу или повлиять на доступность приложения».

Https://www.rabbitmq.com/queues.html#properties

Spring предлагает использовать очереди на основе брокера, которые могут привести к состоянию гонки: https://docs.spring.io/spring-amqp/docs/current/reference/html/#containers-and-broker-named-queues

Обновлено: Мы не инициируем соединение сами, но компонент администрирования инициирует его после d.setAdminsThatShouldDelcare(admin)

    public Declarables someEventsDeclarables(
    @Qualifier("rabbitAdmin") RabbitAdmin admin,
    @Qualifier("AmqpExchange") Exchange exchange
) {
    final var bindingKey = somePrefix +".*." +someSuffix
    final var cfg = new OurEventsDeclarables(
        exchange,
        "", // no queue name - RabbitMq generates it
        bindingKey,
        true
    );

    final var declarables = cfg.declarables();
    for (Declarable d : declarables.getDeclarables()) {
        d.setAdminsThatShouldDeclare(admin);
        admin.declareQueue();
    }
    return declarables;
}

Запуск интеграционного теста, который использует очередь, приводит к

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[spring.gen-QUh8ffN0TimELGG_kF1wFw_awaiting_declaration]

Edit-2 - рабочее решение (в других тестах Declarables и AutoAckConsumer может потребоваться @MockBean):

public class AmqpConfig{
...
@Bean("someEventsDeclarables")
public Declarables someEventsDeclarables(
@Qualifier("rabbitAdmin") RabbitAdmin admin,
@Qualifier("AmqpExchange") Exchange exchange
) {
final var bindingKey = somePrefix +".*." +someSuffix
final var cfg = new OurEventsDeclarables(
    exchange,
    "", // no queue name - RabbitMq generates it
    bindingKey,
    true
);

final var declarables = cfg.declarables();

/** declare queue and bindings */
final List<Queue> queues = declarables.getDeclarablesByType(Queue.class);
final List<Binding> bindings = declarables.getDeclarablesByType(Binding.class);
if (queues.size() == 0) {
    throw new BeanCreationException("Queue for empty-queue-name is not found");
}
if (bindings.size() == 0) {
    throw new BeanCreationException("Binding for {} is not found of empty-queue-name", bindingKey);
}
Queue queue = queues.get(0);
Binding binding = bindings.get(0);
String declareQueue = admin.declareQueue(queue);
queue.setActualName(declareQueue);
admin.declareBinding(binding);
return declarables;
}

Как вы объявляете очередь? При объявлении через RabbitAdmin фактическое имя заполняется из результата DeclareOk: queue.setActualName(declareOk.getQueue());.

Gary Russell 10.01.2023 18:15

Я отредактировал вопрос с помощью фрагмента кода из нашего кода, который вызывает d.setAdminsThatShouldDeclare(admin); который должен заставить компонент администратора инициировать соединение.

Endre Moen 11.01.2023 16:57

Вы используете неправильный метод - смотрите мой ответ.

Gary Russell 11.01.2023 17:07

У меня нет доступа к очереди — только Declarables, созданные с помощью return new Declarables(queue, binding);

Endre Moen 11.01.2023 17:19
getDeclarablesByType(Queue.class).
Gary Russell 11.01.2023 17:51
0
5
59
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Непонятно, что и где вы делаете с этим Queue объектом. Вы должны иметь bean-компонент RabbitAdmin в контексте приложения. Как и все эти экземпляры Declarable, они также должны быть объявлены как bean-компоненты. Затем, когда запускается контекст приложения, этот RabbitAdmin берет все Declarable bean-компоненты и объявляет их брокеру. Дальнейшая логика вокруг Queue такая:

DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
                            queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
if (StringUtils.hasText(declareOk.getQueue())) {
    queue.setActualName(declareOk.getQueue());
}

Просто создание переменных для этих типов не инициирует никакого подключения к брокеру для создания объектов и привязок между ними.

Я отредактировал вопрос с помощью фрагмента кода из нашего кода, который вызывает d.setAdminsThatShouldDeclare(admin); который должен заставить компонент администратора инициировать соединение.

Endre Moen 11.01.2023 16:56
Ответ принят как подходящий

Вы используете admin.declareQueue() (и отбрасываете результат) вместо admin.declareQueue(Queue queue).

Первый метод просто объявляет брокера с именем queue; для обновления фактического имени требуется объект Queue (чтобы он мог обновить имя).

Как мне попасть в очередь? В объявлении у меня есть очередь, потому что cfg.declarables(); делает: var queue = QueueBuilder.durable(queueName).exclusive().autoDelete().bui‌​ld(); var binding = BindingBuilder.bind(queue).to(exchange).with(bindingKey).noa‌​rgs(); return new Declarables(queue, binding);

Endre Moen 11.01.2023 17:13
Declarables на самом деле не предназначен для этого варианта использования, он предназначен для автоматического объявления @Beans, найденных в контексте приложения, администратором. Тем не менее, вы можете использовать declarables.getDeclarablesByType(), чтобы получить список Queue объектов.
Gary Russell 11.01.2023 17:50

Спасибо, что указали, что мне нужно admin.declareQueue(Queue queue) и сохранить возвращенное имя очереди для вызова queue.setAcutalName(returnedQueueName). Мне также нужно было объявить привязку. Обновленный вопрос с рабочим решением.

Endre Moen 13.01.2023 10:32

Другие вопросы по теме