Курсор монго s3 putObject

Я пытаюсь запросить документы mongodb с помощью Pymongo и пытаюсь записать их в s3. Я хочу сохранить файлы на s3 в формате массива json. Я могу легко восстановить его обратно в монго. Запросы Mongodb создаются динамически, и у меня нет возможности сохранять документы в каком-то временном месте, а затем загружать.

Нижеприведенный фрагмент работает, но потребляет много памяти. Для копирования 1,5 ГБ данных требуется ~ 12 ГБ физической памяти.

mongo_query = {"$and": [{"ABC": school_year_id.upper()}, {"XYZ": clientid}]
        plans = mongo_conn[self.database][self.collection]
        plans_archivable_docs = plans.find(mongo_query)
        s3_key = school_year + '/' + clientid + '/' + self.database + self.collection + '.json'
        s3_client = mngarchs3.get_s3_client()
        response = s3_client.put_object(ACL='private',
                                        Bucket='xxxx-mongo-archives',
                                        Key=s3_key,
                                        Body=dumps(plans_archivable_docs)
                                        )
        print (response)

Есть ли какой-нибудь другой вариант с более эффективным использованием памяти?

Рассматривали ли вы возможность написания отдельного объекта JSON для каждой строки, а затем чтения документа за раз / поток строки для S3 за раз?

Llama 18.04.2018 03:33

да .. Это вроде как последний вариант. Я могу получать документы партиями и записывать их в несколько файлов.

Sanket Shah 20.04.2018 06:09

Если вы можете передавать документы с помощью курсора, вы можете направить этот поток на S3. Попробую написать POC позже, когда у меня будет время. Есть библиотека ijson, позволяющая транслировать json

Oluwafemi Sule 20.04.2018 08:02
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
3
502
1

Ответы 1

Стратегия применения для больших файлов в вашем сценарии заключается в том, чтобы загружать их порциями в корзину Amazon S3. Вы можете использовать MultipartUpload API, доступный в библиотеке boto3. Я предлагаю этот способ, если вы хотите объединить фрагменты в один файл после загрузки.

Это объясняет, что вы можете реализовать наивного писателя, если считаете, что объединение фрагментов здесь не имеет значения. Наивный писатель, который загружает документы в вашу коллекцию кусками, может быть написан следующим образом.

Note that you can improve on this to handle errors and perform retries. You can also run the uploads in the parallel.

import json
import io
import boto3
from pymongo import MongoClient

class ObjectChunkIO:

    def __init__(self, bucket, prefix):
        self.bucket = bucket
        self.prefix = prefix
        self.iterator = self.chunk_iterator()

    def next_chunk(self):
        return next(self.iterator)

    def chunk_iterator(self):
        raise NotImplementedError 

Выгрузка коллекции MongoDb в виде фрагментов в корзину Amazon S3

class ObjectChunkWriter(ObjectChunkIO):

    def __init__(self, bucket, prefix, chunk_size=5e7):
        super().__init__(bucket, prefix)
        self.chunk_num = 0
        # the object size in bytes. defaults to 50MB
        self.chunk_size = chunk_size

    def _upload_buf(self, object_data):
        self.bucket.put_object(Body=object_data, Key=self.path())

    def chunk_iterator(self):
        while True:
            buffer = io.BytesIO()

            yield buffer

            buffer.close()

            self.chunk_num += 1

    def path(self):
        return '{prefix}.{0:03d}.json'.format(
            self.chunk_num, prefix=self.prefix)

    def write(self, cur):
        with self.next_chunk() as buf:

            for doc in cur:
                b_doc = json.dumps(doc).encode('utf-8')

                if buf.tell() + len(b_doc) > self.chunk_size:
                    self._upload_buf(buf.getvalue())
                    buf = self.next_chunk()

                buf.writelines([b_doc, b'\n'])

            self._upload_buf(buf.getvalue())

Это можно использовать так:

def get_collection():
    db_client = MongoClient()
    db = db_client.<database>

    return db.<collection>


def get_bucket():
    session = boto3.Session(profile_name='<profile>')
    s3_resource = session.resource('s3')

    return s3_resource.Bucket('<bucket>')

def upload_in_chunks(cur, bucket):
    ''' Uploads documents returned in a MongoDb cur in chunks to S3 bucket
        Args:
            - cur: pymongo.cursor.Cursor
            - collection: s3.Bucket
    '''

    object_chunk_writer = ObjectChunkWriter(bucket,
                                            prefix='streaming/sample-output/chunk')

    object_chunk_writer.write(cur)


collection = get_collection()
cur = collection.find()
bucket = get_bucket()

upload_in_chunks(cur, bucket)

Чтение загруженных фрагментов в коллекцию MongoDb

Чтение загруженных фрагментов - довольно простая часть.

class ObjectChunkReader(ObjectChunkIO):

    def chunk_iterator(self):
        object_chunks = self.bucket.objects.filter(Prefix=self.prefix, MaxKeys=3)

        for object_summary in object_chunks:
            response = object_summary.get()
            body = response['Body']
            chunk_object = [json.loads(line) for line in body.iter_lines()]

            yield chunk_object

    def __iter__(self):
        for object_chunk in self.iterator:
            yield object_chunk

def write_objects_to_collection(bucket, collection):
    ''' Writes s3 objects to a MongoDb collection
        Args:
            - bucket: s3.Bucket
            - collection: pymongo.collection.Collection
    '''

    object_chunk_reader = ObjectChunkReader(bucket,
                                            prefix='streaming/sample-output/chunk')

    for object_chunk in object_chunk_reader:
        collection.insert_many(object_chunk)


collection = get_collection()
bucket = get_bucket()
write_objects_to_collection(bucket, collection)

Другие вопросы по теме