Я должен сгенерировать 3000000 файлов в качестве результата задания искры.
У меня есть два входных файла:
File 1 -> Size=3.3 Compressed, No.Of Records=13979835
File 2 -> Size=1.g Compressed, No.Of Records=6170229
Spark Job делает следующее:
Запись результата разделения DataFrame-C по столбцу2.
val DF1 = sparkSession.read.json("FILE1") // |ID |isHighway|isRamp|pvId |linkIdx|ffs |length |
val DF12 = sparkSession.read.json("FILE2") // |lId |pid |
val joinExpression = DF1.col("pvId") === DF2.col("lId")
val DFA = DF.join(tpLinkDF, joinExpression, "inner").select(col("ID").as("SCAR"), col("lId"), col("length"), col("ffs"), col("ar"), col("pid")).orderBy("linkIdx")
val DFB = DFA.select(col("SCAR"),concat_ws(",", col("lId"), col("length"),col("ffs"), col("ar"), col("pid")).as("links")).groupBy("SCAR").agg(collect_list("links").as("links"))
val DFC = DFB.select(col("SCAR"), array_join(col("links"), "\n").as("links"))
DFC.write.format("com.databricks.spark.csv").option("quote", "\u0000").partitionBy("SCAR").mode(SaveMode.Append).format("csv").save("/tmp")
Я должен сгенерировать 3000000 файлов в качестве результата задания искры.
это своего рода требование, когда другой системе необходимо прочитать эти небольшие файлы (не все, но по мере поступления запроса для этого файла каждый файл содержит некоторый идентификатор в имени файла, поэтому запрос с этим идентификатором для этого полученного файла система должна прочитать этот файл) и дать результат в реальном времени в течение 45 сек.
Звучит как катастрофа, если честно.
После запуска некоторого теста у меня появилась идея запустить это задание в пакетном режиме, например:
и так.... до
Почему это необходимо? Проблема с маленькими файлами.