Я пытаюсь создать сообщения о надгробии в сжатой теме Kafka со схемой Avro, используя Scala (v2.13.10) и библиотеку FS2 Kafka (v3.0.0-M8) с модулем Vulcan.
Приложение использует тему A и создает надгробную плиту для той же темы A для значений, которые соответствуют некоторому условию.
Пример фрагмента
val producerSettings =
ProducerSettings(
keySerializer = keySerializer,
valueSerializer = Serializer.unit[IO]
).withBootstrapServers("localhost:9092")
def processRecord(committableRecord: CommittableConsumerRecord[IO, KeySchema, ValueSchema]
, producer: KafkaProducer.Metrics[IO, KeySchema, Unit]
): IO[CommittableOffset[IO]] = {
val key = committableRecord.record.key
val value = committableRecord.record.value
if (value.filterColumn.field1 == "<removable>") {
val tombStone = ProducerRecord(committableRecord.record.topic, key, ())
val producerRecord: ProducerRecords[CommittableOffset[IO], KeySchema, Unit] = ProducerRecords.one(tombStone, committableRecord.offset)
producer.produce(producerRecord).flatten.map(_.flatMap(a => {
IO(a.passthrough)
}))
}
else
IO(committableRecord.offset)
}
Приведенный выше фрагмент отлично работает, если я создаю действительное сообщение о значении ключа. Однако я получаю следующую ошибку, когда пытаюсь создать нулевые/пустые сообщения:
java.lang.IllegalArgumentException: Invalid Avro record: bytes is null or empty
at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$4(AvroDeserializer.scala:32)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at mapN @ fs2.kafka.KafkaProducerConnection$$anon$1.withSerializersFrom(KafkaProducerConnection.scala:141)
at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:184)
at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:265)
at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$26(KafkaConsumer.scala:267)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at mapN @ fs2.kafka.KafkaProducerConnection$$anon$1.withSerializersFrom(KafkaProducerConnection.scala:141)
Пример схемы Avro:
{
"type": "record",
"name": "SampleOrder",
"namespace": "com.myschema.global",
"fields": [
{
"name": "cust_id",
"type": "int"
},
{
"name": "month",
"type": "int"
},
{
"name": "expenses",
"type": "double"
},
{
"name": "filterColumn",
"type": {
"type": "record",
"name": "filterColumn",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "field1",
"type": "string"
}
]
}
}
]
}
Заранее спасибо.
Я пробовал разные сериализаторы для производителя, но все они приводят к одному и тому же исключению.
Во-первых, производитель будет использовать сериализатор, но в трассировке стека указано, что десериализатор. Если ваши ключи не являются Avro, вам не нужна схема Avro для отправки нулевых значений в тему. Используйте ByteArraySerializer и просто отправьте нулевое значение...
Но это похоже на ошибку. Если входящий ключ/значение записи имеет значение null, он должен return null
, а не явно вызывать ошибку
Сравните с реализацией Confluent
Привет @OneCricketeer, это было на высоте! проблема в том, что я потребляю из той же темы, для которой я создаю надгробие, поэтому, как только мой потребитель достигает созданного мной надгробия, он не может десериализовать нулевое значение. И полностью согласен с предложенным решением. Это значительно упростит обработку потребления нулевых записей. Спасибо вам за помощь :)