У меня есть df1 и df2 без общего перекрестного столбца. Теперь мне нужно добавить новый столбец в df1 из df2, если выполняется условие, основанное на столбцах df2. Я попытаюсь объяснить себя лучше на примере:
Df1:
+--------+----------+
|label | raw |
+--------+----------+
|0.0 |-1.1088619|
|0.0 |-1.3188809|
|0.0 |-1.3051535|
+--------+----------+
Df2:
+--------------------+----------+----------+
| probs | minRaw| maxRaw|
+--------------------+----------+----------+
| 0.1|-1.3195256|-1.6195256|
| 0.2|-1.6195257|-1.7195256|
| 0.3|-1.7195257|-1.8195256|
| 0.4|-1.8195257|-1.9188809|
Ожидаемым результатом будет новый столбец в df1, который получит df2.probs, если значение df1.raw находится между df2.minRaw и df2.maxRaw.
Мой первый подход заключался в том, чтобы попытаться взорвать диапазон minRaw и maxRaw, а затем объединить кадры данных, но эти столбцы непрерывны. Вторая идея udf вот такая:
def get_probabilities(raw):
df= isotonic_prob_table.filter((F.col("min_raw")>=raw)& \
(F.col("max_raw")<=raw))\
.select("probs")
df.show()
#return df.select("probabilidad_bin").value()
#return df.first()["probabilidad_bin"]
Но в моем большом фрейме данных это занимает много времени, и я получаю следующие предупреждения:
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/13 22:02:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 82:> (0 + 1) / 1][Stage 83:====> (4 + 3) / 15]23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/02/13 22:04:36 WARN org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
Если значение не находится между minRaw и maxRaw, ожидаемый результат равен null, а df1 может иметь дубликаты.
У меня есть искра версии 2.4.7, и я не эксперт по pyspark. Заранее спасибо за прочтение!
Привет, Дерек, если значение не находится между «minRaw» и «maxRaw», ожидаемый результат равен нулю.
О, я вижу - тогда это изменит мой ответ. есть ли повторяющиеся значения label + raw в df1?
Да, df1 может иметь дубликаты. Я буду обновлять вопрос с этой информацией. Спасибо!
Спасибо, что нашли время, чтобы обновить свой вопрос - это всегда ценится!
Вы можете выполнить перекрестное соединение между df1 и df2 и применить фильтр, чтобы вы выбирали только строки, где df1.raw находится между df2.minRaw и df2.maxRaw — это должно быть более эффективно, чем udf.
Примечание. Поскольку df1 может иметь дубликаты, мы хотим выполнить дедупликацию df1 перед перекрестным соединением с df2, чтобы после применения фильтра у нас не было повторяющихся строк, но при этом оставался минимум необходимой нам информации. Затем мы можем присоединиться к df1, чтобы убедиться, что у нас есть все исходные строки в df1.
Я также немного изменил ваш df1, чтобы включить дубликаты для демонстрации результата:
df1 = spark.createDataFrame(
[
(0.0,-1.10),
(0.0,-1.10),
(0.0,-1.32),
(0.0,-1.32),
(0.0,-1.73),
(0.0,-1.88)
],
['label','raw']
)
df2 = spark.createDataFrame(
[
(0.1, -1.3195256, -1.6195256),
(0.2, -1.6195257, -1.7195256),
(0.3, -1.7195257, -1.8195256),
(0.4, -1.8195257, -1.9188809)
],
['probs','minRaw','maxRaw']
)
Это результат, когда вы crossjoin df1 и df2 и удаляете дубликаты:
df1.drop_duplicates().crossJoin(df2).show()
+-----+-----+-----+----------+----------+
|label| raw|probs| minRaw| maxRaw|
+-----+-----+-----+----------+----------+
| 0.0| -1.1| 0.1|-1.3195256|-1.6195256|
| 0.0|-1.32| 0.1|-1.3195256|-1.6195256|
| 0.0|-1.73| 0.1|-1.3195256|-1.6195256|
| 0.0|-1.88| 0.1|-1.3195256|-1.6195256|
...
| 0.0| -1.1| 0.4|-1.8195257|-1.9188809|
| 0.0|-1.32| 0.4|-1.8195257|-1.9188809|
| 0.0|-1.73| 0.4|-1.8195257|-1.9188809|
| 0.0|-1.88| 0.4|-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+
Затем мы можем применить фильтр и соединиться справа с помощью df1, чтобы убедиться, что все исходные строки существуют:
df1.crossJoin(df2).filter(
(F.col('raw') > F.col('maxRaw')) & (F.col('raw') < F.col('minRaw'))
).select(
'label','raw','probs'
).join(
df1, on=['label','raw'], how='right'
)
+-----+-----+-----+
|label| raw|probs|
+-----+-----+-----+
| 0.0| -1.1| null|
| 0.0| -1.1| null|
| 0.0|-1.32| 0.1|
| 0.0|-1.32| 0.1|
| 0.0|-1.73| 0.3|
| 0.0|-1.88| 0.4|
+-----+-----+-----+
Использовать диапазон между в выражении sql
df2.createOrReplaceTempView('df2')
df1.createOrReplaceTempView('df1')
%sql
SELECT minRaw,maxRaw,raw
FROM df1 JOIN df2 ON df1.raw BETWEEN df2.minRaw and df2.maxRaw
Я думаю, вы можете просто присоединиться к этим кадрам данных с условием between.
df1.join(df2, f.col('raw').between(f.col('maxRaw'), f.col('minRaw')), 'left').show(truncate=False)
+-----+-----+-----+----------+----------+
|label|raw |probs|minRaw |maxRaw |
+-----+-----+-----+----------+----------+
|0.0 |-1.1 |null |null |null |
|0.0 |-1.1 |null |null |null |
|0.0 |-1.32|0.1 |-1.3195256|-1.6195256|
|0.0 |-1.32|0.1 |-1.3195256|-1.6195256|
|0.0 |-1.73|0.3 |-1.7195257|-1.8195256|
|0.0 |-1.88|0.4 |-1.8195257|-1.9188809|
+-----+-----+-----+----------+----------+
Я не знал, что вы можете присоединиться с условием - это определенно лучший ответ
Каков ожидаемый результат для строк в df1, где значение df1.raw не находится между строками minRaw и maxRaw в df2?