Json & PySpark – прочитать значение из структуры, которая может быть нулевой

У меня есть список .json files, которые содержат информацию о человеке. Один файл содержит информацию об одном человеке. Я хочу загрузить эти данные в таблицу с помощью pyspark в записной книжке Azure Databricks.

Допустим, файлы построены следующим образом:

{
    "id": 1,
    "name": "Homer",
    "address": {
        "street": "742 Evergreen Terrace"
        "city": "Springfield"
    }
}

Здесь довольно простой json, который я могу прочитать в данных с помощью этого кода:

from pyspark.sql.functions import *

sourcejson = spark.read.json("path/to/json")

df = (
    sourcejson.select(
        col('id'),
        col('name'),
        col('address.street').alias('street'),
        col('address.city').alias('city')
    )
)

что дает ожидаемый результат:

id | name  | street                | city
1  | Homer | 742 Evergreen Terrace | Springfield

Однако. Проблемы начинаются, когда адрес неизвестен. В этом случае вся структура адреса в json будет просто null:

{
    "id": 2,
    "name": "Ned",
    "address": null
}

В приведенном выше примере файла мы не знаем адрес Неда, поэтому у нас есть ноль. Используя предыдущий код, я ожидал бы такого результата:

id | name | street | city
2  | Ned  | null   | null

однако запуск кода приводит к ошибке:

[INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "address". Need a complex type [STRUCT, ARRAY, MAP] but got "STRING"

Я понимаю причину ошибки, но не могу найти решения. Есть идеи, как мы можем с этим справиться?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
74
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы создаете проблему (которую можно избежать), читая по одному файлу за раз. Читать все файлы сразу spark.read.json('folder/with/all/json/files') вместо:

  • spark.read.json('folder/with/all/json/files/file1') а потом
  • spark.read.json('folder/with/all/json/files/file2')

Здесь есть небольшая ошибка. В OP вы читаете один файл за раз. Практически вы будете читать все файлы одновременно.

  • Когда вы читаете по одному файлу за раз (что не имеет смысла, поскольку вам нужен один фрейм данных, содержащий данные из всех файлов JSON в виде строк), Spark будет определять тип address как STRING для значений null. Если вы не укажете схему при чтении файла.
>>> spark.read.json(sc.parallelize(['{"id": 2, "name": "Marge", "address": null}'])).printSchema()
root
 |-- address: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

>>> 
  • Если вы прочитаете все файлы одновременно, он выведет тип address как StructType([StructField('city', StringType(), True), StructField('street', StringType(), True)]), False)] для значений null. И ваш исходный код будет работать как есть.
>>>
>>> df = spark.read.json(sc.parallelize([
...   '{"id": 2, "name": "Marge", "address": null}',
...   '{"id": 1, "name": "Homer", "address": {"street": "742 Evergreen Terrace", "city": "Springfield"} }'
... ]))
>>> df.printSchema()
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

>>> df.select(
...     F.col('id'),
...     F.col('name'),
...     F.col('address.street').alias('street'),
...     F.col('address.city').alias('city')
... ).show(truncate=False)
+---+-----+---------------------+-----------+
|id |name |street               |city       |
+---+-----+---------------------+-----------+
|2  |Marge|null                 |null       |
|1  |Homer|742 Evergreen Terrace|Springfield|
+---+-----+---------------------+-----------+

>>>

Используйте Coalesce(), если вы действительно хотите использовать какое-то конкретное значение по умолчанию для нулевых значений. Например. код ниже преобразует address=null в файле json в {city='', street=null} в фрейме данных вместо {city=null, street=null}, который Spark делает по умолчанию, когда вы читаете все файлы одновременно.

>>> json_strings = [
...   '{"id": 1, "name": "Homer", "address": {"street": "742 Evergreen Terrace", "city": "Springfield"} }',
...   '{"id": 2, "name": "Marge", "address": null}',
... ]
>>> df = spark.read.json(sc.parallelize(json_strings))
>>> df.show(truncate=False)
+------------------------------------+---+-----+
|address                             |id |name |
+------------------------------------+---+-----+
|{Springfield, 742 Evergreen Terrace}|1  |Homer|
|null                                |2  |Marge|
+------------------------------------+---+-----+

>>>
>>> default_value = F.struct(F.lit('').alias('city'), F.lit(None).alias('street'))
>>> df2 = df.select('id', 'name', F.coalesce('address', default_value).alias('address'))
>>> df2.show(truncate=False)
+---+-----+------------------------------------+
|id |name |address                             |
+---+-----+------------------------------------+
|1  |Homer|{Springfield, 742 Evergreen Terrace}|
|2  |Marge|{, null}                            |
+---+-----+------------------------------------+

>>>
>>> df2.select(
...     F.col('id'),
...     F.col('name'),
...     F.col('address.street').alias('street'),
...     F.col('address.city').alias('city')
... ).show(truncate=False)
+---+-----+---------------------+-----------+
|id |name |street               |city       |
+---+-----+---------------------+-----------+
|1  |Homer|742 Evergreen Terrace|Springfield|
|2  |Marge|null                 |           |
+---+-----+---------------------+-----------+

>>>
Ответ принят как подходящий

Если вы не предоставите схему для spark.read.json, она будет выведена из данных. Поэтому, когда address отсутствует во всех объектах, Spark предполагает, что это StringType, и поэтому вы получаете ошибку. Одним из возможных решений является чтение данных по схеме:

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType

raw_data = spark.sparkContext.parallelize(
    ['{"id": 2, "name": "Marge", "address": null}']
)
address_struct = StructType([
    StructField('street', StringType(), True),
    StructField('city', StringType(), True),
])
schema = StructType([
    StructField('id', LongType(), True),
    StructField('name', StringType(), True),
    StructField('address', address_struct, True),
])
sourcejson = spark.read.json(raw_data, schema=schema)

res = (
    sourcejson.select(
        F.col('id'),
        F.col('name'),
        F.col('address.street').alias('street'),
        F.col('address.city').alias('city')
    )
)
res.show(10, False)

# +---+-----+------+----+
# |id |name |street|city|
# +---+-----+------+----+
# |2  |Marge|NULL  |NULL|
# +---+-----+------+----+

Другие вопросы по теме