Я не профи со искрой, поэтому прошу помощи.
Я сделал миграцию из таблицы DynamoDB в S3 со встроенным сервисом. Он сохраняет файлы в формате *.json. Допустим, ниже у нас есть пример строки (данные каждой строки представляют собой словарь, вложенный в ключ «Элемент»).
{
"Item": {
"accept_languages": {
"M": {
"en": {"N": "0.9"},
"en-US": {"N": "1"}
}
},
"accept_mimetypes": {
"M": {
"*/*": {"N": "0.8"},
"image/*": {"N": "1"},
"image/apng": {"N": "1"},
"image/webp": {"N": "1"}
}
},
"id": {"S": "5cddbd53b870c2619f1083ed"},
"ip": {"S": "11.11.111.11"},
"landing_page__type": {"S": "PageMain"},
"location__city": {"S": "Scituate"},
"location__country": {"S": "United States"},
"location__country_code": {"S": "US"},
"location__region": {"S": "MA"},
"location__zip": {"S": "02066"},
"origin_url": {"S": "https://www.bing.com/"},
"session": {"S": "b4d58fd18"},
"source": {"S": "bing"},
"user_agent__browser": {"S": "Chrome"},
"user_device": {"S": "t"}
}
}
Как мы видим, данные каждой строки вложены друг в друга. В результате я хочу создать файл *.csv. Любые рекомендации, как я могу его разобрать? В настоящее время у меня есть UDF (пользовательская функция) для преобразования самого словаря из DynamoDB в обычный вид. Как я могу извлечь данные из каждой строки и применить к ней эту функцию, например.
Спасибо
Идея (взятая из этого ответа) состоит в том, чтобы рекурсивно собрать все имена столбцов в списке, а затем использовать этот список в операторе select
:
from pyspark.sql import functions as F
from pyspark.sql import types as T
df = spark.read.option("multiLine", "true").json(<filename>)
def flatten(schema, prefix=None):
for field in schema.fields:
if prefix is None:
colName = field.name
else:
colName = prefix + "." + field.name
if isinstance(field.dataType,T.StructType):
yield from flatten(field.dataType, colName)
else:
yield F.col(colName).alias(colName.replace(".", "_"))
df.select(list(flatten(df.schema))).show()
Выход:
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
|Item_accept_languages_M_en_N|Item_accept_languages_M_en-US_N|Item_accept_mimetypes_M_*/*_N|Item_accept_mimetypes_M_image/*_N|Item_accept_mimetypes_M_image/apng_N|Item_accept_mimetypes_M_image/webp_N| Item_id_S| Item_ip_S|Item_landing_page__type_S|Item_location__city_S|Item_location__country_S|Item_location__country_code_S|Item_location__region_S|Item_location__zip_S| Item_origin_url_S|Item_session_S|Item_source_S|Item_user_agent__browser_S|Item_user_device_S|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
| 0.9| 1| 0.8| 1| 1| 1|5cddbd53b870c2619...|11.11.111.11| PageMain| Scituate| United States| US| MA| 02066|https://www.bing....| b4d58fd18| bing| Chrome| t|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
Затем этот фрейм данных можно сохранить как плоский CSV.
Я считаю, что
.option("multiLine", "true")
неверен и будет читать только первую строку в каждом из файлов, созданных AWS.