Как игнорировать некоторые виды сообщений в приложении Kafka Streams, которое читает и записывает разные типы событий из одной темы

Предположим, приложение Spring Cloud Stream создает KStream из order topic. Интересуется OrderCreated {"id":x, "productId": y, "customerId": z} событиями. Как только один приходит, он обрабатывает его и генерирует выходное событие OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z} для того же order topic.

Проблема, с которой я столкнулся, заключается в том, что, поскольку приложение Kafka Stream читает и записывает из/в одну и ту же тему, оно пытается обрабатывать свои собственные записи, что не имеет смысла.

Как я могу запретить этому приложению обрабатывать генерируемые им события?

Обновлено: Как отмечают Артем Билан и собычако, я думал об использовании KStream.filter(), но есть некоторые детали, которые заставляют меня сомневаться, как с этим бороться:

Прямо сейчас приложение KStream выглядит так:

interface ShippingKStreamProcessor {
    ...
    @Input("order")
    fun order(): KStream<String, OrderCreated>

    @Output("output")
    fun output(): KStream<String, OrderShipped>

Конфигурация KStream

    @StreamListener
    @SendTo("output")
    fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {

Привязки порядка и вывода указывают на тему заказа как на место назначения.

Класс OrderCreated:

data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
    constructor() : this(null, null, null)
}

Класс OrderShiped

data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
    constructor() : this(null, null, null, null)
}

Я использую JSON в качестве формата сообщения, поэтому сообщения выглядят так:

  • INPUT - Заказ Создан: {"id":1, "productId": 7,"customerId": 20}
  • ВЫВОД - Заказ отправлен: {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}

Я ищу лучший способ отфильтровать нежелательные сообщения, учитывая это:

Если я просто использую KStream.filter() прямо сейчас, когда я получу {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}, мой KStream<Int, OrderCreated> распараллелит событие OrderShipped как объект OrderCreated с некоторыми нулевыми полями: OrderCreated(id:1, productId: 7, customerId: null). Проверка пустых полей не кажется надежной.

возможное решение может состоять в том, чтобы добавить еще одно поле, eventType = OrderCreated|OrderShipped, ко всем типам сообщений/классов, которые используют эту тему. Даже в этом случае я бы получил класс OrderCreated (помните KStream< Int,OrderCreated >) с атрибутом eventType=OrderShipped. Это выглядит как уродливый обходной путь. Любая идея улучшить его?

Есть ли другой, более автоматический способ справиться с этим? Например, будет ли другой вид сериализации (АВРО?) препятствовать обработке сообщений, если они не соответствуют ожидаемой схеме (OrderCreated)? Этот способ поддержки нескольких схем (типов событий) в одной теме кажется хорошей практикой в ​​соответствии с этой статьей: https://www.confluent.io/blog/put-several-event-types-kafka-topic/ Однако неясно, как распаковать/десериализовать разные типы.

Почему KStream.filter() у вас не работает? Поскольку все находится в теме Кафки, эти OrderShipped по-прежнему будут доступны для других потребителей в этой теме.

Artem Bilan 17.04.2019 17:51

Как упомянул @ArtemBilan, это должно быть чем-то, что можно контролировать с помощью filter. Если вы можете поделиться еще немного кода, мы можем посмотреть.

sobychacko 17.04.2019 18:03

Я обновил вопрос с более подробной информацией

codependent 17.04.2019 18:27
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
2
3
3 121
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Вы можете использовать заголовки записей Kafka для хранения типа записи. См. КИП-82. Вы можете установить заголовки в ProducerRecord.

Обработка будет следующей:

  1. Прочтите stream типа KStream<Integer, Bytes> со значением serde Serdes.BytesSerde из темы.
  2. Используйте KStream#transformValues() для фильтрации и создания объектов. В частности, в transformValues() вы можете получить доступ к ProcessorContext, который дает вам доступ к заголовкам записей, которые содержат информацию о типе записи. Потом:

    • Если тип OrderShipped, вернуть null.
    • В противном случае создайте объект OrderCreated из объекта Bytes и верните его.

Для решения с AVRO вы можете взглянуть на следующие документы

Я думаю, что ваше решение может работать нормально, но у меня есть сомнения по этому поводу, transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier, java.lang.String... stateStoreNames) - это операция с отслеживанием состояния, и она ожидает хранилище состояний имена в методе преобразования, нужно ли передавать имя магазина или его можно не указывать...

codependent 20.04.2019 14:08

Хранилище состояний может быть опущено.

Bruno Cadonna 20.04.2019 18:28

Я принял ответ Бруно как правильный способ решить эту проблему. Однако я думаю, что придумал более простой/логичный способ, используя иерархию событий, аннотированных JsonTypeInfo.

Сначала вам нужен базовый класс для событий Order и укажите все подклассы. Обратите внимание, что в документ JSON будет добавлено свойство типа, которое поможет Джексону маршалировать/демаршалировать DTO:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
    JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
    JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent

data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
    constructor() : this(null, null, null)
}

data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
    constructor() : this(null, null, null, null)
}

При этом производитель объектов OrderCreatedEvent будет генерировать следующее сообщение:

key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1}

Теперь очередь KStream. Я изменил подпись на KStream<Int, OrderEvent>, так как она может получать события OrderCreatedEvent или OrderShippedEvent. В следующих двух строчках...

orderEvent.filter { _, value -> value is OrderCreatedEvent }
                .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }

... Я фильтрую, чтобы сохранить только сообщения класса OrderCreatedEvent, и сопоставляю их, чтобы преобразовать KStream<Int, OrderEvent> в KStream<Int, OrderCreatedEvent>

Полная логика KStream:

@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = JsonSerde<Customer>(Customer::class.java)
        val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)

        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce({ _, y -> y }, stateStore)


        return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
                .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
                .selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
                .join(customerTable, { orderIt, customer ->
                    OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
                }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
                .selectKey { _, value -> value.id }
                //.to("order", Produced.with(intSerde, orderShippedSerde))
    }

После этого процесса я генерирую новое сообщение key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"} в теме заказа, но оно будет отфильтровано потоком.

Это отлично работает для меня, если известны все типы событий. Однако есть предложения, как обрабатывать/игнорировать события неизвестного типа? Например, может быть OrderDeletedEvent, который служба поддержки клиентов должна игнорировать, поскольку он не имеет значения. В этом случае я не хочу включать дополнительный, ненужный класс OrderDeletedEvent. Одним из решений было бы зарегистрировать ошибку и продолжить обработку, настроив org.apache.kafka.streams.errors.LogAndContinueExceptionHandl‌​er, но это означает, что все исключения десериализации игнорируются, что может быть нежелательно.

puhlerblet 03.07.2020 13:53

Другие вопросы по теме