Как работает пул соединений в многопроцессорной обработке Python?

Предположим, что при использовании многопроцессорной обработки Python я создаю 10 процессов, разветвляя основной процесс. Мы знаем, что дочерний процесс наследует состояние родительского процесса в своем собственном пространстве памяти.

У меня есть движок SQLAlchemy, созданный в родительском процессе с размером пула 5. Во время разветвления в пуле нет никаких соединений.

Поскольку дочерние процессы имеют собственное пространство памяти, это означает, что унаследованный движок должен быть доступен только им, верно?

  1. Если это так, означает ли это общее количество активных возможных соединений будет 10 [количество процессов] * 5 [размер_пула] = 50?
  2. Если нет, то как здесь работает пул соединений?
    import multiprocessing
    import os
    
    # Create an engine with a connection pool in the parent process
    engine = create_engine('sqlite:///example.db', pool_size=5)
    
    def initializer():
        # Connections in the pool are gracefully removed!
        engine.dispose(close=False)
    
    def get_stuff():
      for _ in range(5):
         with engine.connect() as conn:
            conn.execute(text("..."))
    
    
    with multiprocessing.Pool(10,initializer=initializer) as pool:
        results = [pool.apply_async(get_stuff) for _ in range(10)]
        # Collect results
        output = [result.get() for result in results]
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
89
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

multiprocessing.Pool создает пул подпроцессов (воркеров). Рабочие процессы передаются по конвейеру и разветвляются основным процессом, создающим пул. В вашем случае у вас работает 10 рабочих подпроцессов, способных принимать задачи в порядке очереди, когда вы загружаете пул такими вызовами, какpool.apply().

Вот пример одного из таких процессов. У вас должно быть 10 из них, работающих в вашей системе.

64163 ttys001    0:00.12 /Library/Frameworks/Python.framework/Versions/3.11/Resources/Python.app/Contents/MacOS/Python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=7, pipe_handle=30) --multiprocessing-fork

Как вы заметили, дочерние подпроцессы имеют доступ к родительским ресурсам. Однако я не могу вам сказать, как они взаимодействуют с другим менеджером приложений.

Когда задачи добавляются в пул с помощью pool.apply(), перед выполнением запускается инициализатор. Я вижу только один вызов pool.apply() в операторе with multiprocessing.Pool(), что означает, что выполняется только одна задача. Возможно, вы захотите использовать pool.map() для параллельного выполнения нескольких задач.

Привет @igmartin, спасибо за ответ. Ты прав, мне следовало поставить pool.map или pool.apply_async. Я выбираю pool.apply_sync(func), потому что pool.map(func,func_argument) ожидает аргументы функции, которые в моем случае не используются!

Sanjay S 26.06.2024 07:48
Ответ принят как подходящий

Ваш вопрос немного сбивает с толку, потому что вы переплетаете два сценария туда и обратно.

  • (Случай 2) Один процесс порождает другой дочерний процесс.
  • (Случай 11) Один процесс использует многопроцессорность для создания 10 дочерних процессов.

Кроме того, точное максимальное количество подключений немного сложно, поскольку существует настройка переполнения. В пуле по умолчанию максимальное количество разрешенных подключений установлено на pool_size + max_overflow. Грубая идея заключается в том, что вы можете предотвратить постоянное зависание множества соединений, но при этом разрешить некоторый всплеск соединений, если есть спрос. Подробности можно найти здесь: sqlalchemy.pool.QueuePool.params.max_overflow

Теперь для обработки случаев:

  • (Случай 2)
    • Максимальное количество одновременных подключений = 2 процесса * (5 подключений на пул + 10 соединений переполнения на пул) = 30
  • (случай 11)
    • Максимальное количество одновременных подключений: 11 процессов * (5 соединений на пул + 10 соединений переполнения на пул) = 165 (вероятно, если это произойдет, мне придется плохо)

Обратите внимание: если вы не используете многопоточность или asyncio в процессах, вы не превысите одно соединение на процесс, даже если движок это позволит. Каждый раз, когда вы подключаетесь в процессе, вы просто получаете то же самое соединение обратно или в первый раз будет установлено новое, при условии, что вы используете настройки по умолчанию.

Наконец, насколько я понимаю, теоретически вы будете использовать большое количество процессов, 10, в многопроцессорном пуле в Python с SQLAlchemy только в том случае, если вы извлекаете данные в каждом процессе, а затем выполняете большой объем работы с интенсивным использованием ЦП. Если большая часть работы просто ожидает базу данных, вы можете просто использовать потоки.

Пример

