Скопируйте данные из Postgres DB в AWS S3 за один шаг

У меня есть вариант использования для переноса выбранных данных из Postgres в Amazon S3. Это должно произойти за один шаг. Я пишу программу на Java, чтобы это сделать.

Я нашел способ скопировать данные за 2 шага. Я использую библиотеку CopyManager и метод copyOut для получения данных в моем локальном хранилище. После этого я перемещаю тот же файл в S3 с помощью Java.

код postgres для получения данных в моем локальном

CopyManager copyManager = new CopyManager((BaseConnection) con);
FileWriter fileWriter = new FileWriter("file.csv");
copyManager.copyOut("COPY (SELECT stmt) TO STDOUT WITH DELIMITER '\t' CSV HEADER", fileWriter);

Код AWS для перехода с локального на S3

AmazonS3 conn = new AmazonS3Client(credentials);
conn.setEndpoint("xxx.com");
conn.putObject(
            bucket1.getName(),
            "request.json",
            new File("file.csv")
    );

Я ожидаю, что это произойдет за один раз, вместо того, чтобы записывать в файл и затем перемещать файл на S3.

Если вы готовы сделать это на python вместо java, у меня есть пример, который, я думаю, сработает (экспорт в CSV в памяти, прямая загрузка в S3, не оставляя постоянного файла в ОС). Дайте мне знать, если вам интересно, я напишу ответ с примером и как это работает.

Adam Bethke 27.12.2018 20:23

@AdamBethke Можно ли это сделать, ограничив объем памяти и запустив цикл для добавления этих буферизованных данных в файл в ОС? В противном случае с данными, которые у нас есть, мы попали бы в исключение памяти.

Nav_cfc 07.01.2019 12:31

Понял, что мой первоначальный комментарий был немного неправильным - его нет в памяти, но это временный файл (управляемый процессом) на диске. Я разместил это как ответ; Я использую аналогичный процесс для частой передачи дампа размером 6 ГБ, и он не использует значительный объем памяти. Если это не то, что вы ищете, дайте мне знать, и я удалю ответ.

Adam Bethke 08.01.2019 14:05

По сути, он делает то, что я пытаюсь сделать. Вместо того, чтобы хранить его в явно указанном месте, создается временный файл, в него записываются данные, затем этот же файл перемещается на S3, а затем временный файл удаляется. Значит, это не имеет отношения к памяти?

Nav_cfc 08.01.2019 17:06

Да, это правильно. У него будут некоторые накладные расходы на память, потому что он генерирует процесс, но он должен быть минимальным.

Adam Bethke 08.01.2019 19:26

@AdamBethke Я пытался запустить ваш код. Он очень похож на тот, что у меня есть. Временный файл будет иметь случайное расположение, и я указал его явно. Таким образом, ваш код занимает почти столько же времени, что и мой. У вас есть идеи, чтобы он был в памяти для размера буфера?

Nav_cfc 10.01.2019 09:39

К сожалению, если я правильно понимаю, я не уверен, как вы это сделаете. Основываясь на ваших проблемах с попаданием в исключение нехватки памяти, я думаю, вы, вероятно, попали в точку, где вам нужно либо перейти на экземпляр с большим объемом памяти, чтобы быть в порядке со скоростью. или разработать решение для потоковой передачи. В определенный момент для передачи большого количества данных потребуется приличное количество времени, и выбор изменится с «могу ли я сделать это быстрее» на «как мне оптимизировать способ / что еще мы блокируем». .

Adam Bethke 10.01.2019 12:32
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
4
7
3 683
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Я не пробовал этого, но думаю, у вас должно получиться.

Вместо того, чтобы передавать FileWriter в copyOut(), вы можете передать любой OutputStream. Вы также можете предоставить InputStream методу putObject, а не файл.

Итак, вам просто нужно преобразовать ваш OutputStream в InputStream, для чего существует ряд методов (например, см. эта почта), или вы можете использовать что-то вроде Библиотека EasyStream.

Вы должны использовать PutObject с InputStream.

Вы можете использовать PipedOutputStream и PipedInputStream для перенаправления вывода на ввод putObject

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);

Используйте in в качестве аргумента для PutObject и начните запись в out в другом потоке.

Можете ли вы помочь мне с кодом для putobject и pipedinputstream с двумя потоками. Я попытался использовать pipedstream, но у меня возникли трудности с потоками при огромных объемах данных. Трудности чтения и записи, поскольку выходной поток использовался для закрытия, когда входной поток был запущен. Небольшой фрагмент кода был бы полезен.

Nav_cfc 21.12.2018 20:10

