Канал ошибок весеннего облачного потока не работает

Обновлено:

Пример использования: получите запрос, обработайте его и уведомите кого-нибудь о завершении запроса. Поместите в другую очередь, если в процессе возникнет ошибка для повторной обработки позже.

Мне нужен API, который получает полезную нагрузку и отправляет ее в очередь (задания для обработки). Затем заберите сообщения и начните использовать эту полезную нагрузку для вызова другой службы отдыха и поместите ответ этого вызова в новую очередь (задания выполнены), после чего другая служба выберет эти сообщения и уведомит другую службу о завершении.

Таким образом, API POST сохраняет в очереди, забирает сообщение, обрабатывает его, сохраняет результат этого процесса в другой очереди.


У меня есть приложение с двумя привязками, одно для незанятых заданий, а другое для обработанных заданий. Приложение работает так, как задумано, но когда я генерирую исключение, после того, как spring пытается повторно обработать сообщение 3 раза, оно не отправляет сообщение в processing.default.errors, а выдает исключение:

Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'processed.default.errors'; nested exception is org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted

@Component
public interface JobBinding {

    String INPUT = "jobs-in";
    String OUTPUT = "jobs-out";

    @Input(INPUT)
    SubscribableChannel input();

    @Output(OUTPUT)
    MessageChannel output();
}

-

@Component
public interface ProcessedBinding {

    String INPUT = "processed-in";
    String OUTPUT = "processed-out";

    @Input(INPUT)
    SubscribableChannel input();

    @Output(OUTPUT)
    MessageChannel output();
}

Я включаю их оба:

@EnableBinding({JobBinding.class, ProcessedBinding.class})
public class BindingsConfiguration {
}

Затем обрабатываю задание:

@Component
@Slf4j
public class JobProcessor {

    @StreamListener(JobBinding.INPUT)
    @SendTo(ProcessedBinding.OUTPUT)
    public Message<Job> handleJob(@Payload Job job) {
        log.info("Processing {}", job);
        job.addMetadata("status", "processed");
        return MessageBuilder.withPayload(job).build();
    }
}

И тогда я к-бум:

@Component
@Slf4j
public class DoneProcessor {

    @StreamListener(ProcessedBinding.INPUT)
    public void handleJob(@Payload Job job) {
        log.info("Notifying {}", job);
        throw new RuntimeException("puff");
    }
}

Мой application.properties выглядит так:

spring.cloud.stream.bindings.jobs-out.destination=jobs
spring.cloud.stream.bindings.jobs-in.destination=jobs
spring.cloud.stream.bindings.jobs-in.group=default

spring.cloud.stream.bindings.processed-out.destination=processed
spring.cloud.stream.bindings.processed-in.destination=processed
spring.cloud.stream.bindings.processed-in.group=default

spring.cloud.stream.default.contentType=application/json

Я пробовал читать документы Spring Stream Cloud, но не смог найти решения. Очередь должна создаваться автоматически.

В вашей конфигурации много чего происходит, поэтому давайте попробуем исключить некоторые из них для ясности. Во-первых, вам не нужен @Component ни для одного из перечисленных вами компонентов Во-вторых, использование - в именах может привести к непредвиденным последствиям, учитывая, что - является зарезервированным символом в java, и даже если вы используете его в String, имейте в виду, что он используется нами (фреймворком) для множества вещей. Вместо этого используйте camelCase.

Oleg Zhurakousky 20.09.2018 06:40

Вам также не нужен ...default.contentType=application/json, поскольку он установлен по умолчанию.

Oleg Zhurakousky 20.09.2018 07:19

Кроме того, я не уверен, что понимаю, почему ваши точки входа / выхода в один и тот же пункт назначения. Возможно, вы сможете объяснить свой вариант использования (не дизайн) относительно того, что вам нужно сделать.

Oleg Zhurakousky 20.09.2018 07:55

@OlegZhurakousky, я обновил свой вопрос, спасибо за интерес.

jisuskraist 21.09.2018 01:29

@OlegZhurakousky, я думаю, может быть, мой вариант использования не подходит для весеннего облачного потока, и я должен использовать spring-rabbitmq только для рабочих в качестве рабочего.

jisuskraist 21.09.2018 04:24

Вполне возможно, что это возможно, но, не зная вашего варианта использования, невозможно сказать

Oleg Zhurakousky 24.09.2018 11:11
0
6
430
0

Другие вопросы по теме