У меня есть 2 ГБ данных json из экспорта Mongodb. Я пытаюсь провести некоторую аналитику с этими данными и отправить эти аналитические данные в удаленную таблицу Postgres. Это мой код:
import json
from psycopg2 import connect, Error
import dateutil.parser as parser
from datetime import datetime
record_list = []
for line in open('december.json', 'r'):
record_list.append(json.loads(line))
#print(record_list[0])
devices = []
for key, values in record_list[0].items():
if key == "deviceSerial":
devices.append(values)
test_list = list(set(devices))
device_data = []
for x in test_list:
device_1 = [i for i in record_list if (i['deviceSerial'] == x)]
datetime_object_first = parser.parse(device_1[0]['createdAt'],fuzzy=True)
datetime_object_last = parser.parse(device_1[len(device_1)-1]['createdAt'],fuzzy=True)
devices = device_1[0]['deviceSerial']
device_model = device_1[0]['device']
device_Usage=round((datetime.timestamp(datetime_object_last)-datetime.timestamp(datetime_object_first))/60/1000,3)
#calculating device usage
device_data_elements = {
'deviceModel': device_model,
'deviceSerial':devices,
'deviceUsageInMin':device_Usage
}
device_data.append(device_data_elements)
if type(device_data) == list:
first_record = device_data[0]
columns = list(first_record.keys())
#print ("\ncolumn names:", columns)
#print(device_data)
table_name = "test_data"
sql_string = 'INSERT INTO {} '.format( table_name )
sql_string += "(" + ', '.join(columns) + ")\nVALUES "
# enumerate over the record
for i, record_dict in enumerate(device_data):
# iterate over the values of each record dict object
values = []
for col_names, val in record_dict.items():
if type(val) == str:
val = val.replace("'", "''")
val = "'" + val + "'"
values += [ str(val) ]
# join the list of values and enclose record in parenthesis
sql_string += "(" + ', '.join(values) + "),\n"
# remove the last comma and end statement with a semicolon
sql_string = sql_string[:-2] + ";"
print ("\nSQL statement:")
print (sql_string)
try:
# declare a new PostgreSQL connection object
conn = connect(
dbname = "events_data",
user = "chao",
host = "localhost",
# attempt to connect for 10 seconds then raise exception
connect_timeout = 10
)
cur = conn.cursor()
print ("\ncreated cursor object:", cur)
#Post data into postgres table
except (Exception, Error) as err:
print ("\npsycopg2 connect error:", err)
conn = None
cur = None
if cur != None:
try:
cur.execute( sql_string )
conn.commit()
print ('\nfinished INSERT INTO execution')
except (Exception, Error) as error:
print("\nexecute_sql() error:", error)
conn.rollback()
# close the cursor and connection
cur.close()
conn.close()
Я делаю некоторые расчеты (аналитику) с помощью этого скрипта. Так что в скрипте есть цикл for. Когда длина данных меньше, скрипт успешно вставляет этот анализ в Postgres. Если длина данных велика, это занимает слишком много времени, и я ждал даже 12 часов, но безуспешно. Прямо сейчас мой скрипт работает локально и считывает данные также локально. Каков наилучший подход для чтения и обработки больших объемов данных и публикации аналитики в таблице Postgres. Это мой пример данных json.
[
{
"createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
"sessionId": null,
"text": null,
"device": null,
"deviceSerial": null
},
{
"createdAt": "Tue Sep 01 2020 06:59:18 GMT+0000",
"sessionId": null,
"text": "Approve",
"device": "Android",
"deviceSerial": null
},
{
"createdAt": "Wed Sep 02 2020 08:40:10 GMT+0000",
"pageTitle": "submit option",
"sessionId": null,
"text": "launchComponent",
"device": "Android",
"deviceSerial": "636363636890"
},
{
"createdAt": "Wed Sep 02 2020 08:40:11",
"pageTitle": "quick check",
"sessionId": "88958d89c65f4fcea56e148a5a2838cfhdhdhd",
"text": "",
"device": "Android",
"deviceSerial": "6625839827"
}
]





Я бы рекомендовал выгружать JSON в Postgres и выполнять анализ в Postgres. Это то, в чем хорош Postgres. Обработка JSON не требуется, вы можете напрямую преобразовать массив JSON в строки Postgres.
Один из вариантов — создать таблицу с одним столбцом jsonb и вставить каждый элемент в виде строки, используя jsonb_array_elements.
create table devices_json (
data jsonb
)
insert into devices_json (data)
select * from jsonb_array_elements('
[
{
"createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
"sessionId": null,
"text": null,
"device": null,
"deviceSerial": null
},
...and so on...
]
')
Затем проведите анализ в Postgres, используя его функции JSON.
Если поля хорошо известны, вы можете использовать json_populate_recordset для вставки полей в отдельные столбцы традиционной таблицы SQL. Тогда у вас есть традиционная таблица SQL, с которой может быть проще работать.
-- NOTE Postgres columns are case-sensitive, so they must be quoted to
-- ensure they exactly match the JSON keys
create table devices (
"createdAt" timestamp,
"sessionId" text,
"text" text,
device text,
"deviceSerial" text,
"pageTitle" text
);
insert into devices
select * from json_populate_recordset(NULL::devices, '
[
{
"createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
"sessionId": null,
"text": null,
"device": null,
"deviceSerial": null
},
...and so on...
]
')
Вы также можете сделать гибрид из двух: сбросить массив JSON в столбец jsonb и разделить его на отдельные столбцы внутри Postgres.
Спасибо @Schwern. Это действительно хорошая помощь. Я хочу сделать свою аналитику динамической. Например, я хочу отсортировать столбец deviceSerial и рассчитать его использование по разнице отдельно от последнего и первого индекса. Это моя аналитика. Их будет тысячи строки данных. Я хочу опубликовать эту аналитику в другой таблице. Я думал снова получить эту строку Postgres в Python, сделать некоторые вычисления и отправить результат в Postgres. Будет ли это хорошим подходом? Или я могу сделать это, не получать данные из Postgres. Вместо этого с помощью команды python для Postgres.
Массовые вставки в Posgres лучше всего делать с помощью cursor.copy_from.
Преобразуйте свой json в нужную структуру в Python, затем запишите в файл памяти io.TextIO. Вставьте это в БД одним вызовом copy_from(). Выполните seek(0) для объекта TextIO перед copy_from.
import io
import psycopg2
buf = io.StringIO()
buf.write('1|{"x":0}\n')
buf.write('2|{"y":0}\n')
buf.seek(0)
with psycopg2.connect(service = "my_service") as con:
cur = con.cursor()
cur.execute("create table if not exists t (a int, b jsonb)")
cur.copy_from(buf, "t", sep = "|")
print(cur.rowcount)
Вставка в postgres не может быть выполнена быстрее.
cursor.copy_expert может сделать то же самое, но с большей гибкостью.
С уважением Нильс
Не создавайте SQL как строки. Это небезопасно. Используйте SQL-композицию psycopg2. «Я делаю некоторые расчеты (аналитику) с помощью этого скрипта». Вместо этого рассмотрите возможность вставки данных в Postgres и выполнения анализа в Postgres.