SQSListener с ThreadpoolExecutor

В приведенном ниже примере я устанавливаю максимальный и основной размер пула равным 1. Однако сообщения не обрабатываются. Когда я включаю журнал отладки, я могу видеть сообщения, извлекаемые из SQS, но я предполагаю, что они не обрабатываются / не удаляются. Однако, когда я увеличиваю размер ядра и максимального пула до 2, кажется, что сообщения обрабатываются.

РЕДАКТИРОВАТЬ

Я считаю, что Spring, возможно, выделяет поток для получателя, который считывает данные из очереди, и, следовательно, он не может выделить поток для слушателя, который обрабатывает сообщение. Когда я увеличил размер corepoolsize до 2, я увидел, что сообщения считываются из очереди. Когда я добавил еще одного слушателя (для очереди недоставленных сообщений), я столкнулся с той же проблемой - двух потоков было недостаточно, поскольку сообщения не обрабатывались. Когда я увеличил размер corepoolsize до 3, он начал обрабатывать сообщения. Я предполагаю, что в этом случае 1 поток был выделен для чтения сообщений из очереди, а 2 слушателя были назначены по 1 потоку каждому.

@Configuration
public class SqsListenerConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "aws.configuration")
    public ClientConfiguration clientConfiguration() {
        return new ClientConfiguration();
    }


    @Bean
    @Primary
    public AWSCredentialsProvider awsCredentialsProvider() {

        ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
        try {
            credentialsProvider.getCredentials();
            System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
            System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());

        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                            "Please make sure that your credentials file is at the correct " +
                            "location (~/.aws/credentials), and is in valid format.",
                    e);
        }
        return credentialsProvider;
    }


    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard().
                withCredentials(awsCredentialsProvider()).
                withClientConfiguration(clientConfiguration()).
                build();
    }


    @Bean
    @ConfigurationProperties(prefix = "aws.queue")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
        simpleMessageListenerContainer.setMaxNumberOfMessages(10);
        simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
        return simpleMessageListenerContainer;
    }


    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
        QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
        return queueMessageHandler;
    }


    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setThreadNamePrefix("oaoQueueExecutor");
        executor.initialize();
        return executor;
    }


    @Bean
    public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
        return new QueueMessagingTemplate(amazonSQSAsync);
    }


}

Конфигурация слушателя

    @SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {

        System.out.println(" Data = " + serviceData + " MessageId = " + messageId);

        repository.execute(serviceData);
}

Вот билет, который я создал для этого вопроса - github.com/spring-cloud/spring-cloud-aws/issues/321

Punter Vicky 04.03.2019 20:11
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
11
1
6 126
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Устанавливая одинаковые corePoolSize и maximumPoolSize, вы создаете fixed-size thread pool. Очень хорошее объяснение правил задокументировано здесь

Установка maxPoolSize неявно позволяет отбрасывать задачи. Однако емкость очереди по умолчанию - Integer.MAX_VALUE, что для практических целей бесконечно.

Следует обратить внимание на то, что ThreadPoolTaskExecutor использует внизу ThreadPoolExecutor, который имеет несколько необычный подход к организации очередей, описанный в документы:

If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

Это означает, что maxPoolSize актуален только тогда, когда очередь заполнена, иначе количество потоков никогда не превысит corePoolSize. Например, если мы отправим задачи это никогда не завершится в пул потоков:

  • каждое из первых представлений corePoolSize запускает новую цепочку;
  • после этого все заявки попадают в очередь;
  • если очередь конечна и ее емкость исчерпана, каждое представление запускает новый поток, вплоть до maxPoolSize;
  • когда и пул, и очередь заполнены, новые заявки отклоняются.

Очередь - прочитать документы

Любой BlockingQueue может использоваться для передачи и хранения отправленных задач. Использование этой очереди влияет на размер пула:

  • Если выполняется менее corePoolSize потоков, Executor всегда предпочитает добавлять новый поток, а не ставить в очередь.
  • Если corePoolSize или несколько потоков запущены, Executor всегда предпочитает ставить запрос в очередь, а не добавлять новый поток.
  • Если запрос не может быть поставлен в очередь, создается новый поток, если только это будет превышать maximumPoolSize, и в этом случае задача будет быть отклоненным.

Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to be queued in cases where all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.)

  1. Если количество потоков меньше, чем у corePoolSize, создайте новый Тема для запуска новой задачи.
  2. Если количество потоков равно (или больше) corePoolSize, поставь задачу в очередь.
  3. Если очередь заполнена, а количество потоков меньше, чем maxPoolSize, создайте новый поток для запуска задач.
  4. Если очередь заполнена, а количество потоков больше или равно maxPoolSize, отклонить задачу.

Спасибо @Vikram Palakurthi. В моем примере я не установил емкость очереди. Так что я считаю, что это бесконечность. Следовательно, maxpoolsize, как я полагаю, не имеет значения. Я пробовал поместить около 10 сообщений в очередь SQS FIFO. Поскольку для corepoolsize установлено значение 1, я предполагал, что потребитель будет обрабатывать сообщения одно за другим. Но потребитель не получает сообщения. Однако, когда я установил его на 2, сообщения читаются. Всегда ли один поток предназначен для опроса сообщений, и, следовательно, если я установлю размер очереди на 2, вторые сообщения будут выполнять обработку?

Punter Vicky 30.04.2018 17:08

Нет, не для опроса, но я думаю, что в вашем случае executor.setCorePoolSize (1); executeor.setMaxPoolSize (1); 4-е правило применяется, и задача отклоняется. Если вы видите порядок правил, я несколько предполагаю, что исполнитель обнаруживает, что потоки больше, чем maxPoolSize, который равен 1, и отклоняет задачи, что объясняет, почему обработка работает, когда вы увеличиваете его до 2.

Vikram Palakurthi 30.04.2018 18:23

Установив одинаковые значения corePoolSize и maximumPoolSize, вы создали пул потоков фиксированного размера, не оставляя ему возможности динамически выделять растущие потоки. Попробуйте удалить эту часть executor.setMaxPoolSize (1); и посмотрим, как получится.

Vikram Palakurthi 30.04.2018 18:27

Я предполагаю, что maxpoolsize срабатывает только в том случае, если достигается емкость очереди. Об этом даже говорится в вашем ответе - «Это означает, что maxPoolSize актуален только тогда, когда очередь заполнена». В моем случае я просто отправляю 4-5 сообщений в очередь для тестирования. Я также попытался увеличить maxpoolsize до 1. Но из-за этой конфигурации эффекта нет. Только если я увеличу размер основного пула до большего, чем 1, это сработает.

Punter Vicky 30.04.2018 19:06

Правильно, в вашем случае corePoolSize равен 1, так как у вас около 4-5 сообщений, maxPoolSize сработает. А поскольку вы указали как corePoolSize, так и maxPoolSize как 1, что вызвало проблему. Вы упомянули, что пытались увеличить maxPoolSize до 1, что не решит вашу проблему, пытаясь увеличить его до значения больше 1, например 2-5.

Vikram Palakurthi 30.04.2018 20:19

Извини, Викрам, я увеличил его до 10. Это была опечатка.

Punter Vicky 30.04.2018 23:10

Я думаю, что максимальный размер пула сработает только тогда, когда будет достигнута емкость очереди. В этом случае емкость очереди не ограничена, поэтому я предполагаю, что максимальный размер пула не вступит в силу.

Punter Vicky 30.04.2018 23:11

Другие вопросы по теме