У меня есть два фрейма данных pyspark:
один:
name start end
bob 1 3
john 5 8
и второе:
day outcome
1 a
2 c
3 d
4 a
5 e
6 c
7 u
8 l
И мне нужны результаты за несколько дней для каждого человека, например
bob acd
john ecul
Можно ли это сделать в pyspark?


Используйте искровой SQL. Я использовал Scala, но SQL в pyspark точно такой же, и я считаю, что вы можете легко преобразовать его, если есть какие-либо различия для pyspark.
Объедините два кадра данных, используйте Collect_list(), чтобы получить массив результатов, затем используйте concat_ws(), чтобы объединить массив в строку:
val dF1 = Seq(
("bob", 1, 3),
("john", 5, 8)
).toDF("name","start","end")
dF1.createOrReplaceTempView("dF1")
val dF2 = Seq(
(1, "a"),
(2, "c"),
(3, "d"),
(4, "a"),
(5, "e"),
(6, "c"),
(7, "u"),
(8, "l")
).toDF("day","outcome")
dF2.createOrReplaceTempView("dF2")
spark.sql("""
select d1.name, concat_ws('',collect_list(d2.outcome)) outcome
from
(select d1.name, e.day
from dF1 d1
lateral view explode(sequence(d1.start, d1.end)) e as day
)d1
left join dF2 d2 on d1.day=d2.day
group by d1.name
""").show(100, false)
Результат:
+----+-------+
|name|outcome|
+----+-------+
|bob |acd |
|john|ecul |
+----+-------+
Исправление ООМ:
spark.sql("""
select d1.name, concat_ws('',collect_list(d2.outcome)) outcome
from dF1 d1
left join dF2 d2 on d1.start<=d2.day and d1.end>=d2.day
group by d1.name
""").show(100, false)
@user453575457 user453575457 Если он слишком большой, попробуйте второй запрос, он может работать напрямую, если выполнить последовательность + развернуть
Ваш ответ совершенно правильный, но у искры здесь очень плохая производительность, я сделаю это по-другому. 100 записей - 10Кб в первом df, 7Мб во втором df - длится 40 секунд. При 1000 энтей - я отказался ждать через 10 мин. И у меня 900 тысяч записей в первом кадре, так что нет никаких шансов, что он когда-нибудь будет завершен.
@user453575457 user453575457 Дайте мне подумать... Этот вид соединения, когда один DF слишком велик, а второй мал (вы сказали 200 МБ) и используется неэкви-соединение, тогда лучшей стратегией соединения является широковещательное соединение с вложенным циклом. попробуйте принудительно присоединиться к трансляции
@ user453575457 Используйте подсказку: select /*+ BROADCASTJOIN (d2) */ d1.name, concat_ws( ... А теперь убедитесь, что размер исполнителя соответствует транслируемому d2.
Извините, я не очень хорошо разбираюсь в искре. Что вы имеете в виду под словом «исполнитель большого размера»? Где мне это установить?
Ответы читайте здесь: stackoverflow.com/a/55964221/2700344 Вам необходимо установить spark.executor.memory, spark.driver.memory. Начните с 2G и увеличивайте при необходимости. Также вы можете установить ядра драйверов и исполнителей, а также количество исполнителей.
на данный момент просто локальный скрипт Python, позже я хочу распространить его на многие виртуальные машины, но до сих пор не знаю, как это сделать
хорошо. попробуйте принудительно подключиться к широковещательному соединению, используя подсказку в sql/check EXPLAIN EXTENDED SELECT, должно показать, что будет использоваться широковещательное соединение. При необходимости увеличьте размер исполнителя.
Сработало как по волшебству! теперь 1к завершился за 5 минут, и даже лучше - теперь я понимаю, как распределить этот материал на большее количество узлов. Большое спасибо!
Интересно, как работает первая версия после увеличения размера исполнителя?
Я протестирую и сообщу вам
Первый вариант работает неправильно, пытаюсь понять почему, поэтому если будут обновления - напишу здесь
df = df1.join(df2, on=(df1.start <= df2.day) & (df1.end >= df2.day))
df = df.groupBy('name').agg(F.collect_list(F.struct('day', 'outcome')).alias('outcome'))
df = df.withColumn('outcome', F.transform(F.array_sort('outcome'), lambda x: x.outcome))
df = df.withColumn('outcome', F.array_join('outcome', ''))
Используйте метод неравноправного соединения, чтобы соединить фрейм данных df1 с df2
+----+-----+---+---+-------+
|name|start|end|day|outcome|
+----+-----+---+---+-------+
|bob |1 |3 |1 |a |
|bob |1 |3 |2 |c |
|bob |1 |3 |3 |d |
|john|5 |8 |5 |e |
|john|5 |8 |6 |c |
|john|5 |8 |7 |u |
|john|5 |8 |8 |l |
+----+-----+---+---+-------+
Сгруппируйте фрейм данных по name и collect парам дня и результата. Этот шаг имеет решающее значение, поскольку столбец day необходим для поддержания порядка конкатенации.
+----+--------------------------------+
|name|outcome |
+----+--------------------------------+
|john|[{5, e}, {6, c}, {7, u}, {8, l}]|
|bob |[{1, a}, {2, c}, {3, d}] |
+----+--------------------------------+
Отсортируйте массив пар и преобразуйте его, чтобы извлечь значения outcome.
+----+------------+
|name|outcome |
+----+------------+
|john|[e, c, u, l]|
|bob |[a, c, d] |
+----+------------+
Присоединитесь к массиву, чтобы получить результат
+----+-------+
|name|outcome|
+----+-------+
|john|ecul |
|bob |acd |
+----+-------+
Я получил очень похожий на этот ответ :) за исключением того, что вам не нужно создавать sequence, если вы выполняете условное соединение. df1.join(df2, on=(df1.start <= df2.day) & (df1.end >= df2.day))
@Эмма Согласна, хорошая мысль :-)
Большое спасибо, ваш ответ совершенно правильный и верный, хотя я могу проверить только один ответ, поэтому проверяю самый ранний
Спасибо! Это работает, но у меня заканчивается память, поэтому я сделаю все это по-другому. Мой второй файл размером около 200 МБ.