Вычисление нового столбца в spark df на основе другого spark df без явного столбца соединения

У меня есть df1 и df2 без общего перекрестного столбца. Теперь мне нужно добавить новый столбец в df1 из df2, если выполняется условие, основанное на столбцах df2. Я попытаюсь объяснить себя лучше на примере:

Df1:

+--------+----------+
|label   |    raw   |
+--------+----------+
|0.0     |-1.1088619|
|0.0     |-1.3188809|
|0.0     |-1.3051535|
+--------+----------+

Df2:

+--------------------+----------+----------+
|    probs           |    minRaw|    maxRaw|
+--------------------+----------+----------+
|                 0.1|-1.3195256|-1.6195256|
|                 0.2|-1.6195257|-1.7195256|
|                 0.3|-1.7195257|-1.8195256|
|                 0.4|-1.8195257|-1.9188809|

Ожидаемым результатом будет новый столбец в df1, который получит df2.probs, если значение df1.raw находится между df2.minRaw и df2.maxRaw.

Мой первый подход заключался в том, чтобы попытаться взорвать диапазон minRaw и maxRaw, а затем объединить кадры данных, но эти столбцы непрерывны. Вторая идея udf вот такая:

def get_probabilities(raw):
    df= isotonic_prob_table.filter((F.col("min_raw")>=raw)& \
                                    (F.col("max_raw")<=raw))\
                           .select("probs")
    df.show()
    #return df.select("probabilidad_bin").value()
    #return df.first()["probabilidad_bin"]

Но в моем большом фрейме данных это занимает много времени, и я получаю следующие предупреждения:

23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 82:>                 (0 + 1) / 1][Stage 83:====>            (4 + 3) / 15]23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

Если значение не находится между minRaw и maxRaw, ожидаемый результат равен null, а df1 может иметь дубликаты.

У меня есть искра версии 2.4.7, и я не эксперт по pyspark. Заранее спасибо за прочтение!

Каков ожидаемый результат для строк в df1, где значение df1.raw не находится между строками minRaw и maxRaw в df2?

Derek O 14.02.2023 00:12

Привет, Дерек, если значение не находится между «minRaw» и «maxRaw», ожидаемый результат равен нулю.

Jels 14.02.2023 00:23

О, я вижу - тогда это изменит мой ответ. есть ли повторяющиеся значения label + raw в df1?

Derek O 14.02.2023 00:24

Да, df1 может иметь дубликаты. Я буду обновлять вопрос с этой информацией. Спасибо!

Jels 14.02.2023 00:29

Спасибо, что нашли время, чтобы обновить свой вопрос - это всегда ценится!

Derek O 14.02.2023 00:31
Ускорьте разработку веб-приложений Laravel с помощью этих бесплатных стартовых наборов
Ускорьте разработку веб-приложений Laravel с помощью этих бесплатных стартовых наборов
Laravel - это мощный PHP-фреймворк, используемый для создания масштабируемых и надежных веб-приложений. Одним из преимуществ Laravel является его...
Что такое двойные вопросительные знаки (??) в JavaScript?
Что такое двойные вопросительные знаки (??) в JavaScript?
Как безопасно обрабатывать неопределенные и нулевые значения в коде с помощью Nullish Coalescing
Создание ресурсов API Laravel: Советы по производительности и масштабируемости
Создание ресурсов API Laravel: Советы по производительности и масштабируемости
Создание API-ресурса Laravel может быть непростой задачей. Она требует глубокого понимания возможностей Laravel и лучших практик, чтобы обеспечить...
Как сделать компонент справочного центра с помощью TailwindCSS
Как сделать компонент справочного центра с помощью TailwindCSS
Справочный центр - это веб-сайт, где клиенты могут найти ответы на свои вопросы и решения своих проблем. Созданный для решения многих распространенных...
Асинхронная передача данных с помощью sendBeacon в JavaScript
Асинхронная передача данных с помощью sendBeacon в JavaScript
В современных веб-приложениях отправка данных из JavaScript на стороне клиента на сервер является распространенной задачей. Одним из популярных...
Как подобрать выигрышные акции с помощью анализа и визуализации на Python
Как подобрать выигрышные акции с помощью анализа и визуализации на Python
Отказ от ответственности: Эта статья предназначена только для демонстрации и не должна использоваться в качестве инвестиционного совета.
2
5
76
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Вы можете выполнить перекрестное соединение между df1 и df2 и применить фильтр, чтобы вы выбирали только строки, где df1.raw находится между df2.minRaw и df2.maxRaw — это должно быть более эффективно, чем udf.

