Вставка данных JSON в удаленную таблицу Postgres с помощью Python Psycopg2 занимает слишком много времени. Правильный ли это подход?

У меня есть 2 ГБ данных json из экспорта Mongodb. Я пытаюсь провести некоторую аналитику с этими данными и отправить эти аналитические данные в удаленную таблицу Postgres. Это мой код:

import json

from psycopg2 import connect, Error
import dateutil.parser as parser
from datetime import datetime

record_list = []

for line in open('december.json', 'r'):
    record_list.append(json.loads(line))

    #print(record_list[0])
    


    devices = []

    for key, values in record_list[0].items():


       if key == "deviceSerial":
          devices.append(values)
    test_list = list(set(devices)) 
    device_data = []

    for x in test_list:
        device_1 = [i for i in record_list if (i['deviceSerial'] == x)] 
        datetime_object_first = parser.parse(device_1[0]['createdAt'],fuzzy=True) 
        datetime_object_last = parser.parse(device_1[len(device_1)-1]['createdAt'],fuzzy=True) 

        devices = device_1[0]['deviceSerial']
        device_model = device_1[0]['device']
        device_Usage=round((datetime.timestamp(datetime_object_last)-datetime.timestamp(datetime_object_first))/60/1000,3)


        #calculating device usage
        device_data_elements  = {

            'deviceModel': device_model,
            'deviceSerial':devices,
            'deviceUsageInMin':device_Usage
     
        }

        device_data.append(device_data_elements)

        if type(device_data) == list:



            first_record = device_data[0]


        columns = list(first_record.keys())

        #print ("\ncolumn names:", columns)

        #print(device_data)

        table_name = "test_data"
sql_string = 'INSERT INTO {} '.format( table_name )
sql_string += "(" + ', '.join(columns) + ")\nVALUES "



# enumerate over the record
for i, record_dict in enumerate(device_data):

    # iterate over the values of each record dict object
    values = []
    for col_names, val in record_dict.items():

        if type(val) == str:
          
            val = val.replace("'", "''")
            val = "'" + val + "'"

        values += [ str(val) ]

        # join the list of values and enclose record in parenthesis
    sql_string += "(" + ', '.join(values) + "),\n"

# remove the last comma and end statement with a semicolon
sql_string = sql_string[:-2] + ";"

print ("\nSQL statement:")
print (sql_string)

try:
    # declare a new PostgreSQL connection object
    conn = connect(
        dbname = "events_data",
        user = "chao",
        host = "localhost",
        
        # attempt to connect for 10 seconds then raise exception
        connect_timeout = 10
    )

    cur = conn.cursor()
    print ("\ncreated cursor object:", cur)
    
#Post data into postgres table
except (Exception, Error) as err:
    print ("\npsycopg2 connect error:", err)
    conn = None
    cur = None

if cur != None:

    try:
        
        cur.execute( sql_string )
        conn.commit()

        print ('\nfinished INSERT INTO execution')

    except (Exception, Error) as error:
        print("\nexecute_sql() error:", error)
        conn.rollback()

    # close the cursor and connection
    cur.close()
    conn.close()

Я делаю некоторые расчеты (аналитику) с помощью этого скрипта. Так что в скрипте есть цикл for. Когда длина данных меньше, скрипт успешно вставляет этот анализ в Postgres. Если длина данных велика, это занимает слишком много времени, и я ждал даже 12 часов, но безуспешно. Прямо сейчас мой скрипт работает локально и считывает данные также локально. Каков наилучший подход для чтения и обработки больших объемов данных и публикации аналитики в таблице Postgres. Это мой пример данных json.


