Я использую pyspark для объединения двух таблиц по 100 тыс. строк в каждой (чтобы не было искаженного соединения). Это занимает больше 30 минут, даже час, и я думаю, что здесь что-то не так. Код - это просто обычное соединение
a = b.join(c, b.id == c.id, "inner").drop(c.id)
Я много искал и пробовал, в том числе:
spark.sql.adaptive.enabled=true
Ни то, ни другое не работает.
Мой вопрос: если обе таблицы (объект pyspark.sql.dataframe
) получены с использованием udf, имеет ли это значение? Это единственное отличие от обычного использования.
Для подготовки таблиц я использовал следующую логику udf:
def func(row):
id = row.split(",")[0]
f1, f2 = ", ".join(row.split(",")[1:-1]), int(row.split(",")[-1])
return (int(id), f1, f2)
func_udf = udf(func,
StructType([
StructField("id", IntegerType(), True),
StructField("f1", StringType(), True),
StructField("f2", IntegerType(), True)
]))
df = df.withColumn("Result", func_udf(col("data")))
df = df.drop(df.data).select("Result.*")
df
— таблица, используемая для соединения.
Любая идея по устранению неполадок приветствуется. Спасибо.
P.S. таблица b имеет 3 столбца, а таблица c — 6 столбцов. Так что они не широкий. Кроме того, если я уменьшу размер до 10 КБ, соединение будет работать так, как ожидалось.
a.explain()
проверьте логический план о соединении b c
Это не дает ответа на вопрос. Как только у вас будет достаточная репутация , вы сможете комментировать любую публикацию ; вместо этого дайте ответы, не требующие разъяснений от спрашивающего . - Из отзыва
Если все те изменения, которые вы пробовали ранее, не сработали, вы можете попытаться улучшить ситуацию, используя Spark API, но не используя UDF, поскольку он будет выполнять сериализацию и десериализацию данных между средой выполнения Python и JVM, и это повлияет на производительность. Фактически, вашу логику UDF можно изменить следующим образом:
df = df.withColumn(
"data_split",
func.split(func.col("data"), ",")
).withColumn(
"result",
func.struct(
func.col("data_split").getItem(0).cast(IntegerType()).alias("id"),
func.concat_ws(", ", func.slice(func.col("data_split"), 2, func.size(func.col("data_split"))-2)).cast(StringType()).alias("f1"),
func.col("data_split").getItem(-1).cast(IntegerType()).alias("id").alias("f2"),
)
).select(
"result.*"
)
Спасибо. Я попробую. Что, если функция сложнее, чем просто анализ строки? это все еще работает (из вашего опыта)?
@TripleH Это зависит от настроенной функции, например, если вам нужно использовать внешнюю библиотеку, использование UDF неизбежно. Эмпирическое правило — сначала попытаться использовать Spark API.
Я понял. Я хотел бы поделиться своим опытом ниже:
df.cache()
полезно.Можете ли вы пометить ответ как ответ, если вы это поняли?
можно подробнее, как отметить ответ? Спасибо.
Можете ли вы показать всю логику перед объединением, а также показать действие, которое запускает выполнение? Да, с точки зрения производительности это совершенно другое сравнение UDF и Spark API.