Это моя первая попытка многопроцессорной обработки с использованием библиотеки многопроцессорной обработки Python. Простая версия кода выглядит следующим образом:
import multiprocessing as mp
from dataclasses import dataclass
from typing import Dict, NoReturn
import time
import logging
import signal
import numpy as np
@dataclass
class TmpData:
name: str
value: int
def worker(name: str, data: TmpData) -> NoReturn:
logger_obj = mp.log_to_stderr()
logger_obj.setLevel(logging.INFO)
logger_obj.info(f"name: {name}; value: {data.value}")
if name == "XYZ":
raise RuntimeError("XYZ worker failed")
time.sleep(data.value)
def init_worker_processes() -> None:
signal.signal(signal.SIGINT, signal.SIG_IGN)
if __name__ == "__main__":
map_data: Dict[str, TmpData] = {
key: TmpData(name=key, value=np.random.randint(5, 15))
for key in ["ABC", "DEF", "XYZ"]
}
main_logger = logging.getLogger()
with mp.get_context("spawn").Pool(
processes=2,
initializer=init_worker_processes(),
) as pool:
results = []
for key in map_data:
try:
results.append(
pool.apply_async(
worker,
args=(
key,
map_data[key],
),
)
)
except KeyboardInterrupt:
pool.terminate()
pool.close()
pool.join()
for result in results:
try:
result.get()
except Exception as err:
main_logger.error(f"{err}")
Это выводит что-то вроде следующего:
[INFO/SpawnPoolWorker-2] name: ABC; value: 10
[INFO/SpawnPoolWorker-1] name: DEF; value: 10
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process exiting with exitcode 0
XYZ worker failed
Меня беспокоит то, что [INFO/SpawnPoolWorker-2] name: XYZ; value: 12 напечатано дважды. Я предполагаю, что это только проблема с печатью (а не 2 процесса, поскольку сообщение об ошибке сбоя рабочего XYZ приходит только один раз). Проблема не возникает, когда пул инициализируется тремя процессами.
Теперь я хочу понять, в чем основная причина и как ее исправить. Может ли кто-нибудь помочь мне понять, что я делаю неправильно и как это исправить?






Проблема в том, как добавить регистратор stderr. Когда вы вызываете mp.log_to_stderr, поток не удаляет существующие обработчики журнала, а добавляет дополнительный обработчик, который передается в стандартный вывод. Другими словами, каждый раз, когда вы запускаете def worker(...), вы добавляете дополнительные logging.StreamHandler к существующим обработчикам в регистраторе потоков.
Шаг за шагом:
logging.StreamHandlerlogging.StreamHandlerlogging.StreamHandler. Теперь он будет печатать все журналы дважды.Чтобы распечатать существующие обработчики журналов, вы можете использовать следующий фрагмент:
def worker(name: str, data: TmpData) -> NoReturn:
_ = mp.log_to_stderr()
# print existing logger handlers:
logger = mp.get_logger()
thread_name = mp.current_process().name
print (thread_name, logger.handlers)
Это выведет:
SpawnPoolWorker-1 [<StreamHandler <stderr> (NOTSET)>]
SpawnPoolWorker-2 [<StreamHandler <stderr> (NOTSET)>]
SpawnPoolWorker-1 [<StreamHandler <stderr> (NOTSET)>, <StreamHandler <stderr> (NOTSET)>]
Как видите, второй поток имеет два StreamHandler. Таким образом, он будет печатать каждый текст дважды (по одному разу для каждого обработчика).
Решение:
Правильный способ добавить новых регистраторов — сделать это в файле init_worker_processes.
def worker(name: str, data: TmpData) -> NoReturn:
# get existing logger which already has a stdout StreamHandler
logger = mp.get_logger()
logger.info(f"name: {name}; value: {data.value}")
if name == "XYZ":
raise RuntimeError("XYZ worker failed")
time.sleep(0.01)
def init_worker_processes() -> None:
# this only runs single time per each thread
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
signal.signal(signal.SIGINT, signal.SIG_IGN)
Надеюсь это поможет.
PS: Также вам нужно будет исправить строки, связанные с инициализатором пула. Правильный способ — передать функцию init_worker_processes напрямую следующим образом:
with mp.get_context("spawn").Pool(
processes=2,
initializer=init_worker_processes,
) as pool:
Обратите внимание, что ОП неправильно передал initializer (они вызвали его в родительском процессе и передали возвращаемое значение None как initializer), поэтому им необходимо это исправить, чтобы ваши изменения могли выполняться в дочерних процессах.
Спасибо @ShadowRanger за указание; это произошло из-за автодополнения в редакторе :(
К вашему сведению,
initializer=init_worker_processes(),неверно; вы запустили этот инициализатор в родительском процессе, а затем передалиNoneв качестве фактического аргументаinitializer. Вы хотелиinitializer=init_worker_processes,, без скобок, так его называют рабочие при запуске.