AWS SQS Listener - автоматический выключатель

Я создал прослушиватель SQS, который опрашивает сообщения из SQS и отправляет сообщения в службу REST. Если служба REST не работает, я не хочу извлекать сообщения из очереди, пока она не будет готова. Я использую весеннее облако для чтения сообщений через SQS. Есть ли примеры, как этого можно достичь?

Я настроил очередь недоставленных писем для этой очереди. Я не хочу, чтобы все сообщения попадали в DLQ, если серверная служба не работает.

@Configuration
public class SqsListenerConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "aws.configuration")
    public ClientConfiguration clientConfiguration() {
        return new ClientConfiguration();
    }


    @Bean
    @Primary
    public AWSCredentialsProvider awsCredentialsProvider() {

        ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
        try {
            credentialsProvider.getCredentials();
            System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
            System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());

        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                            "Please make sure that your credentials file is at the correct " +
                            "location (~/.aws/credentials), and is in valid format.",
                    e);
        }
        return credentialsProvider;
    }


    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard().
                withCredentials(awsCredentialsProvider()).
                withClientConfiguration(clientConfiguration()).
                build();
    }


    @Bean
    @ConfigurationProperties(prefix = "aws.queue")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
        simpleMessageListenerContainer.setMaxNumberOfMessages(10);
        simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
        return simpleMessageListenerContainer;
    }


    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
        QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
        return queueMessageHandler;
    }


    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setThreadNamePrefix("oaoQueueExecutor");
        executor.initialize();
        return executor;
    }


    @Bean
    public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
        return new QueueMessagingTemplate(amazonSQSAsync);
    }


}

Слушатель

    @SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {

        System.out.println(" Data = " + serviceData + " MessageId = " + messageId);

        repository.execute(serviceData);
}

Я считаю, что контракт для вашего метода onMessage заключается в том, что если он выдает исключение, сообщение не будет удалено. Или вы имеете в виду, что просто не хотите, чтобы onMethod выполнялся до тех пор, пока REST не будет готов?

Compass 01.05.2018 16:36

Да, это правильно. Он становится доступным по истечении тайм-аута видимости. Да - я хотел бы остановить очередь опроса для сообщений, когда я знаю, что я не могу обработать сообщение из-за того, что одна из систем, от которых я зависим, недоступна.

Punter Vicky 01.05.2018 16:38

Вы думали об этом без Spring? Обычно у вас просто цикл опроса SQS, и для каждого сообщения вы должны решить удалить его. Если другая служба не работает, просто не удаляйте ее и прекращайте опрос в течение некоторого времени. Я предполагаю, что это тот же или меньший код, чем вы показываете выше.

stdunbar 01.05.2018 16:51

Спасибо @stdunbar. Да, я бы хотел использовать для этого библиотеки Spring. Однако я не могу найти способ использовать любую конфигурацию Spring, которая может быть доступна для управления опросом,

Punter Vicky 01.05.2018 16:55
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
4
577
0

Другие вопросы по теме