Я пытаюсь запросить документы mongodb с помощью Pymongo и пытаюсь записать их в s3. Я хочу сохранить файлы на s3 в формате массива json. Я могу легко восстановить его обратно в монго. Запросы Mongodb создаются динамически, и у меня нет возможности сохранять документы в каком-то временном месте, а затем загружать.
Нижеприведенный фрагмент работает, но потребляет много памяти. Для копирования 1,5 ГБ данных требуется ~ 12 ГБ физической памяти.
mongo_query = {"$and": [{"ABC": school_year_id.upper()}, {"XYZ": clientid}]
plans = mongo_conn[self.database][self.collection]
plans_archivable_docs = plans.find(mongo_query)
s3_key = school_year + '/' + clientid + '/' + self.database + self.collection + '.json'
s3_client = mngarchs3.get_s3_client()
response = s3_client.put_object(ACL='private',
Bucket='xxxx-mongo-archives',
Key=s3_key,
Body=dumps(plans_archivable_docs)
)
print (response)
Есть ли какой-нибудь другой вариант с более эффективным использованием памяти?
да .. Это вроде как последний вариант. Я могу получать документы партиями и записывать их в несколько файлов.
Если вы можете передавать документы с помощью курсора, вы можете направить этот поток на S3. Попробую написать POC позже, когда у меня будет время. Есть библиотека ijson, позволяющая транслировать json






Стратегия применения для больших файлов в вашем сценарии заключается в том, чтобы загружать их порциями в корзину Amazon S3.
Вы можете использовать MultipartUpload API, доступный в библиотеке boto3. Я предлагаю этот способ, если вы хотите объединить фрагменты в один файл после загрузки.
Это объясняет, что вы можете реализовать наивного писателя, если считаете, что объединение фрагментов здесь не имеет значения. Наивный писатель, который загружает документы в вашу коллекцию кусками, может быть написан следующим образом.
Note that you can improve on this to handle errors and perform retries. You can also run the uploads in the parallel.
import json
import io
import boto3
from pymongo import MongoClient
class ObjectChunkIO:
def __init__(self, bucket, prefix):
self.bucket = bucket
self.prefix = prefix
self.iterator = self.chunk_iterator()
def next_chunk(self):
return next(self.iterator)
def chunk_iterator(self):
raise NotImplementedError
class ObjectChunkWriter(ObjectChunkIO):
def __init__(self, bucket, prefix, chunk_size=5e7):
super().__init__(bucket, prefix)
self.chunk_num = 0
# the object size in bytes. defaults to 50MB
self.chunk_size = chunk_size
def _upload_buf(self, object_data):
self.bucket.put_object(Body=object_data, Key=self.path())
def chunk_iterator(self):
while True:
buffer = io.BytesIO()
yield buffer
buffer.close()
self.chunk_num += 1
def path(self):
return '{prefix}.{0:03d}.json'.format(
self.chunk_num, prefix=self.prefix)
def write(self, cur):
with self.next_chunk() as buf:
for doc in cur:
b_doc = json.dumps(doc).encode('utf-8')
if buf.tell() + len(b_doc) > self.chunk_size:
self._upload_buf(buf.getvalue())
buf = self.next_chunk()
buf.writelines([b_doc, b'\n'])
self._upload_buf(buf.getvalue())
Это можно использовать так:
def get_collection():
db_client = MongoClient()
db = db_client.<database>
return db.<collection>
def get_bucket():
session = boto3.Session(profile_name='<profile>')
s3_resource = session.resource('s3')
return s3_resource.Bucket('<bucket>')
def upload_in_chunks(cur, bucket):
''' Uploads documents returned in a MongoDb cur in chunks to S3 bucket
Args:
- cur: pymongo.cursor.Cursor
- collection: s3.Bucket
'''
object_chunk_writer = ObjectChunkWriter(bucket,
prefix='streaming/sample-output/chunk')
object_chunk_writer.write(cur)
collection = get_collection()
cur = collection.find()
bucket = get_bucket()
upload_in_chunks(cur, bucket)
Чтение загруженных фрагментов - довольно простая часть.
class ObjectChunkReader(ObjectChunkIO):
def chunk_iterator(self):
object_chunks = self.bucket.objects.filter(Prefix=self.prefix, MaxKeys=3)
for object_summary in object_chunks:
response = object_summary.get()
body = response['Body']
chunk_object = [json.loads(line) for line in body.iter_lines()]
yield chunk_object
def __iter__(self):
for object_chunk in self.iterator:
yield object_chunk
def write_objects_to_collection(bucket, collection):
''' Writes s3 objects to a MongoDb collection
Args:
- bucket: s3.Bucket
- collection: pymongo.collection.Collection
'''
object_chunk_reader = ObjectChunkReader(bucket,
prefix='streaming/sample-output/chunk')
for object_chunk in object_chunk_reader:
collection.insert_many(object_chunk)
collection = get_collection()
bucket = get_bucket()
write_objects_to_collection(bucket, collection)
Рассматривали ли вы возможность написания отдельного объекта JSON для каждой строки, а затем чтения документа за раз / поток строки для S3 за раз?