Spring AMQP RabbitMQ RPC Priority Queue

Я пытаюсь создать очередь RPC, в которой сообщения от производителя / клиента имеют приоритет для потребителя. Я хочу, чтобы все сообщения с приоритетом 2 обрабатывались перед любыми сообщениями с приоритетом 1. Я также хочу, чтобы каждый потребитель мог обрабатывать 10 сообщений за раз. Я могу заставить каждого потребителя обрабатывать 10 сообщений за раз. Я НЕ могу заставить работать приоритизацию сообщений. Ниже моя установка:

Файлы конфигурации:

@Configuration
public class QueueConfig  { 
    public static final String QUEUE_NAME = "requests";

    private int maxPriority = 2;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public Queue requests() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-priority", maxPriority);
        return new Queue(QUEUE_NAME,true,false,false, args);
    }

    @Bean
    public Queue replies() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-priority", maxPriority);
        return new Queue("replies",true,false,false, args);
    }  

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(replies().getName());
        return container;
    }

    @Bean
    public RabbitTemplate template() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setRoutingKey(requests().getName());
        return rabbitTemplate;
    }

    @Bean
    public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer container) {
        AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate, container);
        asyncRabbitTemplate.setReceiveTimeout(90000);
        return asyncRabbitTemplate;
    }        
}

Клиент / Производитель:

@Component
public class Client {
    @Autowired
    private AsyncRabbitTemplate template;

    public void sendHigh(String name) {

        MessagePostProcessor messageProcessor = new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setPriority(2);
                return message;
            }
        };

        ListenableFuture<String> response = template.convertSendAndReceive(QueueConfig.QUEUE_NAME, (Object) name,(MessagePostProcessor) messageProcessor);
        try {
            response.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public void sendLow(String name) {

        MessagePostProcessor messageProcessor = new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setPriority(1);
                return message;
            }
        };

        ListenableFuture<String> response = template.convertSendAndReceive(QueueConfig.QUEUE_NAME, (Object) name,(MessagePostProcessor) messageProcessor);
        try {
            response.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

конфигурационный файл 2:

@Configuration
@EnableAsync
public class ServiceConfig implements AsyncConfigurer {

    @Override
    @Bean
    public Executor getAsyncExecutor() {
        return new SimpleAsyncTaskExecutor();
    }


}

Потребитель:

@Component
public class Consumer  {

    @RabbitListener(queues = QueueConfig.QUEUE_NAME)
    public String consume(@Payload String name) {
        System.out.println("Request Consumer " + name);
        String result = name;
        if (result.equals("john") || result.equals("john1")) {
            try {
                Thread.sleep(22000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return result;
    }

Application.properties:

spring.rabbitmq.dynamic=true
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.host=localhost
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.direct.acknowledge-mode=MANUAL

Юнит:

Я ожидал, что все john и john1 будут получены потребителем раньше jeff, но я не наблюдаю такого поведения. В основном я смотрю на этот println и ожидаю, что все john и john1 будут напечатаны до jeff System.out.println ("Request Consumer" + name);

@RunWith(SpringRunner.class)
@ComponentScan(basePackages = "com.test.test")
@EnableAutoConfiguration
@SpringBootTest
public class ApplicationTests {

    @Autowired AsyncClass asyncClass;

    @Test
    public void contextLoads() throws InterruptedException, ExecutionException {
        List<Future<String>> futures = new ArrayList<>();
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        futures.add(asyncClass.runAsyncHigh("john"));
        Thread.sleep(1000);
        futures.add(asyncClass.runAsyncLow("jeff"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));
        futures.add(asyncClass.runAsyncHigh("john1"));

        for(Future<String> future : futures) {
            future.get();
        }


    }

@Компонент public class AsyncClass {

@Autowired Client client;

@Async
public Future<String> runAsyncHigh(String name){
    client.sendHigh(name);
    return new AsyncResult<String>(name);
}

@Async
public Future<String> runAsyncLow(String name){
    client.sendLow(name);
    return new AsyncResult<String>(name);
}

}

Спасибо, Брайан

Здравствуйте, я думаю, что разобрался. Кажется, что если я установлю spring.rabbitmq.listener.simple.prefetch = 1, я получу ожидаемое поведение.

Brian 10.12.2018 14:55

Я просто собирался предложить это - после совета команды RabbitMQ мы увеличили предварительную выборку по умолчанию в версии 2.0, чтобы повысить производительность сразу после установки.

Gary Russell 10.12.2018 15:20
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
2
234
0

Другие вопросы по теме