У меня есть список .json files
, которые содержат информацию о человеке. Один файл содержит информацию об одном человеке. Я хочу загрузить эти данные в таблицу с помощью pyspark
в записной книжке Azure Databricks.
Допустим, файлы построены следующим образом:
{
"id": 1,
"name": "Homer",
"address": {
"street": "742 Evergreen Terrace"
"city": "Springfield"
}
}
Здесь довольно простой json, который я могу прочитать в данных с помощью этого кода:
from pyspark.sql.functions import *
sourcejson = spark.read.json("path/to/json")
df = (
sourcejson.select(
col('id'),
col('name'),
col('address.street').alias('street'),
col('address.city').alias('city')
)
)
что дает ожидаемый результат:
id | name | street | city
1 | Homer | 742 Evergreen Terrace | Springfield
Однако. Проблемы начинаются, когда адрес неизвестен. В этом случае вся структура адреса в json будет просто null
:
{
"id": 2,
"name": "Ned",
"address": null
}
В приведенном выше примере файла мы не знаем адрес Неда, поэтому у нас есть ноль. Используя предыдущий код, я ожидал бы такого результата:
id | name | street | city
2 | Ned | null | null
однако запуск кода приводит к ошибке:
[INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "address". Need a complex type [STRUCT, ARRAY, MAP] but got "STRING"
Я понимаю причину ошибки, но не могу найти решения. Есть идеи, как мы можем с этим справиться?
Вы создаете проблему (которую можно избежать), читая по одному файлу за раз. Читать все файлы сразу spark.read.json('folder/with/all/json/files')
вместо:
spark.read.json('folder/with/all/json/files/file1')
а потомspark.read.json('folder/with/all/json/files/file2')
Здесь есть небольшая ошибка. В OP вы читаете один файл за раз. Практически вы будете читать все файлы одновременно.
address
как STRING
для значений null
. Если вы не укажете схему при чтении файла.>>> spark.read.json(sc.parallelize(['{"id": 2, "name": "Marge", "address": null}'])).printSchema()
root
|-- address: string (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
>>>
address
как StructType([StructField('city', StringType(), True), StructField('street', StringType(), True)]), False)]
для значений null
. И ваш исходный код будет работать как есть.>>>
>>> df = spark.read.json(sc.parallelize([
... '{"id": 2, "name": "Marge", "address": null}',
... '{"id": 1, "name": "Homer", "address": {"street": "742 Evergreen Terrace", "city": "Springfield"} }'
... ]))
>>> df.printSchema()
root
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- street: string (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
>>> df.select(
... F.col('id'),
... F.col('name'),
... F.col('address.street').alias('street'),
... F.col('address.city').alias('city')
... ).show(truncate=False)
+---+-----+---------------------+-----------+
|id |name |street |city |
+---+-----+---------------------+-----------+
|2 |Marge|null |null |
|1 |Homer|742 Evergreen Terrace|Springfield|
+---+-----+---------------------+-----------+
>>>
Используйте Coalesce(), если вы действительно хотите использовать какое-то конкретное значение по умолчанию для нулевых значений. Например. код ниже преобразует address=null
в файле json в {city='', street=null}
в фрейме данных вместо {city=null, street=null}
, который Spark делает по умолчанию, когда вы читаете все файлы одновременно.
>>> json_strings = [
... '{"id": 1, "name": "Homer", "address": {"street": "742 Evergreen Terrace", "city": "Springfield"} }',
... '{"id": 2, "name": "Marge", "address": null}',
... ]
>>> df = spark.read.json(sc.parallelize(json_strings))
>>> df.show(truncate=False)
+------------------------------------+---+-----+
|address |id |name |
+------------------------------------+---+-----+
|{Springfield, 742 Evergreen Terrace}|1 |Homer|
|null |2 |Marge|
+------------------------------------+---+-----+
>>>
>>> default_value = F.struct(F.lit('').alias('city'), F.lit(None).alias('street'))
>>> df2 = df.select('id', 'name', F.coalesce('address', default_value).alias('address'))
>>> df2.show(truncate=False)
+---+-----+------------------------------------+
|id |name |address |
+---+-----+------------------------------------+
|1 |Homer|{Springfield, 742 Evergreen Terrace}|
|2 |Marge|{, null} |
+---+-----+------------------------------------+
>>>
>>> df2.select(
... F.col('id'),
... F.col('name'),
... F.col('address.street').alias('street'),
... F.col('address.city').alias('city')
... ).show(truncate=False)
+---+-----+---------------------+-----------+
|id |name |street |city |
+---+-----+---------------------+-----------+
|1 |Homer|742 Evergreen Terrace|Springfield|
|2 |Marge|null | |
+---+-----+---------------------+-----------+
>>>
Если вы не предоставите схему для spark.read.json
, она будет выведена из данных. Поэтому, когда address
отсутствует во всех объектах, Spark предполагает, что это StringType
, и поэтому вы получаете ошибку. Одним из возможных решений является чтение данных по схеме:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType
raw_data = spark.sparkContext.parallelize(
['{"id": 2, "name": "Marge", "address": null}']
)
address_struct = StructType([
StructField('street', StringType(), True),
StructField('city', StringType(), True),
])
schema = StructType([
StructField('id', LongType(), True),
StructField('name', StringType(), True),
StructField('address', address_struct, True),
])
sourcejson = spark.read.json(raw_data, schema=schema)
res = (
sourcejson.select(
F.col('id'),
F.col('name'),
F.col('address.street').alias('street'),
F.col('address.city').alias('city')
)
)
res.show(10, False)
# +---+-----+------+----+
# |id |name |street|city|
# +---+-----+------+----+
# |2 |Marge|NULL |NULL|
# +---+-----+------+----+