Я пытаюсь передать объект базы данных, который использует пул соединений в Postgres, другому объекту класса, но продолжаю получать ошибку.
if __name__ == "__main__":
cores = calculate_number_of_cores()
pretty_print(f"Number of cores ==> {cores}")
db_manager = DatabaseManager()
with ProcessPoolExecutor(cores) as executor:
while True:
if LOCAL_RUN:
pretty_print("ALERT: Doing a local run of the automation with limited capabilities.")
list_of_clients = db_manager.get_clients()
print(list_of_clients)
random.shuffle(list_of_clients)
list_of_not_attempted_clients_domains = db_manager.get_not_attempted_domains_that_matches_random_client_tags()
group_of_clients_handlers = {}
pretty_print(list_of_not_attempted_clients_domains)
# no matches
if not list_of_not_attempted_clients_domains:
sleep = 60 * 10
pretty_print(f'No matches found. Sleeping for {sleep}s')
time.sleep(sleep)
continue
for client in list_of_clients:
client_id = client[0]
client_name = client[1]
group_of_clients_handlers[client_id] = [ClientsHandler(db_manager), client_name]
try:
list(executor.map(
partial(run, group_of_clients_handlers),
list_of_not_attempted_clients_domains
))
# for not_attempted_clients_domains in list_of_not_attempted_clients_domains:
# futures = executor.submit(run(group_of_clients_handlers, not_attempted_clients_domains))
# for future in concurrent.futures.as_completed(futures):
# pretty_print("Loading futures....")
# print(future.result())
time.sleep(60)
pretty_print("sleeping for 60secs")
except Exception as err:
pretty_print(err)
Я использую многопроцессорный модуль в Python. Моя цель — иметь готовые соединения, которые мы используем для каждого запроса к базе данных, вместо того, чтобы всегда подключаться к базе данных для каждого выполнения или процесса.
Я должен передать db_manager
функции run
, так как я не хочу создавать там новое соединение. Но из-за этого я получаю сообщение об ошибке: невозможно выбрать объект «psycopg2.extensions.connection».
Я даже не знаю точно, что это значит. Единственное, что я знаю, это то, что когда я вручную отправляю функцию run
исполнителю, код работает правильно. Это закомментированный код.
Я не знаю другого способа справиться с этим. Я вроде как знаю, откуда возникает ошибка, но как ее решить, чтобы она не мешала моей настройке, — это такая боль.
Полученная вами ошибка означает, что объекты connection
нельзя сериализовать с помощью pickle
. Пиклирование объектов происходит на некоторых платформах, когда вы используете многопроцессорность для создания дочерних процессов, подробнее об этом можно прочитать в этом ответе.
Однако маринование – не главная ваша проблема. Даже если вы избегаете использования Pickle при создании дочерних процессов, совместное использование связей psycopg
между процессами не сработает. Согласно документации psycopg2, вы не должны разделять связи между процессами.
Если вам нужен предварительно инициализированный пул соединений, а операции с базой данных занимают много времени, попробуйте использовать threading
вместо multiprocessing
, поскольку согласно документации psycopg2
(ссылка выше) соединения являются потокобезопасными. С помощью потоков вы сможете реализовать свою программу по своему желанию, с предварительно инициализированным пулом соединений в основном потоке и запросами, делегированными другим потокам, использующим этот пул.
Да, совместное использование пула соединений между процессами — плохая идея и не рекомендуется в официальной документации, поэтому я бы не пошел по этому пути. В вашем случае, я думаю, вам следует просто инициализировать все процессы в вашем пуле с помощью отдельного пула соединений psycopg (см. параметр initializer
в ProcessPoolExecutor
), а затем повторно использовать этот пул инициализированных процессов для всех ваших запросов.
Разве использование инициализатора не равнозначно созданию отдельного соединения для каждого дочернего процесса, поскольку он выполняет функцию, предусмотренную в инициализаторе для каждого дочернего процесса?
@AlexanderObidiegwu Инициализатор будет выполняться только один раз для каждого дочернего элемента, поэтому это действительно лучшее место для создания соединения, просто убедитесь, что вы сохранили его в глобальном масштабе. global db_manager
db_manager= ...
, это не должно быть узким местом, поскольку пул создает процессы только лениво (только когда есть необходимые задачи и нет свободных рабочих)
@AlexanderObidiegwu Почему многопоточность неэффективна для кода, привязанного к процессору?
@jjanes Потому что он не может использовать преимущества нескольких ядер из-за GIL
То есть вы хотите сказать, что для каждого дочернего процесса я должен создать новое соединение? Большая часть моего кода привязана к процессору, поэтому переход на многопоточность является дорогостоящим и неэффективным.