У меня есть вариант использования для переноса выбранных данных из Postgres в Amazon S3. Это должно произойти за один шаг. Я пишу программу на Java, чтобы это сделать.
Я нашел способ скопировать данные за 2 шага. Я использую библиотеку CopyManager и метод copyOut для получения данных в моем локальном хранилище. После этого я перемещаю тот же файл в S3 с помощью Java.
код postgres для получения данных в моем локальном
CopyManager copyManager = new CopyManager((BaseConnection) con);
FileWriter fileWriter = new FileWriter("file.csv");
copyManager.copyOut("COPY (SELECT stmt) TO STDOUT WITH DELIMITER '\t' CSV HEADER", fileWriter);
Код AWS для перехода с локального на S3
AmazonS3 conn = new AmazonS3Client(credentials);
conn.setEndpoint("xxx.com");
conn.putObject(
bucket1.getName(),
"request.json",
new File("file.csv")
);
Я ожидаю, что это произойдет за один раз, вместо того, чтобы записывать в файл и затем перемещать файл на S3.
@AdamBethke Можно ли это сделать, ограничив объем памяти и запустив цикл для добавления этих буферизованных данных в файл в ОС? В противном случае с данными, которые у нас есть, мы попали бы в исключение памяти.
Понял, что мой первоначальный комментарий был немного неправильным - его нет в памяти, но это временный файл (управляемый процессом) на диске. Я разместил это как ответ; Я использую аналогичный процесс для частой передачи дампа размером 6 ГБ, и он не использует значительный объем памяти. Если это не то, что вы ищете, дайте мне знать, и я удалю ответ.
По сути, он делает то, что я пытаюсь сделать. Вместо того, чтобы хранить его в явно указанном месте, создается временный файл, в него записываются данные, затем этот же файл перемещается на S3, а затем временный файл удаляется. Значит, это не имеет отношения к памяти?
Да, это правильно. У него будут некоторые накладные расходы на память, потому что он генерирует процесс, но он должен быть минимальным.
@AdamBethke Я пытался запустить ваш код. Он очень похож на тот, что у меня есть. Временный файл будет иметь случайное расположение, и я указал его явно. Таким образом, ваш код занимает почти столько же времени, что и мой. У вас есть идеи, чтобы он был в памяти для размера буфера?
К сожалению, если я правильно понимаю, я не уверен, как вы это сделаете. Основываясь на ваших проблемах с попаданием в исключение нехватки памяти, я думаю, вы, вероятно, попали в точку, где вам нужно либо перейти на экземпляр с большим объемом памяти, чтобы быть в порядке со скоростью. или разработать решение для потоковой передачи. В определенный момент для передачи большого количества данных потребуется приличное количество времени, и выбор изменится с «могу ли я сделать это быстрее» на «как мне оптимизировать способ / что еще мы блокируем». .