[
    {
   "createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
   "sessionId": null,
   "text": null,
   "device": null,
   "deviceSerial": null
 },
 {
   "createdAt": "Tue Sep 01 2020 06:59:18 GMT+0000",
   "sessionId": null,
   "text": "Approve",
   "device": "Android",
   "deviceSerial": null
 },
 {
   "createdAt": "Wed Sep 02 2020 08:40:10 GMT+0000",
   "pageTitle": "submit option",
   "sessionId": null,
   "text": "launchComponent",
   "device": "Android",
   "deviceSerial": "636363636890"
 },
 {
   "createdAt": "Wed Sep 02 2020 08:40:11",
   "pageTitle": "quick check",
   "sessionId": "88958d89c65f4fcea56e148a5a2838cfhdhdhd",
   "text": "",
   "device": "Android",
   "deviceSerial": "6625839827"
 }
]

Не создавайте SQL как строки. Это небезопасно. Используйте SQL-композицию psycopg2. «Я делаю некоторые расчеты (аналитику) с помощью этого скрипта». Вместо этого рассмотрите возможность вставки данных в Postgres и выполнения анализа в Postgres.

Schwern 10.12.2020 22:58
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
1
1 537
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

Я бы рекомендовал выгружать JSON в Postgres и выполнять анализ в Postgres. Это то, в чем хорош Postgres. Обработка JSON не требуется, вы можете напрямую преобразовать массив JSON в строки Postgres.

Один из вариантов — создать таблицу с одним столбцом jsonb и вставить каждый элемент в виде строки, используя jsonb_array_elements.

create table devices_json (
    data jsonb
)

insert into devices_json (data)
select * from jsonb_array_elements('
[
 {
   "createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
   "sessionId": null,
   "text": null,
   "device": null,
   "deviceSerial": null
 },
 ...and so on...
]
')

Затем проведите анализ в Postgres, используя его функции JSON.

Если поля хорошо известны, вы можете использовать json_populate_recordset для вставки полей в отдельные столбцы традиционной таблицы SQL. Тогда у вас есть традиционная таблица SQL, с которой может быть проще работать.

-- NOTE Postgres columns are case-sensitive, so they must be quoted to
-- ensure they exactly match the JSON keys
create table devices (
   "createdAt" timestamp,
   "sessionId" text,
   "text" text,
   device text,
   "deviceSerial" text,
   "pageTitle" text
);

insert into devices
select * from json_populate_recordset(NULL::devices, '
[
 {
   "createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
   "sessionId": null,
   "text": null,
   "device": null,
   "deviceSerial": null
 },
 ...and so on...
]
')

Попробуйте.

Вы также можете сделать гибрид из двух: сбросить массив JSON в столбец jsonb и разделить его на отдельные столбцы внутри Postgres.

Спасибо @Schwern. Это действительно хорошая помощь. Я хочу сделать свою аналитику динамической. Например, я хочу отсортировать столбец deviceSerial и рассчитать его использование по разнице отдельно от последнего и первого индекса. Это моя аналитика. Их будет тысячи строки данных. Я хочу опубликовать эту аналитику в другой таблице. Я думал снова получить эту строку Postgres в Python, сделать некоторые вычисления и отправить результат в Postgres. Будет ли это хорошим подходом? Или я могу сделать это, не получать данные из Postgres. Вместо этого с помощью команды python для Postgres.

Массовые вставки в Posgres лучше всего делать с помощью cursor.copy_from.

Преобразуйте свой json в нужную структуру в Python, затем запишите в файл памяти io.TextIO. Вставьте это в БД одним вызовом copy_from(). Выполните seek(0) для объекта TextIO перед copy_from.

import io
import psycopg2

buf = io.StringIO()
buf.write('1|{"x":0}\n')
buf.write('2|{"y":0}\n')
buf.seek(0)

with psycopg2.connect(service = "my_service") as con:
    cur = con.cursor()
    cur.execute("create table if not exists t (a int, b jsonb)")
    cur.copy_from(buf, "t", sep = "|")
    print(cur.rowcount)

Вставка в postgres не может быть выполнена быстрее.

cursor.copy_expert может сделать то же самое, но с большей гибкостью.

С уважением Нильс

Другие вопросы по теме