Исключение возникает в threading._wait_for_tstate_lock
, когда я передаю огромные данные между Process
и Thread
через multiprocessing.Queue
.
Мой минимальный рабочий пример сначала выглядит немного сложным — извините. Я объясню. Оригинальное приложение загружает в оперативную память множество (не столь важных) файлов. Это делается в отдельном процессе для экономии ресурсов. Основной поток графического интерфейса не должен зависать.
Графический интерфейс запускает отдельный Thread
, чтобы предотвратить зависание цикла событий графического интерфейса.
Затем этот отдельный Thread
запускает один Process
, который должен выполнять свою работу.
а) Это Thread
создает экземпляр multiprocess.Queue
(имейте в виду, что это multiprocessing
, а не threading
!)
б) Это предоставляется Process
для обмена данными с Process
обратно на Thread
.
Process
выполняет некоторую работу (3 шага) и .put()
результат в multiprocessing.Queue
.
Когда Process
заканчивается, Thread
снова вступает во владение и собирает данные из Queue
, сохраняет их в своем собственном атрибуте MyThread.result
.
Thread
указывает основному циклу/потоку GUI вызывать функцию обратного вызова, если у него есть время.
Функция обратного вызова (MyWindow::callback_thread_finished()
) получает результаты от MyWindow.thread.result
.
Проблема в том, что если данные, помещаемые в Queue
, слишком велики, что-то происходит, чего я не понимаю, - MyThread
никогда не заканчивается. Я должен отменить приложение через Strg+C.
Я получил некоторые подсказки из документов. Но моя проблема в том, что я не полностью понял документацию. Но у меня такое ощущение, что ключ к моим проблемам можно найти там. См. два красных прямоугольника в "Трубы и очереди" (документы Python 3.5). Это полный вывод
MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
t.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
util._exit_function()
File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
_run_finalizers()
File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
finalizer()
File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
thread.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Это минимальный рабочий пример
#!/usr/bin/env python3
import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib
class MyThread (threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
def run(self):
print('Running MyThread...')
self.result = []
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
process.join()
while not queue.empty():
process_result = queue.get()
self.result.append(process_result)
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess (multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x'*102048))
print('MyProcess stoppd.')
class MyWindow (Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self)
self.connect('destroy', Gtk.main_quit)
GLib.timeout_add(2000, self.do_start)
def do_start(self):
print('MyWindow::do_start()')
# The process need to be started from a separate thread
# to prevent the main thread (which is the gui main loop)
# from freezing while waiting for the process result.
self.thread = MyThread(self.callback_thread_finished)
self.thread.start()
def callback_thread_finished(self):
result = self.thread.result
for r in result:
print('{} {}...'.format(r[0], r[1][:10]))
if __name__ == '__main__':
win = MyWindow()
win.show_all()
Gtk.main()
Возможный дубликат, но совершенно другой и ИМО без ответа для моей ситуации: Thread._wait_for_tstate_lock() никогда не возвращается.
Используя Управляющий делами, изменив строку 22, чтобы queue = multiprocessing.Manager().Queue()
решить проблему. Но я не знаю почему. Моя цель этого вопроса - понять, что стоит за этим, а не только заставить мой код работать. Даже я на самом деле не знаю, что такое Manager()
и есть ли у него другие последствия (вызывающие проблемы).
Согласно второму предупреждающему окну в документации, на которую вы ссылаетесь, вы можете попасть в тупик, если присоединитесь к процессу до обработки всех элементов в очереди. Таким образом, запуск процесса и немедленное присоединение к нему и обработка элементов в очереди тогда — неправильный порядок шагов. Вы должны запустить процесс, затем получить элементы, и только тогда, когда все элементы будут получены, вы можете вызвать метод соединения. Определите некоторое сигнальное значение, чтобы сигнализировать о том, что процесс завершил отправку данных через очередь. None
например, если это не может быть обычным значением, которое вы ожидаете от процесса.
class MyThread(threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
self.result = []
def run(self):
print('Running MyThread...')
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
while True:
process_result = queue.get()
if process_result is None:
break
self.result.append(process_result)
process.join()
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x' * 102048))
self.queue.put(None)
print('MyProcess stoppd.')
Дополнительный вопрос: я не понимаю, почему я вообще должен звонить .join()
в таком случае. Когда поток все еще получает все данные от Queue
, включая дозорного None
, все в порядке. Так почему же в таком случае Thread
должен ждать Process
?
Нет, это не тратит ресурсы, потому что queue.get()
блокируется до тех пор, пока в очереди действительно что-то не появится. Это не занятая петля. Он запускается ровно один раз для каждого элемента в очереди.
Вам не нужно вызывать join()
здесь, но это выглядит немного чище и может сделать некоторую очистку тогда, а не позже.
Дополнительный вопрос: в моем воображении
while True
тратить системные ресурсы. Не лучше ли делать небольшойtime.sleep()
вызов в каждой итерации?