Как правильно использовать коннектор приемника Spark -> Kafka -> JDBC с Avro?

У меня есть простое приложение Spark, генерирующее сообщения Kafka с помощью

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.avro.functions.to_avro
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"

    val parquet_schema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val df = spark.readStream
      .schema(parquet_schema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*")).alias("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

У меня есть схема Avro в реестре Apicurio, созданная

curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm-iot' \
--data '{
    "type": "record",
    "namespace": "com.hongbomiao",
    "name": "hm.motor",
    "fields": [
        {
            "name": "timestamp",
            "type": "long"
        },
        {
            "name": "current",
            "type": "double"
        },
        {
            "name": "voltage",
            "type": "double"
        },
        {
            "name": "temperature",
            "type": "double"
        }
    ]
}'

Я пытаюсь использовать конечную точку REST API, совместимую с Apicurio Registry. В настоящее время используется Content ID 26 для получения

curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6/schemas/ids/26' \
  --header 'Content-type: application/json; artifactType=AVRO' \
  --header 'X-Registry-ArtifactId: hm-iot'

который печатает

{
    "schema": "{\n    \"type\": \"record\",\n    \"namespace\": \"com.hongbomiao\",\n    \"name\": \"hm.motor\",\n    \"fields\": [\n        {\n            \"name\": \"timestamp\",\n            \"type\": \"long\"\n        },\n        {\n            \"name\": \"current\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"voltage\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"temperature\",\n            \"type\": \"double\"\n        }\n    ]\n}",
    "references": []
}

что выглядит хорошо.

На основе документа JDBC-коннектора Айвена я написал конфигурацию коннектора приемника JDBC:

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "hm.motor",
        "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
        "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
        "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",

        "insert.mode": "upsert",

        "table.name.format": "motor",

        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6",

        "transforms": "convertTimestamp",
        "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTimestamp.field": "timestamp",
        "transforms.convertTimestamp.target.type": "Timestamp"
    }
}

Однако я получил эту ошибку в своем журнале Kafka Connect.

2023-05-01 19:01:11,291 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] WorkerSinkTask{id=hm-motor-jdbc-sink-kafka-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-hm-motor-jdbc-sink-kafka-connector-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hm.motor to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -1330532454
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:253)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:372)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:203)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    ... 18 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: No content with id/hash 'contentId--1330532454' was found.; error code: 40403
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:853)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:826)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:311)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:433)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:361)
    ... 21 more

Он пытается получить Content ID -1330532454, но, очевидно, у меня его нет. Мой в 26. Как JDBC ищет соответствующую схему AVRO?

Я не уверен, как это отображается сейчас. Я думал, что он будет искать схему под названием hm.motor, основанную на теме Кафки, но оказалось, что это не так.

Спасибо!


ОБНОВЛЕНИЕ 1

Спасибо @Ftisiot!

Я нашел документ о сериализаторах и десериализаторах Kafka.

Сериализаторы и десериализаторы Kafka по умолчанию используют <topicName>-key и <topicName>-value в качестве соответствующего имени субъекта при регистрации или извлечении схемы.

Также для value.converter.value.subject.name.strategy по умолчанию используется io.confluent.kafka.serializers.subject.TopicNameStrategy.

Я обновил имя схемы Avro на hm.motor-value, но все равно получил ту же ошибку.

Пожалуйста, уточните, как вы производите данные. Отрицательное значение будет означать, что ваш Avro закодирован неправильно, что означает, что проблема заключается в вашем коде производителя, а не в соединителе/конвертере/реестре. Кроме того, производитель должен автоматически регистрировать любую схему, а не регистрировать ее вручную.

OneCricketeer 02.05.2023 00:18

@OneCricketeer спасибо, я добавил, как я сгенерировал в своем вопросе, и немного почистил. Ах, я вроде как понимаю, что вы имеете в виду под автоматической регистрацией схемы, но в таком случае, кто предоставляет начальную схему с использованием AVRO? Я думал, что мне нужно обеспечить.

Hongbo Miao 02.05.2023 03:58

