Как правильно сопоставить сообщение с объектом со «схемой» и «полезной нагрузкой» в структурированной потоковой передаче Spark?

Я надеюсь сопоставить сообщение объекту с schema и payload внутри во время структурированной потоковой передачи Spark.

Это мой исходный код

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType)
  .add("voltage", DoubleType)
  .add("temperature", DoubleType)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(to_json(struct("*")).alias("value"))

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

Это выведет сообщение в этом формате при записи в Kafka:

{
  "timestamp": 1682556571.14622,
  "current": 2.0172032595808242,
  "voltage": 19.34080877806074,
  "temperature": 37.461518565900434
}

Тем не менее, я надеюсь добавить поле schema и переместить его в payload, чтобы позже я мог перейти к Postgres с помощью соединителя приемника JDBC, такого как соединители приемника JDBC и источника Aiven.

Основываясь на этого документа, я думаю, что мне следует использовать "decimal" в качестве каждого типа поля.

Итак, это формат сообщения Kafka, который я надеюсь сгенерировать:

{
  "schema":{
    "type": "struct",
    "fields":[
      {
        "type": "decimal",
        "optional": false,
        "field": "timestamp"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "current"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "voltage"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "temperature"
      }
    ]
  },
  "payload":{
    "timestamp": 1682556571.14622,
    "current": 2.0172032595808242,
    "voltage": 19.34080877806074,
    "temperature": 37.461518565900434
  }
}

Я попытался обновить свой код Spark до

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType, nullable = true)
  .add("voltage", DoubleType, nullable = true)
  .add("temperature", DoubleType, nullable = true)

val output_schema = new StructType()
  .add("timestamp", "decimal")
  .add("current", "decimal", nullable = true)
  .add("voltage", "decimal", nullable = true)
  .add("temperature", "decimal", nullable = true)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(
    to_json(struct("*")).alias("payload")
  )
  .withColumn(
    "schema",
    to_json(struct(
      lit("struct").alias("type"),
      lit(output_schema.fields.map(field => struct(
        lit(field.dataType).alias("type"),
        lit(field.nullable).alias("optional"),
        lit(field.name).alias("field")
      ))).alias("fields")
    ))
  )
  .select(
    to_json(struct(
      col("schema"),
      col("payload")
    )).alias("value")
  )

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

Но когда я spark-submit, я получил ошибку

Exception in thread "main" org.apache.spark.SparkRuntimeException: The feature is not supported: literal for 'DecimalType(10,0)' of class org.apache.spark.sql.types.DecimalType.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:296)
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
    at org.apache.spark.sql.functions$.lit(functions.scala:125)
    at com.hongbomiao.IngestFromS3ToKafka$.$anonfun$main$1(IngestFromS3ToKafka.scala:46)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:45)
    at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Я как бы чувствую StructType, что оно возвращается DecimalType(10,0). Может в этом случае не стоит использовать StructType?

Я не уверен, как точно сгенерировать "decimal" в выходном сообщении. Любой гид будет признателен, спасибо!

Spark может читать и записывать таблицы JDBC. Вам не нужны схемы и обертки полезной нагрузки, если вам не нужно использовать Kafka Connect.

OneCricketeer 28.04.2023 15:07

Спасибо @OneCricketeer! На этот раз мне нужна Kafka, чтобы не взорвать базу данных, поскольку в пиковое время могут обрабатываться сотни миллионов данных в секунду. Реальное использование было бы другой базой данных, но все же похожей.

Hongbo Miao 28.04.2023 19:05

Я хотел сказать, что вам не нужны разъемы Aiven. Вы можете просто использовать Spark для всего, включая чтение/запись в Kafka, а также базы данных JDBC

OneCricketeer 29.04.2023 07:03
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
3
63
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я думаю, что моя первоначальная догадка верна. StructType является особенным и не должен использоваться здесь. Использование "decimal" аналогично использованию DecimalType(10,0), поэтому он показывает эту ошибку.

В моем случае я должен использовать простой struct.

Кроме того, я обнаружил, что коннектор приемника JDBC от Aiven документ может иметь проблему:

Пробовал decimal, Decimal, DECIMAL, все неудачно при запуске коннектора.

По умолчанию value.converter равно org.apache.kafka.connect.json.JsonConverter, поэтому в моем случае я должен использовать double (или float) на основе исходного кода JsonSchema .

Итак, окончательный рабочий вариант:

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType, nullable = true)
  .add("voltage", DoubleType, nullable = true)
  .add("temperature", DoubleType, nullable = true)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(
    struct(
      lit("struct").alias("type"),
      array(
        struct(
          lit("double").alias("type"),
          lit(false).alias("optional"),
          lit("timestamp").alias("field")
        ),
        struct(
          lit("double").alias("type"),
          lit(true).alias("optional"),
          lit("current").alias("field")
        ),
        struct(
          lit("double").alias("type"),
          lit(true).alias("optional"),
          lit("voltage").alias("field")
        ),
        struct(
          lit("double").alias("type"),
          lit(true).alias("optional"),
          lit("temperature").alias("field")
        )
      ).alias("fields")
    ).alias("schema"),
    struct("*").alias("payload")
  )
  .select(to_json(struct("*")).alias("value"))

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

Это будет формат печати

{
  "schema":{
    "type": "struct",
    "fields":[
      {
        "type": "double",
        "optional": false,
        "field": "timestamp"
      },
      {
        "type": "double",
        "optional": true,
        "field": "current"
      },
      {
        "type": "double",
        "optional": true,
        "field": "voltage"
      },
      {
        "type": "double",
        "optional": true,
        "field": "temperature"
      }
    ]
  },
  "payload":{
    "timestamp": 1682556571.14622,
    "current": 2.0172032595808242,
    "voltage": 19.34080877806074,
    "temperature": 37.461518565900434
  }
}

Вы можете просто полностью удалить объект схемы из записи, используя функцию get_json_object("$.payload"). Я не вижу смысла в разборе схемы из Spark. В противном случае, используя вместо этого Avro в Kafka Connect, Spark также может вывести схему из реестра схем.

OneCricketeer 28.04.2023 15:03

Спасибо, да Авро мой следующий шаг 😃

Hongbo Miao 28.04.2023 19:06

Привет @OneCricketeer, не могли бы вы уточнить get_json_object("$.payload")? Извините, я не понял. Спасибо!

Hongbo Miao 28.04.2023 22:50

Это позволяет вам анализировать как строку без какой-либо схемы. Например. spark.readStream.format("kafka")....select(cast("value").as(‌​"string")).select(ge‌​t_json_object("$.pay‌​load")). Он использует JSONPath для извлечения полей. Оттуда вы можете дополнительно использовать select(to_json(col("value"), payloadSchema) для применения схемы SparkSQL... Я имел в виду, что поле "schema" в метаданных Connect ничего не значит для SparkSQL, поэтому вы должны удалить его и пропустить его синтаксический анализ.

OneCricketeer 29.04.2023 00:50

Я вижу, однако, я читаю паркетные файлы, как показывает .parquet("s3a://my-bucket/my-folder/") вместо Кафки. 🙂

Hongbo Miao 29.04.2023 03:17

Виноват. Та же концепция применяется для любого кадра данных Spark с данными String/JSON. Альтернативное решение — установить value.converter.schemas.enable=false в коннектор S3 Sink.

OneCricketeer 29.04.2023 06:55

Другие вопросы по теме