Я пытаюсь загрузить 2800 CSV-файлов в RDS Postgres с помощью команды COPY. Моя программа, приведенная ниже, во многом основана на этом, и она (1) выводит список всех объектов S3 (2) создает таблицу в Postgres (3) пытается КОПИРОВАТЬ один файл в таблицу, которую я создал как POC .
import boto3
import psycopg2
S3_BUCKET = "arapbi"
S3_FOLDER = "polygon/tickers/"
s3 = boto3.resource("s3")
my_bucket = s3.Bucket(S3_BUCKET)
object_list = []
for obj in my_bucket.objects.filter(Prefix=S3_FOLDER):
object_list.append(obj)
conn_string = "postgresql://user:[email protected]:5432/arapbi"
def write_sql(file):
sql = f"""
COPY tickers
FROM '{file}'
DELIMITER ',' CSV;
"""
return sql
table_create_sql = """
CREATE TABLE IF NOT EXISTS public.tickers ( ticker varchar(20),
timestamp timestamp,
open double precision,
close double precision,
volume_weighted_average_price double precision,
volume double precision,
transactions double precision,
date date
)"""
# Create the table
pg_conn = psycopg2.connect(conn_string, database = "arapbi")
cur = pg_conn.cursor()
cur.execute(table_create_sql)
pg_conn.commit()
cur.close()
pg_conn.close()
# attempt to upload one file to the table
sql_copy = write_sql(object_list[-1].key)
pg_conn = psycopg2.connect(conn_string, database = "arapbi")
cur = pg_conn.cursor()
cur.execute()
pg_conn.commit()
cur.close()
pg_conn.close()
sql_copy
в данном случае это
COPY tickers
FROM 'polygon/tickers/dt=2023-04-24/2023-04-24.csv'
DELIMITER ',' CSV;
Когда я запускаю часть, которая должна КОПИРОВАТЬ файл в Postgres, я получаю следующую ошибку:
FeatureNotSupported: COPY from a file is not supported
HINT: Anyone can COPY to stdout or from stdin. psql's \copy command also works for anyone.
Единственный другой пример, который я смог найти в Интернете, был FeatureNotSupported: COPY из файла не поддерживается, который не был решен.
Я все еще работаю над этим и сообщу ответ, если доберусь туда первым. Мне любопытно, есть ли у кого-нибудь еще такая же рабочая нагрузка, как у меня (необходимость копирования CSV-файлов из S3 в RDS Postgres) и как они ее решили.
Огромное спасибо Адриану Клаверу и Торну за то, что указали мне правильное направление. Просто для полноты картины я добавил свое новое обновленное решение ниже:
conn_string = f"postgresql://{user}:{password}@arapbi20240406153310133100000001.c368i8aq0xtu.us-west-2.rds.amazonaws.com:5432/arapbi"
pg_conn = psycopg2.connect(conn_string, database = "arapbi")
for i, file in enumerate(object_list[]):
cur = pg_conn.cursor()
output = StringIO()
obj = object_list[i].key
bucket_name = object_list[i].bucket_name
df = pd.read_csv(f"s3a://{bucket_name}/{obj}").drop("Unnamed: 0", axis=1)
output.write(df.to_csv(index=False, header=False, na_rep='NaN'))
output.seek(0)
cur.copy_expert(f"COPY tickers FROM STDIN WITH CSV HEADER", output)
pg_conn.commit()
cur.close()
n_records = str(df.count())
print(f"loaded {n_records} records from s3a://{bucket_name}/{obj}")
pg_conn.close()
Прочтите его и соответствующим образом обновите мой сценарий. Спасибо!
Команда postgres COPY
зависит от того, доступны ли файлы серверу. Обычно это означает, что файлы находятся на самом сервере, поэтому удаленное использование COPY с файлом не работает. RDS не может получить доступ к пути к файлу S3, поэтому команда копирования не работает (на самом деле я подозреваю, что она отключена в RDS). Копирование из STDIN по-прежнему поддерживается, как указано в сообщении об ошибке:
HINT: Anyone can COPY to stdout or from stdin.
https://www.postgresql.org/docs/current/sql-copy.html
Как упоминалось в комментарии @adrian klaver, psycopg имеет несколько вспомогательных функций копирования для копирования — copy_from
и copy_expert
обе принимают в качестве аргумента файловоподобный объект (например, CSV).
Чтение документации psycopg2 copy_expert было бы хорошим началом.