По моему опыту, производитель всегда определяет схему. Другими словами, если «кто-то другой» регистрирует схему, а «производитель» использует другую схему, тогда это не имеет смысла. Упомянутая ниже функция toConfluentAvro должна регистрировать схему на основе схемы Spark Dataframe, но с тех пор, как я исследовал ее, прошло несколько лет.

OneCricketeer 03.05.2023 00:12
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
3
194
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Я полагаю, что имя схемы по умолчанию будет конкатенацией имени темы и либо -value, либо -key в зависимости от части сообщения, которое вы декодируете.

Поэтому в вашем случае я бы попробовал имя схемы hm.motor-value.

В этом видео вы можете проверить автоматически сгенерированные имена схем при кодировании из json в avro с помощью flink.

Отказ от ответственности: я работаю на Айвен, и мы должны обновить документы, чтобы отразить это.

Спасибо @Ftisiot, я добавил больше информации в свое ОБНОВЛЕНИЕ 1, но все равно не повезло. Но думаю ближе! 😃

Hongbo Miao 02.05.2023 03:33

Это частично правильно, но не касается фактического исключения десериализатора Avro.

OneCricketeer 02.05.2023 04:22

Забудьте о Connect на минуту. Сначала вы должны отладить свою тему с помощью kafka-avro-console-consumer. Вы получите ту же ошибку, поскольку вашему производителю необходимо правильно закодировать данные.

to_avro Спарка этого не делает.

Смотрите toConfluentAvro функцию этой библиотеки - https://github.com/AbsaOSS/ABRIS

Подробнее о внутренностях https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

Что касается ваших проблем со схемой, name относится к полному имени класса Java, как определено в спецификации Avro, и не имеет никакого отношения к теме реестра при использовании TopicNameStategy.

Как называется этот предмет

Это параметр пути в вызове API POST /subjects/:name/versions/, используемый внутренними HTTP-клиентами Serializer и Deserializer.

Также упоминалось ранее, что Kafka Connect здесь не нужен. Spark может напрямую записывать в базы данных JDBC. Источником данных может быть Parquet или Kafka.

Спасибо @OneCricketeer, я наконец сделал это, написал в другом ответе! 😃

Hongbo Miao 05.05.2023 23:57
Ответ принят как подходящий

Спасибо всем за помощь, наконец-то я разобрался! Я постараюсь обобщить то, что я узнал.

1. Создание сообщения Kafka в формате Avro

На самом деле существует два основных типа данных Avro:

  • Сливающийся Авро
  • "Ваниль" Авро

1.1 [Успешно] Генерация данных Confluent Avro в Spark

Confluent Avro не является "ванильным" Avro , что доставляет некоторые неудобства для Spark и других инструментов.

Как отметил @OneCricketeer, существует библиотека ABRIS, помогающая генерировать сообщение Kafka в формате Confluent Avro (toConfluentAvro).

Сначала сгенерируйте схему Avro с помощью

curl --location 'http://confluent-schema-registry.svc:8081/subjects/hm.motor-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{
    "schema": "{\"type\": \"record\", \"name\": \"motor\", \"fields\":[{ \"name\": \"timestamp\", \"type\": \"long\"},{ \"name\": \"current\", \"type\": \"double\"},{ \"name\": \"voltage\", \"type\": \"double\"},{ \"name\": \"temperature\", \"type\": \"double\"}]}"
}'
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import za.co.absa.abris.avro.functions.to_avro
import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"

    val parquetSchema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val toAvroConfig: ToAvroConfig =
      AbrisConfig.toConfluentAvro.downloadSchemaByLatestVersion
        .andTopicNameStrategy("hm.motor")
        .usingSchemaRegistry(
          "http://confluent-schema-registry.svc:8081"
        )

    val df = spark.readStream
      .schema(parquetSchema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*"), toAvroConfig).as("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

build.sbt

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-avro" % "3.4.0" % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.463" % "provided",
  "za.co.absa" %% "abris" % "6.3.0"
)
ThisBuild / assemblyMergeStrategy := {
  // https://stackoverflow.com/a/67937671/2000548
  case PathList("module-info.class") => MergeStrategy.discard
  case x if x.endsWith("/module-info.class") => MergeStrategy.discard
  // https://stackoverflow.com/a/76129963/2000548
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x =>
    val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
    oldStrategy(x)
}

