У меня есть фрейм данных PySpark со столбцом, содержащим StructField строкового типа, который имеет список списков динамической длины.
df.schema: StructType(List(StructField(id,StringType,true),StructField(recs,StringType,true)))
|id | recs |
|ABC|[66, [["AB", 10]]]
|XYZ|[66, [["XY", 10], ["YZ", 20]]]
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]
Я пытаюсь сгладить списки до чего-то вроде этого
|id | like_id
|ABC|AB|
|XYZ|XY|
|XYZ|YZ|
|DEF|DE|
|DEF|EF|
|DEF|FG|
Что я пробовал:
Я попытался использовать выражения массива, это выдало мне ошибку, так как записи имеют тип StringType, как и ожидалось.
Я смог обработать это с помощью загрузки json и itertools в pandas, но мне нужно, чтобы эта обработка происходила в искре, поскольку размер кадра данных составляет ~ 30 миллионов, а результат будет 10 раз.
df["recs"].apply(
lambda x: [rec_id[0] for rec_id in json.loads(x)[1:][0]]
)
for i, row in df.iterrows():
....
IIUC, вы можете разделить строку в столбце recs
, используя шаблон , (?=\[\[)|\]$
, найти второй элемент, а затем использовать from_json для получения массива массивов:
from pyspark.sql import functions as F
df1 = df.withColumn('recs1', F.split('recs', ', (?=\[\[)|\]$')[1]) \
.withColumn('recs2', F.from_json('recs1', 'array<array<string>>'))
Где: шаблон разделения , (?=\[\[)|\]$
содержит два подшаблона:
, (?=\[\[)
\]$
Результат:
df1.show(truncate=False)
+---+------------------------------------------+------------------------------------+------------------------------+
|id |recs |recs1 |recs2 |
+---+------------------------------------------+------------------------------------+------------------------------+
|ABC|[66, [["AB", 10]]] |[["AB", 10]] |[[AB, 10]] |
|XYZ|[66, [["XY", 10], ["YZ", 20]]] |[["XY", 10], ["YZ", 20]] |[[XY, 10], [YZ, 20]] |
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]|[["DE", 10], ["EF", 20], ["FG", 30]]|[[DE, 10], [EF, 20], [FG, 30]]|
+---+------------------------------------------+------------------------------------+------------------------------+
Затем вы можете использовать explode
, чтобы получить желаемый результат:
df1.selectExpr("id", "explode_outer(recs2) as recs") \
.selectExpr("id", "recs[0] as like_id") \
.show()
+---+-------+
| id|like_id|
+---+-------+
|ABC| AB|
|XYZ| XY|
|XYZ| YZ|
|DEF| DE|
|DEF| EF|
|DEF| FG|
+---+-------+
Короче говоря, мы можем записать приведенный выше код в следующее:
df_new = df.selectExpr("id", r"explode_outer(from_json(split(recs, ', (?=\\[\\[)|\\]$')[1], 'array<array<string>>')) as recs") \
.selectExpr("id", "recs[0] as like_id")
Способ решить эту проблему - очистить содержимое столбца и разделить его:
Не забудьте импортировать:
import pyspark.sql.functions as f
# Remove any non string character
df = df.withColumn('only_ids', f.trim(f.regexp_replace('recs', r'[^a-zA-Z\s:]', '')))
# Change blank spaces to commas
df = df.withColumn('clear_blank_spaces', f.regexp_replace('only_ids', r'\W+', ','))
# Split the values by comma
df = df.withColumn('like_ids', f.split('clear_blank_spaces', ','))
# Just for DEBUG
df.show(truncate=False)
# Explode like_ids to transform array to rows
df = df.select('id', f.explode('like_ids').alias('like_ids'))
# Final result
df.show(truncate=False)
Первый вывод:
+---+------------------------------------------+----------+------------------+------------+
|id |recs |only_ids |clear_blank_spaces|like_ids |
+---+------------------------------------------+----------+------------------+------------+
|ABC|[66, [["AB", 10]]] |AB |AB |[AB] |
|XYZ|[66, [["XY", 10], ["YZ", 20]]] |XY YZ |XY,YZ |[XY, YZ] |
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]|DE EF FG|DE,EF,FG |[DE, EF, FG]|
+---+------------------------------------------+----------+------------------+------------+
Второй вывод:
+---+--------+
|id |like_ids|
+---+--------+
|ABC|AB |
|XYZ|XY |
|XYZ|YZ |
|DEF|DE |
|DEF|EF |
|DEF|FG |
+---+--------+
Если ваши данные действительно выглядят такими чистыми, вы можете просто разделить их на "
и получить записи с нечетным индексом в результирующем массиве.
import pyspark.sql.functions as F
df2 = df.select(
'id',
F.posexplode(F.split('recs', '"')).alias('pos', 'like_id')
).filter('cast(pos % 2 as boolean)').drop('pos')
df2.show()
+---+-------+
| id|like_id|
+---+-------+
|ABC| AB|
|XYZ| XY|
|XYZ| YZ|
|DEF| DE|
|DEF| EF|
|DEF| FG|
+---+-------+
В качестве примечания:
from_json
with schema=array<array<string>>
работает только со Spark 2.4+. ниже этой версии вы можете использовать split + regexp_replace + взорвать.