Как объединить две таблицы с помощью агрегации

У меня есть два фрейма данных pyspark:

один:

name start end
bob    1   3
john   5   8

и второе:

day outcome
  1   a
  2   c
  3   d
  4   a
  5   e
  6   c
  7   u
  8   l

И мне нужны результаты за несколько дней для каждого человека, например

bob  acd

john  ecul

Можно ли это сделать в pyspark?

ReactJs | Supabase | Добавление данных в базу данных
ReactJs | Supabase | Добавление данных в базу данных
Это и есть ваш редактор таблиц в supabase.👇
Понимание Python и переход к SQL
Понимание Python и переход к SQL
Перед нами лабораторная работа по BloodOath:
5
0
96
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Используйте искровой SQL. Я использовал Scala, но SQL в pyspark точно такой же, и я считаю, что вы можете легко преобразовать его, если есть какие-либо различия для pyspark.

Объедините два кадра данных, используйте Collect_list(), чтобы получить массив результатов, затем используйте concat_ws(), чтобы объединить массив в строку:

val dF1 = Seq(
("bob", 1, 3),
("john",  5, 8)
).toDF("name","start","end")

dF1.createOrReplaceTempView("dF1")

val dF2 = Seq(
(1, "a"),
(2, "c"),
(3, "d"),
(4, "a"),
(5, "e"),
(6, "c"),
(7, "u"),
(8, "l")
).toDF("day","outcome")

dF2.createOrReplaceTempView("dF2")


spark.sql(""" 
select d1.name, concat_ws('',collect_list(d2.outcome)) outcome
from
(select d1.name, e.day 
  from dF1 d1 
       lateral view explode(sequence(d1.start, d1.end)) e as day
)d1
left join dF2 d2 on d1.day=d2.day
group by d1.name
""").show(100, false)

Результат:

+----+-------+
|name|outcome|
+----+-------+
|bob |acd    |
|john|ecul   |
+----+-------+

Исправление ООМ:

spark.sql(""" 
select d1.name, concat_ws('',collect_list(d2.outcome)) outcome
from dF1 d1 
left join dF2 d2 on d1.start<=d2.day and  d1.end>=d2.day
group by d1.name
""").show(100, false)

Спасибо! Это работает, но у меня заканчивается память, поэтому я сделаю все это по-другому. Мой второй файл размером около 200 МБ.

user453575457 26.09.2023 17:13

@user453575457 user453575457 Если он слишком большой, попробуйте второй запрос, он может работать напрямую, если выполнить последовательность + развернуть

leftjoin 26.09.2023 17:23

Ваш ответ совершенно правильный, но у искры здесь очень плохая производительность, я сделаю это по-другому. 100 записей - 10Кб в первом df, 7Мб во втором df - длится 40 секунд. При 1000 энтей - я отказался ждать через 10 мин. И у меня 900 тысяч записей в первом кадре, так что нет никаких шансов, что он когда-нибудь будет завершен.

user453575457 27.09.2023 10:43

@user453575457 user453575457 Дайте мне подумать... Этот вид соединения, когда один DF слишком велик, а второй мал (вы сказали 200 МБ) и используется неэкви-соединение, тогда лучшей стратегией соединения является широковещательное соединение с вложенным циклом. попробуйте принудительно присоединиться к трансляции

leftjoin 27.09.2023 13:14

@ user453575457 Используйте подсказку: select /*+ BROADCASTJOIN (d2) */ d1.name, concat_ws( ... А теперь убедитесь, что размер исполнителя соответствует транслируемому d2.

leftjoin 27.09.2023 13:53

Извините, я не очень хорошо разбираюсь в искре. Что вы имеете в виду под словом «исполнитель большого размера»? Где мне это установить?

user453575457 27.09.2023 14:05

Ответы читайте здесь: stackoverflow.com/a/55964221/2700344 Вам необходимо установить spark.executor.memory, spark.driver.memory. Начните с 2G и увеличивайте при необходимости. Также вы можете установить ядра драйверов и исполнителей, а также количество исполнителей.

leftjoin 27.09.2023 14:30

на данный момент просто локальный скрипт Python, позже я хочу распространить его на многие виртуальные машины, но до сих пор не знаю, как это сделать

user453575457 27.09.2023 14:36

хорошо. попробуйте принудительно подключиться к широковещательному соединению, используя подсказку в sql/check EXPLAIN EXTENDED SELECT, должно показать, что будет использоваться широковещательное соединение. При необходимости увеличьте размер исполнителя.

leftjoin 27.09.2023 15:14

Сработало как по волшебству! теперь 1к завершился за 5 минут, и даже лучше - теперь я понимаю, как распределить этот материал на большее количество узлов. Большое спасибо!

user453575457 27.09.2023 17:24

Интересно, как работает первая версия после увеличения размера исполнителя?

leftjoin 27.09.2023 18:47

Я протестирую и сообщу вам

user453575457 27.09.2023 20:07

Первый вариант работает неправильно, пытаюсь понять почему, поэтому если будут обновления - напишу здесь

user453575457 28.09.2023 12:36

Код

df = df1.join(df2, on=(df1.start <= df2.day) & (df1.end >= df2.day))

df = df.groupBy('name').agg(F.collect_list(F.struct('day', 'outcome')).alias('outcome'))
df = df.withColumn('outcome', F.transform(F.array_sort('outcome'), lambda x: x.outcome))
df = df.withColumn('outcome', F.array_join('outcome', ''))

Как это работает?

Используйте метод неравноправного соединения, чтобы соединить фрейм данных df1 с df2

+----+-----+---+---+-------+
|name|start|end|day|outcome|
+----+-----+---+---+-------+
|bob |1    |3  |1  |a      |
|bob |1    |3  |2  |c      |
|bob |1    |3  |3  |d      |
|john|5    |8  |5  |e      |
|john|5    |8  |6  |c      |
|john|5    |8  |7  |u      |
|john|5    |8  |8  |l      |
+----+-----+---+---+-------+

Сгруппируйте фрейм данных по name и collect парам дня и результата. Этот шаг имеет решающее значение, поскольку столбец day необходим для поддержания порядка конкатенации.

+----+--------------------------------+
|name|outcome                         |
+----+--------------------------------+
|john|[{5, e}, {6, c}, {7, u}, {8, l}]|
|bob |[{1, a}, {2, c}, {3, d}]        |
+----+--------------------------------+

Отсортируйте массив пар и преобразуйте его, чтобы извлечь значения outcome.

+----+------------+
|name|outcome     |
+----+------------+
|john|[e, c, u, l]|
|bob |[a, c, d]   |
+----+------------+

Присоединитесь к массиву, чтобы получить результат

+----+-------+
|name|outcome|
+----+-------+
|john|ecul   |
|bob |acd    |
+----+-------+

Я получил очень похожий на этот ответ :) за исключением того, что вам не нужно создавать sequence, если вы выполняете условное соединение. df1.join(df2, on=(df1.start <= df2.day) & (df1.end >= df2.day))

Emma 26.09.2023 16:20

@Эмма Согласна, хорошая мысль :-)

Shubham Sharma 26.09.2023 16:24

Большое спасибо, ваш ответ совершенно правильный и верный, хотя я могу проверить только один ответ, поэтому проверяю самый ранний

user453575457 27.09.2023 09:47

Другие вопросы по теме