Python sqlite3 и параллелизм

У меня есть программа Python, в которой используется модуль «threading». Каждую секунду моя программа запускает новый поток, который извлекает некоторые данные из Интернета и сохраняет эти данные на моем жестком диске. Я хотел бы использовать sqlite3 для хранения этих результатов, но не могу заставить его работать. Похоже, проблема в следующей строке:

conn = sqlite3.connect("mydatabase.db")
  • Если я помещаю эту строку кода в каждый поток, я получаю OperationalError, сообщающую мне, что файл базы данных заблокирован. Я предполагаю, что это означает, что другой поток открыл mydatabase.db через соединение sqlite3 и заблокировал его.
  • Если я помещаю эту строку кода в основную программу и передаю объект соединения (conn) каждому потоку, я получаю ProgrammingError, говорящую, что объекты SQLite, созданные в потоке, могут использоваться только в том же потоке.

Раньше я сохранял все свои результаты в файлах CSV, и у меня не было ни одной из этих проблем с блокировкой файлов. Надеюсь, это станет возможным с sqlite. Любые идеи?

Я хотел бы отметить, что более свежие версии Python включают новые версии sqlite3, которые должны решить эту проблему.

Ryan Fugger 01.05.2012 21:45

@RyanFugger, знаете ли вы, какая самая ранняя версия поддерживает это? Я использую 2.7

notbad.jpeg 26.06.2013 02:34

@RyanFugger AFAIK нет предварительно созданной версии, которая содержит более новую версию SQLite3, в которой это исправлено. Однако вы можете построить его самостоятельно.

shezi 08.11.2013 17:36
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
92
3
88 653
14
Перейти к ответу Данный вопрос помечен как решенный

Ответы 14

Ответ принят как подходящий

Вы можете использовать шаблон потребитель-производитель. Например, вы можете создать очередь, разделяемую между потоками. Первый поток, который получает данные из Интернета, помещает эти данные в общую очередь. Другой поток, которому принадлежит соединение с базой данных, удаляет данные из очереди и передает их в базу данных.

FWIW: более поздние версии sqlite утверждают, что вы можете обмениваться соединениями и объектами между потоками (кроме курсоров), но на практике я обнаружил иное.

Richard Levasseur 14.02.2009 05:22
Здесь - пример того, о чем говорил выше Евгений Лазин.
dugres 03.05.2009 12:21

Скрытие вашей базы данных за общей очередью - действительно плохое решение этого вопроса, потому что SQL в целом и SQLite, в частности, уже имеют встроенные механизмы блокировки, которые, вероятно, намного более усовершенствованы, чем все, что вы можете создать самостоятельно.

shezi 08.11.2013 17:35

Вопрос нужно прочитать, на тот момент не было встроенных запорных механизмов. Во многих современных встроенных базах данных этот механизм отсутствует по причинам производительности (например, LevelDB).

Evgeny Lazin 15.11.2013 18:39

Мне нравится ответ Евгения: очереди - это вообще лучший способ реализовать межпотоковое взаимодействие. Для полноты картины вот еще несколько вариантов:

  • Закройте соединение с БД, когда порожденные потоки закончат его использовать. Это исправит ваш OperationalError, но открывать и закрывать такие соединения, как правило, нельзя из-за накладных расходов на производительность.
  • Не используйте дочерние потоки. Если задача, выполняемая один раз в секунду, достаточно легкая, вам может сойти с рук выполнение выборки и сохранения, а затем засыпание до подходящего момента. Это нежелательно, поскольку операции выборки и сохранения могут занимать> 1 секунды, и вы теряете преимущества мультиплексированных ресурсов, которые у вас есть при многопоточном подходе.

Переключитесь на многопроцессорность. Он намного лучше, хорошо масштабируется, может выходить за рамки использования нескольких ядер за счет использования нескольких процессоров, а интерфейс такой же, как при использовании модуля потоковой передачи Python.

Или, как предложил Али, просто используйте Механизм пула потоков SQLAlchemy. Он сделает все за вас автоматически и имеет множество дополнительных функций, просто процитирую некоторые из них:

  1. SQLAlchemy включает диалекты для SQLite, Postgres, MySQL, Oracle, MS-SQL, Firebird, MaxDB, MS Access, Sybase и Informix; IBM также выпустила драйвер DB2. Таким образом, вам не нужно переписывать свое приложение, если вы решите отказаться от SQLite.
  2. Система Unit Of Work, центральная часть Object Relational Mapper (ORM) SQLAlchemy, организует ожидающие операции создания / вставки / обновления / удаления в очереди и сбрасывает их все в одном пакете. Для этого он выполняет топологическую «сортировку зависимостей» всех измененных элементов в очереди, чтобы соблюдать ограничения внешнего ключа, и группирует избыточные операторы вместе, где они иногда могут быть дополнительно объединены в пакеты. Это обеспечивает максимальную эффективность и безопасность транзакций и сводит к минимуму вероятность возникновения взаимоблокировок.

