Быстрая загрузка данных из Kafka в MSSQL

Я прочитал данные из Kafka, и теперь мне нужно загрузить данные на сервер MSSQL.

Я пробовал несколько методов.

Этот метод загружает только 500 строк за 1 минуту:

while max_offset < end_offset-1:
    msg = consumer.poll() 
    max_offset += 1
if msg is not None:
    if msg.error():
        print(f'Error while receiving message: {msg.error()}')
    else:
        value = msg.value()
        offset = msg.offset()
        try: 
            cursor = conn_str.cursor()
            df = pd.json_normalize(value)
            for index, row in df.iterrows():
                cursor.execute("INSERT INTO TABLE_NAME (COLUMN_1, COLUMN_2, COLUMN_3, COLUMN_4, COLUMN_5, COLUMN_6) values(?,?,?,?,?,?)", row.column_1, row.column_2, row.column_3, row.column_4, row.column_5, row.column_6)
                conn_str.commit()
            
                print(f'Received message:\n{df}')
        except Exception as e:
            print(f'Error message: {e}')
else:
    print('No messages received')

Второй метод, который я пробовал, был:

messages_to_insert = []
batch_size = 10000
counter = 0

while max_offset < end_offset-1:
    msg = consumer.poll()
    max_offset += 1
    if msg is not None:
        if msg.error():
           
        else:
         
            row = msg.value()
            offset = msg.offset()
         
            messages_to_insert.append((row["field1"], row["field2"], row["field3"], row["field4"], row["field5"], row["field6"]))
            counter += 1
           
            if counter >= batch_size:
                cursor.executemany("INSERT INTO <table_name> (field1, field2, field3, field4, field5, field6) values(?,?,?,?,?,?)", messages_to_insert)
                conn_str.commit()
                messages_to_insert = []
                counter = 0

if messages_to_insert:
    cursor.executemany("INSERT INTO <table_name> (field1, field2, field3, field4, field5, field6) values(?,?,?,?,?,?)", messages_to_insert)
    conn_str.commit()

Это метод немного быстрее.

И последний был загружен в файл .csv. В файл .csv загружается очень быстро, а в mssql загружается слишком медленно

messages_to_insert = []
batch_size = 10000
counter = 0

