Я прочитал данные из Kafka, и теперь мне нужно загрузить данные на сервер MSSQL.
Я пробовал несколько методов.
Этот метод загружает только 500 строк за 1 минуту:
while max_offset < end_offset-1:
msg = consumer.poll()
max_offset += 1
if msg is not None:
if msg.error():
print(f'Error while receiving message: {msg.error()}')
else:
value = msg.value()
offset = msg.offset()
try:
cursor = conn_str.cursor()
df = pd.json_normalize(value)
for index, row in df.iterrows():
cursor.execute("INSERT INTO TABLE_NAME (COLUMN_1, COLUMN_2, COLUMN_3, COLUMN_4, COLUMN_5, COLUMN_6) values(?,?,?,?,?,?)", row.column_1, row.column_2, row.column_3, row.column_4, row.column_5, row.column_6)
conn_str.commit()
print(f'Received message:\n{df}')
except Exception as e:
print(f'Error message: {e}')
else:
print('No messages received')
Второй метод, который я пробовал, был:
messages_to_insert = []
batch_size = 10000
counter = 0
while max_offset < end_offset-1:
msg = consumer.poll()
max_offset += 1
if msg is not None:
if msg.error():
else:
row = msg.value()
offset = msg.offset()
messages_to_insert.append((row["field1"], row["field2"], row["field3"], row["field4"], row["field5"], row["field6"]))
counter += 1
if counter >= batch_size:
cursor.executemany("INSERT INTO <table_name> (field1, field2, field3, field4, field5, field6) values(?,?,?,?,?,?)", messages_to_insert)
conn_str.commit()
messages_to_insert = []
counter = 0
if messages_to_insert:
cursor.executemany("INSERT INTO <table_name> (field1, field2, field3, field4, field5, field6) values(?,?,?,?,?,?)", messages_to_insert)
conn_str.commit()
Это метод немного быстрее.
И последний был загружен в файл .csv. В файл .csv загружается очень быстро, а в mssql загружается слишком медленно
messages_to_insert = []
batch_size = 10000
counter = 0
with open('data.csv', 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['COLUMN_1', 'COLUMN_2', 'COLUMN_3', 'COLUMN_4', 'COLUMN_5', 'COLUMN_6'])
while max_offset < end_offset-1:
msg = consumer.poll()
max_offset += 1
if msg is not None:
if msg.error():
logger.error(f'Error while receiving message: {msg.error()}')
else:
row = msg.value()
offset = msg.offset()
# Write row to file
writer.writerow([row['field1'], row['field2'], row['field3'], row['field4'], row['field5'], field6])
file_path = 'data.csv'
with open(file_path, 'r') as f:
lines = f.readlines()
lines.pop(0)
data = [tuple(line.strip().split(',')) for line in lines]
placeholders = ','.join('?' * len(data[0]))
query = f"INSERT INTO <table_name> (COLUMN_1, COLUMN_2, COLUMN_3, COLUMN_4, COLUMN_5, COLUMN_6) VALUES ({placeholders})"
cursor = conn_str.cursor()
cursor.executemany(query, data)
cursor.commit()
conn.close()
И попытался загрузить файл .csv после этого с помощью SSIS, но скорость загрузки была такой же.
Что ты вообще здесь спрашиваешь? В вашем вопросе нет вопроса.
@ThomA, как ускорить загрузку, это мой вопрос
Все ваши попытки, похоже, основаны на методах RBAR, @Denys, вы INSERT
один ряд за мучительным рядом. Однако службы SSIS должны быть довольно быстрыми, поскольку они объединяют данные в транзакцию. Как долго это "слишком долго"? Сколько времени на самом деле занимают эти процессы?
@ThomA SSIS загружает 10 000 строк примерно за 15 минут, у python примерно такая же скорость. но топик Кафка может иметь миллионы записей и поэтому скорость, которая сейчас мне кажется медленной. в сам файл csv, загрузка из Kafka очень быстрая
У вас есть что-то вроде плохо написанного триггера на вашем столе? Слишком много индексов? 15 минут на вставку 10 000 строк кричат, что есть более серьезная проблема.
нет ни триггеров, ни индексов, самая обычная таблица
Если вы хотите, чтобы это было быстрее, не используйте Python... Для этого вы можете использовать Kafka Connect JDBC Sink. Или, если вам нужен Python, не создавайте однострочный фрейм данных для каждого потребляемого события. Файлы CSV также будут медленными
@OneCricketeer К сожалению, я не могу использовать Kafka Connect JDBC Sink, попробую загрузить
Да, пакетная загрузка — это лучшее, что вы можете сделать (JDBC Sink делает то же самое). Однако вам необходимо отключить автоматическую коммит-фиксацию Kafka для потребителей, а также использовать транзакции (которых у вас в настоящее время нет), среди прочего, для обработки ошибок. Тем не менее, Python будет медленнее, чем другие языки, такие как Java, C#, Go и т. д.
@OneCricketeer В данный момент я пытаюсь использовать sqlalchemy для пакетной загрузки.
Хорошо. Как уже упоминалось, вы также захотите использовать транзакции docs.sqlalchemy.org/en/20/orm/session_transaction.html
but the load speed was the same.
что-то не так с вашим кодом, базой данных или способом измерения времени. 10Krows вообще не данные и определенно не требуют 1 минуты. Даже 1 секунды будет слишком много. Небольшие задержки должны быть заметны при 1 млн строк. Возможно диск тормозит, индексов нет, таблица заблокирована другими транзакциями?
Код ниже отлично работает для меня:
values_list.append(decoded_msg)
if len(values_list) == 10000:
df = pd.DataFrame(values_list)
column_order = ['col1', 'col2', 'col3']
df = df.reindex(columns=column_order)
with engine.connect() as connection:
print(df)
try:
df.to_sql('TABLE_NAME', con=engine, if_exists='append', index=False, dtype = {'COL1': Integer(), 'COL2': String(), 'COL3': String()}, schema='SCHEMA')
except Exception as e:
print("An error occurred while inserting data: ", e)
connection.commit()
values_list = []
Это медленный код, полная противоположность задачи массового импорта SSIS. to_sql
вставляет строки одну за другой (если вы не включили fastexecutemany
) с полным протоколированием. Массовый импорт BULK INSERT, bcp и SSIS использует минимальное ведение журнала, при котором регистрируются только страницы данных, а не отдельные INSERT. Загрузка данных CSV в фрейм данных происходит еще медленнее — сначала вам нужно загрузить все данные в память, а затем отправить их в базу данных.
Быстрый способ загрузки данных — использовать команду bcp
или команду BULK INSERT
SQL для импорта данных непосредственно из файла CSV или плоского файла. 10K строк — это вообще не данные, они могут легко поместиться в кеш процессора.
При создании движка я добавил параметр engine = create_engine("mssql+pyodbc:///?odbc_connect = {}".format(params), fast_executemany=True)
Это все еще медленный путь. Совершенно невозможно, чтобы 10 тысяч строк заняли даже минуту без серьезных проблем. Что происходит с сервером? Вы проверили Activity Monitor
? Есть ли заблокированные сеансы? Используется ли целевая таблица другими соединениями одновременно, что может привести к блокировке? Если вы импортируете непосредственно в занятую таблицу, у которой нет соответствующих индексов, любая команда SELECT
может легко установить общие блокировки на всю таблицу. Ваши INSERT
должны дождаться освобождения замков. Вы пробовали импортировать данные в отдельную промежуточную таблицу?
Вы можете попробовать задачу массовой вставки SSIS.