У меня есть приложение, в котором у меня есть несколько потоков, читающих сообщения из пункта назначения jms. Поток слушателя читает сообщение, вносит в него некоторые изменения и вызывает несколько других методов разных классов. Эти методы снабжены аннотацией @Async
, что все методы выполняются параллельно с использованием настраиваемого ThreadPoolTaskExecutor
.
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setTaskDecorator(new LoggingTaskDecorator());
executor.initialize();
return executor;
}
До сих пор все сообщения считались одинаковыми по приоритету, все было в порядке, так как все сообщения попадали в LinkedBlockingQueue
, если ни один из потоков Executor
не оставался доступным.
Теперь возникает требование, согласно которому определенному типу сообщения, прочитанному из очереди, должен быть предоставлен более высокий приоритет, чем любому другому сообщению, прочитанному из очереди.
В настоящее время я использую org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor, который не предоставляет никакого метода, позволяющего установить приоритетную очередь в качестве реализации моей очереди блокировки.
Не могли бы вы помочь мне решить этот сценарий? Или существующий дизайн системы не может принять это изменение? Или что может быть лучшим решением для таких сценариев?
Спасибо !
Просто переопределив метод createQueue
. Также вы должны использовать метод @Bean
для создания экземпляра bean-компонента, чтобы Spring мог правильно управлять жизненным циклом, что является небольшой, но важной вещью (иначе завершение работы не будет работать должным образом).
@Override
public Executor getAsyncExecutor() {
return taskExecutor();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
};
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setTaskDecorator(new LoggingTaskDecorator());
return executor;
}
Что-то вроде этого должно работать. Метод createQueue
теперь создает PriorityBlockingQueue
вместо LinkedBlockingQueue
по умолчанию.