Как группировать строки и создавать новые столбцы в pyspark

исходный фрейм данных

я бы Эл. адрес название 1 [email protected] Джон 2 [email protected] Майке 2 id2 @ секунда Майке 1 [email protected] Джон

Я хочу преобразовать в это

я бы Эл. адрес электронная почта1 название 1 [email protected] [email protected] Джон 2 [email protected] id2 @ секунда Майке

это только пример, у меня очень большой файл и более 60 столбцов

я использую

df = spark.read.option("header",True) \
        .csv("contatcs.csv", sep =',')

но работает с pyspark.pandas API

import pyspark.pandas as ps    

df = ps.read_csv('contacts.csv', sep=',')
df.head()

но я предпочитаю spark.read, потому что это ленивая оценка и API панд не

Каковы ваши критерии для создания столбцов электронной почты? Это основано на домене, таком как «first.com», или на имени?

Azhar Khan 18.11.2022 05:18

То, что вы хотите, это pivot

mozway 18.11.2022 05:25
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
2
59
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Чтобы сделать это детерминировано в Spark, у вас должно быть какое-то правило, чтобы определить, какое электронное письмо является первым, а какое вторым. Порядок строк в CSV-файле (отсутствие указанного столбца для номера строки) — плохое правило при работе со Spark, потому что каждая строка может идти к другому узлу, и тогда вы не сможете увидеть, какая из строк была первой или второй. .

В следующем примере я предполагаю, что правилом является алфавитный порядок, поэтому я собираю все электронные письма в один массив, используя collect_set, а затем сортирую их, используя array_sort.

Вход:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('1', '[email protected]', 'john'),
     ('2', '[email protected]', 'Maike'),
     ('2', 'id2@second', 'Maike'),
     ('1', '[email protected]', 'john')],
    ['id', 'email', 'name'])

Скрипт:

emails = F.array_sort(F.collect_set('email'))
df = df.groupBy('id', 'name').agg(
    emails[0].alias('email0'),
    emails[1].alias('email1'),
)
df.show()
# +---+-----+-------------+--------------+
# | id| name|       email0|        email1|
# +---+-----+-------------+--------------+
# |  2|Maike|[email protected]|    id2@second|
# |  1| john|[email protected]|[email protected]|
# +---+-----+-------------+--------------+

Если бы у вас был номер строки, что-то вроде...

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('1', '1', '[email protected]', 'john'),
     ('2', '2', '[email protected]', 'Maike'),
     ('3', '2', 'id2@second', 'Maike'),
     ('4', '1', '[email protected]', 'john')],
    ['row_number', 'id', 'email', 'name'])

Вы можете использовать что-то вроде следующих опций:

emails = F.array_sort(F.collect_set(F.struct(F.col('row_number').cast('long'), 'email')))
df = df.groupBy('id', 'name').agg(
    emails[0]['email'].alias('email0'),
    emails[1]['email'].alias('email1'),
)
df.show()
# +---+-----+-------------+--------------+
# | id| name|       email0|        email1|
# +---+-----+-------------+--------------+
# |  2|Maike|[email protected]|    id2@second|
# |  1| john|[email protected]|[email protected]|
# +---+-----+-------------+--------------+
from pyspark.sql import Window as W

w = W.partitionBy('id', 'name').orderBy('row_number')
df = (df
    .withColumn('_rn', F.row_number().over(w))
    .filter('_rn <= 2')
    .withColumn('_rn', F.concat(F.lit('email'), '_rn'))
    .groupBy('id', 'name')
    .pivot('_rn')
    .agg(F.first('email'))
)
df.show()
# +---+-----+-------------+--------------+
# | id| name|       email1|        email2|
# +---+-----+-------------+--------------+
# |  1| john|[email protected]|[email protected]|
# |  2|Maike|[email protected]|    id2@second|
# +---+-----+-------------+--------------+
Ответ принят как подходящий

