Динамически выводить схему данных JSON с помощью Pyspark

У меня есть сервер MongoDB, с которого я загружаю данные в PySpark Dataframe. Однако проблема связана с некоторыми различиями между системами (я также пробовал устаревшие типы данных). Я не могу загрузить данные напрямую из MongoDB. Итак, мне нужно предоставить внешнюю схему. Проблема в том, что один атрибут (столбец steps) имеет вложенные атрибуты, поэтому я не могу предоставить для него точную схему.

Поэтому во время загрузки я просто помечаю этот столбец как StringType(), а затем пытаюсь правильно определить схему этого столбца и его вложенную структуру.

Данные моих шагов выглядят следующим образом:

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|steps                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"id": "selfie", "status": 200, "startedAt": "2024-08-01T11:24:43.698Z", "completedAt": "2024-08-01T11:24:43.702Z", "startCount": 0, "cacheHit": false, "data": {"selfiePhotoUrl": ""}, "inner": {"isSelfieFraudError": false}}]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|[{"id": "ip-validation", "status": 200, "startedAt": "2024-08-01T11:03:01.233Z", "completedAt": "2024-08-01T11:03:01.296Z", "startCount": 0, "cacheHit": false, "data": {"country": "Botswana", "countryCode": "BW", "region": "Gaborone", "regionCode": "GA", "city": "Gaborone", "zip": "", "latitude": -24.6437, "longitude": 25.9112, "safe": true, "ipRestrictionEnabled": false, "vpnDetectionEnabled": false, "platform": "web_mobile"}}, {"id": "liveness", "status": 200, "startedAt": "2024-08-01T11:22:29.787Z", "completedAt": "2024-08-01T11:22:30.609Z", "startCount": 1, "cacheHit": false, "data": {"videoUrl": "", "spriteUrl": "", "selfieUrl": {"media": "", "isUrl": true}}, "inner": {}}]|
|[{"id": "ip-validation", "status": 200, "startedAt": "2024-08-01T11:24:40.251Z", "completedAt": "2024-08-01T11:24:40.285Z", "startCount": 0, "cacheHit": false, "data": {"country": "Mexico", "countryCode": "MX", "region": "Mexico City", "regionCode": "CMX", "city": "Mexico City", "zip": "03020", "latitude": 19.4203, "longitude": -99.1193, "safe": true, "ipRestrictionEnabled": false, "vpnDetectionEnabled": false, "platform": ""}}]                                                                                                                                                                                                                                                                                                                                                                                                                              |
|[{"id": "liveness", "status": 200, "startedAt": "2024-07-31T20:57:54.206Z", "completedAt": "2024-07-31T20:57:55.762Z", "startCount": 1, "cacheHit": false, "data": {"videoUrl": "", "spriteUrl": "", "selfieUrl": {"media": "", "isUrl": true}}, "inner": {}}]                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Как видите, атрибут data имеет вложенный тип и атрибуты даже не фиксированы.

Я попробовал приведенный ниже код, но он работает только для первой записи, поскольку я использую head()

schema = F.schema_of_json(df.select('steps').head()[0])
df1 = df.select("_id","steps").withColumn("steps",F.from_json("steps", schema))

Теперь это работает для первой записи, но соответствует этой схеме для других записей, и даже если есть дополнительные атрибуты, эти атрибуты усекаются и не создаются никакие атрибуты. Посмотрите на атрибут data в выводе.

Как, например, я получаю вывод, как показано ниже:

{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:24:43.702Z","data":{"selfiePhotoUrl":""},"id":"selfie","inner":{"isSelfieFraudError":false},"startCount":0,"startedAt":"2024-08-01T11:24:43.698Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:03:01.296Z","data":{},"id":"ip-validation","startCount":0,"startedAt":"2024-08-01T11:03:01.233Z","status":200},{"cacheHit":false,"completedAt":"2024-08-01T11:22:30.609Z","data":{},"id":"liveness","inner":{},"startCount":1,"startedAt":"2024-08-01T11:22:29.787Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:24:40.285Z","data":{},"id":"ip-validation","startCount":0,"startedAt":"2024-08-01T11:24:40.251Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-07-31T20:57:55.762Z","data":{},"id":"liveness","inner":{},"startCount":1,"startedAt":"2024-07-31T20:57:54.206Z","status":200}]}
{"_id":"","steps":[]}

_id — это дополнительный столбец, который я написал в выводе. Это следует игнорировать.

Я пытался выполнить выборку записей, но метод, который я использовал, работает только для одной записи. Итак, как я могу вывести схему динамически? Есть ли какой-то оптимальный способ?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
76
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Чтобы динамически вывести схему столбца JSON в PySpark DataFrame, особенно если структура вложена и варьируется между записями, вам понадобится более надежный подход, чем просто использование метода head(), который проверяет только первую запись. Вместо использования только первой записи вам необходимо выбрать подмножество записей из DataFrame. Это даст вам более широкое представление о структуре данных.

Попробуйте в PySpark что-то вроде ниже, отрегулируйте параметры там, где это необходимо.

from pyspark.sql import functions as F
from pyspark.sql.types import StructType
import json
# Load the DataFrame
df = spark.read.format("mongo").load()
# Sample a few records
sample_df = df.select("steps").sample(fraction=0.1)
sample_data = sample_df.rdd.map(lambda row: row[0]).collect()
# Combine the sample data into a single JSON array string
sample_json = "[" + ",".join([json.dumps(record) for record in sample_data if 
 record]) + "]"
# Infer schema from the combined JSON
schema = F.schema_of_json(F.lit(sample_json))
# Apply the inferred schema to the DataFrame
df_with_schema = df.withColumn("steps", F.from_json("steps", schema))
# Show the resulting DataFrame
df_with_schema.show(truncate=False)

Большое спасибо за вашу помощь @Mich, но я не хочу продолжать выборку, поэтому сделал что-то вроде того, что написал другой пользователь.

RushHour 06.08.2024 09:41
Ответ принят как подходящий

Вы можете сделать что-то вроде ниже:

schema = spark.read.json(df.rdd.map(lambda row: row['steps'])).schema
array_of_steps_schema = ArrayType(schema, True)
df1 = df.withColumn('steps', from_json(col('steps'), array_of_steps_schema))  

и это дает мне правильную схему, и я тоже не вижу потери данных.

Причина такого подхода в том, что я не могу доверять sampling в данном случае, потому что не знаю, как он работает и будет ли указанная выборка объективной или нет.

Затем код использует функцию отображения, чтобы отобразить на выходе исходный фрейм данных df и сведенный фрейм данных Flatten_df.

О чем ты говоришь?

RushHour 06.08.2024 10:04

Другие вопросы по теме