Я не пробовал этого, но думаю, у вас должно получиться.
Вместо того, чтобы передавать FileWriter в copyOut(), вы можете передать любой OutputStream. Вы также можете предоставить InputStream методу putObject, а не файл.
Итак, вам просто нужно преобразовать ваш OutputStream в InputStream, для чего существует ряд методов (например, см. эта почта), или вы можете использовать что-то вроде Библиотека EasyStream.
Вы должны использовать PutObject с InputStream.
Вы можете использовать PipedOutputStream и PipedInputStream для перенаправления вывода на ввод putObject
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream(in);
Используйте in в качестве аргумента для PutObject и начните запись в out в другом потоке.
Можете ли вы помочь мне с кодом для putobject и pipedinputstream с двумя потоками. Я попытался использовать pipedstream, но у меня возникли трудности с потоками при огромных объемах данных. Трудности чтения и записи, поскольку выходной поток использовался для закрытия, когда входной поток был запущен. Небольшой фрагмент кода был бы полезен.
Закрытие PipedOutputStrem не должно влиять на связанный PipedInputStream, вам нужно будет явно вызвать close для них обоих. С другой стороны, я не думаю, что даже использование методов InputStream поможет, поскольку это позволяет избежать полного чтения файла в памяти перед его отправкой на S3, однако даже потоковым методам в S3 API нужен content-length в ObjectMetaData, иначе он считывает весь поток в памяти. Забавно то, что для получения длины контента нам нужно прочитать весь поток. Это меня так сильно беспокоит.
Значит, используя метод потока, мы не можем ограничить объем данных в памяти? Разве мы не можем просто написать цикл, в котором мы читаем количество данных «x», пока оно не будет прочитано полностью? И заодно читать данные 'x' во входной поток для s3?
Если вы готовы сделать это на Python, вот пример, который должен работать:
import boto
import gzip
import psycopg2
import tempfile
# database connection setup
connection = psycopg2.connect('postgresql://scott:tiger@localhost/mydatabase')
connection.autocommit = True
cursor = connection.cursor()
# aws connection setup
s3_connection = boto.connect_s3('<aws access key>', '<aws secret key>')
bucket = s3_connection.get_bucket('<bucket>')
with tempfile.NamedTemporaryFile() as t:
with gzip.GzipFile(t.name, mode='wb') as g:
cursor.copy_expert("COPY ({0}) TO STDOUT WITH CSV HEADER".format('<select_query>'), g)
key = boto.s3.key.Key(bucket, '<s3_key>')
key.set_contents_from_filename(g.name)
В этом процессе используется модуль tempfile в Python, который позволяет вам создать файл, который будет использоваться, а затем удаляться в процессе. Диспетчер контекста (with tempfile...) упрощает управление процессом записи файла, поэтому вам не нужно удалять его вручную. В зависимости от того, как вы настроили временный файл, вы можете сделать файл доступным или никогда не видимым для пользователей системы. По сути, вы транслируете оператор SELECT в STDOUT, а затем записываете STDOUT во временный файл. Вы по-прежнему обязаны своей базе данных за инструкцию SELECT с точки зрения управления памятью, скорости и доступа.
Преимущество состоит в том, что вам не нужно хранить весь файл в памяти при попытке передать его на S3; недостатки в том, что вам нужно достаточно места на диске для временного хранения файла, и что это, очевидно, медленнее, потому что вы записываете на диск, а не делаете все это в памяти.
Также следует отметить, что я сохранил шаг, на котором python сжимает файл с помощью gzip перед загрузкой. Я сделал это, чтобы сэкономить место при загрузке; это особенно полезно, если вы загружаете таблицу с большим количеством повторяющихся данных.
В качестве отступления: вы должны использовать нет как есть в среде, где вы открыты для SQL-инъекций; есть лучшие способы создания команды COPY, если это часть вашего варианта использования.
Я выполняю описанную выше процедуру в своем коде, но у меня возникают проблемы с очень большими файлами, так как я считаю, что локального хранилища моего Composer недостаточно. Как бы вы справились с этим, когда вашего локального хранилища недостаточно?
Я быстро подумал, что вам нужно реализовать решение для потоковой передачи, подобное этому вопросу: stackoverflow.com/q/31031463/6591849; Тем не менее, потоковые решения всегда будут более сложными для обработки / я не уверен, какой правильный подход будет для сжатия файла во время потоковой передачи (возможно, это какая-то форма решения для фрагментов)
Вот мое решение кода Python для перемещения всех таблиц в БД с именем dvdrental в PostgreSQL в корзину S3.
import boto3
import json
import psycopg2
s3_client = boto3.client('s3')
# database connection setup
connection = psycopg2.connect(
database = "dvdrental",
user = "postgres",
password = "****")
connection.autocommit = True
cursor = connection.cursor()
# Get all the table names from the database
query = "SELECT table_name FROM information_schema.tables WHERE table_schema =
'public' and table_type = 'BASE TABLE';"
cursor.execute(query)
table_names = cursor.fetchall()
table_names_list = [row[0] for row in table_names]
# Get the contents of each table in json format and move it to the s3 bucket
for name in table_names_list:
query2 = "SELECT array_to_json(array_agg(row_to_json(n_alias))) from (select *
from {}) n_alias;".format(name)
cursor.execute(query2)
file_content = cursor.fetchall()
data = json.dumps(file_content).encode('UTF-8')
s3_client.put_object(Body=data, Bucket='dvdrentalbucket',
Key='{}.json'.format(name))
# closing database connection.
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
Если вы готовы сделать это на python вместо java, у меня есть пример, который, я думаю, сработает (экспорт в CSV в памяти, прямая загрузка в S3, не оставляя постоянного файла в ОС). Дайте мне знать, если вам интересно, я напишу ответ с примером и как это работает.