Quarkus/Smallrye reactive kafka — ответ об успехе/отказе конечной точки от сообщения

Я хочу ответить на конечную точку REST ответом Success/Failure, который динамически принимает тему в качестве параметра запроса. В Quarkus с реактивным обменом сообщениями smallrye код будет выглядеть примерно так, как показано ниже, обертывая полезную нагрузку с помощью OutgoingKafkaRecordMetadata.

то есть https://myendpoint/publishToKafka?topic=myDynamicTopic

@Channel("test")
Emitter<byte []> kafkaEmitter;

@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){

    kafkaEmitter.send(Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
            .withKey("my-key")
            .withTopic("myDynamicTopic")
            .build()));
    
}

Из Quarkus документ «Если конечная точка не возвращает CompletionStage, ответ HTTP может быть записан до того, как сообщение будет отправлено в Kafka, поэтому пользователю не будет сообщено об ошибках». Пример здесь описывает этот процесс, когда вы отправляете полезную нагрузку напрямую (т. е. emitter.send(полезная нагрузка), которая возвращает CompletionStage, но emitter.send(message) возвращает void), но для этого требуется предварительная настройка темы. Можно ли указать метаданные с сообщением и по-прежнему отвечать вызывающему клиенту ответом об успехе/неуспехе? (Я не против, если это с Emitter и CompletionStage или MunityEmitter и Uni).

Любые советы или предложения будут оценены.

Действительно ли клиенту необходимо знать, было ли сообщение успешно доставлено брокеру? Обычно мы используем систему обмена сообщениями для асинхронной связи, поэтому обработка ошибок должна беспокоить не клиента, а серверную часть.

Turing85 20.03.2022 10:48

Я могу придумать пару сценариев, где это было бы полезно. Если есть проблема с конфигурацией темы (например, безопасность, неправильное имя, кафка в автономном режиме), клиент продолжит работу, не зная об ошибке, поскольку он получил 200 или 204 от сервера и будет думать, что он успешно фиксирует сообщения. Если состояние управляется из внешнего источника (например, базы данных), клиент хотел бы знать, что данные были успешно активированы, прежде чем обновлять его состояние (я знаю, что друзья не позволяют друзьям выполнять двойную запись, но это не всегда возможно, т.е. в идеале что-то как дебезиум и шаблон исходящих)

thedartfish 20.03.2022 11:24

Идея шаблон исходящих сообщений (https://microservices.io/) заключается в использовании второй транзакции. Важная часть второй транзакции заключается в том, что она не изменяет исходящий ящик (только читает его) и взаимодействует только с исходящей темой. Следовательно, «извне» мы не можем (легко) сказать, успешна ли вторая транзакция или нет. Итак, как мы можем «узнать», когда мы можем отправить ответ клиенту?

Turing85 20.03.2022 12:01

Если вы используете Emitter.send(Message), вы можете добавить обработчик ack/nack к объекту сообщения. В этом обработчике вы можете завершить CompletionStage, который вы возвращаете из метода ресурса.

Ladicek 20.03.2022 13:25

@Ladicek это правда и продемонстрировано в примере, который я привел в своем вопросе, но ответ HTTP возвращается клиенту до подтверждения подтверждения. Мне нужно подтверждение успеха/неудачи публикации сообщения, отправленного обратно клиенту.

thedartfish 20.03.2022 13:35

Хорошо, позвольте мне быть немного более конкретным. Вам нужно создать new CompletableFuture(), затем вызвать emitter.send с Message, который включает обработчик ack/nack, а затем return с CompletableFuture. В этот момент будущее еще не завершено, поэтому ответ не будет отправлен. В обработчике ack/nack вы завершаете будущее, и тогда ответ будет отправлен.

Ladicek 20.03.2022 14:55

Спасибо @Ladicek Теперь я понимаю, что вы имеете в виду! Клемент также предоставил пример фрагмента кода ниже. Ваше здоровье.

thedartfish 20.03.2022 23:40
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
7
68
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Поскольку вы используете сообщение (поскольку вам нужно указать тему), вам нужно что-то более запутанное:

@Channel("test")
Emitter<byte []> kafkaEmitter;

@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){
    CompletableFuture<Void> future = new CompletableFuture<>();
    Message<byte[]> message = Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata. 
           <String>builder()
            .withKey("my-key")
            .withTopic("myDynamicTopic")
            .build()));
    message = message.withAck(() -> {
         future.complete(null));
         return CompleteableFuture.completedFuture(null);
    }
     .withNack(t -> {
       future.completeExceptionnaly(t));
       return CompleteableFuture.completedFuture(null);
    });
    kafkaEmitter.send(message);
    return future;    
}

В этом фрагменте я также прикрепляю обработчики ack и nack, вызываемые, когда сообщение либо подтверждено (принято брокером), либо отклонено (произошло что-то не так).

Эти обратные вызовы сообщают future, CompletableFuture, созданному в методе. Это возвращаемый объект, так как он будет делать то, что вы хотите: указывать результат.

Я знаю, что обратные вызовы немного сложны. В основном это связано со спецификацией: мы должны return CompleteableFuture.completedFuture(...); признать, что nack-процесс прошел успешно. Если бы вместо этого мы использовали return future; (которое мы установили на future.completeExceptionnaly(t));), это было бы интерпретировано как сбой во время процесса nack. По сути, это будет эквивалентно throw внутри catch-блока в императивном мире.

К счастью, будет доступна более легкая версия скороспелый (не беспокойтесь, мы не сломаемся).

Спасибо @Clement и Turing85, теперь стало намного яснее, я искренне ценю ваш вклад. Я нашел управление обратными вызовами с помощью CompletionStage немного запутанным, поэтому, увидев его в коде, я определенно прояснил ситуацию. Я буду следить за «более легкой» версией Клемента, поэтому я обязательно настроюсь на отличные сеансы в блогах и на YouTube, которые я видел от вас и сообщества RedHat. Спасибо еще раз.

thedartfish 20.03.2022 23:57

Другие вопросы по теме