У меня есть AWS SQS с 5000 сообщениями, уже находящимися в очереди (пример сообщения выглядит как «Hello @ 1») Я создал приложение SpringBoot и внутри одного из классов компонентов создаю метод для чтения сообщений из SQS.
package com.example.aws.sqs.service;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class MessageReceiverService {
@SqsListener(value = { "${cloud.aws.sqs.url}" }, deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
public void readMessage(String message){
log.info("Reading Message... {}", message);
}
}
Мой основной класс SpringBoot
@SpringBootApplication
public class AwsSqsApplicationConsumer {
public static void main(String[] args) {
SpringApplication.run(AwsSqsApplicationConsumer.class, args);
}
}
Исключение, которое я получаю при запуске приложения:
s.c.a.m.l.SimpleMessageListenerContainer : An Exception occurred while polling queue '<my sqs name>'. The failing operation will be retried in 10000 milliseconds
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]] did not accept task: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:309) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286) ~[spring-cloud-aws-messaging-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_65]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2 rejected from java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 2, queued tasks = 0, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_65]
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:306) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
... 6 common frames omitted
Я НЕ настраиваю какие-либо пользовательские службы Executor. Использование предварительно настроенных компонентов Spring Beans. springBootVersion = '2.0.3.RELEASE' springCloudVersion = 'Finchley.RELEASE'
org.springframework.cloud.aws.messaging.listener.SimpleMessa geListenerContainer - это класс Spring, поставляемый с spring-cloud-aws-messaging-2.0.0.RELEASE.jar
Привет как ты решил эту проблему
@ AnkitaAgrawal - Я перестал использовать «@SqsListener» и начал использовать клиент AmazonSqs с «@Scheduled» с системой fixedRate из Spring.




