В PyMongo 3.7.2 я пытаюсь прочитать коллекцию порциями, используя batch_size для курсора MongoDB, как описано здесь. Основная идея заключается в использовании метода find() для объекта коллекции с параметром batch_size. Но что бы я ни пытался, курсор всегда возвращает все документы в моей коллекции.
Базовый фрагмент моего кода выглядит так (в коллекции более 10 тысяч документов):
import pymongo as pm
client = pm.MongoClient()
coll = client.get_database('db').get_collection('coll')
cur = coll.find({}, batch_size=500)
Однако курсор всегда сразу же возвращает полный размер коллекции. Я использую его, как описано в документах.
Кто-нибудь знает, как правильно перебирать коллекцию партиями? Есть способы зациклиться на выводе метода find(), но при этом сначала будет получена полная коллекция, и будут зацикливаться только уже извлеченные документы в памяти. Предполагается, что параметр batch_size получает пакет и каждый раз совершает обход на сервер, чтобы сэкономить место в памяти.






У Pymongo есть помощники для класса Cursor, поэтому он автоматически выполнит пакетную обработку для вас и вернет вам результат в виде документов.
Параметр batch_size установлен, но идея в том, что вам нужно установить его только в методе find(), и вам не нужно выполнять ручные низкоуровневые вызовы или повторять пакеты.
Например, если у меня в коллекции 100 документов:
> db.test.count()
100
Затем я установил уровень профилирования для регистрации всех запросов:
> db.setProfilingLevel(0,-1)
{
"was": 0,
"slowms": 100,
"sampleRate": 1,
"ok": 1,
...
Затем я использую pymongo, чтобы указать batch_size из 10:
import pymongo
import bson
conn = pymongo.MongoClient()
cur = conn.test.test.find({}, {'txt':0}, batch_size=10)
print(list(cur))
Запустив этот запрос, я вижу в журнале MongoDB:
2019-02-22T15:03:54.522+1100 I COMMAND [conn702] command test.test command: find { find: "test", filter: {} ....
2019-02-22T15:03:54.523+1100 I COMMAND [conn702] command test.test command: getMore { getMore: 266777378048, collection: "test", batchSize: 10, ....
(getMore repeated 9 more times)
Таким образом, запрос был получен с сервера в указанных пакетах. Это просто скрыто от вас через класс Cursor.
Редактировать
Если вам В самом деле нужно получить документы пакетами, в разделе Сбор (ссылка на документ) есть функция find_raw_batches(). Этот метод работает аналогично find() и принимает те же параметры. Однако имейте в виду, что он вернет необработанный BSON, который необходимо будет декодировать приложением на отдельном шаге. Примечательно, что этот метод не поддерживает сессии.
Сказав это, если цель состоит в том, чтобы снизить использование памяти приложением, стоит подумать об изменении запроса, чтобы вместо этого он использовал диапазоны. Например:
find({'$gte': <some criteria>, '$lte': <some other criteria>})
Запросы диапазона легче оптимизировать, они могут использовать индексы и (на мой взгляд) их легче отлаживать и легче перезапускать, если запрос прерывается. Это менее гибко при использовании пакетов, когда вам нужно перезапустить запрос с нуля и снова просмотреть все пакеты, если он будет прерван.
Я обновил ответ. Надеюсь, это ответит на ваши опасения.
Вот как я это делаю, это помогает разбить данные на части, но я подумал, что будет более простой способ сделать это. Я создал функцию yield_rows, которая получает генерируемые и выдающие чанки, она обеспечивает удаление использованных чанков.
import pymongo as pm
CHUNK_SIZE = 500
client = pm.MongoClient()
coll = client.get_database('db').get_collection('coll')
cursor = coll.find({}, batch_size=CHUNK_SIZE)
def yield_rows(cursor, chunk_size):
"""
Generator to yield chunks from cursor
:param cursor:
:param chunk_size:
:return:
"""
chunk = []
for i, row in enumerate(cursor):
if i % chunk_size == 0 and i > 0:
yield chunk
del chunk[:]
chunk.append(row)
yield chunk
chunks = yield_rows(cursor, CHUNK_SIZE)
for chunk in chunks:
# do processing here
pass
Если я найду более чистый и эффективный способ сделать это, я обновлю свой ответ.
Спасибо, это приятно знать. Но это означает, что, несмотря на то, что документы извлекаются пакетами, он запускает все пакеты, затем возвращает вывод, а затем у меня есть объект со всеми документами вместе? Я пытался получить партию из монго, обработать эту партию в коде, например записать ее в файл, и только после этого получить следующую партию из монго. Таким образом, на каждой итерации в памяти будет только один размер пакета.