Я пытаюсь использовать cassandra с pyspark. Я могу правильно установить удаленное подключение к Spark Server. Но на этапе чтения таблицы Кассандры у меня проблемы. Я пробовал все коннекторы datastax, я менял конфигурации Spark (ядро, память и т. д.), но не смог этого сделать. (Строки комментариев в приведенном ниже коде — это мои попытки.)
Вот мои коды Python;
import os
os.environ['JAVA_HOME'] = "C:\Program Files\Java\jdk1.8.0_271"
os.environ['HADOOP_HOME'] = "E:\etc\spark-3.0.1-bin-hadoop2.7"
os.environ['PYSPARK_DRIVER_PYTHON'] = "/usr/local/bin/python3.7"
os.environ['PYSPARK_PYTHON'] = "/usr/local/bin/python3.7"
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=XX.XX.XX.XX spark.cassandra.auth.username=username spark.cassandra.auth.password=passwd pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars .ivy2\jars\spark-cassandra-connector-driver_2.12-3.0.0-alpha2.jar pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-alpha2 pyspark-shell'
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import Row
from pyspark.sql import SQLContext
conf = SparkConf()
conf.setMaster("spark://YY.YY.YY:7077").setAppName("My app")
conf.set("spark.shuffle.service.enabled", "false")
conf.set("spark.dynamicAllocation.enabled","false")
conf.set("spark.executor.cores", "2")
conf.set("spark.executor.memory", "5g")
conf.set("spark.executor.instances", "1")
conf.set("spark.jars", "C:\\Users\\verianalizi\\.ivy2\\jars\\spark-cassandra-connector_2.12-3.0.0-beta.jar")
conf.set("spark.cassandra.connection.host","XX.XX.XX.XX")
conf.set("spark.cassandra.auth.username","username")
conf.set("spark.cassandra.auth.password","passwd")
conf.set("spark.cassandra.connection.port", "9042")
# conf.set("spark.sql.catalog.myCatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")
sc = SparkContext(conf=conf)
# sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)
# It works well until now
def load_and_get_table_df(keys_space_name, table_name):
table_df = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.option("keyspace",keys_space_name)\
.option("table",table_name)\
.load()
return table_df
movies = load_and_get_table_df("weather", "currentweatherconditions")
Ошибка, которую я получаю;
У кого-нибудь есть идеи с этим?
Это происходит потому, что вы указываете только свойство spark.jars
и указываете на одну банку. Но разъем spark cassandra зависит от количества дополнительных банок, не включенных в этот список. Вместо этого я рекомендую либо использовать spark.jars.packages
с координатой com.datastax.spark:spark-cassandra-connector_2.12:3.0.0
, либо указать в spark.jars
путь к банке сборки, в которой есть все необходимые зависимости.
Кстати, версия 3.0 была выпущена несколько месяцев назад — почему вы до сих пор используете бета-версию?
Я не мог сказать о Юпитере
Я нашел решение. Конечно, используя вашу информацию выше. conf.set("spark.executor.jars", "C:\\Users\\verianalizi\\.ivy2\\jars\\spark-cassandra-connector-assembly_2.12-3.0.0.jar") conf.set("spark.driver.extraClassPath", "C:\\Users\\verianalizi\\.ivy2\\jars\\spark-cassandra-connector-assembly_2.12-3.0.0.jar")
Спасибо, что ответили на мой вопрос. Я попробовал код ниже и аналогичные коды ниже
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 pyspark-shell'
Но результаты такие же.