with open('data.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['COLUMN_1', 'COLUMN_2', 'COLUMN_3', 'COLUMN_4', 'COLUMN_5', 'COLUMN_6'])

    while max_offset < end_offset-1:
        msg = consumer.poll()
        max_offset += 1
        if msg is not None:
            if msg.error():
                logger.error(f'Error while receiving message: {msg.error()}')
            else:
                row = msg.value()
                offset = msg.offset()

                # Write row to file
                writer.writerow([row['field1'], row['field2'], row['field3'], row['field4'], row['field5'], field6])

file_path = 'data.csv'


with open(file_path, 'r') as f:
    lines = f.readlines()
    lines.pop(0)
data = [tuple(line.strip().split(',')) for line in lines]
placeholders = ','.join('?' * len(data[0]))


    query = f"INSERT INTO <table_name> (COLUMN_1, COLUMN_2, COLUMN_3, COLUMN_4, COLUMN_5, COLUMN_6) VALUES ({placeholders})"


    cursor = conn_str.cursor()
    cursor.executemany(query, data)
    cursor.commit()

conn.close()               

И попытался загрузить файл .csv после этого с помощью SSIS, но скорость загрузки была такой же.

Вы можете попробовать задачу массовой вставки SSIS.

Yitzhak Khabinsky 03.04.2023 17:27

Что ты вообще здесь спрашиваешь? В вашем вопросе нет вопроса.

Thom A 03.04.2023 17:27

@ThomA, как ускорить загрузку, это мой вопрос

Denys 03.04.2023 17:56

Все ваши попытки, похоже, основаны на методах RBAR, @Denys, вы INSERT один ряд за мучительным рядом. Однако службы SSIS должны быть довольно быстрыми, поскольку они объединяют данные в транзакцию. Как долго это "слишком долго"? Сколько времени на самом деле занимают эти процессы?

Thom A 03.04.2023 18:00

@ThomA SSIS загружает 10 000 строк примерно за 15 минут, у python примерно такая же скорость. но топик Кафка может иметь миллионы записей и поэтому скорость, которая сейчас мне кажется медленной. в сам файл csv, загрузка из Kafka очень быстрая

Denys 03.04.2023 18:16

У вас есть что-то вроде плохо написанного триггера на вашем столе? Слишком много индексов? 15 минут на вставку 10 000 строк кричат, что есть более серьезная проблема.

Thom A 03.04.2023 18:17

нет ни триггеров, ни индексов, самая обычная таблица

Denys 03.04.2023 18:23

Если вы хотите, чтобы это было быстрее, не используйте Python... Для этого вы можете использовать Kafka Connect JDBC Sink. Или, если вам нужен Python, не создавайте однострочный фрейм данных для каждого потребляемого события. Файлы CSV также будут медленными

OneCricketeer 03.04.2023 20:02

@OneCricketeer К сожалению, я не могу использовать Kafka Connect JDBC Sink, попробую загрузить

Denys 04.04.2023 14:04

Да, пакетная загрузка — это лучшее, что вы можете сделать (JDBC Sink делает то же самое). Однако вам необходимо отключить автоматическую коммит-фиксацию Kafka для потребителей, а также использовать транзакции (которых у вас в настоящее время нет), среди прочего, для обработки ошибок. Тем не менее, Python будет медленнее, чем другие языки, такие как Java, C#, Go и т. д.

OneCricketeer 04.04.2023 18:36

@OneCricketeer В данный момент я пытаюсь использовать sqlalchemy для пакетной загрузки.

Denys 05.04.2023 14:16

Хорошо. Как уже упоминалось, вы также захотите использовать транзакции docs.sqlalchemy.org/en/20/orm/session_transaction.html

OneCricketeer 05.04.2023 16:06
but the load speed was the same. что-то не так с вашим кодом, базой данных или способом измерения времени. 10Krows вообще не данные и определенно не требуют 1 минуты. Даже 1 секунды будет слишком много. Небольшие задержки должны быть заметны при 1 млн строк. Возможно диск тормозит, индексов нет, таблица заблокирована другими транзакциями?
Panagiotis Kanavos 06.04.2023 10:03
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
13
57
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Код ниже отлично работает для меня:

values_list.append(decoded_msg)

if len(values_list) == 10000:
    df = pd.DataFrame(values_list)
    column_order = ['col1', 'col2', 'col3']
    df = df.reindex(columns=column_order)
   
    with engine.connect() as connection:
        
        print(df)
        try:
            df.to_sql('TABLE_NAME', con=engine, if_exists='append', index=False, dtype = {'COL1': Integer(), 'COL2': String(), 'COL3': String()}, schema='SCHEMA')
        except Exception as e:
            print("An error occurred while inserting data: ", e)
        connection.commit()

    values_list = []

Это медленный код, полная противоположность задачи массового импорта SSIS. to_sql вставляет строки одну за другой (если вы не включили fastexecutemany) с полным протоколированием. Массовый импорт BULK INSERT, bcp и SSIS использует минимальное ведение журнала, при котором регистрируются только страницы данных, а не отдельные INSERT. Загрузка данных CSV в фрейм данных происходит еще медленнее — сначала вам нужно загрузить все данные в память, а затем отправить их в базу данных.

Panagiotis Kanavos 06.04.2023 09:54

Быстрый способ загрузки данных — использовать команду bcp или команду BULK INSERT SQL для импорта данных непосредственно из файла CSV или плоского файла. 10K строк — это вообще не данные, они могут легко поместиться в кеш процессора.

Panagiotis Kanavos 06.04.2023 10:05

При создании движка я добавил параметр engine = create_engine("mssql+pyodbc:///?odbc_connect = {}".format(para‌​ms), fast_executemany=True)

Denys 06.04.2023 10:17

Это все еще медленный путь. Совершенно невозможно, чтобы 10 тысяч строк заняли даже минуту без серьезных проблем. Что происходит с сервером? Вы проверили Activity Monitor? Есть ли заблокированные сеансы? Используется ли целевая таблица другими соединениями одновременно, что может привести к блокировке? Если вы импортируете непосредственно в занятую таблицу, у которой нет соответствующих индексов, любая команда SELECT может легко установить общие блокировки на всю таблицу. Ваши INSERT должны дождаться освобождения замков. Вы пробовали импортировать данные в отдельную промежуточную таблицу?

Panagiotis Kanavos 06.04.2023 11:11

Другие вопросы по теме