Проблема в конфигурации потока слушателя. См. Следующие
...
ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]]
...
Размер пула потоков по умолчанию меньше желаемого.
Добавьте следующую конфигурацию в свое приложение Spring
@Configuration
public class TasksConfiguration implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(5); // TODO: Load this from configuration
taskScheduler.initialize();
taskRegistrar.setTaskScheduler(taskScheduler);
}
}
Теперь у вас должна быть возможность обрабатывать эти задачи.
P.S. Какие бы задачи не были отклонены ранее, они будут подняты позже, по прошествии определенного периода.
Обновлено: я думаю, что люди пугаются числа в строке .setPoolSize(5000). Это настраиваемый номер, который вы можете выбрать в соответствии с вашими требованиями. Для ответа я сокращаю его до меньшего числа.
так что согласно вашему решению мне нужно столько потоков, сколько сообщений в SQS? 5000 - большое число, которое нужно настроить как размер и при условии, что вы учли ту часть, где это могло бы работать на низкоядерном процессоре.
Думаю, довольно ясно, что это настраиваемое число. Вы должны выбрать номер в соответствии с требованиями вашего приложения (очевидно, я не могу не выбрать этот номер для вас). Другое дело, вам не нужно устанавливать его равным количеству сообщений в SQS. Это зависит от того, сколько сообщений вы хотите обрабатывать одновременно. Если вы хотите, чтобы они обрабатывались одновременно, вы можете установить его на 5000. Если вы не возражаете обрабатывать их по одному, вы можете установить его на один. Они будут обработаны позже, как указано в ответе. Но количество сообщений будет расти. Это твое решение.
установка максимального количества сообщений, похоже, решает проблему:
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS){
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSQS);
factory.setMaxNumberOfMessages(10);
return factory;
}
Добавление некоторых пояснений о том, как это работает: Spring-cloud-aws установит количество потоков равным количеству сообщений, если количество сообщений было явно установлено, в противном случае он установит его на 2, даже если количество сообщений по умолчанию равно 10.
Действительно заинтересованы в этом ответе, можете ли вы опубликовать полную строку исходного кода для реализации аналогичного механизма
Привет, я решил эту проблему с помощью Spring Listener. Ниже приведен код, надеюсь, он поможет.
В следующем решении после завершения инициализации всех bean-компонентов выделяется новый исполнитель задачи с большим размером пула.
@Component
public class PostBeansConstructionListener{
@EventListener
public void handleContextRefreshedEvent(ContextRefreshedEvent event){
final ApplicationContext applicationContext = event.getApplicationContext();
final SimpleMessageListenerContainer simpleMessageListenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class);
setAsyncTaskExecutor(simpleMessageListenerContainer);
}
private void setAsyncTaskExecutor(SimpleMessageListenerContainer simpleMessageListenerContainer) {
try{
simpleMessageListenerContainer.setTaskExecutor(getAsyncExecutor());
}catch(Exception ex){
throw new RuntimeException("Not able to create Async Task Executor for SimpleMessageListenerContainer.", ex);
}
}
public AsyncTaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("threadPoolExecutor-SimpleMessageListenerContainer-");
executor.initialize();
return executor;
}
}
Невозможно добавить комментарий к предыдущим ответам, чтобы дополнительно объяснить, почему возникает проблема и почему работает параметр решения MaxNumberOfMessages сообщений. Надеюсь, следующее поможет все прояснить.
SimpleMessageListenerContainerThreadPoolTaskExecutor настроен на размер основного пула 2 потока, максимальный размер пула 3 потока и емкость очереди 0. Однако по умолчанию максимальное количество сообщений, возвращаемых при опросе в Amazon SQS, равно 10. Значение что если в одном опросе будет доступно 10 сообщений, потоков для их обработки не хватит. Таким образом выбрасывается RejectedExecutionException.
Настройка setMaxNumberOfMessages на 10 на SimpleMessageListenerContainerFactory устанавливает максимальный размер пула потоков равным 11, что должно обеспечить доступность достаточного количества потоков. Он не устанавливает емкость очереди.
Чтобы установить емкость очереди, можно инициализировать отдельный TaskExecutor и установить его в компоненте SimpleMessageListenerContainerFactory следующим образом:
@Bean(name = "sqsAsyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor(@Value("${external.aws.sqs.core-thread-count}") int coreThreadCount,
@Value("${external.aws.sqs.max-thread-count}") int maxThreadCount,
@Value("${external.aws.sqs.queue-capacity}") int queueCapacity) {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setCorePoolSize(coreThreadCount);
asyncTaskExecutor.setMaxPoolSize(maxThreadCount);
asyncTaskExecutor.setQueueCapacity(queueCapacity);
asyncTaskExecutor.setThreadNamePrefix("threadPoolExecutor-SimpleMessageListenerContainer-");
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS, @Qualifier("sqsAsyncTaskExecutor") AsyncTaskExecutor asyncTaskExecutor) {
SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
simpleMessageListenerContainerFactory.setTaskExecutor(asyncTaskExecutor);
return simpleMessageListenerContainerFactory;
}
Я использовал следующие значения: coreThreadCount = 5, maxThreadCount = 20, queueCapacity = 10.
Как я уже сказал, я думаю, что настройки setMaxNumberOfMessages на 10 на SimpleMessageListenerContainerFactory должно быть достаточно для обработки всех пакетных сообщений, полученных из одного запроса. Однако, если вы чувствуете, что вам нужен более точный контроль над TaskExecutor, эта конфигурация также подойдет.
Я считаю, что весной это ошибка или недосмотр. Проблема связана со значениями по умолчанию:
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
private static final int DEFAULT_WORKER_THREADS = 2;
а также
abstract class AbstractMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle, BeanNameAware {
private static final int DEFAULT_MAX_NUMBER_OF_MESSAGES = 10;
Если maxNumberOfMessages не задан, то он использует 10 как количество сообщений, извлекаемых из SQS, и 2 как количество рабочих в исполнителе задачи. Это означает, что если он извлекает 3 или более сообщения одновременно, вы получаете это исключение. Если вы вручную установите maxNumberOfMessages на значение (любое значение), он будет использовать его в обоих местах, синхронизируя значения, как я считаю, ожидается:
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(
SimpleMessageListenerContainerFactory factory, QueueMessageHandler messageHandler)
{
SimpleMessageListenerContainer container = factory.createSimpleMessageListenerContainer();
container.setMaxNumberOfMessages(5);
container.setMessageHandler(messageHandler);
return container;
}
Ошибка вроде
SimpleMessageListenerContainer- где этот код?