Присоединяйтесь к двум столам по 100 тысяч, занимая более получаса

Я использую pyspark для объединения двух таблиц по 100 тыс. строк в каждой (чтобы не было искаженного соединения). Это занимает больше 30 минут, даже час, и я думаю, что здесь что-то не так. Код - это просто обычное соединение

a = b.join(c, b.id == c.id, "inner").drop(c.id)

Я много искал и пробовал, в том числе:

  • использовать больше ядер в кластере
  • заменить более крупные экземпляры в каждом узле
  • набор spark.sql.adaptive.enabled=true
  • передел и т. д.

Ни то, ни другое не работает.

Мой вопрос: если обе таблицы (объект pyspark.sql.dataframe) получены с использованием udf, имеет ли это значение? Это единственное отличие от обычного использования.

Для подготовки таблиц я использовал следующую логику udf:

def func(row):
    id = row.split(",")[0]
    f1, f2 = ", ".join(row.split(",")[1:-1]), int(row.split(",")[-1])
    return (int(id), f1, f2)

func_udf = udf(func, 
              StructType([
                StructField("id", IntegerType(), True),
                StructField("f1", StringType(), True),
                StructField("f2", IntegerType(), True)
               ]))
df = df.withColumn("Result", func_udf(col("data")))
df = df.drop(df.data).select("Result.*")

df — таблица, используемая для соединения.

Любая идея по устранению неполадок приветствуется. Спасибо.

P.S. таблица b имеет 3 столбца, а таблица c — 6 столбцов. Так что они не широкий. Кроме того, если я уменьшу размер до 10 КБ, соединение будет работать так, как ожидалось.

Можете ли вы показать всю логику перед объединением, а также показать действие, которое запускает выполнение? Да, с точки зрения производительности это совершенно другое сравнение UDF и Spark API.

Jonathan 16.08.2024 07:21
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
1
67
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

a.explain()

проверьте логический план о соединении b c

Если все те изменения, которые вы пробовали ранее, не сработали, вы можете попытаться улучшить ситуацию, используя Spark API, но не используя UDF, поскольку он будет выполнять сериализацию и десериализацию данных между средой выполнения Python и JVM, и это повлияет на производительность. Фактически, вашу логику UDF можно изменить следующим образом:

df = df.withColumn(
    "data_split", 
    func.split(func.col("data"), ",")
).withColumn(
    "result",
    func.struct(
        func.col("data_split").getItem(0).cast(IntegerType()).alias("id"),
        func.concat_ws(", ", func.slice(func.col("data_split"), 2, func.size(func.col("data_split"))-2)).cast(StringType()).alias("f1"),
        func.col("data_split").getItem(-1).cast(IntegerType()).alias("id").alias("f2"),
    )
).select(
    "result.*"
)

Спасибо. Я попробую. Что, если функция сложнее, чем просто анализ строки? это все еще работает (из вашего опыта)?

TripleH 17.08.2024 01:34

@TripleH Это зависит от настроенной функции, например, если вам нужно использовать внешнюю библиотеку, использование UDF неизбежно. Эмпирическое правило — сначала попытаться использовать Spark API.

Jonathan 19.08.2024 08:34
Ответ принят как подходящий

Я понял. Я хотел бы поделиться своим опытом ниже:

  1. В моем случае я сначала объединяю таблицы, а затем анализирую столбцы с помощью UDF. Конечно, таблица с проанализированным UDF показывает плохую производительность для присоединиться (как отметил Джонатан ниже, использование искрового API может быть лучшим вариантом).
  2. Несмотря на то, что в таблицах не так много столбцов, некоторые данные в столбцах очень большие (длинные строки). Это также влияет на производительность соединения.
  3. Кажется df.cache() полезно.
  4. Проверка логического плана также является хорошей идеей для проверки.

Можете ли вы пометить ответ как ответ, если вы это поняли?

Powers 18.08.2024 06:02

можно подробнее, как отметить ответ? Спасибо.

TripleH 19.08.2024 03:46

Другие вопросы по теме