У нас есть вариант использования для подготовки искрового задания, которое будет считывать данные от нескольких поставщиков, содержащие информацию о пользователях, присутствующих в произвольном порядке, и записывать их обратно в файлы в S3. Теперь условие: все данные пользователя должны быть представлены в одном файле. Существует примерно около 1 миллиона уникальных пользователей, и каждый из них имеет около 10 КБ данных, максимум. Мы думали создать не более 1000 файлов, и пусть каждый файл будет содержать около 1000 записей пользователей.
Мы используем API-интерфейс java dataframe для создания задания на искру 2.4.0. Я не могу обернуть голову вокруг, что было бы наиболее логичным способом сделать это? Должен ли я выполнять группу по операции с идентификатором пользователя, а затем каким-то образом собирать строки, если я не достигну 1000 пользователей, а затем перевернуться (если это вообще возможно) или есть какой-то лучший способ. Любая помощь или подсказка в правильном направлении очень ценятся.
Обновлять:
Следуя предложению из ответа, я продолжил работу со следующим фрагментом кода, но все же увидел, что записывается 200 файлов вместо 1000.
Properties props = PropLoader.getProps("PrepareData.properties");
SparkSession spark = SparkSession.builder().appName("prepareData").master("local[*]")
.config("fs.s3n.awsAccessKeyId", props.getProperty(Constants.S3_KEY_ID_KEY))
.config("fs.s3n.awsSecretAccessKey", props.getProperty(Constants.S3_SECERET_ACCESS_KEY)).getOrCreate();
Dataset<Row> dataSet = spark.read().option("header", true).csv(pathToRead);
dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);
spark.close();
Но вместо 1000 если я использую 100, то я вижу 100 файлов. Затем я перешел по ссылке, которой поделился @Alexandros, и следующий фрагмент кода сгенерировал более 20000 файлов в отдельных каталогах, а также время выполнения увеличилось как сумасшедшее.
dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);
Вы можете использовать перераспределение, а затем функцию объединения.
Df.repartion(user_id).coalese(1000)
Df.repartion(user_id,1000)
Первый гарантирует, что не будет пустых разделов, а во втором решении некоторые разделы могут быть пустыми.
См.: Spark SQL — разница между df.repartition и DataFrameWriter partitionBy?
https://spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/DataFrame.html#coalesce(int)
Обновлять:
Чтобы сделать эту работу
dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);
spark.sql.shuffle.partitions (по умолчанию: 200). Из-за этого он не дает 1000 файлов, но работает для 100 файлов. Чтобы заставить его работать, вам нужно будет сначала репатриировать на 1000 разделов, что будет таким же, как в подходе 2.
dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);
Я думаю, что приведенный выше код создаст один миллион файлов или более вместо 1000.
dataSet.repartition(1000, dataSet.col("idvalue")).write().parquet(pathToWrite);
Он создаст 1000 файлов, но вам нужно будет создать сопоставление между идентификаторами и файлами, прочитав каждый файл после завершения записи файлов.
Я предполагаю, что по умолчанию у нас есть 200 разделов. Используя первый упомянутый вами подход, когда мы выполняем объединение до 1000, то есть увеличиваем количество разделов, есть ли вероятность того, что данные некоторых пользователей будут разделены по разделам?
Мы увеличиваем количество разделов до 1 миллиона, а затем уменьшаем его до 1000. Я думаю, что объединение с shuffle=false по умолчанию просто объединяет существующие разделы, чтобы получить желаемое количество разделов. В вашем случае объединение создаст новый раздел путем объединения 1000 существующих разделов. Обратитесь: linkedin.com/pulse/… Также, как только вы запишете фрейм данных с помощью df.write(path), вы потеряете, какие значения находятся в каком файле, вам нужно будет создать еще одну структуру данных, чтобы отслеживать ее после завершения записи файла.
Привет, не могли бы вы взглянуть на обновление в вопросе, потому что предложение не работает. Я что-то упустил здесь?
1-й вариант обновленного раздела сработал, но с опцией --conf spark.sql.shuffle.partitions=1000
Здравствуйте @Bitswazsky, у вас есть варианты, упомянутые здесь: stackoverflow.com/questions/50775870/…