У меня есть сервер MongoDB
, с которого я загружаю данные в PySpark
Dataframe. Однако проблема связана с некоторыми различиями между системами (я также пробовал устаревшие типы данных). Я не могу загрузить данные напрямую из MongoDB
. Итак, мне нужно предоставить внешнюю схему. Проблема в том, что один атрибут (столбец steps
) имеет вложенные атрибуты, поэтому я не могу предоставить для него точную схему.
Поэтому во время загрузки я просто помечаю этот столбец как StringType()
, а затем пытаюсь правильно определить схему этого столбца и его вложенную структуру.
Данные моих шагов выглядят следующим образом:
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|steps |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"id": "selfie", "status": 200, "startedAt": "2024-08-01T11:24:43.698Z", "completedAt": "2024-08-01T11:24:43.702Z", "startCount": 0, "cacheHit": false, "data": {"selfiePhotoUrl": ""}, "inner": {"isSelfieFraudError": false}}] |
|[{"id": "ip-validation", "status": 200, "startedAt": "2024-08-01T11:03:01.233Z", "completedAt": "2024-08-01T11:03:01.296Z", "startCount": 0, "cacheHit": false, "data": {"country": "Botswana", "countryCode": "BW", "region": "Gaborone", "regionCode": "GA", "city": "Gaborone", "zip": "", "latitude": -24.6437, "longitude": 25.9112, "safe": true, "ipRestrictionEnabled": false, "vpnDetectionEnabled": false, "platform": "web_mobile"}}, {"id": "liveness", "status": 200, "startedAt": "2024-08-01T11:22:29.787Z", "completedAt": "2024-08-01T11:22:30.609Z", "startCount": 1, "cacheHit": false, "data": {"videoUrl": "", "spriteUrl": "", "selfieUrl": {"media": "", "isUrl": true}}, "inner": {}}]|
|[{"id": "ip-validation", "status": 200, "startedAt": "2024-08-01T11:24:40.251Z", "completedAt": "2024-08-01T11:24:40.285Z", "startCount": 0, "cacheHit": false, "data": {"country": "Mexico", "countryCode": "MX", "region": "Mexico City", "regionCode": "CMX", "city": "Mexico City", "zip": "03020", "latitude": 19.4203, "longitude": -99.1193, "safe": true, "ipRestrictionEnabled": false, "vpnDetectionEnabled": false, "platform": ""}}] |
|[{"id": "liveness", "status": 200, "startedAt": "2024-07-31T20:57:54.206Z", "completedAt": "2024-07-31T20:57:55.762Z", "startCount": 1, "cacheHit": false, "data": {"videoUrl": "", "spriteUrl": "", "selfieUrl": {"media": "", "isUrl": true}}, "inner": {}}] |
|[] |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Как видите, атрибут data
имеет вложенный тип и атрибуты даже не фиксированы.
Я попробовал приведенный ниже код, но он работает только для первой записи, поскольку я использую head()
schema = F.schema_of_json(df.select('steps').head()[0])
df1 = df.select("_id","steps").withColumn("steps",F.from_json("steps", schema))
Теперь это работает для первой записи, но соответствует этой схеме для других записей, и даже если есть дополнительные атрибуты, эти атрибуты усекаются и не создаются никакие атрибуты. Посмотрите на атрибут data
в выводе.
Как, например, я получаю вывод, как показано ниже:
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:24:43.702Z","data":{"selfiePhotoUrl":""},"id":"selfie","inner":{"isSelfieFraudError":false},"startCount":0,"startedAt":"2024-08-01T11:24:43.698Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:03:01.296Z","data":{},"id":"ip-validation","startCount":0,"startedAt":"2024-08-01T11:03:01.233Z","status":200},{"cacheHit":false,"completedAt":"2024-08-01T11:22:30.609Z","data":{},"id":"liveness","inner":{},"startCount":1,"startedAt":"2024-08-01T11:22:29.787Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:24:40.285Z","data":{},"id":"ip-validation","startCount":0,"startedAt":"2024-08-01T11:24:40.251Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-07-31T20:57:55.762Z","data":{},"id":"liveness","inner":{},"startCount":1,"startedAt":"2024-07-31T20:57:54.206Z","status":200}]}
{"_id":"","steps":[]}
_id
— это дополнительный столбец, который я написал в выводе. Это следует игнорировать.
Я пытался выполнить выборку записей, но метод, который я использовал, работает только для одной записи. Итак, как я могу вывести схему динамически? Есть ли какой-то оптимальный способ?
Чтобы динамически вывести схему столбца JSON в PySpark DataFrame, особенно если структура вложена и варьируется между записями, вам понадобится более надежный подход, чем просто использование метода head(), который проверяет только первую запись. Вместо использования только первой записи вам необходимо выбрать подмножество записей из DataFrame. Это даст вам более широкое представление о структуре данных.
Попробуйте в PySpark что-то вроде ниже, отрегулируйте параметры там, где это необходимо.
from pyspark.sql import functions as F
from pyspark.sql.types import StructType
import json
# Load the DataFrame
df = spark.read.format("mongo").load()
# Sample a few records
sample_df = df.select("steps").sample(fraction=0.1)
sample_data = sample_df.rdd.map(lambda row: row[0]).collect()
# Combine the sample data into a single JSON array string
sample_json = "[" + ",".join([json.dumps(record) for record in sample_data if
record]) + "]"
# Infer schema from the combined JSON
schema = F.schema_of_json(F.lit(sample_json))
# Apply the inferred schema to the DataFrame
df_with_schema = df.withColumn("steps", F.from_json("steps", schema))
# Show the resulting DataFrame
df_with_schema.show(truncate=False)
Вы можете сделать что-то вроде ниже:
schema = spark.read.json(df.rdd.map(lambda row: row['steps'])).schema
array_of_steps_schema = ArrayType(schema, True)
df1 = df.withColumn('steps', from_json(col('steps'), array_of_steps_schema))
и это дает мне правильную схему, и я тоже не вижу потери данных.
Причина такого подхода в том, что я не могу доверять sampling
в данном случае, потому что не знаю, как он работает и будет ли указанная выборка объективной или нет.
Затем код использует функцию отображения, чтобы отобразить на выходе исходный фрейм данных df и сведенный фрейм данных Flatten_df.
О чем ты говоришь?
Большое спасибо за вашу помощь @Mich, но я не хочу продолжать выборку, поэтому сделал что-то вроде того, что написал другой пользователь.