Предположим, приложение Spring Cloud Stream создает KStream из order topic. Интересуется OrderCreated {"id":x, "productId": y, "customerId": z} событиями. Как только один приходит, он обрабатывает его и генерирует выходное событие OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z} для того же order topic.
Проблема, с которой я столкнулся, заключается в том, что, поскольку приложение Kafka Stream читает и записывает из/в одну и ту же тему, оно пытается обрабатывать свои собственные записи, что не имеет смысла.
Как я могу запретить этому приложению обрабатывать генерируемые им события?
Обновлено: Как отмечают Артем Билан и собычако, я думал об использовании KStream.filter(), но есть некоторые детали, которые заставляют меня сомневаться, как с этим бороться:
Прямо сейчас приложение KStream выглядит так:
interface ShippingKStreamProcessor {
...
@Input("order")
fun order(): KStream<String, OrderCreated>
@Output("output")
fun output(): KStream<String, OrderShipped>
Конфигурация KStream
@StreamListener
@SendTo("output")
fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {
Привязки порядка и вывода указывают на тему заказа как на место назначения.
Класс OrderCreated:
data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
constructor() : this(null, null, null)
}
Класс OrderShiped
data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
constructor() : this(null, null, null, null)
}
Я использую JSON в качестве формата сообщения, поэтому сообщения выглядят так:
{"id":1, "productId": 7,"customerId": 20}{"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}Я ищу лучший способ отфильтровать нежелательные сообщения, учитывая это:
Если я просто использую KStream.filter() прямо сейчас, когда я получу {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}, мой KStream<Int, OrderCreated> распараллелит событие OrderShipped как объект OrderCreated с некоторыми нулевыми полями: OrderCreated(id:1, productId: 7, customerId: null). Проверка пустых полей не кажется надежной.
возможное решение может состоять в том, чтобы добавить еще одно поле, eventType = OrderCreated|OrderShipped, ко всем типам сообщений/классов, которые используют эту тему. Даже в этом случае я бы получил класс OrderCreated (помните KStream< Int,OrderCreated >) с атрибутом eventType=OrderShipped. Это выглядит как уродливый обходной путь. Любая идея улучшить его?
Есть ли другой, более автоматический способ справиться с этим? Например, будет ли другой вид сериализации (АВРО?) препятствовать обработке сообщений, если они не соответствуют ожидаемой схеме (OrderCreated)? Этот способ поддержки нескольких схем (типов событий) в одной теме кажется хорошей практикой в соответствии с этой статьей: https://www.confluent.io/blog/put-several-event-types-kafka-topic/ Однако неясно, как распаковать/десериализовать разные типы.
Как упомянул @ArtemBilan, это должно быть чем-то, что можно контролировать с помощью filter. Если вы можете поделиться еще немного кода, мы можем посмотреть.
Я обновил вопрос с более подробной информацией

Вы можете использовать заголовки записей Kafka для хранения типа записи. См. КИП-82. Вы можете установить заголовки в ProducerRecord.
Обработка будет следующей:
stream типа KStream<Integer, Bytes> со значением serde Serdes.BytesSerde из темы.Используйте KStream#transformValues() для фильтрации и создания объектов. В частности, в transformValues() вы можете получить доступ к ProcessorContext, который дает вам доступ к заголовкам записей, которые содержат информацию о типе записи. Потом:
OrderShipped, вернуть null.OrderCreated из объекта Bytes и верните его.Для решения с AVRO вы можете взглянуть на следующие документы
Я думаю, что ваше решение может работать нормально, но у меня есть сомнения по этому поводу, transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier, java.lang.String... stateStoreNames) - это операция с отслеживанием состояния, и она ожидает хранилище состояний имена в методе преобразования, нужно ли передавать имя магазина или его можно не указывать...
Хранилище состояний может быть опущено.
Я принял ответ Бруно как правильный способ решить эту проблему. Однако я думаю, что придумал более простой/логичный способ, используя иерархию событий, аннотированных JsonTypeInfo.
Сначала вам нужен базовый класс для событий Order и укажите все подклассы. Обратите внимание, что в документ JSON будет добавлено свойство типа, которое поможет Джексону маршалировать/демаршалировать DTO:
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent
data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
constructor() : this(null, null, null)
}
data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
constructor() : this(null, null, null, null)
}
При этом производитель объектов OrderCreatedEvent будет генерировать следующее сообщение:
key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1}
Теперь очередь KStream. Я изменил подпись на KStream<Int, OrderEvent>, так как она может получать события OrderCreatedEvent или OrderShippedEvent. В следующих двух строчках...
orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
... Я фильтрую, чтобы сохранить только сообщения класса OrderCreatedEvent, и сопоставляю их, чтобы преобразовать KStream<Int, OrderEvent> в KStream<Int, OrderCreatedEvent>
Полная логика KStream:
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
.selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
.join(customerTable, { orderIt, customer ->
OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
}, Joined.with(intSerde, orderCreatedSerde, customerSerde))
.selectKey { _, value -> value.id }
//.to("order", Produced.with(intSerde, orderShippedSerde))
}
После этого процесса я генерирую новое сообщение key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"} в теме заказа, но оно будет отфильтровано потоком.
Это отлично работает для меня, если известны все типы событий. Однако есть предложения, как обрабатывать/игнорировать события неизвестного типа? Например, может быть OrderDeletedEvent, который служба поддержки клиентов должна игнорировать, поскольку он не имеет значения. В этом случае я не хочу включать дополнительный, ненужный класс OrderDeletedEvent. Одним из решений было бы зарегистрировать ошибку и продолжить обработку, настроив org.apache.kafka.streams.errors.LogAndContinueExceptionHandler, но это означает, что все исключения десериализации игнорируются, что может быть нежелательно.
Почему
KStream.filter()у вас не работает? Поскольку все находится в теме Кафки, этиOrderShippedпо-прежнему будут доступны для других потребителей в этой теме.