Предположим, что при использовании многопроцессорной обработки Python я создаю 10 процессов, разветвляя основной процесс. Мы знаем, что дочерний процесс наследует состояние родительского процесса в своем собственном пространстве памяти.
У меня есть движок SQLAlchemy, созданный в родительском процессе с размером пула 5. Во время разветвления в пуле нет никаких соединений.
Поскольку дочерние процессы имеют собственное пространство памяти, это означает, что унаследованный движок должен быть доступен только им, верно?
import multiprocessing
import os
# Create an engine with a connection pool in the parent process
engine = create_engine('sqlite:///example.db', pool_size=5)
def initializer():
# Connections in the pool are gracefully removed!
engine.dispose(close=False)
def get_stuff():
for _ in range(5):
with engine.connect() as conn:
conn.execute(text("..."))
with multiprocessing.Pool(10,initializer=initializer) as pool:
results = [pool.apply_async(get_stuff) for _ in range(10)]
# Collect results
output = [result.get() for result in results]
multiprocessing.Pool
создает пул подпроцессов (воркеров). Рабочие процессы передаются по конвейеру и разветвляются основным процессом, создающим пул. В вашем случае у вас работает 10 рабочих подпроцессов, способных принимать задачи в порядке очереди, когда вы загружаете пул такими вызовами, какpool.apply().
Вот пример одного из таких процессов. У вас должно быть 10 из них, работающих в вашей системе.
64163 ttys001 0:00.12 /Library/Frameworks/Python.framework/Versions/3.11/Resources/Python.app/Contents/MacOS/Python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=7, pipe_handle=30) --multiprocessing-fork
Как вы заметили, дочерние подпроцессы имеют доступ к родительским ресурсам. Однако я не могу вам сказать, как они взаимодействуют с другим менеджером приложений.
Когда задачи добавляются в пул с помощью pool.apply()
, перед выполнением запускается инициализатор. Я вижу только один вызов pool.apply()
в операторе with multiprocessing.Pool()
, что означает, что выполняется только одна задача. Возможно, вы захотите использовать pool.map()
для параллельного выполнения нескольких задач.
Ваш вопрос немного сбивает с толку, потому что вы переплетаете два сценария туда и обратно.
Кроме того, точное максимальное количество подключений немного сложно, поскольку существует настройка переполнения. В пуле по умолчанию максимальное количество разрешенных подключений установлено на pool_size + max_overflow
. Грубая идея заключается в том, что вы можете предотвратить постоянное зависание множества соединений, но при этом разрешить некоторый всплеск соединений, если есть спрос. Подробности можно найти здесь: sqlalchemy.pool.QueuePool.params.max_overflow
Теперь для обработки случаев:
Обратите внимание: если вы не используете многопоточность или asyncio в процессах, вы не превысите одно соединение на процесс, даже если движок это позволит. Каждый раз, когда вы подключаетесь в процессе, вы просто получаете то же самое соединение обратно или в первый раз будет установлено новое, при условии, что вы используете настройки по умолчанию.
Наконец, насколько я понимаю, теоретически вы будете использовать большое количество процессов, 10, в многопроцессорном пуле в Python с SQLAlchemy только в том случае, если вы извлекаете данные в каждом процессе, а затем выполняете большой объем работы с интенсивным использованием ЦП. Если большая часть работы просто ожидает базу данных, вы можете просто использовать потоки.
Вот пример, в котором один процесс используется для запуска двух дочерних/подпроцессов. Каждый из них пытается переполнить пул. Если вы посмотрите на выходные данные отладки, включенные с помощью echo_pool = "debug"
, вы увидите, что сразу создается 30 соединений (2 дочерних процесса * (5 размеров пула + 10 максимального переполнения)). Как только лимит достигнут, лишние потоки (намеренно на 5 больше лимита) должны ждать 30 секунд, pool_timeout=30
, чтобы пул предоставил им соединение, а не создал больше. Спящий режим составляет всего 5 секунд, поэтому, когда более ранние потоки завершают работу, лишние потоки получают соединения и не истекают по тайм-ауту.
import os
import math
from sqlalchemy import (
Column,
Integer,
create_engine,
)
from sqlalchemy.orm import (
declarative_base,
)
from sqlalchemy.sql import select
import threading
import multiprocessing
import concurrent
import time
POOL_SIZE = 5
MAX_OVERFLOW = 10
TOTAL_LIMIT = MAX_OVERFLOW + POOL_SIZE
NUM_SUBPROCESSES = 2
def get_engine(env):
URI = f"sqlite:///example.db"
return create_engine(
URI,
echo_pool='debug',
pool_size=POOL_SIZE,
max_overflow=MAX_OVERFLOW,
# This is the default and should let workers still finish
# but show the queuing.
pool_timeout=30,
)
def is_prime(n):
""" Simple is_prime taken straight from Python doc concurrent.futures example. """
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
engine = get_engine(os.environ)
def initializer():
# Connections in the pool are gracefully removed!
engine.dispose(close=False)
def stall(index):
with engine.connect() as conn:
print (f"Connected in process:thread = {os.getpid()}:{threading.get_ident()}")
time.sleep(5)
# Just feed the index in and out of the database, doing nothing.
db_result = conn.execute(select(index)).scalar()
# Then check if prime.
return is_prime(db_result)
def fill_up_pool():
results = {}
# Start up threads in subprocess.
with concurrent.futures.ThreadPoolExecutor(max_workers=TOTAL_LIMIT+5) as executor:
future_to_index = {executor.submit(stall, x): x for x in range(TOTAL_LIMIT+5)}
for future in concurrent.futures.as_completed(future_to_index):
index = future_to_index[future]
try:
result = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (index, exc))
else:
print (f"{index} is {'prime' if result else 'composite'}")
results[index] = result
return results
# Start up subprocesses.
with multiprocessing.Pool(NUM_SUBPROCESSES,initializer=initializer) as pool:
sub_results = [pool.apply_async(fill_up_pool, ()) for _ in range(NUM_SUBPROCESSES)]
# Collect result and print.
print ([result.get() for result in sub_results])
Отладочный вывод после фильтрации по Created new connection
(ровно 30, как и предполагалось):
2024-06-26 17:36:36,193 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf5b0>
2024-06-26 17:36:36,193 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf5b0>
2024-06-26 17:36:36,194 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf880>
2024-06-26 17:36:36,194 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801120>
2024-06-26 17:36:36,194 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf880>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801b70>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801120>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8023e0>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801b70>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8023e0>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802890>
2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802c50>
2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802890>
2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803010>
2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802b60>
2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8033d0>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803010>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803790>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803a60>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8033d0>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803790>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803f10>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803b50>
2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803f10>
2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8310>
2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a86d0>
2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8310>
2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8a90>
2024-06-26 17:36:36,199 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a86d0>
2024-06-26 17:36:36,199 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8a90>```
Привет, Ян! Спасибо и извините за путаницу, я отредактировал вопрос, правильный вопрос - это второй случай (случай 11), который вы объяснили. Итак, если я правильно понял, каждый процесс будет иметь свой собственный набор соединений с максимальным пределом, равным значению Pool_size + max_overflow, верно? Кроме того, в вашем уравнении используются 11 процессов: 1 родительский + 10 дочерних процессов?
@SanjayS Я добавил пример, но да, то, что вы сказали, верно. В моем примере родительский процесс не использует никаких соединений, но применяется тот же верхний предел pool_size + max_overflow
. Это только по умолчанию Pool
, остальные пулы ведут себя по-другому.
Спасибо @Ian, прекрасно объяснил! Например, вы пропустили дополнительный вопрос, который я упомянул в предыдущем комментарии.
@SanjayS Да, в число 11 входили родитель (1) и дети (10). Например, если вы читаете исходные данные в родительском элементе и отправляете эти данные дочерним элементам, то родительский элемент также будет использовать соединения.
Понятно! Спасибо @Ian
Привет @igmartin, спасибо за ответ. Ты прав, мне следовало поставить
pool.map
илиpool.apply_async
. Я выбираюpool.apply_sync(func)
, потому чтоpool.map(func,func_argument)
ожидает аргументы функции, которые в моем случае не используются!