У меня есть простое приложение Spark, генерирующее сообщения Kafka с помощью
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.avro.functions.to_avro
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
object IngestFromS3ToKafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ingest-from-s3-to-kafka")
.config("spark.ui.port", "4040")
.getOrCreate()
val folderPath = "s3a://hongbomiao-bucket/iot/"
val parquet_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val df = spark.readStream
.schema(parquet_schema)
.option("maxFilesPerTrigger", 1)
.parquet(folderPath)
.withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
.select(to_avro(struct("*")).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "hm.motor")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
query.awaitTermination()
}
}
У меня есть схема Avro в реестре Apicurio, созданная
curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm-iot' \
--data '{
"type": "record",
"namespace": "com.hongbomiao",
"name": "hm.motor",
"fields": [
{
"name": "timestamp",
"type": "long"
},
{
"name": "current",
"type": "double"
},
{
"name": "voltage",
"type": "double"
},
{
"name": "temperature",
"type": "double"
}
]
}'
Я пытаюсь использовать конечную точку REST API, совместимую с Apicurio Registry. В настоящее время используется Content ID 26 для получения
curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6/schemas/ids/26' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm-iot'
который печатает
{
"schema": "{\n \"type\": \"record\",\n \"namespace\": \"com.hongbomiao\",\n \"name\": \"hm.motor\",\n \"fields\": [\n {\n \"name\": \"timestamp\",\n \"type\": \"long\"\n },\n {\n \"name\": \"current\",\n \"type\": \"double\"\n },\n {\n \"name\": \"voltage\",\n \"type\": \"double\"\n },\n {\n \"name\": \"temperature\",\n \"type\": \"double\"\n }\n ]\n}",
"references": []
}
что выглядит хорошо.
На основе документа JDBC-коннектора Айвена я написал конфигурацию коннектора приемника JDBC:
{
"name": "hm-motor-jdbc-sink-kafka-connector",
"config": {
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"topics": "hm.motor",
"connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
"connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
"connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
"insert.mode": "upsert",
"table.name.format": "motor",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6",
"transforms": "convertTimestamp",
"transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTimestamp.field": "timestamp",
"transforms.convertTimestamp.target.type": "Timestamp"
}
}
Однако я получил эту ошибку в своем журнале Kafka Connect.
2023-05-01 19:01:11,291 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] WorkerSinkTask{id=hm-motor-jdbc-sink-kafka-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-hm-motor-jdbc-sink-kafka-connector-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hm.motor to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -1330532454
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:253)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:372)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:203)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
... 18 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: No content with id/hash 'contentId--1330532454' was found.; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:853)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:826)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:311)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:433)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:361)
... 21 more
Он пытается получить Content ID -1330532454
, но, очевидно, у меня его нет. Мой в 26
. Как JDBC ищет соответствующую схему AVRO?
Я не уверен, как это отображается сейчас. Я думал, что он будет искать схему под названием hm.motor
, основанную на теме Кафки, но оказалось, что это не так.
Спасибо!
Спасибо @Ftisiot!
Я нашел документ о сериализаторах и десериализаторах Kafka.
Сериализаторы и десериализаторы Kafka по умолчанию используют
<topicName>-key
и<topicName>-value
в качестве соответствующего имени субъекта при регистрации или извлечении схемы.
Также для value.converter.value.subject.name.strategy
по умолчанию используется io.confluent.kafka.serializers.subject.TopicNameStrategy
.
Я обновил имя схемы Avro на hm.motor-value
, но все равно получил ту же ошибку.
@OneCricketeer спасибо, я добавил, как я сгенерировал в своем вопросе, и немного почистил. Ах, я вроде как понимаю, что вы имеете в виду под автоматической регистрацией схемы, но в таком случае, кто предоставляет начальную схему с использованием AVRO? Я думал, что мне нужно обеспечить.
По моему опыту, производитель всегда определяет схему. Другими словами, если «кто-то другой» регистрирует схему, а «производитель» использует другую схему, тогда это не имеет смысла. Упомянутая ниже функция toConfluentAvro
должна регистрировать схему на основе схемы Spark Dataframe, но с тех пор, как я исследовал ее, прошло несколько лет.
Я полагаю, что имя схемы по умолчанию будет конкатенацией имени темы и либо -value
, либо -key
в зависимости от части сообщения, которое вы декодируете.
Поэтому в вашем случае я бы попробовал имя схемы hm.motor-value
.
В этом видео вы можете проверить автоматически сгенерированные имена схем при кодировании из json в avro с помощью flink.
Отказ от ответственности: я работаю на Айвен, и мы должны обновить документы, чтобы отразить это.
Спасибо @Ftisiot, я добавил больше информации в свое ОБНОВЛЕНИЕ 1, но все равно не повезло. Но думаю ближе! 😃
Это частично правильно, но не касается фактического исключения десериализатора Avro.
Забудьте о Connect на минуту. Сначала вы должны отладить свою тему с помощью kafka-avro-console-consumer
. Вы получите ту же ошибку, поскольку вашему производителю необходимо правильно закодировать данные.
to_avro
Спарка этого не делает.
Смотрите toConfluentAvro
функцию этой библиотеки - https://github.com/AbsaOSS/ABRIS
Подробнее о внутренностях https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
Что касается ваших проблем со схемой, name
относится к полному имени класса Java, как определено в спецификации Avro, и не имеет никакого отношения к теме реестра при использовании TopicNameStategy.
Как называется этот предмет
Это параметр пути в вызове API POST /subjects/:name/versions/
, используемый внутренними HTTP-клиентами Serializer и Deserializer.
Также упоминалось ранее, что Kafka Connect здесь не нужен. Spark может напрямую записывать в базы данных JDBC. Источником данных может быть Parquet или Kafka.
Спасибо @OneCricketeer, я наконец сделал это, написал в другом ответе! 😃
Спасибо всем за помощь, наконец-то я разобрался! Я постараюсь обобщить то, что я узнал.
На самом деле существует два основных типа данных Avro:
Confluent Avro не является "ванильным" Avro , что доставляет некоторые неудобства для Spark и других инструментов.
Как отметил @OneCricketeer, существует библиотека ABRIS, помогающая генерировать сообщение Kafka в формате Confluent Avro (toConfluentAvro
).
Сначала сгенерируйте схему Avro с помощью
curl --location 'http://confluent-schema-registry.svc:8081/subjects/hm.motor-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{
"schema": "{\"type\": \"record\", \"name\": \"motor\", \"fields\":[{ \"name\": \"timestamp\", \"type\": \"long\"},{ \"name\": \"current\", \"type\": \"double\"},{ \"name\": \"voltage\", \"type\": \"double\"},{ \"name\": \"temperature\", \"type\": \"double\"}]}"
}'
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import za.co.absa.abris.avro.functions.to_avro
import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig}
object IngestFromS3ToKafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ingest-from-s3-to-kafka")
.config("spark.ui.port", "4040")
.getOrCreate()
val folderPath = "s3a://hongbomiao-bucket/iot/"
val parquetSchema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val toAvroConfig: ToAvroConfig =
AbrisConfig.toConfluentAvro.downloadSchemaByLatestVersion
.andTopicNameStrategy("hm.motor")
.usingSchemaRegistry(
"http://confluent-schema-registry.svc:8081"
)
val df = spark.readStream
.schema(parquetSchema)
.option("maxFilesPerTrigger", 1)
.parquet(folderPath)
.withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
.select(to_avro(struct("*"), toAvroConfig).as("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "hm.motor")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
query.awaitTermination()
}
}
build.sbt
name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-avro" % "3.4.0" % "provided",
"org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.463" % "provided",
"za.co.absa" %% "abris" % "6.3.0"
)
ThisBuild / assemblyMergeStrategy := {
// https://stackoverflow.com/a/67937671/2000548
case PathList("module-info.class") => MergeStrategy.discard
case x if x.endsWith("/module-info.class") => MergeStrategy.discard
// https://stackoverflow.com/a/76129963/2000548
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
}
Во-первых, я сгенерировал свою схему Varo,
curl --location 'http://apicurio-registry.svc:8080/apis/registry/v2/groups/default/artifacts' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm.motor-value' \
--data '{
"type": "record",
"namespace": "com.hongbomiao",
"name": "motor",
"fields": [
{
"name": "timestamp",
"type": "long"
},
{
"name": "current",
"type": "double"
},
{
"name": "voltage",
"type": "double"
},
{
"name": "temperature",
"type": "double"
}
]
}'
В Spark очень просто использовать его с нативным org.apache.spark.sql.avro.functions.to_avro
.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
import org.apache.spark.sql.avro.functions.to_avro
import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest}
object IngestFromS3ToKafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ingest-from-s3-to-kafka")
.config("spark.ui.port", "4040")
.getOrCreate()
val folderPath = "s3a://hongbomiao-bucket/iot/"
// For below `parquet_schema`, you can
// 1. hard code like current code
// 2. read from one file `val parquet_schema = spark.read.parquet("s3a://hongbomiao-bucket/iot/motor.parquet").schema`
// 3. Maybe possible also from Avro, I will try in future!
val parquetSchema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val backend = HttpClientSyncBackend()
val response = basicRequest
.get(
uri"http://apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts/hm.motor-value"
)
.send(backend)
val kafkaRecordValueSchema = response.body.fold(identity, identity)
val df = spark.readStream
.schema(parquetSchema)
.option("maxFilesPerTrigger", 1)
.parquet(folderPath)
.withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
.select(to_avro(struct("*"), kafkaRecordValueSchema).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "hm.motor")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
query.awaitTermination()
}
}
встроенный.sbt
name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
"org.apache.spark" %% "spark-avro" % "3.3.2" % "provided",
"org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.461" % "provided",
"com.softwaremill.sttp.client3" %% "core" % "3.8.15"
)
Много идей я почерпнул из этой статьи.
io.confluent.connect.avro.AvroConverter
с Confluent RegistryЗдесь мы используем Confluent Registry REST API:
{
"name": "hm-motor-jdbc-sink-kafka-connector",
"config": {
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"topics": "hm.motor",
"connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
"connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
"connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
"insert.mode": "upsert",
"table.name.format": "motor",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://confluent-schema-registry.svc:8081",
"transforms": "convertTimestamp",
"transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTimestamp.field": "timestamp",
"transforms.convertTimestamp.target.type": "Timestamp"
}
}
io.confluent.connect.avro.AvroConverter
с реестром схемы ApicurioЗдесь мы используем совместимый с REST API Apicurio Registry:
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://apicurio-registry.svc:8080/apis/ccompat/v6",
(дальше я не тестировал это направление)
io.apicurio.registry.utils.converter.AvroConverter
с реестром схемы ApicurioЗдесь мы используем совместимый с REST API Apicurio Registry:
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/ccompat/v6",
"value.converter.apicurio.registry.as-confluent": true,
(дальше я не тестировал это направление)
Здесь мы используем io.apicurio.registry.utils.converter.AvroConverter
.
Моя конфигурация коннектора JDBC:
{
"name": "hm-motor-jdbc-sink-kafka-connector",
"config": {
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"topics": "hm.motor",
"connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
"connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
"connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
"insert.mode": "upsert",
"table.name.format": "motor",
"transforms": "convertTimestamp",
"transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTimestamp.field": "timestamp",
"transforms.convertTimestamp.target.type": "Timestamp",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/registry/v2"
"value.converter.apicurio.registry.fallback.group-id": "hm-group",
"value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
}
}
Возможно в будущем я придумаю, как избавиться от value.converter.apicurio.registry.fallback
связанных полей.
Более подробную информацию о io.apicurio.registry.utils.converter.AvroConverter
можно найти на здесь.
Пожалуйста, уточните, как вы производите данные. Отрицательное значение будет означать, что ваш Avro закодирован неправильно, что означает, что проблема заключается в вашем коде производителя, а не в соединителе/конвертере/реестре. Кроме того, производитель должен автоматически регистрировать любую схему, а не регистрировать ее вручную.