Я пытаюсь понять, как работают ThreadPoolExecutor и ProcessPoolExecutors. В этом тесте я предполагал, что задачи, связанные с ЦП, такие как увеличение счетчика, не получат выгоды от выполнения на ThreadPoolExecutors, поскольку он не освобождает GIL и может использовать только один процесс одновременно.
@measure_execution_time
def cpu_slow_function(item):
start = time.time()
duration = random()
counter = 1
while time.time() - start < duration:
counter += 1
return item, counter
def test_thread_pool__cpu_bound():
"""
100 tasks of average .5 seconds each, would take 50 seconds to complete sequentially.
"""
items = list(range(100))
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(cpu_slow_function, items))
for index, (result, counter) in enumerate(results):
assert result == index
assert counter >= 0.0
К моему удивлению, этот тест занимает около 5 секунд. По моим предположениям, это должно занять ~50 секунд, 100 задач в среднем по 0,5 секунды каждая.
Что мне не хватает?
Ваша cpu_slow_function ошибочна. Вы измеряете реальное время. Из-за этого каждая функция будет выполняться одинаково. Даже если GIL задержит одну функцию, скажем, в 10 раз, это повлияет только на значение счетчика. Однако время выполнения будет таким же, как и при многопроцессорной обработке.
Другими словами: это не функция, привязанная к процессору. Время выполнения практически не зависит от скорости процессора, почти полностью от затраченного времени.






GIL не препятствует одновременному запуску потоков Python. Это только предотвращает выполнение байт-кода более чем одним потоком в любой момент времени.
В любой момент в вашей программе будет один рабочий процесс, который на самом деле находится в процессе выполнения байт-кода, и 99 рабочих процессов, которые все ждут возможности выполнить свой следующий байт-код, а тем временем часы time.time() тикают. (т. е. проходит реальное время) для всех из них.
Вы можете увидеть эффект GIL в следующем варианте вашего кода:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import perf_counter, time
from os import cpu_count
def func(item: int) -> tuple[int, ...]:
start = time()
counter = 1
while time() - start < 0.5:
counter += 1
return item, counter
def main():
n = max(2, cpu_count() or 2) - 1
for executor in ProcessPoolExecutor, ThreadPoolExecutor:
with executor(n) as exe:
begin = perf_counter()
for _ in exe.map(func, range(100)):
pass
duration = perf_counter() - begin
print(executor.__name__, f"{duration:.4f}s")
if __name__ == "__main__":
main()
Для лучшего статистического анализа следует использовать постоянную, а не переменную (псевдослучайную) длительность. Здесь мы используем 0,5 с.
Кроме того, вам необходимо убедиться, что пулы процессов и потоков имеют одинаковый размер. Этот размер основан на количестве процессоров минус 1.
Выход:
ProcessPoolExecutor 7.5492s
ThreadPoolExecutor 7.8219s
Мы видим, что в этом случае многопоточность немного медленнее, чем многопроцессорность, поскольку функция func() полностью привязана к процессору.
Примечание:
Протестировано на Apple Silicon M2, где os.cpu_count() == 8.
GIL не блокирует весь поток. Ваше предположение о том, что общая продолжительность должна составлять 50 секунд, ошибочно.