Или, если вы ленивы, как я, можете использовать SQLAlchemy. Он будет обрабатывать потоки за вас, (с использованием локального потока и некоторого пула соединений), и способ, которым он это делает, даже настраиваемый.

В качестве дополнительного бонуса, если / когда вы поймете / решите, что использование Sqlite для любого параллельного приложения будет катастрофой, вам не придется менять свой код для использования MySQL, Postgres или чего-либо еще. Вы можете просто переключиться.

Почему нигде на официальном сайте не указана версия Python?

Display Name 24.09.2015 20:37

Вам необходимо спроектировать параллелизм для вашей программы. SQLite имеет четкие ограничения, и вам необходимо соблюдать их, см. Часто задаваемые вопросы (также следующий вопрос).

Вам вообще не следует использовать для этого потоки. Это тривиальная задача для скрученный, и она, скорее всего, в любом случае уведет вас значительно дальше.

Используйте только один поток, и завершение запроса инициирует событие для выполнения записи.

twisted позаботится о расписании, обратных вызовах и т. д. за вас. Он передаст вам весь результат в виде строки, или вы можете запустить его через потоковый процессор (у меня есть twitter API и Friendfeed API, которые оба запускают события для вызывающих, поскольку результаты все еще загружаются).

В зависимости от того, что вы делаете со своими данными, вы можете просто выгрузить полный результат в sqlite по мере его завершения, приготовить его и выгрузить или приготовить его во время чтения и выгрузить в конце.

У меня очень простое приложение, которое делает что-то похожее на то, что вы хотите от github. Я называю это pfetch (параллельная выборка). Он захватывает различные страницы по расписанию, передает результаты в файл и, при необходимости, запускает сценарий после успешного завершения каждой из них. Он также делает некоторые причудливые вещи, такие как условные GET, но все же может быть хорошей базой для всего, что вы делаете.

Scrapy кажется потенциальным ответом на мой вопрос. Его домашняя страница точно описывает мою задачу. (Хотя я еще не уверен, насколько стабилен код.)

Я бы посмотрел на модуль Python y_serial для сохранения данных: http://yserial.sourceforge.net

который обрабатывает проблемы взаимоблокировки, связанные с одной базой данных SQLite. Если спрос на параллелизм становится большим, можно легко настроить класс Farm для многих баз данных, чтобы распределить нагрузку в течение стохастического времени.

Надеюсь, это поможет вашему проекту ... он должен быть достаточно простым, чтобы реализовать его за 10 минут.

Следующее найдено на mail.python.org.pipermail.1239789

Я нашел решение. Я не знаю, почему в документации Python нет ни слова об этой опции. Итак, мы должны добавить новый аргумент ключевого слова в функцию подключения и мы сможем создавать из него курсоры в другом потоке. Так что используйте:

sqlite.connect(":memory:", check_same_thread = False)

отлично работает для меня. Конечно, с этого момента мне нужно заботиться безопасного многопоточного доступа к БД. В любом случае спасибо всем за попытку помочь.

(С GIL действительно не так много возможностей для настоящего многопоточного доступа к базе данных, в любом случае, что я видел)

Erik Aronesty 08.02.2018 01:29
ПРЕДУПРЕЖДЕНИЕ: В документах Python есть это, чтобы сказать о опции check_same_thread: «При использовании нескольких потоков с одним и тем же соединением операции записи должны быть сериализованы пользователем, чтобы избежать повреждения данных». Итак, да, вы может используете SQLite с несколькими потоками, если ваш код гарантирует, что только один поток может писать в базу данных в любой момент времени. В противном случае вы можете повредить свою базу данных.
Ajedi32 02.09.2020 19:07

Вопреки распространенному мнению, более новые версии sqlite3 делать поддерживают доступ из нескольких потоков.

Это можно включить с помощью необязательного ключевого аргумента check_same_thread:

sqlite.connect(":memory:", check_same_thread=False)

Я столкнулся с непредсказуемыми исключениями, и даже Python вылетает с этой опцией (Python 2.7 в Windows 32).

reclosedev 04.05.2012 15:19

Согласно документы, в многопоточном режиме ни одно соединение с базой данных не может использоваться в нескольких потоках. Также есть сериализованный режим

Casebash 13.05.2013 04:40

Может кто-нибудь сослаться на конкретную часть документации здесь?

Medeiros 27.09.2013 07:15

Ничего, только что нашел: http://sqlite.org/compile.html#threadsafe

Medeiros 27.09.2013 07:17

@reclosedev, можешь подробнее рассказать о своих исключениях?

FrEaKmAn 15.09.2014 20:40

@FrEaKmAn, извините, это было давно, тоже нет: memory: database. После этого я не использовал соединение sqlite в нескольких потоках.

