Я использую разъем mongo spark 10.1.1 (spark v2.13) и пытаюсь прочитать содержимое коллекции в набор данных для обработки. Сеанс искры настроен следующим образом:
//Build Spark session
SparkSession spark = SparkSession.builder()
.master("local")
.appName("ExampleApp")
.config("spark.mongodb.input.uri", "mongodb://user:password@localhost:27017/test_db")
.config("spark.mongodb.output.uri", "mongodb://user:password@localhost:27017/test_db")
.config("spark.mongodb.input.collection", "ExampleCollection")
.getOrCreate();
Затем я пытаюсь загрузить содержимое в объект набора данных:
//Load data and infer schema
Dataset<Row> dataset = spark.read().format("mongodb").load();
Это запускает трассировку стека ниже:
com.mongodb.spark.sql.connector.exceptions.ConfigException: Missing configuration for: database
at com.mongodb.spark.sql.connector.assertions.Assertions.validateConfig(Assertions.java:69) ~[mongo-spark-connector_2.13-10.1.1.jar:na]
at com.mongodb.spark.sql.connector.config.AbstractMongoConfig.getDatabaseName(AbstractMongoConfig.java:111) ~[mongo-spark-connector_2.13-10.1.1.jar:na]
at com.mongodb.spark.sql.connector.config.ReadConfig.getDatabaseName(ReadConfig.java:45) ~[mongo-spark-connector_2.13-10.1.1.jar:na]
at com.mongodb.spark.sql.connector.config.AbstractMongoConfig.withCollection(AbstractMongoConfig.java:175) ~[mongo-spark-connector_2.13-10.1.1.jar:na]
at com.mongodb.spark.sql.connector.config.ReadConfig.withCollection(ReadConfig.java:45) ~[mongo-spark-connector_2.13-10.1.1.jar:na]
at com.mongodb.spark.sql.connector.schema.InferSchema.inferSchema(InferSchema.java:82) ~[mongo-spark-connector_2.13-10.1.1.jar:na]
at com.mongodb.spark.sql.connector.MongoTableProvider.inferSchema(MongoTableProvider.java:62) ~[mongo-spark-connector_2.13-10.1.1.jar:na]
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:90) ~[spark-sql_2.13-3.3.2.jar:3.3.2]
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:140) ~[spark-sql_2.13-3.3.2.jar:3.3.2]
at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:209) ~[spark-sql_2.13-3.3.2.jar:3.3.2]
at scala.Option.flatMap(Option.scala:283) ~[scala-library-2.13.8.jar:na]
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:207) ~[spark-sql_2.13-3.3.2.jar:3.3.2]
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171) ~[spark-sql_2.13-3.3.2.jar:3.3.2]
предполагая, что существует проблема с конфигурацией сеанса искры. Я попытался добавить имя базы данных в качестве отдельного свойства: «spark.mongodb.input.database» и удалить из uris, но выдается точно такая же ошибка.
Все другие темы в этой теме относятся к загрузке с использованием класса MongoSpark
, но это не рекомендуется для этой версии коннектора.
Вы должны изменить имена параметров.
Для чтения:
sparkSession.format("mongodb")
.option("spark.mongodb.read.database", databaseName)
.option("spark.mongodb.read.collection", collectionName)
.option("spark.mongodb.read.connection.uri", s"mongodb://$userName:$password@$host:$port")
и для записи:
sparkSession.format("mongodb")
.option("spark.mongodb.write.database", databaseName)
.option("spark.mongodb.write.collection", collectionName)
.option("spark.mongodb.write.connection.uri", s"mongodb://$userName:$password@$host:$port")
Вы правы - я все еще просматривал документацию из более старой версии.