У меня есть небольшой CSV-файл со словарем округов, который я хочу использовать в большом искровом задании. Я загружаю этот файл с помощью опции --files искровой отправки:
spark-submit --master yarn <some config stuff> --files /var/lib/data/airflow/dags/other/path/log4j.properties,/var/lib/data/airflow/dags/other/path/countries.csv,/var/lib/data/airflow/dags/other/path/gdpr.csv
Далее я вижу в логах следующее:
[2024-06-14 19:27:45.980+0300] INFO org.apache.spark.deploy.yarn.Client: Uploading resource file:/var/lib/data/airflow/dags/other/path/countries.csv -> hdfs://hdfs/user/user1/.sparkStaging/application_1718353091108_5460/countries.csv
[2024-06-14 19:27:46,054] {spark_submit.py:502} INFO - Identified spark driver id: application_1718353091108_5460
[2024-06-14 19:27:46,055] {spark_submit.py:526} INFO - [2024-06-14 19:27:46.054+0300] INFO org.apache.spark.deploy.yarn.Client: Uploading resource file:/var/lib/data/airflow/dags/other/path/gdpr.csv -> hdfs://hdfs/user/user1/.sparkStaging/application_1718353091108_5460/gdpr.csv
[2024-06-14 19:27:46,249] {spark_submit.py:502} INFO - Identified spark driver id: application_1718353091108_5460
Но когда я пытаюсь прочитать этот файл в искровом задании (file = 'countries.csv' или 'gdpr.csv'):
val localPath = SparkFiles.get(file)
val localFile = new File(localPath)
spark.read.format(format).load(s"file://$localPath").createTempView(name)
Я получаю ошибку:
[2024-06-14 19:37:30,411] {spark_submit.py:526} INFO - diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: Path does not exist: file:/var/lib/hadoop/data4/nodemanager/cache/nm-local-dir/usercache/user1/appcache/application_1718353091108_5460/spark-2f967a42-74b3-421f-9139-43b3e999d5db/userFiles-6319505f-af66-402f-85b5-86c92518817e/countries.csv;
Как мне найти и загрузить эти файлы?
Я бы сказал передать localPath
в load()
как есть, без интерполяции строк, без каких-либо переносов/расчетов.
Отвечает ли это на ваш вопрос? Читать файлы, отправленные драйвером с помощью искры
Сделал это через загрузку в HDFS. Вот рабочий пример:
import org.apache.hadoop.fs.Path
//tmp: org.apache.hadoop.fs.Path - temporary folder
val from = new File(file).getAbsoluteFile.toPath
val to = tmp + file
fc.upload(from, to)
spark.read.format(format).load(to.asStr)
Также возможно, загрузив локальный файл и spark.createDataFrame
почему ты это делаешь
val localFile = new File(localPath)
?