Я создал прослушиватель SQS, который опрашивает сообщения из SQS и отправляет сообщения в службу REST. Если служба REST не работает, я не хочу извлекать сообщения из очереди, пока она не будет готова. Я использую весеннее облако для чтения сообщений через SQS. Есть ли примеры, как этого можно достичь?
Я настроил очередь недоставленных писем для этой очереди. Я не хочу, чтобы все сообщения попадали в DLQ, если серверная служба не работает.
@Configuration
public class SqsListenerConfiguration {
@Bean
@ConfigurationProperties(prefix = "aws.configuration")
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
try {
credentialsProvider.getCredentials();
System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e);
}
return credentialsProvider;
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard().
withCredentials(awsCredentialsProvider()).
withClientConfiguration(clientConfiguration()).
build();
}
@Bean
@ConfigurationProperties(prefix = "aws.queue")
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("oaoQueueExecutor");
executor.initialize();
return executor;
}
@Bean
public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}
}
Слушатель
@SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
System.out.println(" Data = " + serviceData + " MessageId = " + messageId);
repository.execute(serviceData);
}
Да, это правильно. Он становится доступным по истечении тайм-аута видимости. Да - я хотел бы остановить очередь опроса для сообщений, когда я знаю, что я не могу обработать сообщение из-за того, что одна из систем, от которых я зависим, недоступна.
Вы думали об этом без Spring? Обычно у вас просто цикл опроса SQS, и для каждого сообщения вы должны решить удалить его. Если другая служба не работает, просто не удаляйте ее и прекращайте опрос в течение некоторого времени. Я предполагаю, что это тот же или меньший код, чем вы показываете выше.
Спасибо @stdunbar. Да, я бы хотел использовать для этого библиотеки Spring. Однако я не могу найти способ использовать любую конфигурацию Spring, которая может быть доступна для управления опросом,
Я считаю, что контракт для вашего метода
onMessage
заключается в том, что если он выдает исключение, сообщение не будет удалено. Или вы имеете в виду, что просто не хотите, чтобыonMethod
выполнялся до тех пор, пока REST не будет готов?