Я надеюсь сопоставить сообщение объекту с schema
и payload
внутри во время структурированной потоковой передачи Spark.
Это мой исходный код
val input_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType)
.add("voltage", DoubleType)
.add("temperature", DoubleType)
val df = spark.readStream
.schema(input_schema)
.option("maxFilesPerTrigger", 1)
.parquet("s3a://my-bucket/my-folder/")
.select(to_json(struct("*")).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "my-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
Это выведет сообщение в этом формате при записи в Kafka:
{
"timestamp": 1682556571.14622,
"current": 2.0172032595808242,
"voltage": 19.34080877806074,
"temperature": 37.461518565900434
}
Тем не менее, я надеюсь добавить поле schema
и переместить его в payload
, чтобы позже я мог перейти к Postgres с помощью соединителя приемника JDBC, такого как соединители приемника JDBC и источника Aiven.
Основываясь на этого документа, я думаю, что мне следует использовать "decimal"
в качестве каждого типа поля.
Итак, это формат сообщения Kafka, который я надеюсь сгенерировать:
{
"schema":{
"type": "struct",
"fields":[
{
"type": "decimal",
"optional": false,
"field": "timestamp"
},
{
"type": "decimal",
"optional": true,
"field": "current"
},
{
"type": "decimal",
"optional": true,
"field": "voltage"
},
{
"type": "decimal",
"optional": true,
"field": "temperature"
}
]
},
"payload":{
"timestamp": 1682556571.14622,
"current": 2.0172032595808242,
"voltage": 19.34080877806074,
"temperature": 37.461518565900434
}
}
Я попытался обновить свой код Spark до
val input_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val output_schema = new StructType()
.add("timestamp", "decimal")
.add("current", "decimal", nullable = true)
.add("voltage", "decimal", nullable = true)
.add("temperature", "decimal", nullable = true)
val df = spark.readStream
.schema(input_schema)
.option("maxFilesPerTrigger", 1)
.parquet("s3a://my-bucket/my-folder/")
.select(
to_json(struct("*")).alias("payload")
)
.withColumn(
"schema",
to_json(struct(
lit("struct").alias("type"),
lit(output_schema.fields.map(field => struct(
lit(field.dataType).alias("type"),
lit(field.nullable).alias("optional"),
lit(field.name).alias("field")
))).alias("fields")
))
)
.select(
to_json(struct(
col("schema"),
col("payload")
)).alias("value")
)
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "my-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
Но когда я spark-submit
, я получил ошибку
Exception in thread "main" org.apache.spark.SparkRuntimeException: The feature is not supported: literal for 'DecimalType(10,0)' of class org.apache.spark.sql.types.DecimalType.
at org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:296)
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
at org.apache.spark.sql.functions$.lit(functions.scala:125)
at com.hongbomiao.IngestFromS3ToKafka$.$anonfun$main$1(IngestFromS3ToKafka.scala:46)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:45)
at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Я как бы чувствую StructType
, что оно возвращается DecimalType(10,0)
. Может в этом случае не стоит использовать StructType
?
Я не уверен, как точно сгенерировать "decimal"
в выходном сообщении. Любой гид будет признателен, спасибо!
Спасибо @OneCricketeer! На этот раз мне нужна Kafka, чтобы не взорвать базу данных, поскольку в пиковое время могут обрабатываться сотни миллионов данных в секунду. Реальное использование было бы другой базой данных, но все же похожей.
Я хотел сказать, что вам не нужны разъемы Aiven. Вы можете просто использовать Spark для всего, включая чтение/запись в Kafka, а также базы данных JDBC
Я думаю, что моя первоначальная догадка верна. StructType
является особенным и не должен использоваться здесь. Использование "decimal"
аналогично использованию DecimalType(10,0)
, поэтому он показывает эту ошибку.
В моем случае я должен использовать простой struct
.
Кроме того, я обнаружил, что коннектор приемника JDBC от Aiven документ может иметь проблему:
Пробовал decimal
, Decimal
, DECIMAL
, все неудачно при запуске коннектора.
По умолчанию value.converter
равно org.apache.kafka.connect.json.JsonConverter
, поэтому в моем случае я должен использовать double
(или float
) на основе исходного кода JsonSchema .
Итак, окончательный рабочий вариант:
val input_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val df = spark.readStream
.schema(input_schema)
.option("maxFilesPerTrigger", 1)
.parquet("s3a://my-bucket/my-folder/")
.select(
struct(
lit("struct").alias("type"),
array(
struct(
lit("double").alias("type"),
lit(false).alias("optional"),
lit("timestamp").alias("field")
),
struct(
lit("double").alias("type"),
lit(true).alias("optional"),
lit("current").alias("field")
),
struct(
lit("double").alias("type"),
lit(true).alias("optional"),
lit("voltage").alias("field")
),
struct(
lit("double").alias("type"),
lit(true).alias("optional"),
lit("temperature").alias("field")
)
).alias("fields")
).alias("schema"),
struct("*").alias("payload")
)
.select(to_json(struct("*")).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "my-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
Это будет формат печати
{
"schema":{
"type": "struct",
"fields":[
{
"type": "double",
"optional": false,
"field": "timestamp"
},
{
"type": "double",
"optional": true,
"field": "current"
},
{
"type": "double",
"optional": true,
"field": "voltage"
},
{
"type": "double",
"optional": true,
"field": "temperature"
}
]
},
"payload":{
"timestamp": 1682556571.14622,
"current": 2.0172032595808242,
"voltage": 19.34080877806074,
"temperature": 37.461518565900434
}
}
Вы можете просто полностью удалить объект схемы из записи, используя функцию get_json_object("$.payload")
. Я не вижу смысла в разборе схемы из Spark. В противном случае, используя вместо этого Avro в Kafka Connect, Spark также может вывести схему из реестра схем.
Спасибо, да Авро мой следующий шаг 😃
Привет @OneCricketeer, не могли бы вы уточнить get_json_object("$.payload")
? Извините, я не понял. Спасибо!
Это позволяет вам анализировать как строку без какой-либо схемы. Например. spark.readStream.format("kafka")....select(cast("value").as("string")).select(get_json_object("$.payload"))
. Он использует JSONPath для извлечения полей. Оттуда вы можете дополнительно использовать select(to_json(col("value"), payloadSchema)
для применения схемы SparkSQL... Я имел в виду, что поле "schema"
в метаданных Connect ничего не значит для SparkSQL, поэтому вы должны удалить его и пропустить его синтаксический анализ.
Я вижу, однако, я читаю паркетные файлы, как показывает .parquet("s3a://my-bucket/my-folder/")
вместо Кафки. 🙂
Виноват. Та же концепция применяется для любого кадра данных Spark с данными String/JSON. Альтернативное решение — установить value.converter.schemas.enable=false
в коннектор S3 Sink.
Spark может читать и записывать таблицы JDBC. Вам не нужны схемы и обертки полезной нагрузки, если вам не нужно использовать Kafka Connect.