Multiprocessing.Queue с большими данными вызывает _wait_for_tstate_lock

Исключение возникает в threading._wait_for_tstate_lock, когда я передаю огромные данные между Process и Thread через multiprocessing.Queue.

Мой минимальный рабочий пример сначала выглядит немного сложным — извините. Я объясню. Оригинальное приложение загружает в оперативную память множество (не столь важных) файлов. Это делается в отдельном процессе для экономии ресурсов. Основной поток графического интерфейса не должен зависать.

  1. Графический интерфейс запускает отдельный Thread, чтобы предотвратить зависание цикла событий графического интерфейса.

  2. Затем этот отдельный Thread запускает один Process, который должен выполнять свою работу.

а) Это Thread создает экземпляр multiprocess.Queue (имейте в виду, что это multiprocessing, а не threading!)

б) Это предоставляется Process для обмена данными с Process обратно на Thread.

  1. Process выполняет некоторую работу (3 шага) и .put() результат в multiprocessing.Queue.

  2. Когда Process заканчивается, Thread снова вступает во владение и собирает данные из Queue, сохраняет их в своем собственном атрибуте MyThread.result.

  3. Thread указывает основному циклу/потоку GUI вызывать функцию обратного вызова, если у него есть время.

  4. Функция обратного вызова (MyWindow::callback_thread_finished()) получает результаты от MyWindow.thread.result.

Проблема в том, что если данные, помещаемые в Queue, слишком велики, что-то происходит, чего я не понимаю, - MyThread никогда не заканчивается. Я должен отменить приложение через Strg+C.

Я получил некоторые подсказки из документов. Но моя проблема в том, что я не полностью понял документацию. Но у меня такое ощущение, что ключ к моим проблемам можно найти там. См. два красных прямоугольника в "Трубы и очереди" (документы Python 3.5). Это полный вывод

MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
    t.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Это минимальный рабочий пример

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib


class MyThread (threading.Thread):
    """This thread just starts the process."""
    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback

    def run(self):
        print('Running MyThread...')
        self.result = []

        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        process.join()

        while not queue.empty():
            process_result = queue.get()
            self.result.append(process_result)
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess (multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x'*102048))
        print('MyProcess stoppd.')

class MyWindow (Gtk.Window):
    def __init__(self):
        Gtk.Window.__init__(self)
        self.connect('destroy', Gtk.main_quit)
        GLib.timeout_add(2000, self.do_start)

    def do_start(self):
        print('MyWindow::do_start()')
        # The process need to be started from a separate thread
        # to prevent the main thread (which is the gui main loop)
        # from freezing while waiting for the process result.
        self.thread = MyThread(self.callback_thread_finished)
        self.thread.start()

    def callback_thread_finished(self):
        result = self.thread.result
        for r in result:
            print('{} {}...'.format(r[0], r[1][:10]))

if __name__ == '__main__':
    win = MyWindow()
    win.show_all()
    Gtk.main()

Возможный дубликат, но совершенно другой и ИМО без ответа для моей ситуации: Thread._wait_for_tstate_lock() никогда не возвращается.

Обходной путь

Используя Управляющий делами, изменив строку 22, чтобы queue = multiprocessing.Manager().Queue() решить проблему. Но я не знаю почему. Моя цель этого вопроса - понять, что стоит за этим, а не только заставить мой код работать. Даже я на самом деле не знаю, что такое Manager() и есть ли у него другие последствия (вызывающие проблемы).

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
6
0
1 730
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Согласно второму предупреждающему окну в документации, на которую вы ссылаетесь, вы можете попасть в тупик, если присоединитесь к процессу до обработки всех элементов в очереди. Таким образом, запуск процесса и немедленное присоединение к нему и обработка элементов в очереди тогда — неправильный порядок шагов. Вы должны запустить процесс, затем получить элементы, и только тогда, когда все элементы будут получены, вы можете вызвать метод соединения. Определите некоторое сигнальное значение, чтобы сигнализировать о том, что процесс завершил отправку данных через очередь. None например, если это не может быть обычным значением, которое вы ожидаете от процесса.

class MyThread(threading.Thread):
    """This thread just starts the process."""

    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback
        self.result = []

    def run(self):
        print('Running MyThread...')
        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        while True:
            process_result = queue.get()
            if process_result is None:
                break
            self.result.append(process_result)
        process.join()
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x' * 102048))
        self.queue.put(None)
        print('MyProcess stoppd.')

Дополнительный вопрос: в моем воображении while True тратить системные ресурсы. Не лучше ли делать небольшой time.sleep() вызов в каждой итерации?

buhtz 27.05.2019 12:36

Дополнительный вопрос: я не понимаю, почему я вообще должен звонить .join() в таком случае. Когда поток все еще получает все данные от Queue, включая дозорного None, все в порядке. Так почему же в таком случае Thread должен ждать Process?

buhtz 27.05.2019 12:39

Нет, это не тратит ресурсы, потому что queue.get() блокируется до тех пор, пока в очереди действительно что-то не появится. Это не занятая петля. Он запускается ровно один раз для каждого элемента в очереди.

BlackJack 27.05.2019 12:41

Вам не нужно вызывать join() здесь, но это выглядит немного чище и может сделать некоторую очистку тогда, а не позже.

BlackJack 27.05.2019 12:42

Другие вопросы по теме