Вот пример, в котором один процесс используется для запуска двух дочерних/подпроцессов. Каждый из них пытается переполнить пул. Если вы посмотрите на выходные данные отладки, включенные с помощью echo_pool = "debug", вы увидите, что сразу создается 30 соединений (2 дочерних процесса * (5 размеров пула + 10 максимального переполнения)). Как только лимит достигнут, лишние потоки (намеренно на 5 больше лимита) должны ждать 30 секунд, pool_timeout=30, чтобы пул предоставил им соединение, а не создал больше. Спящий режим составляет всего 5 секунд, поэтому, когда более ранние потоки завершают работу, лишние потоки получают соединения и не истекают по тайм-ауту.

import os
import math

from sqlalchemy import (
    Column,
    Integer,
    create_engine,
)
from sqlalchemy.orm import (
    declarative_base,
)
from sqlalchemy.sql import select
import threading
import multiprocessing
import concurrent
import time

POOL_SIZE = 5
MAX_OVERFLOW = 10
TOTAL_LIMIT = MAX_OVERFLOW + POOL_SIZE
NUM_SUBPROCESSES = 2


def get_engine(env):
    URI = f"sqlite:///example.db"
    return create_engine(
        URI,
        echo_pool='debug',
        pool_size=POOL_SIZE,
        max_overflow=MAX_OVERFLOW,
        # This is the default and should let workers still finish
        # but show the queuing.
        pool_timeout=30,
    )


def is_prime(n):
    """ Simple is_prime taken straight from Python doc concurrent.futures example. """
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


engine = get_engine(os.environ)


def initializer():
    # Connections in the pool are gracefully removed!
    engine.dispose(close=False)

def stall(index):
    with engine.connect() as conn:
        print (f"Connected in process:thread = {os.getpid()}:{threading.get_ident()}")
        time.sleep(5)
        # Just feed the index in and out of the database, doing nothing.
        db_result = conn.execute(select(index)).scalar()
        # Then check if prime.
        return is_prime(db_result)

def fill_up_pool():
    results = {}
    # Start up threads in subprocess.
    with concurrent.futures.ThreadPoolExecutor(max_workers=TOTAL_LIMIT+5) as executor:
        future_to_index = {executor.submit(stall, x): x for x in range(TOTAL_LIMIT+5)}
        for future in concurrent.futures.as_completed(future_to_index):

            index = future_to_index[future]
            try:
                result = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (index, exc))
            else:
                print (f"{index} is {'prime' if result else 'composite'}")
            results[index] = result
    return results


# Start up subprocesses.
with multiprocessing.Pool(NUM_SUBPROCESSES,initializer=initializer) as pool:
    sub_results = [pool.apply_async(fill_up_pool, ()) for _ in range(NUM_SUBPROCESSES)]
    # Collect result and print.
    print ([result.get() for result in sub_results])

Отладочный вывод после фильтрации по Created new connection (ровно 30, как и предполагалось):

2024-06-26 17:36:36,193 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf5b0>
2024-06-26 17:36:36,193 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf5b0>
2024-06-26 17:36:36,194 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf880>
2024-06-26 17:36:36,194 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801120>
2024-06-26 17:36:36,194 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f7bf880>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801b70>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801120>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8023e0>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f801b70>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8023e0>
2024-06-26 17:36:36,195 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802890>
2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802c50>
2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802890>
2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803010>
2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f802b60>
2024-06-26 17:36:36,196 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8033d0>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803010>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803790>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803a60>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f8033d0>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803790>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803f10>
2024-06-26 17:36:36,197 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803b50>
2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79f803f10>
2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8310>
2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a86d0>
2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8310>
2024-06-26 17:36:36,198 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8a90>
2024-06-26 17:36:36,199 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a86d0>
2024-06-26 17:36:36,199 DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <sqlite3.Connection object at 0x7fc79c4a8a90>```

Привет, Ян! Спасибо и извините за путаницу, я отредактировал вопрос, правильный вопрос - это второй случай (случай 11), который вы объяснили. Итак, если я правильно понял, каждый процесс будет иметь свой собственный набор соединений с максимальным пределом, равным значению Pool_size + max_overflow, верно? Кроме того, в вашем уравнении используются 11 процессов: 1 родительский + 10 дочерних процессов?

Sanjay S 26.06.2024 07:49

@SanjayS Я добавил пример, но да, то, что вы сказали, верно. В моем примере родительский процесс не использует никаких соединений, но применяется тот же верхний предел pool_size + max_overflow. Это только по умолчанию Pool, остальные пулы ведут себя по-другому.

Ian Wilson 26.06.2024 19:42

Спасибо @Ian, прекрасно объяснил! Например, вы пропустили дополнительный вопрос, который я упомянул в предыдущем комментарии.

Sanjay S 26.06.2024 20:31

@SanjayS Да, в число 11 входили родитель (1) и дети (10). Например, если вы читаете исходные данные в родительском элементе и отправляете эти данные дочерним элементам, то родительский элемент также будет использовать соединения.

Ian Wilson 26.06.2024 23:06

Понятно! Спасибо @Ian

Sanjay S 27.06.2024 06:43

Другие вопросы по теме