Дублирующая печать с многопроцессорной обработкой

Это моя первая попытка многопроцессорной обработки с использованием библиотеки многопроцессорной обработки Python. Простая версия кода выглядит следующим образом:

import multiprocessing as mp
from dataclasses import dataclass
from typing import Dict, NoReturn
import time
import logging
import signal

import numpy as np


@dataclass
class TmpData:
    name: str
    value: int


def worker(name: str, data: TmpData) -> NoReturn:
    logger_obj = mp.log_to_stderr()
    logger_obj.setLevel(logging.INFO)
    logger_obj.info(f"name: {name}; value: {data.value}")

    if name == "XYZ":
        raise RuntimeError("XYZ worker failed")

    time.sleep(data.value)


def init_worker_processes() -> None:
    signal.signal(signal.SIGINT, signal.SIG_IGN)


if __name__ == "__main__":
    map_data: Dict[str, TmpData] = {
        key: TmpData(name=key, value=np.random.randint(5, 15))
        for key in ["ABC", "DEF", "XYZ"]
    }

    main_logger = logging.getLogger()
    with mp.get_context("spawn").Pool(
        processes=2,
        initializer=init_worker_processes(),
    ) as pool:
        results = []
        for key in map_data:
            try:
                results.append(
                    pool.apply_async(
                        worker,
                        args=(
                            key,
                            map_data[key],
                        ),
                    )
                )
            except KeyboardInterrupt:
                pool.terminate()

        pool.close()
        pool.join()

        for result in results:
            try:
                result.get()
            except Exception as err:
                main_logger.error(f"{err}")

Это выводит что-то вроде следующего:

[INFO/SpawnPoolWorker-2] name: ABC; value: 10
[INFO/SpawnPoolWorker-1] name: DEF; value: 10
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process exiting with exitcode 0
XYZ worker failed

Меня беспокоит то, что [INFO/SpawnPoolWorker-2] name: XYZ; value: 12 напечатано дважды. Я предполагаю, что это только проблема с печатью (а не 2 процесса, поскольку сообщение об ошибке сбоя рабочего XYZ приходит только один раз). Проблема не возникает, когда пул инициализируется тремя процессами.

Теперь я хочу понять, в чем основная причина и как ее исправить. Может ли кто-нибудь помочь мне понять, что я делаю неправильно и как это исправить?

К вашему сведению, initializer=init_worker_processes(), неверно; вы запустили этот инициализатор в родительском процессе, а затем передали None в качестве фактического аргумента initializer. Вы хотели initializer=init_worker_processes,, без скобок, так его называют рабочие при запуске.

ShadowRanger 23.05.2024 18:37
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
1
50
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Проблема в том, как добавить регистратор stderr. Когда вы вызываете mp.log_to_stderr, поток не удаляет существующие обработчики журнала, а добавляет дополнительный обработчик, который передается в стандартный вывод. Другими словами, каждый раз, когда вы запускаете def worker(...), вы добавляете дополнительные logging.StreamHandler к существующим обработчикам в регистраторе потоков.

Шаг за шагом:

  1. Поток 1 берет первое задание и создает его первым logging.StreamHandler
  2. Поток 2 берет вторую работу и создает первую logging.StreamHandler
  3. Поток 1 завершает первое задание (как пример)
  4. Поток 1 берет третье задание и создает второе logging.StreamHandler. Теперь он будет печатать все журналы дважды.

Чтобы распечатать существующие обработчики журналов, вы можете использовать следующий фрагмент:

def worker(name: str, data: TmpData) -> NoReturn:    
    _ = mp.log_to_stderr()

    # print existing logger handlers:
    logger = mp.get_logger()    
    thread_name = mp.current_process().name
    print (thread_name, logger.handlers)

Это выведет:

SpawnPoolWorker-1 [<StreamHandler <stderr> (NOTSET)>]
SpawnPoolWorker-2 [<StreamHandler <stderr> (NOTSET)>]
SpawnPoolWorker-1 [<StreamHandler <stderr> (NOTSET)>, <StreamHandler <stderr> (NOTSET)>]

Как видите, второй поток имеет два StreamHandler. Таким образом, он будет печатать каждый текст дважды (по одному разу для каждого обработчика).

Решение:

Правильный способ добавить новых регистраторов — сделать это в файле init_worker_processes.

def worker(name: str, data: TmpData) -> NoReturn:
    # get existing logger which already has a stdout StreamHandler
    logger = mp.get_logger() 
    logger.info(f"name: {name}; value: {data.value}")

    if name == "XYZ":
        raise RuntimeError("XYZ worker failed")

    time.sleep(0.01)


def init_worker_processes() -> None:
    # this only runs single time per each thread
    logger = mp.log_to_stderr() 
    logger.setLevel(logging.INFO)

    signal.signal(signal.SIGINT, signal.SIG_IGN)

Надеюсь это поможет.

PS: Также вам нужно будет исправить строки, связанные с инициализатором пула. Правильный способ — передать функцию init_worker_processes напрямую следующим образом:

    with mp.get_context("spawn").Pool(
        processes=2,
        initializer=init_worker_processes,
    ) as pool:

Обратите внимание, что ОП неправильно передал initializer (они вызвали его в родительском процессе и передали возвращаемое значение None как initializer), поэтому им необходимо это исправить, чтобы ваши изменения могли выполняться в дочерних процессах.

ShadowRanger 23.05.2024 18:38

Спасибо @ShadowRanger за указание; это произошло из-за автодополнения в редакторе :(

soumeng78 23.05.2024 19:52

Другие вопросы по теме