Ошибка многопроцессорности и пула соединений: невозможно выбрать объект «psycopg2.extensions.connection»

Я пытаюсь передать объект базы данных, который использует пул соединений в Postgres, другому объекту класса, но продолжаю получать ошибку.

if __name__ == "__main__":
cores = calculate_number_of_cores()
pretty_print(f"Number of cores ==> {cores}")

db_manager = DatabaseManager()
with ProcessPoolExecutor(cores) as executor:
    while True:
        if LOCAL_RUN:
            pretty_print("ALERT: Doing a local run of the automation with limited capabilities.")

        list_of_clients = db_manager.get_clients()
        print(list_of_clients)
        random.shuffle(list_of_clients)

        list_of_not_attempted_clients_domains = db_manager.get_not_attempted_domains_that_matches_random_client_tags()
        group_of_clients_handlers = {}
        pretty_print(list_of_not_attempted_clients_domains)

        # no matches
        if not list_of_not_attempted_clients_domains:
            sleep = 60 * 10
            pretty_print(f'No matches found. Sleeping for {sleep}s')
            time.sleep(sleep)
            continue

        for client in list_of_clients:
            client_id = client[0]
            client_name = client[1]
            group_of_clients_handlers[client_id] = [ClientsHandler(db_manager), client_name]

        try:
            list(executor.map(
                partial(run, group_of_clients_handlers),
                list_of_not_attempted_clients_domains
            ))
            # for not_attempted_clients_domains in list_of_not_attempted_clients_domains:
            #     futures = executor.submit(run(group_of_clients_handlers, not_attempted_clients_domains))
            #     for future in concurrent.futures.as_completed(futures):
            #         pretty_print("Loading futures....")
            #         print(future.result())

            time.sleep(60)
            pretty_print("sleeping for 60secs")
        except Exception as err:
            pretty_print(err)

Я использую многопроцессорный модуль в Python. Моя цель — иметь готовые соединения, которые мы используем для каждого запроса к базе данных, вместо того, чтобы всегда подключаться к базе данных для каждого выполнения или процесса.

Я должен передать db_manager функции run, так как я не хочу создавать там новое соединение. Но из-за этого я получаю сообщение об ошибке: невозможно выбрать объект «psycopg2.extensions.connection».

Я даже не знаю точно, что это значит. Единственное, что я знаю, это то, что когда я вручную отправляю функцию run исполнителю, код работает правильно. Это закомментированный код.

Я не знаю другого способа справиться с этим. Я вроде как знаю, откуда возникает ошибка, но как ее решить, чтобы она не мешала моей настройке, — это такая боль.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
100
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Полученная вами ошибка означает, что объекты connection нельзя сериализовать с помощью pickle. Пиклирование объектов происходит на некоторых платформах, когда вы используете многопроцессорность для создания дочерних процессов, подробнее об этом можно прочитать в этом ответе.

Однако маринование – не главная ваша проблема. Даже если вы избегаете использования Pickle при создании дочерних процессов, совместное использование связей psycopg между процессами не сработает. Согласно документации psycopg2, вы не должны разделять связи между процессами.

Если вам нужен предварительно инициализированный пул соединений, а операции с базой данных занимают много времени, попробуйте использовать threading вместо multiprocessing, поскольку согласно документации psycopg2 (ссылка выше) соединения являются потокобезопасными. С помощью потоков вы сможете реализовать свою программу по своему желанию, с предварительно инициализированным пулом соединений в основном потоке и запросами, делегированными другим потокам, использующим этот пул.

То есть вы хотите сказать, что для каждого дочернего процесса я должен создать новое соединение? Большая часть моего кода привязана к процессору, поэтому переход на многопоточность является дорогостоящим и неэффективным.

Alexander Obidiegwu 30.04.2024 02:40

Да, совместное использование пула соединений между процессами — плохая идея и не рекомендуется в официальной документации, поэтому я бы не пошел по этому пути. В вашем случае, я думаю, вам следует просто инициализировать все процессы в вашем пуле с помощью отдельного пула соединений psycopg (см. параметр initializer в ProcessPoolExecutor), а затем повторно использовать этот пул инициализированных процессов для всех ваших запросов.

hnwoh 30.04.2024 03:02

Разве использование инициализатора не равнозначно созданию отдельного соединения для каждого дочернего процесса, поскольку он выполняет функцию, предусмотренную в инициализаторе для каждого дочернего процесса?

Alexander Obidiegwu 30.04.2024 05:31

@AlexanderObidiegwu Инициализатор будет выполняться только один раз для каждого дочернего элемента, поэтому это действительно лучшее место для создания соединения, просто убедитесь, что вы сохранили его в глобальном масштабе. global db_managerdb_manager= ..., это не должно быть узким местом, поскольку пул создает процессы только лениво (только когда есть необходимые задачи и нет свободных рабочих)

Ahmed AEK 30.04.2024 08:23

@AlexanderObidiegwu Почему многопоточность неэффективна для кода, привязанного к процессору?

jjanes 30.04.2024 14:01

@jjanes Потому что он не может использовать преимущества нескольких ядер из-за GIL

Alexander Obidiegwu 30.04.2024 21:55

Другие вопросы по теме