У меня есть 2 базы данных postgres с одинаковой схемой, но в 2 разных схемах. Я пишу скрипт на Python с целью частично экспортировать данные из одной из таблиц и импортировать результат в ту же таблицу, но в другую базу данных (например, select from A where f=123). Схема большая (в ней много столбцов разных типов, некоторые могут быть нулевыми, некоторые нет. Есть типы дат и строковые поля, которые могут содержать предложения, псевдозапросы и имена файлов), и могут быть тысячи столбцов. строки в таблице.
Я выбрал подход к экспорту данных из таблицы в CSV-файл, а затем к импорту данных из CSV-файла во вторую таблицу базы данных.
Я использую psycopg2 lib для работы с Postgres в Python вместе с csv lib для чтения и записи CSV-файлов.
Я реализовал первую версию. Проблема заключалась в следующем: некоторые столбцы в строке пусты, когда я читаю данные таблицы в python, пустые поля имеют значение None, когда поле может быть null, а где поле не может быть null, значение "" пусто строка, а при экспорте в CSV все значения None и "" вставляются как пустые строки в CSV-файл. Например, ряд будет выглядеть так 1234,,,,,1,,. И когда я пытаюсь импортировать файл в таблицу postgres, все пустые значения в csv преобразуются в null и пытаются вставить таким образом, но это не удалось, потому что поля, которые не могут быть null, не принимают это значение. Ниже вы можете увидеть мой код, и после этого кода я вставил улучшение, которое я сделал, чтобы избежать этой проблемы.
import psycopg2
import csv
def export_table(filename, tablename):
conn = psycopg2.connect(....)
cur = conn.cursor()
cur.execute(f'SELECT * FROM {tablename} where f=123')
rows = cur.fetchall()
with open(filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
for row in rows:
writer.writerow(row)
cur.close()
conn.close()
def import_table(filename, tablename):
conn = psycopg2.connect(..second db data)
cur = conn.cursor()
with open(filename, 'r') as csvfile:
cur.copy_expert(
f"COPY {tablename} FROM STDIN WITH (FORMAT CSV)",
csvfile
)
conn.commit()
cur.close()
conn.close()
Пробовал добавлять csv.QUOTE_MINIMAL, csv.QUOTE_NONNUMERIC - мне не помогло.
Поскольку мне не удалось импортировать данные с помощью этого кода, я попытался попробовать еще одну вещь.
Я добавил ручную функцию для цитирования:
def quote_field(field):
if isinstance(field, str):
if field == '':
return '""'
elif any(c in field for c in (',', '"', '\n')):
return '"' + field.replace('"', '""') + '"'
return field
И обновил часть импорта следующим образом:
with open(filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile, quoting=csv.QUOTE_NONE, quotechar='', escapechar='\\')
for row in rows:
writer.writerow([quote_field(field) for field in row])
Я попытался запустить код, он вставил нулевые значения в csv, поскольку значения "" и None помещаются в csv как пустые поля. Таким образом, строка в csv будет выглядеть так 1234,,,"","",,,,,"",,,,,, и для некоторых случаев это будет успешно работать, данные будут импортированы правильно. Но иногда по какой-то причине сгенерированный csv не импортируется вообще или частично. Чтобы проверить это, я попытался использовать DataGrip для импорта данных из файла csv вручную, для некоторых данных он также импортировал их частично (например, 20 строк из 1000), а для некоторых данных он вообще не импортировался. Я проверил CSV на достоверность, они были действительны. Я думаю, что в части импорта есть ошибка, но я не знаю, где она и почему она так себя ведет. Нужна помощь с этим.
Вариант 2: Используйте функции psycopg2 COPY для экспорта.
@AdrianKlaver спасибо за ответ. Как я понял postgres_fdw это расширение, которое должно быть установлено в postgres, но я не понимаю, как можно частично скопировать данные из таблицы на одном сервере в таблицу на другом сервере. Кроме того, это часть работы по автоматизации, поэтому мне нужен работающий скрипт. Я использую copy_expert, который, я думаю, неявно использует copy_to
1) postgres_fdw. Insert into local_tbl select * from remote_tbl where <some_filters> 2) copy_expert может быть от до. Вы можете использовать его, чтобы сделать COPY (SELECT * FROM tablename where f=123) TO STDOUT WITH CSV HEADER. К вашему сведению, не используйте строки f для построения запроса. Безопасный способ — использовать модуль sql.
Создайте таблицы:
create table csv_null(id integer not null, fld1 varchar);
insert into csv_null values (1, 'test'), (2, ''), (3, null), (4, 'cat');
create table csv_null_2 as select * from csv_null limit 0;
\pset null
Null display is "NULL".
select * from csv_null;
id | fld1
----+------
1 | test
2 |
3 | NULL
4 | cat
Код Python:
import io
import psycopg2
con = psycopg2.connect(dbname = "test", host='localhost', user='postgres', port=5432)
buffer = io.StringIO()
cur = con.cursor()
cur.copy_expert('copy (select * from csv_null ) TO STDOUT WITH CSV HEADER', buffer)
buffer.seek(0)
cur.copy_expert('copy csv_null_2 from STDIN WITH CSV HEADER', buffer)
con.commit()
cur.execute("select * from csv_null_2")
cur.fetchall()
[(1, 'test'), (2, ''), (3, None), (4, 'cat')]
В psql:
select * from csv_null_2 ;
id | fld1
----+------
1 | test
2 |
3 | NULL
4 | cat
Cur.copy_expert('copy (select * from csv_null ) TO STDOUT WITH CSV HEADER', buffer) будет производить вывод, который cur.copy_expert('copy csv_null_2 from STDIN WITH CSV HEADER', buffer) будет потреблять правильно. Вы не прыгаете с одного контекста Python csv на Postgres COPY.
ОБНОВЛЯТЬ
Таблица изменена, чтобы иметь столбец NOT NULL:
alter table csv_null add column fld2 varchar not null default '';
update csv_null set fld2 = 'not null' where id in (1,4);
alter table csv_null_2 add column fld2 varchar not null default '';
truncate csv_null_2;
Код Python:
buffer = io.StringIO()
cur.copy_expert('copy (select * from csv_null ) TO STDOUT WITH CSV HEADER', buffer)
buffer.seek(0)
cur.copy_expert('copy csv_null_2 from STDIN WITH CSV HEADER', buffer)
con.commit()
cur.execute("select * from csv_null_2")
cur.fetchall()
[(2, '', ''), (3, None, ''), (1, 'test', 'not null'), (4, 'cat', 'not null')]
В psql:
select * from csv_null_2 ;
id | fld1 | fld2
----+------+----------
2 | |
3 | NULL |
1 | test | not null
4 | cat | not null
Спасибо за ответ! Здесь поле fld1 в основном принимает все значения, включая null. Но в моем случае сбойными полями были те, которые содержат значения varchar, но не принимают null, поэтому они в основном имеют ограничения not null. Будет ли ваш код работать, если вы добавите третий столбец, который является varchar not null?
Кроме того, вы сохраняете экспортированные данные в буфере, который находится в памяти, что, как я полагаю, создает риск переполнения в случае слишком большого количества строк. (также таблицы в моей базе данных имеют около 90 столбцов, поэтому в моем случае риск переполнения значительно выше).
Согласно copy_expert: copy_expert(sql, file, size=8192) ...file — файлоподобный объект для чтения или записи (в соответствии с sql), чтобы вы могли записать его в файл на диске, а затем использовать этот файл в cur.copy_expert('copy <table> from ...')..
См. ОБНОВЛЕНИЕ для случая NOT NULL.
Я видел ваше обновление, это решение решает мою проблему.
Хотя причиной частичной вставки данных из csv были проблемы с самой таблицей (она не позволяла вставлять в некоторых случаях). Я провел некоторое тестирование, мое решение и ваша работа одинаковы, но ваше более лаконично заботится об удалении временного файла. Спасибо за решение! Просто вопрос, который я хотел задать, этот буферный файл, который он использует, имеет ли он какие-либо ограничения на использование пространства?
AFAIK, объем памяти. Все подробности можно узнать здесь io модуль.
Почему бы просто не использовать postgres_fdw и напрямую импортировать из одной базы данных в другую?