Пример использования: получите запрос, обработайте его и уведомите кого-нибудь о завершении запроса. Поместите в другую очередь, если в процессе возникнет ошибка для повторной обработки позже.
Мне нужен API, который получает полезную нагрузку и отправляет ее в очередь (задания для обработки). Затем заберите сообщения и начните использовать эту полезную нагрузку для вызова другой службы отдыха и поместите ответ этого вызова в новую очередь (задания выполнены), после чего другая служба выберет эти сообщения и уведомит другую службу о завершении.
Таким образом, API POST сохраняет в очереди, забирает сообщение, обрабатывает его, сохраняет результат этого процесса в другой очереди.
У меня есть приложение с двумя привязками, одно для незанятых заданий, а другое для обработанных заданий. Приложение работает так, как задумано, но когда я генерирую исключение, после того, как spring пытается повторно обработать сообщение 3 раза, оно не отправляет сообщение в processing.default.errors, а выдает исключение:
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'processed.default.errors'; nested exception is org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
@Component
public interface JobBinding {
String INPUT = "jobs-in";
String OUTPUT = "jobs-out";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
-
@Component
public interface ProcessedBinding {
String INPUT = "processed-in";
String OUTPUT = "processed-out";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
Я включаю их оба:
@EnableBinding({JobBinding.class, ProcessedBinding.class})
public class BindingsConfiguration {
}
Затем обрабатываю задание:
@Component
@Slf4j
public class JobProcessor {
@StreamListener(JobBinding.INPUT)
@SendTo(ProcessedBinding.OUTPUT)
public Message<Job> handleJob(@Payload Job job) {
log.info("Processing {}", job);
job.addMetadata("status", "processed");
return MessageBuilder.withPayload(job).build();
}
}
И тогда я к-бум:
@Component
@Slf4j
public class DoneProcessor {
@StreamListener(ProcessedBinding.INPUT)
public void handleJob(@Payload Job job) {
log.info("Notifying {}", job);
throw new RuntimeException("puff");
}
}
Мой application.properties выглядит так:
spring.cloud.stream.bindings.jobs-out.destination=jobs
spring.cloud.stream.bindings.jobs-in.destination=jobs
spring.cloud.stream.bindings.jobs-in.group=default
spring.cloud.stream.bindings.processed-out.destination=processed
spring.cloud.stream.bindings.processed-in.destination=processed
spring.cloud.stream.bindings.processed-in.group=default
spring.cloud.stream.default.contentType=application/json
Я пробовал читать документы Spring Stream Cloud, но не смог найти решения. Очередь должна создаваться автоматически.
Вам также не нужен ...default.contentType=application/json, поскольку он установлен по умолчанию.
Кроме того, я не уверен, что понимаю, почему ваши точки входа / выхода в один и тот же пункт назначения. Возможно, вы сможете объяснить свой вариант использования (не дизайн) относительно того, что вам нужно сделать.
@OlegZhurakousky, я обновил свой вопрос, спасибо за интерес.
@OlegZhurakousky, я думаю, может быть, мой вариант использования не подходит для весеннего облачного потока, и я должен использовать spring-rabbitmq только для рабочих в качестве рабочего.
Вполне возможно, что это возможно, но, не зная вашего варианта использования, невозможно сказать
В вашей конфигурации много чего происходит, поэтому давайте попробуем исключить некоторые из них для ясности. Во-первых, вам не нужен
@Componentни для одного из перечисленных вами компонентов Во-вторых, использование-в именах может привести к непредвиденным последствиям, учитывая, что-является зарезервированным символом в java, и даже если вы используете его в String, имейте в виду, что он используется нами (фреймворком) для множества вещей. Вместо этого используйтеcamelCase.