Как использовать приоритетную очередь с Spring boot AsyncConfigurer

У меня есть приложение, в котором у меня есть несколько потоков, читающих сообщения из пункта назначения jms. Поток слушателя читает сообщение, вносит в него некоторые изменения и вызывает несколько других методов разных классов. Эти методы снабжены аннотацией @Async, что все методы выполняются параллельно с использованием настраиваемого ThreadPoolTaskExecutor.

@Override
public Executor getAsyncExecutor() {        
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(corePoolSize);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setQueueCapacity(queueCapacity);
    executor.setKeepAliveSeconds(keepAliveSeconds);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setTaskDecorator(new LoggingTaskDecorator());
    executor.initialize();
    return executor;
}

До сих пор все сообщения считались одинаковыми по приоритету, все было в порядке, так как все сообщения попадали в LinkedBlockingQueue, если ни один из потоков Executor не оставался доступным.

Теперь возникает требование, согласно которому определенному типу сообщения, прочитанному из очереди, должен быть предоставлен более высокий приоритет, чем любому другому сообщению, прочитанному из очереди.

В настоящее время я использую org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor, который не предоставляет никакого метода, позволяющего установить приоритетную очередь в качестве реализации моей очереди блокировки.

Не могли бы вы помочь мне решить этот сценарий? Или существующий дизайн системы не может принять это изменение? Или что может быть лучшим решением для таких сценариев?

Спасибо !

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
931
1

Ответы 1

Просто переопределив метод createQueue. Также вы должны использовать метод @Bean для создания экземпляра bean-компонента, чтобы Spring мог правильно управлять жизненным циклом, что является небольшой, но важной вещью (иначе завершение работы не будет работать должным образом).

@Override
public Executor getAsyncExecutor() {
  return taskExecutor();
}        

@Bean    
public ThreadPoolTaskExecutor taskExecutor() {        
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
      return new PriorityBlockingQueue<>(queueCapacity);
    } 
  };
  executor.setCorePoolSize(corePoolSize);
  executor.setMaxPoolSize(maxPoolSize);
  executor.setQueueCapacity(queueCapacity);
  executor.setKeepAliveSeconds(keepAliveSeconds);
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  executor.setTaskDecorator(new LoggingTaskDecorator());  
  return executor;
}

Что-то вроде этого должно работать. Метод createQueue теперь создает PriorityBlockingQueue вместо LinkedBlockingQueue по умолчанию.

Другие вопросы по теме