Я читаю Кафку через структурированную потоковую передачу Spark. Входное сообщение Kafka имеет следующий формат JSON:
[
{
"customer": "Jim",
"sex": "male",
"country": "US"
},
{
"customer": "Pam",
"sex": "female",
"country": "US"
}
]
У меня есть определение схемы, как показано ниже, для ее анализа:
val schemaAsJson = ArrayType(StructType(Seq(
StructField("customer",StringType,true),
StructField("sex",StringType,true),
StructField("country",StringType,true))),true)
Мой код выглядит так,
df.select(from_json($"col", schemaAsJson) as "json")
.select("json.customer","json.sex","json.country")
Текущий вывод выглядит так,
+--------------+----------------+----------------+
| customer| sex|country |
+--------------+----------------+----------------+
| [Jim, Pam]| [male, female]| [US, US]|
+--------------+----------------+----------------+
Ожидаемый результат:
+--------------+----------------+----------------+
| customer| sex| country|
+--------------+----------------+----------------+
| Jim| male| US|
| Pam| female| US|
+--------------+----------------+----------------+
Как разделить массив структур на отдельные строки, как указано выше? Может кто-нибудь помочь?
Вам нужно взорвать столбец перед выбором.
df.select(explode_outer(from_json($"value", schemaAsJson)) as "json")
.select("json.customer","json.sex","json.country").show()
Обновил ответ, проверьте сейчас
Извините, это опечатка
Большое спасибо. Он работает так, как ожидалось! Мои фактические данные немного сложнее по вложенной структуре. Как и в родительской структуре, у меня будет другой массив структуры, такой как «предыдущая работа»: [ {"emp1Details":""}, {"emp2Details":""}]. Мне еще предстоит попробовать ваше решение. Но будет ли для них еще работать Explosion_outer?
Для всех столбцов массива вы можете использовать функцию разнесения.
Я пробовал. Я получаю эту ошибку: Исключение в потоке «основной» org.apache.spark.sql.AnalysisException: невозможно разрешить «
json.customer
» заданные входные столбцы: [col];