Закрытие PipedOutputStrem не должно влиять на связанный PipedInputStream, вам нужно будет явно вызвать close для них обоих. С другой стороны, я не думаю, что даже использование методов InputStream поможет, поскольку это позволяет избежать полного чтения файла в памяти перед его отправкой на S3, однако даже потоковым методам в S3 API нужен content-length в ObjectMetaData, иначе он считывает весь поток в памяти. Забавно то, что для получения длины контента нам нужно прочитать весь поток. Это меня так сильно беспокоит.

11thdimension 21.12.2018 21:12

Значит, используя метод потока, мы не можем ограничить объем данных в памяти? Разве мы не можем просто написать цикл, в котором мы читаем количество данных «x», пока оно не будет прочитано полностью? И заодно читать данные 'x' во входной поток для s3?

Nav_cfc 22.12.2018 06:50
Ответ принят как подходящий

Если вы готовы сделать это на Python, вот пример, который должен работать:

import boto
import gzip
import psycopg2
import tempfile

# database connection setup
connection = psycopg2.connect('postgresql://scott:tiger@localhost/mydatabase')
connection.autocommit = True
cursor = connection.cursor()

# aws connection setup
s3_connection = boto.connect_s3('<aws access key>', '<aws secret key>')
bucket = s3_connection.get_bucket('<bucket>')

with tempfile.NamedTemporaryFile() as t:
    with gzip.GzipFile(t.name, mode='wb') as g:
        cursor.copy_expert("COPY ({0}) TO STDOUT WITH CSV HEADER".format('<select_query>'), g)
    key = boto.s3.key.Key(bucket, '<s3_key>')
    key.set_contents_from_filename(g.name)

В этом процессе используется модуль tempfile в Python, который позволяет вам создать файл, который будет использоваться, а затем удаляться в процессе. Диспетчер контекста (with tempfile...) упрощает управление процессом записи файла, поэтому вам не нужно удалять его вручную. В зависимости от того, как вы настроили временный файл, вы можете сделать файл доступным или никогда не видимым для пользователей системы. По сути, вы транслируете оператор SELECT в STDOUT, а затем записываете STDOUT во временный файл. Вы по-прежнему обязаны своей базе данных за инструкцию SELECT с точки зрения управления памятью, скорости и доступа.

Преимущество состоит в том, что вам не нужно хранить весь файл в памяти при попытке передать его на S3; недостатки в том, что вам нужно достаточно места на диске для временного хранения файла, и что это, очевидно, медленнее, потому что вы записываете на диск, а не делаете все это в памяти.

Также следует отметить, что я сохранил шаг, на котором python сжимает файл с помощью gzip перед загрузкой. Я сделал это, чтобы сэкономить место при загрузке; это особенно полезно, если вы загружаете таблицу с большим количеством повторяющихся данных.

В качестве отступления: вы должны использовать нет как есть в среде, где вы открыты для SQL-инъекций; есть лучшие способы создания команды COPY, если это часть вашего варианта использования.

Я выполняю описанную выше процедуру в своем коде, но у меня возникают проблемы с очень большими файлами, так как я считаю, что локального хранилища моего Composer недостаточно. Как бы вы справились с этим, когда вашего локального хранилища недостаточно?

Minato 30.01.2020 07:28

Я быстро подумал, что вам нужно реализовать решение для потоковой передачи, подобное этому вопросу: stackoverflow.com/q/31031463/6591849; Тем не менее, потоковые решения всегда будут более сложными для обработки / я не уверен, какой правильный подход будет для сжатия файла во время потоковой передачи (возможно, это какая-то форма решения для фрагментов)

Adam Bethke 30.01.2020 13:49

Вот мое решение кода Python для перемещения всех таблиц в БД с именем dvdrental в PostgreSQL в корзину S3.

import boto3
import json
import psycopg2

s3_client = boto3.client('s3')

# database connection setup
connection = psycopg2.connect(
                database = "dvdrental",
                user = "postgres",
                password = "****")
connection.autocommit = True
cursor = connection.cursor()

# Get all the table names from the database
query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 
'public' and table_type = 'BASE TABLE';"
cursor.execute(query)
table_names = cursor.fetchall()
table_names_list = [row[0] for row in table_names]

# Get the contents of each table in json format and move it to the s3 bucket
for name in table_names_list:
  query2 = "SELECT array_to_json(array_agg(row_to_json(n_alias))) from (select * 
  from {}) n_alias;".format(name)
  cursor.execute(query2)
  file_content = cursor.fetchall()
  data = json.dumps(file_content).encode('UTF-8')
  s3_client.put_object(Body=data, Bucket='dvdrentalbucket', 
  Key='{}.json'.format(name))



# closing database connection.
cursor.close()
connection.close()
print("PostgreSQL connection is closed")

Другие вопросы по теме