Я пытаюсь перенести CSV-файлы из Google Cloud Storage (GCS), которые были экспортированы из BigQuery, в экземпляр PostgreSQL Google Cloud sql с помощью скрипта Python.
Я надеялся использовать Google API, но нашел это в документация:
Importing CSV data using the Cloud SQL Admin API is not supported for PostgreSQL instances.
В качестве альтернативы я мог бы использовать библиотеку psycopg2 и передавать строки файла csv в экземпляр SQL. Я могу сделать это тремя способами
Меня беспокоит, что эти CSV-файлы могут содержать миллионы строк, и запуск этого процесса для любого из трех упомянутых выше вариантов кажется мне плохой идеей.
Какие у меня есть альтернативы? По сути, у меня есть необработанные данные в BigQuery, на которых мы выполняем некоторую предварительную обработку перед экспортом в GCS при подготовке к импорту в экземпляр PostgreSQL. Мне нужно экспортировать эти предварительно обработанные данные из BigQuery в экземпляр PostgreSQL.
Это не дубликат этого вопрос, поскольку я предпочитаю искать решение, которое экспортирует данные из BigQuery в экземпляр PostgreSQL, будь то через GCS или напрямую.
У меня тоже нет уважительной причины. Это должно было быть частью быстрого и грязного теста для другой части проекта. Надеялся сделать это, не настроив конвейер потока данных. Я никогда раньше не использовал Dataflow.
Судя по звукам, это был бы очень простой конвейер. Приятно то, что он масштабируется для вас и имеет собственные источники / приемники для BigQuery и CloudSQL.
В качестве альтернативы вы можете загрузить набор данных в pandas, и у него есть собственные методы для отправки их в соединение SQL, например psycopg2.
Возможный дубликат Как импортировать данные файла CSV в таблицу PostgreSQL?
Также на стороне примечания кто-нибудь знает, почему импорт csv не поддерживается для экземпляров PostgreSQL?






Вы можете выполнить процесс импорта с помощью Облачный поток данных, как это было предложено @GrahamPolley. Это правда, что это решение требует дополнительной работы (знакомство с Dataflow, настройка всего и т. д.). Даже с учетом дополнительной работы это было бы предпочтительным решением для вашей ситуации. Однако доступны и другие решения, одно из которых я объясню ниже.
Этот учебник по экспорт BigQuery в Google Datastore является хорошим примером для настройки процесса миграции с помощью Dataflow.
Альтернативное решение Cloud Dataflow
Cloud SQL для PostgreSQL не поддерживает импорт с .CSV, но поддерживает файлы .SQL.
The file type for the specified uri.
SQL: The file contains SQL statements.
CSV: The file contains CSV data. Importing CSV data using the Cloud SQL Admin API is not supported for PostgreSQL instances.
Прямым решением было бы преобразовать файл .CSV в .SQL с помощью какого-либо инструмента (Google не предоставляет тот, о котором я знаю, но их много в Интернете), а затем импортировать в PostgreSQL.
Если вы хотите реализовать это решение более «программным» способом, я бы предложил использовать Облачные функции, вот пример того, как я бы попытался это сделать:
.CSV. Если это так, используйте API csv-to-sql (пример API здесь) для преобразования файла в .SQL.Спасибо за ваш ответ. Я согласен с тем, что настройка конвейера потока данных - наиболее правильный способ решить эту проблему. Поэтому отмечу ваш ответ как принятый. Я нашел другой способ сделать это, и подробно опишу его в ответе, который позволил мне использовать остальную часть кода, который я уже написал.
Я обнаружил, что модуль pyscopg2 имеет copy_from (), который позволяет загружать весь файл csv вместо потоковой передачи строк по отдельности.
Обратной стороной использования этого метода является то, что файл csv по-прежнему необходимо загрузить из GCS и сохранить локально.
вот подробности использования pyscopg2 'copy_from ()'. (От здесь)
import psycopg2
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres")
cur = conn.cursor()
with open('user_accounts.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cur.copy_from(f, 'users', sep=',')
conn.commit()
Вы можете просто использовать класс, чтобы текст, который вы извлекаете из Интернета, вел себя как файл. Я использовал это несколько раз.
import io
import sys
class IteratorFile(io.TextIOBase):
""" given an iterator which yields strings,
return a file like object for reading those strings """
def __init__(self, obj):
elements = "{}|" * len(obj[0])
elements = (unicode(elements[:-1]).format(*x) for x in obj)
self._it = elements
self._f = io.cStringIO()
def read(self, length=sys.maxsize):
try:
while self._f.tell() < length:
self._f.write(next(self._it) + "\n")
except StopIteration as e:
# soak up StopIteration. this block is not necessary because
# of finally, but just to be explicit
pass
except Exception as e:
print("uncaught exception: {}".format(e))
finally:
self._f.seek(0)
data = self._f.read(length)
# save the remainder for next read
remainder = self._f.read()
self._f.seek(0)
self._f.truncate(0)
self._f.write(remainder)
return data
def readline(self):
return next(self._it)
Это необходимо для того, чтобы файл не загружался локально.
Да, это обрабатывает двоичные данные в памяти как файл.
Перед тем как начать, вы должны убедиться:
The database and table you are importing into must already exist on your Cloud SQL instance.
CSV file format requirements CSV files must have one line for each row of data and have comma-separated fields.
Затем вы можете представить импортировать данные в экземпляр Cloud SQL с помощью файла CSV в корзине GCS, выполнив следующие шаги [GCLOUD]
gcloud sql instances describe [INSTANCE_NAME]
Скопируйте поле serviceAccountEmailAddress.
Добавьте сервисную учетную запись в ACL корзины в качестве писателя:
gsutil acl ch -u [SERVICE_ACCOUNT_ADDRESS]:W gs://[BUCKET_NAME]
gsutil acl ch -u [SERVICE_ACCOUNT_ADDRESS]:R gs://[BUCKET_NAME]/[IMPORT_FILE_NAME]
gcloud sql import csv [INSTANCE_NAME] gs://[BUCKET_NAME]/[FILE_NAME] \
--database=[DATABASE_NAME] --table=[TABLE_NAME]
gsutil acl ch -d [SERVICE_ACCOUNT_ADDRESS] gs://[BUCKET_NAME]
Почему бы не использовать Cloud Dataflow? Похоже, хороший вариант для этого.