Как читать из S3 на PySpark локально

Я пытаюсь прочитать CSV-файлы, хранящиеся в ведре S3. У меня установлен Apache Spark 3.5.1 с Homebrew. Я скачал коннектор Hadoop AWS и скопировал его в /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/jars.

Затем, используя следующий код, я пытаюсь прочитать CSV-файлы из S3:

import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Base Spark Template").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df2 = spark.read.csv("s3://arapbi/polygon/tickers/", header=True)

Это не удается с

Py4JJavaError: An error occurred while calling o40.csv.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
    at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)

Первая строка этой трассировки ошибок является важной; Spark не распознает S3.

Однако я понимаю, что Spark должен иметь возможность распознавать S3 на основе загруженного мной соединителя и файла jar, который я скопировал в папку Spark Jars, когда Spark устанавливается через Homebrew.

Я ошибаюсь, какой файл Jar использовать или как это вообще настроить? Я выполнил те же действия с соединителем Google Storage, и он работал правильно.

Я искал в Google и искал Stack Overflow, но безрезультатно. Я дополню вопрос ответом, если найду его, но если кому-то удалось настроить PySpark, установленный Brew, для подключения к S3, пожалуйста, сообщите остальным, как это сделать!

ОБНОВЛЕНИЕ 04.04.2024

Согласно ответу Стива, я изменил df2 = spark.read.csv("s3://arapbi/polygon/tickers/", header=True) на df2 = spark.read.csv("s3a://arapbi/polygon/tickers/", header=True). Это изменило источник ошибки в Stacktrace (см. ниже), но все равно не сработало.

Py4JJavaError: An error occurred while calling o68.csv.
: java.lang.NoClassDefFoundError: software/amazon/awssdk/core/exception/SdkException
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:467)
    at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2625)
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2590)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.ClassNotFoundException: software.amazon.awssdk.core.exception.SdkException
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    ... 31 more
blog.revolve.team/2023/05/02/… это может помочь
user238607 03.04.2024 20:17

Попробовал то, что описал автор, не помогло...

Evan Volgas 04.04.2024 00:54

Также можете ли вы опубликовать шаги, которые вы предпринимаете для запуска приведенного выше сценария? Вы запускаете его прямо из PyCharm? Вы установили пакет pyspark в свою виртуальную среду или среду Python?

user238607 05.04.2024 13:18
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
3
597
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

Сначала вам необходимо обновить jar-файлы Hadoop с v3.4.0 на v3.3.4 jar-файлы Spark 3.5.1, поскольку они скомпилированы с Hadoop v3.3.4, как описано в исходном коде .

Во-вторых, обновите схему s3 URI на схему s3a URI, поскольку Hadoop поддерживает только s3a клиент. Попробуйте установить приведенную ниже конфигурацию в свой код.

spark = SparkSession.builder.appName("Base Spark Template").getOrCreate()
    
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain") # Change it according to your auth mechanism

df2 = spark.read.csv("s3a://arapbi/polygon/tickers/", header=True)

Я попробовал это, и это не сработало...

Evan Volgas 04.04.2024 00:54

Можете ли вы поделиться ошибкой?

Nishu Tayal 04.04.2024 10:20

это не работает, потому что все рекомендации неверны

stevel 04.04.2024 15:44

Та же ошибка, что и раньше @NishuTayal

Evan Volgas 04.04.2024 20:52

Ну, я использую тот же код со Spark 3.5.1. Да, вам необходимо убедиться, что все необходимые файлы jar находятся на месте и добавлены в путь к классам. (hadoop-aws-3.3.4.jar, Hadoop-client-api-3.3.4.jar, Hadoop-client-runtime-3.3.4.jar, Hadoop-shaded-guava-1.1.1.jar)

Nishu Tayal 05.04.2024 08:25

Я позаботился о том, чтобы такие же банки были в наличии. Я получаю An error occurred while calling o50.csv. : org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3". Это так сбивает с толку

Evan Volgas 05.04.2024 15:46

