У меня есть две таблицы; продаж и клиентов. Основная запрашиваемая таблица — продажи, но иногда нам нужно будет получить данные о конкретном покупателе и получить подробную информацию о нем, поэтому нам придется присоединиться к клиентам в таблице продаж. Поэтому для таблицы с пользователями основным столбцом фильтра будет USER_ID. Я хотел разделить таблицу на основе user_id, но Databricks создает файл раздела для каждого из идентификаторов пользователей. Что я хочу сделать, так это разделить таблицу на файлы, в которых есть несколько идентификаторов пользователей, сохраненных последовательно, поэтому, например, в разделе 1 будут пользователи с идентификатором 1-1000, в разделе 2 будут пользователи с идентификатором 1001-2000 и так далее.
У меня такая же проблема с разделением по дате, так как он создает файл раздела на каждый день, но я бы хотел, чтобы он сохранялся, например, в 5-дневном диапазоне.
Есть ли способ хранить разделы в диапазоне внутри столбца? И как повлиять на то, сколько таких разделов будет создано?
До сих пор я использовал df.write.partitionBy(‘column_name’).parquet(‘location’), и это создало проблему, описанную выше.
Я сделал что-то подобное, сгенерировав значение для разделения из рассматриваемых идентификаторов. Если это числовые идентификаторы, вы можете МОДУЛИРОВАТЬ их по сравнению с количеством желаемых разделов, которые вы хотите собрать, и если они являются достаточно случайными числами или даже последовательными числами, они должны довольно хорошо уменьшить перекос...
Здравствуйте, большое спасибо за ответ, и мой вопрос такой же, как и в комментарии ниже :) Просто чтобы уточнить, будут ли приведенные выше запросы по-прежнему эффективны для соединений в исходном столбце? Допустим, я хочу сделать sales.user_id = users.user_id > как databrics узнает, какие файлы пропускать, если мы создали раздел на основе производного столбца (модуль над user_id), а не сам user_id?
Вы можете настроить способ создания разделов, указав схему разделения. Вместо использования секционирования по умолчанию на основе различных значений в столбце вы можете определить собственную функцию секционирования, которая группирует данные на основе диапазона значений в столбце.
Ниже приведен пример того, как вы можете разделить таблицу продаж на основе диапазонов USER_ID
:
from pyspark.sql.functions import *
def user_id_range_partition(user_id):
return floor(user_id / 1000)
sales.write.partitionBy(user_id_range_partition('USER_ID')).parquet('location')
Здесь функция user_id_range_partition
принимает значение USER_ID
и возвращает floor
деление значения на 1000, что группирует идентификаторы USER_ID в диапазоны из 1000. Например, идентификаторы USER_ID 1–1000 будут в разделе 0, идентификаторы USER_ID 1001–2000 будут в разделе 1. , и так далее.
Аналогично, то же самое можно сделать и для дат -
from pyspark.sql.functions import *
# Define the partitioning function that groups dates into ranges of 5 days
def date_range_partition(date_col):
start_date = to_date(lit('2022-01-01')) # define your own start date
days_since_start = floor((date_col - start_date).cast('int') / 5) * 5
return date_add(start_date, days_since_start)
# Partition the sales table based on date_range_partition function
sales.withColumn('sale_date_range', date_range_partition('SALE_DATE')).write.partitionBy('sale_date_range').parquet('location')
Кроме того, вы также можете использовать bucketBy
. Принцип его работы заключается в том, что он распределяет данные по фиксированному количеству сегментов на основе хеш-значения указанного столбца. Это может быть полезно для равномерного распределения данных между фиксированным числом файлов, при этом обеспечивая эффективную фильтрацию на основе столбца, используемого для группировки. Например, вы можете использовать bucketBy
для распределения sales
данных между 10 сегментами на основе столбца USER_ID
—
from pyspark.sql.functions import floor
# Define the number of buckets and the bucketing column
num_buckets = 10
bucket_column = 'USER_ID'
# Define the bucketing function that hashes USER_ID into one of 10 buckets
def bucket_user_id(user_id):
return user_id % num_buckets
# Bucket the sales table based on the bucket_user_id function and the bucket_column
sales.write.bucketBy(num_buckets, bucket_column, {'numBuckets': num_buckets}).parquet('location')
Здравствуйте, спасибо за ответ! Просто чтобы уточнить, будут ли приведенные выше запросы эффективны для соединений в исходном столбце? Допустим, я хочу сделать sales.user_id = users.user_id > как databrics узнает, какие файлы пропускать, если мы создали раздел на основе производного столбца (например, floor(user_id / 1000), а не сам user_id ?
Да, это было бы. Когда вы присоединяетесь к таблицам sales
и users
в USER_ID
, Databricks будет использовать секционирование таблицы sales
для эффективного пропуска ненужных секций. Подумайте об этом так: если вы присоедините sales
и users
к USER_ID
, а таблица users
содержит значения 1200 и 2200, блоки данных будут сканировать только разделы, содержащие диапазон идентификаторов USER_ID 1001-2000 и 2001-3000, и будут использовать раздел сокращение, чтобы пропустить разделы, которые не содержат совпадающих данных, при выполнении операции соединения.
думаю, вы можете использовать что-то вроде repartitionByRange. Но это может не соответствовать вашим наборам переменных данных df.repartitionByRange(2, «column») здесь, 2 — это количество разделов, на которые вы хотите разделить dataFrame, а «column» — это имя столбца, который вы хотите используйте, чтобы разделить раздел