исходный фрейм данных
Я хочу преобразовать в это
это только пример, у меня очень большой файл и более 60 столбцов
я использую
df = spark.read.option("header",True) \
.csv("contatcs.csv", sep =',')
но работает с pyspark.pandas API
import pyspark.pandas as ps
df = ps.read_csv('contacts.csv', sep=',')
df.head()
но я предпочитаю spark.read, потому что это ленивая оценка и API панд не
То, что вы хотите, это pivot
Чтобы сделать это детерминировано в Spark, у вас должно быть какое-то правило, чтобы определить, какое электронное письмо является первым, а какое вторым. Порядок строк в CSV-файле (отсутствие указанного столбца для номера строки) — плохое правило при работе со Spark, потому что каждая строка может идти к другому узлу, и тогда вы не сможете увидеть, какая из строк была первой или второй. .
В следующем примере я предполагаю, что правилом является алфавитный порядок, поэтому я собираю все электронные письма в один массив, используя collect_set
, а затем сортирую их, используя array_sort
.
Вход:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('1', '[email protected]', 'john'),
('2', '[email protected]', 'Maike'),
('2', 'id2@second', 'Maike'),
('1', '[email protected]', 'john')],
['id', 'email', 'name'])
Скрипт:
emails = F.array_sort(F.collect_set('email'))
df = df.groupBy('id', 'name').agg(
emails[0].alias('email0'),
emails[1].alias('email1'),
)
df.show()
# +---+-----+-------------+--------------+
# | id| name| email0| email1|
# +---+-----+-------------+--------------+
# | 2|Maike|[email protected]| id2@second|
# | 1| john|[email protected]|[email protected]|
# +---+-----+-------------+--------------+
Если бы у вас был номер строки, что-то вроде...
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('1', '1', '[email protected]', 'john'),
('2', '2', '[email protected]', 'Maike'),
('3', '2', 'id2@second', 'Maike'),
('4', '1', '[email protected]', 'john')],
['row_number', 'id', 'email', 'name'])
Вы можете использовать что-то вроде следующих опций:
emails = F.array_sort(F.collect_set(F.struct(F.col('row_number').cast('long'), 'email')))
df = df.groupBy('id', 'name').agg(
emails[0]['email'].alias('email0'),
emails[1]['email'].alias('email1'),
)
df.show()
# +---+-----+-------------+--------------+
# | id| name| email0| email1|
# +---+-----+-------------+--------------+
# | 2|Maike|[email protected]| id2@second|
# | 1| john|[email protected]|[email protected]|
# +---+-----+-------------+--------------+
from pyspark.sql import Window as W
w = W.partitionBy('id', 'name').orderBy('row_number')
df = (df
.withColumn('_rn', F.row_number().over(w))
.filter('_rn <= 2')
.withColumn('_rn', F.concat(F.lit('email'), '_rn'))
.groupBy('id', 'name')
.pivot('_rn')
.agg(F.first('email'))
)
df.show()
# +---+-----+-------------+--------------+
# | id| name| email1| email2|
# +---+-----+-------------+--------------+
# | 1| john|[email protected]|[email protected]|
# | 2|Maike|[email protected]| id2@second|
# +---+-----+-------------+--------------+
писпарк
Я включил угловой случай, когда количество идентификаторов электронной почты нечетное. Для этого найдите максимальную длину и выполните итерацию для получения электронной почты по каждому индексу:
from pyspark.sql import functions as F
df = spark.createDataFrame([(1, '[email protected]', 'john'),(2, '[email protected]', 'Maike'),(2, 'id2@second', 'Maike'),(1, '[email protected]', 'john'),(3, '[email protected]', 'amy'),], ['id', 'email', 'name'])
df = df.groupby("id", "name").agg(F.collect_list("email").alias("email"))
max_len = df.select(F.size("email").alias("size")).collect()[0]["size"]
for i in range(1, max_len + 1):
df = df.withColumn(f"email{i}", F.when(F.size("email") >= i, F.element_at("email", i)).otherwise(F.lit("")))
df = df.drop("email")
Вывод:
+---+-----+-------------+--------------+
|id |name |email1 |email2 |
+---+-----+-------------+--------------+
|2 |Maike|[email protected]|id2@second |
|3 |amy |[email protected]| |
|1 |john |[email protected]|[email protected]|
+---+-----+-------------+--------------+
панды
Поскольку вы упомянули панд в тегах, решение в пандах следующее:
df = pd.DataFrame(data=[(1, '[email protected]', 'john'),(2, '[email protected]', 'Maike'),(2, 'id2@second', 'Maike'),(1, '[email protected]', 'john'),(3, '[email protected]', 'amy'),], columns=["id","email","name"])
df = df.groupby("id").agg(email=("email",list), name=("name",pd.unique))
df2 = df.apply(lambda row: pd.Series(data = {f"email{i+1}":v for i,v in enumerate(row["email"])}, dtype = "object"), axis=1)
df = df.drop("email", axis=1).merge(df2, on = "id")
Вывод:
name email1 email2
id
1 john [email protected] [email protected]
2 Maike [email protected] id2@second
3 amy [email protected] NaN
Оба решения работают, но ваше решение лучше, потому что это большой файл, и я не знал, сколько столбцов электронной почты и повторяющихся идентификаторов! Однако диапазон (1, max_len + 1): требует много времени! Спасибо
Если вы хотите сделать его динамическим, чтобы он создавал новые счетчики электронной почты на основе максимального количества электронных писем, вы можете попробовать логику и код ниже.
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('1', '[email protected]', 'john'),
('2', '[email protected]', 'Maike'),
('2', '[email protected]', 'Maike'),
('2', 'id2@second', 'Maike'),
('1', '[email protected]', 'john')],
['id', 'email', 'name'])
df.show()
+---+---------------+-----+
| id| email| name|
+---+---------------+-----+
| 1| [email protected]| john|
| 2| [email protected]|Maike|
| 2|[email protected]|Maike|
| 2| id2@second|Maike|
| 1| [email protected]| john|
Решение
new = ( df.groupBy('id','name').agg(collect_set('email').alias('email') )#Collect unique emails
.withColumn('x',max(size('email')).over(Window.partitionBy()))#Find the group with maximum emails, for use in email column count
)
new = (new.withColumn('email',F.struct(*[ F.col("email")[i].alias(f"email{i+1}") for i in range(new.select('x').collect()[0][0])]))#Convert email column to struct type
.selectExpr('x','id','name','email.*') #Select all columns
)
new.show(truncate=False)
Исход
+---+---+-----+-------------+--------------+---------------+
|x |id |name |email1 |email2 |email3 |
+---+---+-----+-------------+--------------+---------------+
|3 |1 |john |[email protected]|[email protected]|null |
|3 |2 |Maike|id2@second |[email protected] |[email protected]|
+---+---+-----+-------------+--------------+---------------+
Каковы ваши критерии для создания столбцов электронной почты? Это основано на домене, таком как «first.com», или на имени?