Разделить массив структур из JSON на строки Dataframe в SPARK

Я читаю Кафку через структурированную потоковую передачу Spark. Входное сообщение Kafka имеет следующий формат JSON:

[
  {
    "customer": "Jim",
    "sex": "male",
    "country": "US"  
  },
  {
    "customer": "Pam",
    "sex": "female",
    "country": "US"
  } 
] 

У меня есть определение схемы, как показано ниже, для ее анализа:

val schemaAsJson = ArrayType(StructType(Seq(
      StructField("customer",StringType,true),
      StructField("sex",StringType,true),
      StructField("country",StringType,true))),true) 

Мой код выглядит так,

df.select(from_json($"col", schemaAsJson) as "json")
  .select("json.customer","json.sex","json.country")

Текущий вывод выглядит так,

+--------------+----------------+----------------+
|      customer|             sex|country         |
+--------------+----------------+----------------+
|    [Jim, Pam]|  [male, female]|        [US, US]|
+--------------+----------------+----------------+

Ожидаемый результат:

+--------------+----------------+----------------+
|      customer|             sex|         country|
+--------------+----------------+----------------+
|           Jim|            male|              US|
|           Pam|          female|              US|
+--------------+----------------+----------------+

Как разделить массив структур на отдельные строки, как указано выше? Может кто-нибудь помочь?

Как сделать HTTP-запрос в Javascript?
Как сделать HTTP-запрос в Javascript?
В JavaScript вы можете сделать HTTP-запрос, используя объект XMLHttpRequest или более новый API fetch. Вот пример для обоих методов:
1
0
98
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вам нужно взорвать столбец перед выбором.

df.select(explode_outer(from_json($"value", schemaAsJson)) as "json")
.select("json.customer","json.sex","json.country").show()

Я пробовал. Я получаю эту ошибку: Исключение в потоке «основной» org.apache.spark.sql.AnalysisException: невозможно разрешить «json.customer» заданные входные столбцы: [col];

Harikrishnan Balachandran 22.11.2022 15:40

Обновил ответ, проверьте сейчас

Mohana B C 22.11.2022 15:54

Извините, это опечатка

Mohana B C 22.11.2022 15:58

Большое спасибо. Он работает так, как ожидалось! Мои фактические данные немного сложнее по вложенной структуре. Как и в родительской структуре, у меня будет другой массив структуры, такой как «предыдущая работа»: [ {"emp1Details":""}, {"emp2Details":""}]. Мне еще предстоит попробовать ваше решение. Но будет ли для них еще работать Explosion_outer?

Harikrishnan Balachandran 22.11.2022 16:26

Для всех столбцов массива вы можете использовать функцию разнесения.

Mohana B C 22.11.2022 16:29

Другие вопросы по теме