Как распаковать список списков в строковом формате?

У меня есть фрейм данных PySpark со столбцом, содержащим StructField строкового типа, который имеет список списков динамической длины.

df.schema: StructType(List(StructField(id,StringType,true),StructField(recs,StringType,true)))

|id     | recs |

|ABC|[66, [["AB", 10]]]
|XYZ|[66, [["XY", 10], ["YZ", 20]]]
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]  

Я пытаюсь сгладить списки до чего-то вроде этого

|id | like_id
|ABC|AB|
|XYZ|XY|
|XYZ|YZ|
|DEF|DE|
|DEF|EF|
|DEF|FG|

Что я пробовал:

Я попытался использовать выражения массива, это выдало мне ошибку, так как записи имеют тип StringType, как и ожидалось.

Я смог обработать это с помощью загрузки json и itertools в pandas, но мне нужно, чтобы эта обработка происходила в искре, поскольку размер кадра данных составляет ~ 30 миллионов, а результат будет 10 раз.

df["recs"].apply(
        lambda x: [rec_id[0] for rec_id in json.loads(x)[1:][0]]
    )
    for i, row in df.iterrows():
        ....
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
0
890
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

IIUC, вы можете разделить строку в столбце recs, используя шаблон , (?=\[\[)|\]$, найти второй элемент, а затем использовать from_json для получения массива массивов:

from pyspark.sql import functions as F

df1 = df.withColumn('recs1', F.split('recs', ', (?=\[\[)|\]$')[1]) \
    .withColumn('recs2', F.from_json('recs1', 'array<array<string>>'))

Где: шаблон разделения , (?=\[\[)|\]$ содержит два подшаблона:

  • запятая, за которой следует ПРОБЕЛ, за которым должны следовать две открывающие скобки , (?=\[\[)
  • закрывающая скобка в конце строки \]$

Результат:

df1.show(truncate=False)
+---+------------------------------------------+------------------------------------+------------------------------+
|id |recs                                      |recs1                               |recs2                         |
+---+------------------------------------------+------------------------------------+------------------------------+
|ABC|[66, [["AB", 10]]]                        |[["AB", 10]]                        |[[AB, 10]]                    |
|XYZ|[66, [["XY", 10], ["YZ", 20]]]            |[["XY", 10], ["YZ", 20]]            |[[XY, 10], [YZ, 20]]          |
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]|[["DE", 10], ["EF", 20], ["FG", 30]]|[[DE, 10], [EF, 20], [FG, 30]]|
+---+------------------------------------------+------------------------------------+------------------------------+

Затем вы можете использовать explode, чтобы получить желаемый результат:

df1.selectExpr("id", "explode_outer(recs2) as recs") \
    .selectExpr("id", "recs[0] as like_id") \
    .show()
+---+-------+
| id|like_id|
+---+-------+
|ABC|     AB|
|XYZ|     XY|
|XYZ|     YZ|
|DEF|     DE|
|DEF|     EF|
|DEF|     FG|
+---+-------+

Короче говоря, мы можем записать приведенный выше код в следующее:

df_new = df.selectExpr("id", r"explode_outer(from_json(split(recs, ', (?=\\[\\[)|\\]$')[1], 'array<array<string>>')) as recs") \
    .selectExpr("id", "recs[0] as like_id")

В качестве примечания: from_json with schema=array<array<string>> работает только со Spark 2.4+. ниже этой версии вы можете использовать split + regexp_replace + взорвать.

jxc 26.12.2020 03:36

Способ решить эту проблему - очистить содержимое столбца и разделить его:

Не забудьте импортировать:

import pyspark.sql.functions as f
# Remove any non string character
df = df.withColumn('only_ids', f.trim(f.regexp_replace('recs', r'[^a-zA-Z\s:]', '')))

# Change blank spaces to commas
df = df.withColumn('clear_blank_spaces', f.regexp_replace('only_ids', r'\W+', ','))

# Split the values by comma
df = df.withColumn('like_ids', f.split('clear_blank_spaces', ','))

# Just for DEBUG
df.show(truncate=False)

# Explode like_ids to transform array to rows
df = df.select('id', f.explode('like_ids').alias('like_ids'))

# Final result
df.show(truncate=False)

Первый вывод:

+---+------------------------------------------+----------+------------------+------------+
|id |recs                                      |only_ids  |clear_blank_spaces|like_ids    |
+---+------------------------------------------+----------+------------------+------------+
|ABC|[66, [["AB", 10]]]                        |AB        |AB                |[AB]        |
|XYZ|[66, [["XY", 10], ["YZ", 20]]]            |XY  YZ    |XY,YZ             |[XY, YZ]    |
|DEF|[66, [["DE", 10], ["EF", 20], ["FG", 30]]]|DE  EF  FG|DE,EF,FG          |[DE, EF, FG]|
+---+------------------------------------------+----------+------------------+------------+

Второй вывод:

+---+--------+
|id |like_ids|
+---+--------+
|ABC|AB      |
|XYZ|XY      |
|XYZ|YZ      |
|DEF|DE      |
|DEF|EF      |
|DEF|FG      |
+---+--------+

Если ваши данные действительно выглядят такими чистыми, вы можете просто разделить их на " и получить записи с нечетным индексом в результирующем массиве.

import pyspark.sql.functions as F

df2 = df.select(
    'id',
    F.posexplode(F.split('recs', '"')).alias('pos', 'like_id')
).filter('cast(pos % 2 as boolean)').drop('pos')

df2.show()
+---+-------+
| id|like_id|
+---+-------+
|ABC|     AB|
|XYZ|     XY|
|XYZ|     YZ|
|DEF|     DE|
|DEF|     EF|
|DEF|     FG|
+---+-------+

Другие вопросы по теме