писпарк

Я включил угловой случай, когда количество идентификаторов электронной почты нечетное. Для этого найдите максимальную длину и выполните итерацию для получения электронной почты по каждому индексу:

from pyspark.sql import functions as F
df = spark.createDataFrame([(1, '[email protected]', 'john'),(2, '[email protected]', 'Maike'),(2, 'id2@second', 'Maike'),(1, '[email protected]', 'john'),(3, '[email protected]', 'amy'),], ['id', 'email', 'name'])

df = df.groupby("id", "name").agg(F.collect_list("email").alias("email"))
max_len = df.select(F.size("email").alias("size")).collect()[0]["size"]
for i in range(1, max_len + 1):
  df = df.withColumn(f"email{i}", F.when(F.size("email") >= i, F.element_at("email", i)).otherwise(F.lit("")))
df = df.drop("email")

Вывод:

+---+-----+-------------+--------------+
|id |name |email1       |email2        |
+---+-----+-------------+--------------+
|2  |Maike|[email protected]|id2@second    |
|3  |amy  |[email protected]|              |
|1  |john |[email protected]|[email protected]|
+---+-----+-------------+--------------+

панды

Поскольку вы упомянули панд в тегах, решение в пандах следующее:

df = pd.DataFrame(data=[(1, '[email protected]', 'john'),(2, '[email protected]', 'Maike'),(2, 'id2@second', 'Maike'),(1, '[email protected]', 'john'),(3, '[email protected]', 'amy'),], columns=["id","email","name"])

df = df.groupby("id").agg(email=("email",list), name=("name",pd.unique))
df2 = df.apply(lambda row: pd.Series(data = {f"email{i+1}":v for i,v in enumerate(row["email"])}, dtype = "object"), axis=1)
df = df.drop("email", axis=1).merge(df2, on = "id")

Вывод:

     name         email1          email2
id                                      
1    john  [email protected]  [email protected]
2   Maike  [email protected]      id2@second
3     amy  [email protected]             NaN

Оба решения работают, но ваше решение лучше, потому что это большой файл, и я не знал, сколько столбцов электронной почты и повторяющихся идентификаторов! Однако диапазон (1, max_len + 1): требует много времени! Спасибо

Emerson Pedroso 18.11.2022 14:22

Если вы хотите сделать его динамическим, чтобы он создавал новые счетчики электронной почты на основе максимального количества электронных писем, вы можете попробовать логику и код ниже.

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('1', '[email protected]', 'john'),
     ('2', '[email protected]', 'Maike'),
     ('2', '[email protected]', 'Maike'),
     ('2', 'id2@second', 'Maike'),
     ('1', '[email protected]', 'john')],
    ['id', 'email', 'name'])

df.show()



+---+---------------+-----+
| id|          email| name|
+---+---------------+-----+
|  1|  [email protected]| john|
|  2|  [email protected]|Maike|
|  2|[email protected]|Maike|
|  2|     id2@second|Maike|
|  1| [email protected]| john|

Решение

new = (   df.groupBy('id','name').agg(collect_set('email').alias('email') )#Collect unique emails
        .withColumn('x',max(size('email')).over(Window.partitionBy()))#Find the group with maximum emails, for use in email column count
    )
     
new = (new.withColumn('email',F.struct(*[ F.col("email")[i].alias(f"email{i+1}") for i in range(new.select('x').collect()[0][0])]))#Convert email column to struct type
      .selectExpr('x','id','name','email.*') #Select all columns
     )
new.show(truncate=False)

Исход

+---+---+-----+-------------+--------------+---------------+
|x  |id |name |email1       |email2        |email3         |
+---+---+-----+-------------+--------------+---------------+
|3  |1  |john |[email protected]|[email protected]|null           |
|3  |2  |Maike|id2@second   |[email protected] |[email protected]|
+---+---+-----+-------------+--------------+---------------+

Другие вопросы по теме