Как обеспечить, чтобы все данные, принадлежащие пользователю, попадали в один и тот же файл при использовании spark?

У нас есть вариант использования для подготовки искрового задания, которое будет считывать данные от нескольких поставщиков, содержащие информацию о пользователях, присутствующих в произвольном порядке, и записывать их обратно в файлы в S3. Теперь условие: все данные пользователя должны быть представлены в одном файле. Существует примерно около 1 миллиона уникальных пользователей, и каждый из них имеет около 10 КБ данных, максимум. Мы думали создать не более 1000 файлов, и пусть каждый файл будет содержать около 1000 записей пользователей.

Мы используем API-интерфейс java dataframe для создания задания на искру 2.4.0. Я не могу обернуть голову вокруг, что было бы наиболее логичным способом сделать это? Должен ли я выполнять группу по операции с идентификатором пользователя, а затем каким-то образом собирать строки, если я не достигну 1000 пользователей, а затем перевернуться (если это вообще возможно) или есть какой-то лучший способ. Любая помощь или подсказка в правильном направлении очень ценятся.

Обновлять:

Следуя предложению из ответа, я продолжил работу со следующим фрагментом кода, но все же увидел, что записывается 200 файлов вместо 1000.

Properties props = PropLoader.getProps("PrepareData.properties");
SparkSession spark = SparkSession.builder().appName("prepareData").master("local[*]")
    .config("fs.s3n.awsAccessKeyId", props.getProperty(Constants.S3_KEY_ID_KEY))
    .config("fs.s3n.awsSecretAccessKey", props.getProperty(Constants.S3_SECERET_ACCESS_KEY)).getOrCreate();

Dataset<Row> dataSet = spark.read().option("header", true).csv(pathToRead);
dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);

spark.close();

Но вместо 1000 если я использую 100, то я вижу 100 файлов. Затем я перешел по ссылке, которой поделился @Alexandros, и следующий фрагмент кода сгенерировал более 20000 файлов в отдельных каталогах, а также время выполнения увеличилось как сумасшедшее.

dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);

Здравствуйте @Bitswazsky, у вас есть варианты, упомянутые здесь: stackoverflow.com/questions/50775870/…

abiratsis 08.04.2019 02:49
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
1
132
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы можете использовать перераспределение, а затем функцию объединения.

 Df.repartion(user_id).coalese(1000)

 Df.repartion(user_id,1000)

Первый гарантирует, что не будет пустых разделов, а во втором решении некоторые разделы могут быть пустыми.

См.: Spark SQL — разница между df.repartition и DataFrameWriter partitionBy?

https://spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/DataFrame.html#coalesce(int)

Обновлять:

Чтобы сделать эту работу

dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);

spark.sql.shuffle.partitions (по умолчанию: 200). Из-за этого он не дает 1000 файлов, но работает для 100 файлов. Чтобы заставить его работать, вам нужно будет сначала репатриировать на 1000 разделов, что будет таким же, как в подходе 2.

dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);

Я думаю, что приведенный выше код создаст один миллион файлов или более вместо 1000.

dataSet.repartition(1000, dataSet.col("idvalue")).write().parquet(pathToWrite);

Он создаст 1000 файлов, но вам нужно будет создать сопоставление между идентификаторами и файлами, прочитав каждый файл после завершения записи файлов.

Я предполагаю, что по умолчанию у нас есть 200 разделов. Используя первый упомянутый вами подход, когда мы выполняем объединение до 1000, то есть увеличиваем количество разделов, есть ли вероятность того, что данные некоторых пользователей будут разделены по разделам?

Bitswazsky 08.04.2019 06:20

Мы увеличиваем количество разделов до 1 миллиона, а затем уменьшаем его до 1000. Я думаю, что объединение с shuffle=false по умолчанию просто объединяет существующие разделы, чтобы получить желаемое количество разделов. В вашем случае объединение создаст новый раздел путем объединения 1000 существующих разделов. Обратитесь: linkedin.com/pulse/… Также, как только вы запишете фрейм данных с помощью df.write(path), вы потеряете, какие значения находятся в каком файле, вам нужно будет создать еще одну структуру данных, чтобы отслеживать ее после завершения записи файла.

jaimin03 08.04.2019 09:51

Привет, не могли бы вы взглянуть на обновление в вопросе, потому что предложение не работает. Я что-то упустил здесь?

Bitswazsky 09.04.2019 13:00

1-й вариант обновленного раздела сработал, но с опцией --conf spark.sql.shuffle.partitions=1000

Bitswazsky 10.04.2019 09:33

Другие вопросы по теме

Как можно преобразовать линейный список PySpark RDD в DataFrame?
Как добавить совершенно нерелевантный столбец во фрейм данных при использовании pyspark, spark + databricks
Как отправить slurm job, используя много воркеров, а не просто работая в локальном режиме?
Создать уникальный идентификатор для комбинации пары значений из двух столбцов в фрейме данных искры
Как использовать aggregateBykey для получения списка значений для каждого ключа?
Включить метрику Spark в LucidWorks Fusion
Как разобрать JSON, содержащий строковое свойство, представляющее JSON
Фильтрация кадров данных, обусловленных несколькими столбцами, с различными условиями в зависимости от значений столбца
Динамически зацикливать набор данных для всех имен столбцов
Как получить данные второго фрейма данных для всех значений определенных значений столбцов, совпадающих в первом фрейме данных?