1.2 [Успешно] Создание «стандартных»/«ванильных» данных Apache Avro в Spark

Во-первых, я сгенерировал свою схему Varo,

curl --location 'http://apicurio-registry.svc:8080/apis/registry/v2/groups/default/artifacts' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm.motor-value' \
--data '{
    "type": "record",
    "namespace": "com.hongbomiao",
    "name": "motor",
    "fields": [
        {
            "name": "timestamp",
            "type": "long"
        },
        {
            "name": "current",
            "type": "double"
        },
        {
            "name": "voltage",
            "type": "double"
        },
        {
            "name": "temperature",
            "type": "double"
        }
    ]
}'

В Spark очень просто использовать его с нативным org.apache.spark.sql.avro.functions.to_avro.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.sql.avro.functions.to_avro
import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"

    // For below `parquet_schema`, you can
    //  1. hard code like current code
    //  2. read from one file `val parquet_schema = spark.read.parquet("s3a://hongbomiao-bucket/iot/motor.parquet").schema`
    //  3. Maybe possible also from Avro, I will try in future!
    val parquetSchema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val backend = HttpClientSyncBackend()
    val response = basicRequest
      .get(
        uri"http://apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts/hm.motor-value"
      )
      .send(backend)
    val kafkaRecordValueSchema = response.body.fold(identity, identity)

    val df = spark.readStream
      .schema(parquetSchema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*"), kafkaRecordValueSchema).alias("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

встроенный.sbt

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
  "org.apache.spark" %% "spark-avro" % "3.3.2" % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.461" % "provided",
  "com.softwaremill.sttp.client3" %% "core" % "3.8.15"
)

Много идей я почерпнул из этой статьи.

2. Чтение сообщения Kafka в формате Avro в JDBC Kafka Connector и переход в базу данных

2.1 Сообщение Кафки в Confluent Avro

[Успешно] 2.1.1 Использование io.confluent.connect.avro.AvroConverter с Confluent Registry

Здесь мы используем Confluent Registry REST API:

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "hm.motor",
        "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
        "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
        "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
        "insert.mode": "upsert",
        "table.name.format": "motor",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://confluent-schema-registry.svc:8081",
        "transforms": "convertTimestamp",
        "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTimestamp.field": "timestamp",
        "transforms.convertTimestamp.target.type": "Timestamp"
    }
}

2.1.2 Использование io.confluent.connect.avro.AvroConverter с реестром схемы Apicurio

Здесь мы используем совместимый с REST API Apicurio Registry:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://apicurio-registry.svc:8080/apis/ccompat/v6",

(дальше я не тестировал это направление)

2.1.3 Использование io.apicurio.registry.utils.converter.AvroConverter с реестром схемы Apicurio

Здесь мы используем совместимый с REST API Apicurio Registry:

"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/ccompat/v6",
"value.converter.apicurio.registry.as-confluent": true,

(дальше я не тестировал это направление)

2.3 [Успех] Сообщение Kafka в «ванильном» Apache Avro

Здесь мы используем io.apicurio.registry.utils.converter.AvroConverter.

Моя конфигурация коннектора JDBC:

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "hm.motor",
        "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
        "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
        "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
        "insert.mode": "upsert",
        "table.name.format": "motor",
        "transforms": "convertTimestamp",
        "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTimestamp.field": "timestamp",
        "transforms.convertTimestamp.target.type": "Timestamp",

        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/registry/v2"
        "value.converter.apicurio.registry.fallback.group-id": "hm-group",
        "value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
    }
}

Возможно в будущем я придумаю, как избавиться от value.converter.apicurio.registry.fallback связанных полей.

Более подробную информацию о io.apicurio.registry.utils.converter.AvroConverter можно найти на здесь.

Другие вопросы по теме