Примечание. Поскольку df1 может иметь дубликаты, мы хотим выполнить дедупликацию df1 перед перекрестным соединением с df2, чтобы после применения фильтра у нас не было повторяющихся строк, но при этом оставался минимум необходимой нам информации. Затем мы можем присоединиться к df1, чтобы убедиться, что у нас есть все исходные строки в df1.

Я также немного изменил ваш df1, чтобы включить дубликаты для демонстрации результата:

df1 = spark.createDataFrame(
    [
        (0.0,-1.10),
        (0.0,-1.10),
        (0.0,-1.32),
        (0.0,-1.32),
        (0.0,-1.73),
        (0.0,-1.88)
    ],
    ['label','raw']
)

df2 = spark.createDataFrame(
    [
        (0.1, -1.3195256, -1.6195256),
        (0.2, -1.6195257, -1.7195256),
        (0.3, -1.7195257, -1.8195256),
        (0.4, -1.8195257, -1.9188809)
    ],
    ['probs','minRaw','maxRaw']
)

Это результат, когда вы crossjoin df1 и df2 и удаляете дубликаты:

df1.drop_duplicates().crossJoin(df2).show()

+-----+-----+-----+----------+----------+
|label|  raw|probs|    minRaw|    maxRaw|
+-----+-----+-----+----------+----------+
|  0.0| -1.1|  0.1|-1.3195256|-1.6195256|
|  0.0|-1.32|  0.1|-1.3195256|-1.6195256|
|  0.0|-1.73|  0.1|-1.3195256|-1.6195256|
|  0.0|-1.88|  0.1|-1.3195256|-1.6195256|
...
|  0.0| -1.1|  0.4|-1.8195257|-1.9188809|
|  0.0|-1.32|  0.4|-1.8195257|-1.9188809|
|  0.0|-1.73|  0.4|-1.8195257|-1.9188809|
|  0.0|-1.88|  0.4|-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+

Затем мы можем применить фильтр и соединиться справа с помощью df1, чтобы убедиться, что все исходные строки существуют:

df1.crossJoin(df2).filter(
    (F.col('raw') > F.col('maxRaw')) & (F.col('raw') < F.col('minRaw'))
).select(
    'label','raw','probs'
).join(
    df1, on=['label','raw'], how='right'
)

+-----+-----+-----+
|label|  raw|probs|
+-----+-----+-----+
|  0.0| -1.1| null|
|  0.0| -1.1| null|
|  0.0|-1.32|  0.1|
|  0.0|-1.32|  0.1|
|  0.0|-1.73|  0.3|
|  0.0|-1.88|  0.4|
+-----+-----+-----+

Использовать диапазон между в выражении sql

df2.createOrReplaceTempView('df2')

df1.createOrReplaceTempView('df1')

%sql
SELECT minRaw,maxRaw,raw
FROM df1 JOIN df2 ON df1.raw BETWEEN df2.minRaw and df2.maxRaw
Ответ принят как подходящий

Я думаю, вы можете просто присоединиться к этим кадрам данных с условием between.

df1.join(df2, f.col('raw').between(f.col('maxRaw'), f.col('minRaw')), 'left').show(truncate=False)

+-----+-----+-----+----------+----------+
|label|raw  |probs|minRaw    |maxRaw    |
+-----+-----+-----+----------+----------+
|0.0  |-1.1 |null |null      |null      |
|0.0  |-1.1 |null |null      |null      |
|0.0  |-1.32|0.1  |-1.3195256|-1.6195256|
|0.0  |-1.32|0.1  |-1.3195256|-1.6195256|
|0.0  |-1.73|0.3  |-1.7195257|-1.8195256|
|0.0  |-1.88|0.4  |-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+

Я не знал, что вы можете присоединиться с условием - это определенно лучший ответ

Derek O 14.02.2023 18:49

Другие вопросы по теме