В приведенном ниже примере я устанавливаю максимальный и основной размер пула равным 1. Однако сообщения не обрабатываются. Когда я включаю журнал отладки, я могу видеть сообщения, извлекаемые из SQS, но я предполагаю, что они не обрабатываются / не удаляются. Однако, когда я увеличиваю размер ядра и максимального пула до 2, кажется, что сообщения обрабатываются.
РЕДАКТИРОВАТЬ
Я считаю, что Spring, возможно, выделяет поток для получателя, который считывает данные из очереди, и, следовательно, он не может выделить поток для слушателя, который обрабатывает сообщение. Когда я увеличил размер corepoolsize до 2, я увидел, что сообщения считываются из очереди. Когда я добавил еще одного слушателя (для очереди недоставленных сообщений), я столкнулся с той же проблемой - двух потоков было недостаточно, поскольку сообщения не обрабатывались. Когда я увеличил размер corepoolsize до 3, он начал обрабатывать сообщения. Я предполагаю, что в этом случае 1 поток был выделен для чтения сообщений из очереди, а 2 слушателя были назначены по 1 потоку каждому.
@Configuration
public class SqsListenerConfiguration {
@Bean
@ConfigurationProperties(prefix = "aws.configuration")
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
try {
credentialsProvider.getCredentials();
System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e);
}
return credentialsProvider;
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard().
withCredentials(awsCredentialsProvider()).
withClientConfiguration(clientConfiguration()).
build();
}
@Bean
@ConfigurationProperties(prefix = "aws.queue")
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("oaoQueueExecutor");
executor.initialize();
return executor;
}
@Bean
public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}
}
Конфигурация слушателя
@SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
System.out.println(" Data = " + serviceData + " MessageId = " + messageId);
repository.execute(serviceData);
}




Устанавливая одинаковые corePoolSize и maximumPoolSize, вы создаете fixed-size thread pool. Очень хорошее объяснение правил задокументировано здесь
Установка maxPoolSize неявно позволяет отбрасывать задачи.
Однако емкость очереди по умолчанию - Integer.MAX_VALUE, что для практических целей бесконечно.
Следует обратить внимание на то, что ThreadPoolTaskExecutor использует внизу ThreadPoolExecutor, который имеет несколько необычный подход к организации очередей, описанный в документы:
If
corePoolSizeor more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
Это означает, что maxPoolSize актуален только тогда, когда очередь заполнена, иначе количество потоков никогда не превысит corePoolSize.
Например, если мы отправим задачи это никогда не завершится в пул потоков:
corePoolSize запускает новую цепочку;maxPoolSize;Очередь - прочитать документы
Любой BlockingQueue может использоваться для передачи и хранения отправленных задач. Использование этой очереди влияет на размер пула:
Unbounded queues. Using an unbounded queue (for example aLinkedBlockingQueuewithout a predefined capacity) will cause new tasks to be queued in cases where all corePoolSize threads are busy. Thus, no more thancorePoolSizethreads will ever be created. (And the value of themaximumPoolSizetherefore doesn't have any effect.)
corePoolSize, создайте новый
Тема для запуска новой задачи.corePoolSize, поставь задачу в очередь.maxPoolSize, создайте новый поток для запуска задач.maxPoolSize, отклонить задачу.Спасибо @Vikram Palakurthi. В моем примере я не установил емкость очереди. Так что я считаю, что это бесконечность. Следовательно, maxpoolsize, как я полагаю, не имеет значения. Я пробовал поместить около 10 сообщений в очередь SQS FIFO. Поскольку для corepoolsize установлено значение 1, я предполагал, что потребитель будет обрабатывать сообщения одно за другим. Но потребитель не получает сообщения. Однако, когда я установил его на 2, сообщения читаются. Всегда ли один поток предназначен для опроса сообщений, и, следовательно, если я установлю размер очереди на 2, вторые сообщения будут выполнять обработку?
Нет, не для опроса, но я думаю, что в вашем случае executor.setCorePoolSize (1); executeor.setMaxPoolSize (1); 4-е правило применяется, и задача отклоняется. Если вы видите порядок правил, я несколько предполагаю, что исполнитель обнаруживает, что потоки больше, чем maxPoolSize, который равен 1, и отклоняет задачи, что объясняет, почему обработка работает, когда вы увеличиваете его до 2.
Установив одинаковые значения corePoolSize и maximumPoolSize, вы создали пул потоков фиксированного размера, не оставляя ему возможности динамически выделять растущие потоки. Попробуйте удалить эту часть executor.setMaxPoolSize (1); и посмотрим, как получится.
Я предполагаю, что maxpoolsize срабатывает только в том случае, если достигается емкость очереди. Об этом даже говорится в вашем ответе - «Это означает, что maxPoolSize актуален только тогда, когда очередь заполнена». В моем случае я просто отправляю 4-5 сообщений в очередь для тестирования. Я также попытался увеличить maxpoolsize до 1. Но из-за этой конфигурации эффекта нет. Только если я увеличу размер основного пула до большего, чем 1, это сработает.
Правильно, в вашем случае corePoolSize равен 1, так как у вас около 4-5 сообщений, maxPoolSize сработает. А поскольку вы указали как corePoolSize, так и maxPoolSize как 1, что вызвало проблему. Вы упомянули, что пытались увеличить maxPoolSize до 1, что не решит вашу проблему, пытаясь увеличить его до значения больше 1, например 2-5.
Извини, Викрам, я увеличил его до 10. Это была опечатка.
Я думаю, что максимальный размер пула сработает только тогда, когда будет достигнута емкость очереди. В этом случае емкость очереди не ограничена, поэтому я предполагаю, что максимальный размер пула не вступит в силу.
Вот билет, который я создал для этого вопроса - github.com/spring-cloud/spring-cloud-aws/issues/321