Нишу, ты был абсолютно прав, и проблема была в том, что я использовал hadoop-aws-3.4.0.jar вместо hadoop-aws-3.3.4.jar. Большое спасибо! (Также обратите внимание, что префикс s3 должен быть s3a, а не s3, как указывали другие.

Evan Volgas 06.04.2024 21:28

@Эван: Приятно это слышать. Также хочу добавить: Spark 3.5.1 скомпилирован с версией Hadoop 3.3.4, как описано в зависимостях pom.xml. Вот почему вам не подошли банки Hadoop v3.4.0.

Nishu Tayal 07.04.2024 00:13

Нишу, я думаю, ты упускаешь суть. Все эти параметры HadoopConfiguration либо необходимы (fs.s3a.impl;, либо в корне неверны. «com.amazonaws.services.s3.enableV4» было системным свойством, необходимым Hadoop 2.6 для взаимодействия с регионами aws, которые поддерживают только подпись v4. s3a коннектору это не нужно, а если и нужно, то конфигурация Hadoop не подходит для его размещения. Пожалуйста, прочитайте документацию Hadoop s3a, на написание которой я приложил много усилий, а не просто верьте комментариям других людей о переполнении стека.

stevel 09.04.2024 23:19

Apache Hadoop использует разъем s3a, а не EMR.

  • Все URL-адреса s3 должны использовать s3a: в качестве префикса.
  • есть много сообщений SO о правильном выборе путей к классам. Все ваши jar-файлы Hadoop-* должны быть синхронизированы, и вам нужна та же версия aws sdk, с помощью которой он был создан. Если это не работает: переполнение стека поиска.

Я обновил свой вопрос выше, указав разницу в трассировках стека при использовании s3 и s3a. Я не уверен, что понимаю: «Вам нужна та же версия aws sdk, с которой он был создан». Что именно означает «оно»? Кроме того, является ли aws sdk дополнительным jar-файлом, который необходимо добавить в папку Spark jars в дополнение к hadoop-aws-3.4.0.jar? Может быть, проблема в том, что я не включаю все нужные банки?

Evan Volgas 05.04.2024 02:25

Причина в версии Hadoop. Spark 3.5.1 скомпилирован с Hadoop v3.3.4, а OP использовал Hadoop v3.4.0. Следовательно, это дает ошибку.

Nishu Tayal 07.04.2024 00:15

да, я ведь сказал именно это, не так ли?

stevel 09.04.2024 23:20

Согласно обновленному вопросу, недавний выпуск AWS emr emr-7.0.0

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-7.x.html

Он использует последнюю версию Spark: Spark 3.5.0. Для этой версии Spark используется версия AWS Java SDK: 1.12.569

Эти данные приведены на вышеуказанной веб-странице.

Поэтому ОЧЕНЬ ВАЖНО использовать версию Spark 3.5.0, потому что jar-файлы AWS созданы с использованием этой версии Spark.

Чтобы получить все зависимости и т. д., загрузите полный пакет jar снизу.

https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle/1.12.569

Вы можете скачать пакет вручную в разделе Files..

Files   pom (5 KB)  jar (331.7 MB)  View All

После загрузки пакета поместите банку в

/python3.X/site-packages/pyspark/jars

Прочтите этот ответ для более подробной информации: https://stackoverflow.com/a/51537182/3238085

Дополнительные шаги по устранению неполадок здесь:

https://codelovingyogi.medium.com/pyspark-connect-to-aws-s3a-filesystem-82bee54e0812

Я сделал, как вы сказали, и до сих пор получаю An error occurred while calling o40.csv. : java.lang.NoClassDefFoundError: software/amazon/awssdk/core/exception/SdkException. Судя по ответу @Nishu, я также убедился, что у меня есть все hadoop-aws-3.3.4.jar, hadoop-client-api-3.3.4.jar, hadoop-client-runtime-3.3.4.jar, hadoop-shaded-guava-1.1.1.jar

Evan Volgas 05.04.2024 15:44

как вы запускаете скрипт pyspark? из Пичарма?

user238607 05.04.2024 15:52

iПитон. Имеет ли это значение?

Evan Volgas 05.04.2024 15:54

ты сделал pip install pyspark?

user238607 05.04.2024 15:55

Я установил его через Homebrew, как я уже отмечал в своем вопросе, и запустил его через pyspark в командной строке. Мой zsh настроен с... 135 export SPARK_HOME=/opt/homebrew/Cellar/apache-spark/3.5.1/libexec 136 export PATH=/opt/homebrew/Cellar/apache-spark/3.5.1/bin:$PATH 137 export PYSPARK_PYTHON=/usr/local/bin/python3 138 export PYSPARK_DRIVER_PYTHON=ipython

Evan Volgas 05.04.2024 16:08

Можете ли вы найти каталог jars pyspark в вашей среде Python? Поскольку pyspark является самодостаточной установкой, установка Apache-Spark не имеет значения.

user238607 05.04.2024 16:25

Я никогда не устанавливал pyspark. Я использовал Homebrew. Баночки находятся по адресу /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/jars. Я успешно разместил в этом месте банки BigQuery и Google Storage и смог запросить оба. Кажется, мне где-то не хватает jar-файла для AWS, но я не могу понять, где он находится.

Evan Volgas 05.04.2024 16:34

ок, давай попробуем pyspark --packages "com.amazonaws:aws-java-sdk-bundle:1.12.569" app.py . Прежде чем сделать это, установите pyspark 3.5.0, поскольку SDK работает только с 3.5.0. И у тебя есть apache-spark/3.5.1/

user238607 05.04.2024 16:44

все мои банки pyspark расположены здесь /conda_envs/pyspark-example/lib/python3.11/site-packages/pys‌​park/jars на машине с Linux. MacOS pyspark может иметь аналогичную структуру.

user238607 05.04.2024 17:44

Проблема в том, что Spark 3.5.0 больше недоступен для скачивания spark.apache.org/downloads.html

Evan Volgas 06.04.2024 17:17

Кажется, вы используете pyspark из каталога apache-spark/bin или его эквивалент на Mac. Но вы можете напрямую установить pyspark в свою среду Python. pip install pyspark==3.5.0 . Это установит эту конкретную версию. Версия по-прежнему доступна в архиве. archive.apache.org/dist/spark

user238607 06.04.2024 17:27

Я попробовал использовать Spark 3.5.0 (дважды проверил через spark.version в Ipython), но все равно получаю ту же ошибку software/amazon/awssdk/core/exception/SdkException, что и раньше. Я скопировал все соответствующие jar-файлы в копию Spark в виртуальной среде, а также запустил brew uninstall apache-spark, просто чтобы быть уверенным, что работаю с Spark 3.5.0. Как и раньше, банки Google работают нормально. Банки AWS не позволяют мне подключаться к s3.

Evan Volgas 06.04.2024 20:50

Я только что исправил это. (Последняя) проблема заключалась в том, что я использовал неправильный файл jar. Мне нужно было использовать hadoop-aws-3.3.4.jar вместо hadoop-aws-3.4.0.jar. Я продолжаю тестировать это, чтобы увидеть, работает ли оно со Spark 3.5.1.

Evan Volgas 06.04.2024 21:13

Все работает, даже Spark 3.5.1, как результат использования hadoop-aws-3.3.4.jar.

Evan Volgas 06.04.2024 21:28

Другие вопросы по теме