Я оптимизирую ETL Spark SQL, чтобы часто выбирать 0,1% данных из 100 миллиардов строк и таблицы размером 100 ТБ в формате паркета «event_100B» на S3.
Таблица event_100B содержит уникальный ключевой столбец EventId (32 шестнадцатеричных uuid). Запрос выбора, который необходимо оптимизировать, заключается в присоединении к набору predicate_set, предоставляющему 100 миллионов ключей EventId в качестве предиката, которые сопоставляются со случайно распределенными строками в таблице событий. Никакой шаблон кластеризации не может быть использован.
select t1.* from event_100B t1
inner join predicate_set p1 on t1.EventId = p1.EventId
Поскольку предикат состоит из набора ключей высокой мощности, охватывающих большой минимальный-максимальный диапазон, сокращение ни на уровне файла, ни на уровне группы строк не происходит. ETL тратит значительную часть времени на загрузку файлов для выполнения полного сканирования таблицы.
Ищу совет, какие форматы БД или файлов могут поддерживать эффективный пропуск данных с помощью этого типа массового запроса с произвольным доступом, а также, при необходимости, тип оборудования хранения AWS/Azure.
Предварительная идея: Назначьте все EventId в сегменты по 100 миллионов: BucketId = EventId % 1 000 000 так, чтобы каждый сегмент содержал в среднем 1000 значений EventId. Затем событие event_100B кластеризуется/сортируется по BucketId.
Столбец BucketId требует гораздо меньше операций ввода-вывода и пропускной способности сети для загрузки и присоединения к predicate_set:
select t1.* from event_100B t1
inner join predicate_set p1 on t1.BucketId = (p1.EventId % 1,000,000)
Мое ожидание: при увеличении количества сегментов вероятность попадания любого предиката EventId в сегмент уменьшается, и можно пропустить более высокий процент сегментов (и связанных с ними строк).
Вы также можете создать боковую таблицу со строкой для каждого файла паркета и массивом eventid, содержащимся внутри. Тогда первый запрос сообщит вам, какое подмножество файлов паркета нужно прочитать, и может сэкономить много операций ввода-вывода для второго запроса в основной таблице.
Если любой из подходов представляет интерес, просто скажите мне, что я напишу подробный ответ.
Столик-хорошая идея! Я думаю, что это правильное направление — простой паркет не имеет достаточных метаданных для поддержки «100 миллионов иголок в 100 миллиардах стогов сена». Мне нужно поэкспериментировать с гигантской структурой индекса, чтобы реализовать поиск случайных 1% строк, поддерживаемых мощной инфраструктурой хранилища. Если все пойдет хорошо, конечная задержка должна показать заметное улучшение по сравнению с последовательным сканированием 100% строк.
Также ищу рекомендации по распределенной столбчатой БД, подходящей для данного варианта использования в качестве замены паркета.
Похоже, вы говорите о кластерном индексе, который существует десятилетиями, но просто не существует в озерах данных.
Вы говорите как эксперт, поэтому у меня вопрос: разве разбиение уже не делает это за вас? Если вы разделяете EventId и каждый раздел (файл) имеет известное минимальное/максимальное значение eventid, разве это не поддерживает автоматическое сокращение файлов?
100 ТБ паркетных файлов легко представляют собой миллион файлов. В результате механизм запросов будет тратить много времени на получение нижних колонтитулов и, что хуже всего в вашем контексте, на сканирование данных большинства файлов.
Один из подходов к ограничению количества считываемых файлов паркета состоит в том, чтобы поддерживать боковой индекс, сообщающий для каждого файла паркета список содержащихся в них идентификаторов событий. Конечно, вы можете сохранить паркет для построения этого индекса, потому что в худшем случае это будет вопрос нескольких миллионов строк. Тогда знание того, какие файлы содержат ваши 100 тыс. событий, должно быть довольно быстрым, и, надеюсь, в конечном итоге вы в конечном итоге уменьшите количество файлов паркета для чтения.
Вы можете создать более простой дизайн, опробовав другие технологии, такие как Starrocks/Apache Doris. Они являются отличным кандидатом для вашего варианта использования, поскольку они предоставляют собственный формат, совместимый с объектным хранилищем/hdfs, со статистикой, индексацией и оптимизаторами на основе затрат, а движок C++ сильно векторизован. Кстати, я бы не стал рассматривать Trino (паркет), clickhouse (плохая поддержка соединений) или elasticsearch (плохо для получения большого количества записей).
Наконец, вы также можете рассмотреть формат столов на основе паркета. С помощью apache hudi вы можете, например, создать индекс фильтра Блума на eventid и сохранить его в таблице метаданных. Хотя индекс предназначен для более быстрого обновления, его также можно использовать для запросов на чтение с помощью rdd API. Кроме того, в Apache Iceberg ведется постоянная работа по обеспечению встроенных возможностей индексирования, но пока она еще недостаточно развита, AFAIK.
Дайте нам знать ваши окончательные выводы!
Не троллинг или что-то в этом роде, но меня просто расстраивает архитектура озера данных, потому что мы тратим так много времени на повторное изобретение технологий, которые существуют уже несколько десятилетий. В этом примере это похоже на некластеризованный индекс.
спасибо за ваше замечание, добавил некоторые мысли о формате таблиц со встроенными индексами
Делимся обновленной информацией о текущих усилиях по оптимизации. Я полностью согласен с приведенными выше ответами, в которых упоминается использование индексов/файлов для уменьшения объема сканирования.
В контексте формата файла/БД столбчатого хранилища мы определили наиболее важные факторы:
Цель состоит в том, чтобы сформировать многоуровневую структуру, подобную индексу, с кластеризованными и сжатыми значениями столбцов. Текущее направление — это гибридное решение с использованием Vertica DB для выполнения дорогостоящего сканирования с вышеупомянутыми функциями и экспорта полученных данных в виде паркета, чтобы Spark взял на себя остальную аналитику.
А как насчет создания индексов цветения паркета по eventId и превращения соединения в оператор in? Возможно, предикат 100 тыс. можно было бы превратить в небольшие партии