reclosedev 15.09.2014 22:33

@FrEaKmAn, я столкнулся с этим, с дампом ядра процесса python при многопоточном доступе. Поведение было непредсказуемым, никаких исключений зарегистрировано не было. Если я правильно помню, это было верно как для чтения, так и для записи. Это единственное, что я видел до сих пор, когда Python вылетал из строя: D. Я не пробовал это с sqlite, скомпилированным в потокобезопасном режиме, но в то время у меня не было свободы перекомпилировать sqlite системы по умолчанию. В итоге я сделал что-то похожее на то, что предложил Эрик, и отключил совместимость потоков.

verboze 25.02.2015 20:29

@verboze U не нужно ломать текущий модуль sqlite3. Получите копию, затем поместите в соответствующую папку в вашем проекте, а затем импортируйте из нее. С другой стороны, возможность пропускать проверки потоков должна выполняться приложением, которое его создает, а не новыми, запрашивающими доступ: C

m3nda 21.06.2016 10:29

В python3.6 check_same_thread, похоже, пока работает на win32 с множеством потоков.

Erik Aronesty 08.02.2018 01:28
ПРЕДУПРЕЖДЕНИЕ: В документах Python есть это, чтобы сказать о опции check_same_thread: «При использовании нескольких потоков с одним и тем же соединением операции записи должны быть сериализованы пользователем, чтобы избежать повреждения данных». Итак, да, вы может используете SQLite с несколькими потоками, если ваш код гарантирует, что только один поток может писать в базу данных в любой момент времени. В противном случае вы можете повредить свою базу данных.
Ajedi32 02.09.2020 19:07

@ Ajedi32 как насчет одновременного чтения и письма? Документация по этому поводу не ясна, и я могу интерпретировать ее так или иначе. У вас есть опыт использования модуля sqlite3 Python с 1 потоком записи и одним или несколькими потоками одновременного чтения? Будет сбой или нормально работать?

blubberdiblub 31.03.2021 12:30

Используйте threading.Lock ()

Укажите, что делает следующий код и где его следует использовать.

Ali Akhtari 09.04.2020 03:30

Наиболее вероятная причина, по которой вы получаете ошибки с заблокированными базами данных, заключается в том, что вы должны выпустить

conn.commit()

после завершения операции с базой данных. Если вы этого не сделаете, ваша база данных будет заблокирована от записи и останется такой же. Остальные потоки, ожидающие записи, будут отключены по истечении некоторого времени (по умолчанию установлено 5 секунд, подробности см. В http://docs.python.org/2/library/sqlite3.html#sqlite3.connect).

Пример правильной и одновременной вставки будет следующим:

import threading, sqlite3
class InsertionThread(threading.Thread):

    def __init__(self, number):
        super(InsertionThread, self).__init__()
        self.number = number

    def run(self):
        conn = sqlite3.connect('yourdb.db', timeout=5)
        conn.execute('CREATE TABLE IF NOT EXISTS threadcount (threadnum, count);')
        conn.commit()

        for i in range(1000):
            conn.execute("INSERT INTO threadcount VALUES (?, ?);", (self.number, i))
            conn.commit()

# create as many of these as you wish
# but be careful to set the timeout value appropriately: thread switching in
# python takes some time
for i in range(2):
    t = InsertionThread(i)
    t.start()

Если вам нравится SQLite или у вас есть другие инструменты, которые работают с базами данных SQLite, или вы хотите заменить файлы CSV файлами SQLite db, или вам нужно сделать что-то редкое, например межплатформенный IPC, то SQLite - отличный инструмент и очень подходит для этой цели. Не позволяйте заставлять себя использовать другое решение, если оно вам не подходит!

Я не смог найти никаких тестов ни в одном из приведенных выше ответов, поэтому я написал тест для тестирования всего.

Пробовал 3 подхода

  1. Последовательное чтение и запись из базы данных SQLite
  2. Использование ThreadPoolExecutor для чтения / записи
  3. Использование ProcessPoolExecutor для чтения / записи

Результаты и выводы теста следующие.

  1. Последовательное чтение / последовательная запись работают лучше всего
  2. Если вы должны обрабатывать параллельно, используйте ProcessPoolExecutor для параллельного чтения.
  3. Не выполняйте никаких операций записи ни с помощью ThreadPoolExecutor, ни с помощью ProcessPoolExecutor, так как вы столкнетесь с ошибками блокировки базы данных, и вам придется повторить попытку вставки фрагмента снова.

Вы можете найти код и полное решение для тестов в моем SO-ответе ЗДЕСЬ Надеюсь, это поможет!

Вам необходимо использовать session.close() после каждая транзакция в базе данных, чтобы использовать тот же курсор в том же потоке, не используя тот же курсор в многопоточности, которая вызывает эту ошибку.

Другие вопросы по теме