Мне нужно было бы добавить условие фильтра при приеме данных из Cosmos Mongo DB с использованием Databricks,
Я использую следующий запрос для получения данных Cosmos Collection:
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.load()
Как я могу добавить сюда фильтр, чтобы выбрать только выбранные данные? Например: Я хочу получать данные только там, где
{"type" : "student"}
Я был бы очень признателен, если бы кто-нибудь мог помочь в этом
Я попытался выполнить следующий запрос, но получил ошибку, как показано ниже:
query = {"type" : "student"}
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.option('pipeline', json.dumps(query)) \
.load()
Ошибка: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 16.0 завершилась неудачно 4 раза, последний сбой: потеряна задача 0.3 на этапе 16.0 (TID 34) (10.139.64.5 исполнитель 0): com.mongodb. MongoCommandException: выполнение команды завершилось ошибкой 40324 (40324): «Неопознанное имя стадии конвейера: тип» на сервере xxxxxxx-xxxxx.mongo.cosmos.azure.com:10255. Полный ответ: {"ok": 0.0, "errmsg": "Неопознанное имя этапа конвейера: тип", "code": 40324, "codeName": "40324"}
Сообщение об ошибке означает, что имя этапа в запросе конвейера агрегации не было распознано. Решение будет состоять в том, чтобы убедиться, что все имена конвейеров агрегации допустимы в вашем запросе.
В этой статье описаны распространенные ошибки и решения для развертываний с использованием Azure Cosmos DB для MongoDB.
Попробуйте это и, пожалуйста, дайте мне знать, если это сработает: -
query = {'$match': { 'type':'student' }}
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.option('pipeline', query) \
.load()
Спасибо за ответ, выражение соответствия работает как положено