Есть ли способ убить поток?

Можно ли завершить запущенный поток без установки / проверки каких-либо флагов / семафоров / и т. д.?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
816
0
867 979
29
Перейти к ответу Данный вопрос помечен как решенный

Ответы 29

Никогда не следует принудительно убивать поток, не сотрудничая с ним.

Удаление потока удаляет любые гарантии, которые блокируют настройку try / finally, поэтому вы можете оставить блокировки заблокированными, файлы открытыми и т. д.

Единственный раз, когда вы можете утверждать, что принудительное уничтожение потоков - это хорошая идея, - это быстрое уничтожение программы, но никогда не одиночные потоки.

Почему так сложно просто рассказать нить, убейте себя, когда закончите текущий цикл ... Я не понимаю.

Mehdi 08.04.2016 17:37

В ЦП нет встроенного механизма для идентификации «цикла» как такового, лучшее, на что вы можете надеяться, - это использовать какой-то сигнал, который код, который в данный момент находится внутри цикла, проверит после его выхода. Правильный способ обработки синхронизации потоков - совместные действия, приостановка, возобновление и завершение потоков - это функции, предназначенные для отладчиков и операционной системы, а не для кода приложения.

Lasse V. Karlsen 08.04.2016 21:39

@Mehdi: если я (лично) пишу код в ветке, да, я с вами согласен. Но бывают случаи, когда я использую сторонние библиотеки, и у меня нет доступа к циклу выполнения этого кода. Это один из вариантов использования запрошенной функции.

Dan H 13.10.2017 17:16

@DanH Это даже худшее со сторонним кодом, так как вы понятия не имеете, какой ущерб он может нанести. Если ваша сторонняя библиотека недостаточно надежна, чтобы ее нужно было убить, вам следует сделать одно из следующих действий: (1) попросить автора исправить проблему, (2) использовать что-нибудь еще. Если у вас действительно нет выбора, то размещение этого кода в отдельном процессе должно быть безопаснее, поскольку некоторые ресурсы используются совместно только в рамках одного процесса.

Phil1970 24.12.2018 22:47

Для этого нет официального API, нет.

Вам нужно использовать API платформы, чтобы убить поток, например pthread_kill или TerminateThread. Вы можете получить доступ к такому API, например через pythonwin или через ctypes.

Обратите внимание, что это небезопасно по своей сути. Это, вероятно, приведет к необратимому мусору (из локальных переменных фреймов стека, которые становятся мусором) и может привести к тупикам, если убиваемый поток имеет GIL в момент, когда он убит.

Это воля приводит к взаимоблокировкам, если рассматриваемый поток содержит GIL.

Matthias Urlichs 20.09.2015 18:09

Лучше не убивать нить. Одним из способов может быть введение блока «try» в цикл потока и выдача исключения, когда вы хотите остановить поток (например, break / return / ..., который останавливает ваш for / while / ...). Я использовал это в своем приложении, и он работает ...

Вы можете убить поток, установив трассировку в поток, который будет выходить из потока. См. Прикрепленную ссылку для одной из возможных реализаций.

Убить поток в Python

Я это уже видел. Это решение основано на проверке флага self.killed

Sudden Def 28.11.2008 11:59

Один из немногих ответов здесь, который действительно РАБОТАЕТ

Ponkadoodle 04.07.2013 05:53

Две проблемы с этим решением: (а) установка трассировщика с sys.settrace () замедлит работу вашего потока. В 10 раз медленнее, если это ограничение вычислений. (б) не повлияет на ваш поток, пока он находится в системном вызове.

Matthias Urlichs 22.06.2014 20:22

Другая проблема со связанным рецептом состоит в том, что он переопределяет метод start(), тогда как текущая документация явно заявляет: «Другими словами, Только переопределяет методы __init__() и run() этого класса» (при определении подкласса).

martineau 29.11.2020 02:44
Ответ принят как подходящий

Как правило, резкое завершение потока в Python и на любом языке - плохой шаблон. Подумайте о следующих случаях:

  • поток содержит критический ресурс, который должен быть закрыт должным образом
  • поток создал несколько других потоков, которые также необходимо убить.

Хороший способ справиться с этим, если вы можете себе это позволить (если вы управляете своими собственными потоками), - это иметь флаг exit_request, который каждый поток проверяет через регулярный интервал, чтобы увидеть, не пора ли ему выйти.

Например:

import threading

class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self,  *args, **kwargs):
        super(StoppableThread, self).__init__(*args, **kwargs)
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def stopped(self):
        return self._stop_event.is_set()

В этом коде вы должны вызвать stop() в потоке, когда вы хотите, чтобы он завершился, и подождать, пока поток не завершится должным образом, используя join(). Поток должен регулярно проверять флаг остановки.

Однако бывают случаи, когда вам действительно нужно убить поток. Примером может служить упаковка внешней библиотеки, которая занята долгими вызовами, и вы хотите ее прервать.

Следующий код позволяет (с некоторыми ограничениями) вызывать исключение в потоке Python:

def _async_raise(tid, exctype):
    '''Raises an exception in the threads with id tid'''
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
                                                     ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # "if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"
        ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class ThreadWithExc(threading.Thread):
    '''A thread class that supports raising an exception in the thread from
       another thread.
    '''
    def _get_my_tid(self):
        """determines this (self's) thread id

        CAREFUL: this function is executed in the context of the caller
        thread, to get the identity of the thread represented by this
        instance.
        """
        if not self.isAlive():
            raise threading.ThreadError("the thread is not active")

        # do we have it cached?
        if hasattr(self, "_thread_id"):
            return self._thread_id

        # no, look for it in the _active dict
        for tid, tobj in threading._active.items():
            if tobj is self:
                self._thread_id = tid
                return tid

        # TODO: in python 2.6, there's a simpler way to do: self.ident

        raise AssertionError("could not determine the thread's id")

    def raiseExc(self, exctype):
        """Raises the given exception type in the context of this thread.

        If the thread is busy in a system call (time.sleep(),
        socket.accept(), ...), the exception is simply ignored.

        If you are sure that your exception should terminate the thread,
        one way to ensure that it works is:

            t = ThreadWithExc( ... )
            ...
            t.raiseExc( SomeException )
            while t.isAlive():
                time.sleep( 0.1 )
                t.raiseExc( SomeException )

        If the exception is to be caught by the thread, you need a way to
        check that your thread has caught it.

        CAREFUL: this function is executed in the context of the
        caller thread, to raise an exception in the context of the
        thread represented by this instance.
        """
        _async_raise( self._get_my_tid(), exctype )

(На основе Убиваемые потоки Томера Филиба. Цитата о возвращаемом значении PyThreadState_SetAsyncExc, похоже, взята из старая версия Python.)

Как отмечено в документации, это не волшебная палочка, потому что, если поток занят вне интерпретатора Python, он не поймает прерывание.

Хороший образец использования этого кода - заставить поток перехватить определенное исключение и выполнить очистку. Таким образом, вы можете прервать выполнение задачи и при этом произвести надлежащую очистку.

Этот код не совсем работает. Вам нужно преобразовать tid в длинный, например: ctypes.pythonapi.PyThreadState_SetAsyncExc (ctypes.c_long (tid‌), None)

user205703 21.05.2011 11:28

@ Bluebird75: В первом примере зачем использовать threading.Event вместо простого логического атрибута stopped, который можно проверить изнутри потока?

Eric O Lebigot 14.07.2011 20:48

@ Bluebird75: Кроме того, я не уверен, что у меня есть аргумент о том, что потоки не должны прерываться внезапно, «потому что поток может содержать критический ресурс, который должен быть закрыт должным образом»: это также верно для основной программы и основных программ могут быть внезапно убиты пользователем (например, Ctrl-C в Unix) - в этом случае они пытаются обработать эту возможность как можно лучше. Итак, я не понимаю, что особенного в потоках, и почему они не должны подвергаться тому же обращению, что и основные программы (а именно, что они могут быть внезапно уничтожены). :) Не могли бы вы подробнее рассказать об этом?

Eric O Lebigot 14.07.2011 20:53

@EOL: особого преимущества в использовании потоковой передачи нет. Насколько я помню, событие здесь действительно.

Philippe F 02.08.2011 13:36

@EOL: Давайте возьмем случай потока, который открывает удаленный ресурс (соединение БД с другим сервером), у которого тайм-аут 5 минут, а максимальное количество подключений - 20 для каждого пользователя. Если ваша программа открывает 5 потоков с 5 подключениями и вы внезапно завершаете ее без надлежащего закрытия потока, ваша программа не сможет подключиться в 5-й раз, когда вы запустите ее с интервалом в 5 минут. Итак, в этом случае вам нужно правильное закрытие потока (и закрытие программы).

Philippe F 02.08.2011 13:43

@EOL: С другой стороны, если все ресурсы, которыми владеет поток, являются локальными ресурсами (открытыми файлами, сокетами), Linux достаточно хорош в очистке процесса, и это не дает утечек. У меня были случаи, когда я создавал сервер с помощью сокета, и если я жестко прерываю его с помощью Ctrl-C, я больше не могу запускать программу, потому что она не может привязать сокет. Мне нужно подождать 5 минут. Правильным решением было поймать Ctrl-C и выполнить чистое отключение сокета.

Philippe F 02.08.2011 13:45

@ Bluebird75: Спасибо за обсуждение. :) Использует ли упомянутая вами серверная программа потоки или это была основная программа (которая не освободила должным образом привязку сокета)?

Eric O Lebigot 03.08.2011 06:07

@ Bluebird75: кстати. вы можете использовать опцию сокета SO_REUSEADDR, чтобы избежать ошибки Address already in use.

Messa 11.09.2011 13:09

Примечание об этом ответе: по крайней мере, для меня (py2.6) мне пришлось передать None вместо 0 для случая res != 1, и мне пришлось вызвать ctypes.c_long(tid) и передать это любой функции ctypes, а не напрямую tid.

Walt W 18.11.2011 01:16

Стоит отметить, что _stop уже используется в библиотеке потоков Python 3. Таким образом, можно использовать другую переменную, иначе вы получите сообщение об ошибке.

diedthreetimes 18.01.2013 05:51

@Messa, SO_REUSEADDR не всегда работает, и, кроме того, это не относится к делу - сокеты - это просто пример шаблона, в котором может быть хорошей идеей правильно закрыть ресурсы. Конечно, вы можете выключить питание компьютера, а затем снова запустить его. Файловая система восстановится, если в ней есть журнал.

Prof. Falken 30.05.2013 10:44

Меня бы интересовал случай, когда «если поток занят вне интерпретатора python, он не поймает прерывание». Не ловит вообще, или поймает когда-нибудь позже? Я проверил его, и кажется, что в конце концов он его поймает, но через несколько секунд.

glglgl 01.09.2013 11:54

@glglgl: обсуждение выходит за рамки моих технических знаний. Изначально я просто скопировал / вставил какой-то код, который нашел в ActiveState. Я использую его только в составе набора тестов программы, поэтому для меня не будет большой проблемой, если он будет надежен только на 80%. Лучше всего спросить об этом comp.lang.python ...

Philippe F 02.09.2013 14:25

Я не согласен. Когда вы говорите «убить поток» - вы действительно имеете в виду «послать сигнал потоку». (Это может быть magiC# 9, а лучше другой). Во многих случаях это оправдано. Например, если поток находится в каком-то виде блокирующего системного вызова, можно было бы необходимость, чтобы иметь возможность отправить ему сигнал, чтобы вытащить его, даже в сочетании с «изящным» механизмом для выхода из потока [опроса].

Brad 13.09.2013 23:33

Я использую что-то вроде этого, чтобы дать своим потокам KeyboardInterrupt, чтобы у них была возможность очиститься. Если после этого они ВСЕ ЕЩЕ зависают, тогда подходит SystemExit или просто завершите процесс с терминала.

drevicko 21.11.2013 05:02

Для большинства мультипотоков требуется какой-то тип .join (). В этом примере не рассматривается блок обработки, если сигнал поступает в основной поток, такой как sigint.

dman 30.11.2013 06:54

Не был уверен в предложении в ответе «дождаться, пока поток завершится правильно, используя join ()». Насколько я понимаю, реализация - это thread1.join(), thread2.join() и т. д., Как показано здесь.

gary 20.06.2014 21:34

Что должно быть параметром отправки? Для меня он работает только для самого потока, в данном случае: self.thr = ThreadedTask(self.q), self.thr.start(), print(ThreadedTask.stopped(self.thr)), ThreadedTask.stop(self.thr), print(ThreadedTask.stopped(self.thr)), когда я печатаю его таким образом, первый выводит false, второй true. Все в порядке, но моя ветка продолжает работать. Я пишу простое серверное / клиентское приложение с графическим интерфейсом, и мой поток обрабатывает логику сервера, ожидая сокетов, принимает их и отвечает. И он продолжает принимать и отвечать по сокетам. Но остановился, правда

piggy 24.01.2015 14:50

Мне нравится подход StoppableThread. Я добавил класс class ThreadKilled(KeyboardInterrupt): pass и добавил def sleep(self, time): self._stop.wait(time); if self.stopped(): raise ThreadKilled. Таким образом, пока поток заменяет вызовы time.sleep(t) на self.sleep(t), поток может быть прерван из длительного сна. Оттуда я продолжил использовать Queue.Queue вместо threading.Event, поскольку это дает те же базовые функции (включая sleep(t)), но также позволяет потоку быть потребителем.

Ben 22.05.2015 15:33

Также я добавил методы killAndJoin, __enter__ и __exit__, где __exit__ вызывает killAndJoin. Таким образом, основной поток может использовать оператор with для группы рабочих потоков, чтобы они полностью остановились и очистили себя.

Ben 22.05.2015 15:36

В качестве технической детали: может ли кто-нибудь объяснить, почему PyThreadState_SetAsyncExc, похоже, не убивает поток while True: time.sleep(1) мгновенно? Внутреннее устройство time.sleep(1) не на Python, и поэтому ему нужно завершить работу в спящем режиме? Это кажется странным, учитывая, что вы можете ctrl+C из time.sleep; почему бы вам не прервать поток аналогичным образом?

Ben 22.05.2015 15:38

@Ben Это потому, что когда вы вызываете исключение в потоке, это исключение Python, к которому можно получить доступ только через интерпретатор Python. Поскольку time.sleep является внешней функцией C, которая освобождает GIL, она не поймет, что возникла исключительная ситуация, пока функция не вернется, не получит GIL повторно и не запустит интерпретатор байт-кода Python. Ctrl+C вызывает запуск сигнала системного уровня, который потом транслируется в исключение Python. Поэтому он работает с time.sleep.

eestrada 17.11.2015 01:54

@diedthreetimes: Я отредактировал ответ, чтобы исправить проблему с именованием _stop.

Jérôme 09.05.2017 22:48

Какой смысл использовать событие для _stop_event? Этого никогда не ждали? Почему бы просто не использовать логический атрибут в качестве флага?

Jérôme 09.05.2017 22:49

kill -9 не гуманно, но иногда нам приходится это делать (например, сейчас - я могу либо убить около десяти потоков, которые создал по ошибке, либо убить процесс хостинга ...)

user1854182 05.12.2017 12:56

Ваш ответ правильный, но вы упускаете здесь один момент: бывают ситуации, когда ваш поток может застрять в процессе завершения работы (хорошо, в большинстве случаев это из-за плохого дизайна и забытых вещей, но это всегда может случиться). Когда мне нужно разработать критический процесс, я реализую свои стандартные методы постепенной остановки и соединения, однако, если код достаточно важен, также стоит реализовать метод terminate (), который должным образом завершает () любой процесс, о котором идет речь. -be-kill процесс создан. Таким образом, что-то идет не так (например, какое-то непредвиденное исключение не обрабатывается должным образом во время (...)

DGoiko 18.01.2019 19:37

(...) прекращение. В таких случаях я могу обнаружить условие «правильного завершения» и безопасно завершить поток, если оно не выполняется, высвобождая все ресурсы более надежным и безопасным способом. Вы можете возразить, что правильный дизайн справится с этой ситуацией в процессе плавной остановки, и я полностью согласен с вами, на самом деле, я обычно делаю это в обоих случаях, когда мне нужно быть абсолютно уверенным, что вещь будет пуленепробиваемой.

DGoiko 18.01.2019 19:39

На самом деле должен быть стандартный библиотечный способ отправки асинхронных сигналов потокам Python. Если вы хотите сделать эту функцию безопасной, то нам нужно следовать примеру C с pselect или Haskell с маскировкой и скобками. В любом случае очень неудобно, когда поток выполняет операцию блокировки, и нет способа сказать ему, чтобы он остановил то, что он делает, не прибегая к хакам, таким как значения дозорных или переписывая механизм исключений Python для создания исключений (хотя стандартный библиотечный способ выброса исключения для потоков тоже работают).

CMCDragonkai 05.04.2019 06:24

@ Jérôme: На CPython простое логическое значение прекрасно, как написано. Но использование Event означает: 1) поток, которому необходимо некоторое время засыпать, может заменить time.sleep вызовом метода wait из Event на timeout, что позволит ему мгновенно проснуться, если ему будет приказано выйти, и 2) если он не -CPython интерпретаторы, которые JIT, GIL не существуют для обеспечения защиты, такой как обеспечение того, чтобы запись в одном потоке была видна другим (если код полностью скомпилирован, он может прочитать логическое значение один раз и предположить, что ему больше не нужно читать его снова, точно так же, как код C без атомики). У Event такой проблемы нет.

ShadowRanger 05.02.2020 17:22

Что, если я хочу поймать исключение с помощью этого кода? Следует ли это делать вне потока (try: t.start() except SomException...) или в функции, заключенной в поток? (def func(): try: ... except SomeException ...; t = ThreadWithExc(target=func) ...? Или еще где-нибудь?

Michele Piccolini 19.02.2020 15:25

Что касается подкласса threading.Thread, как это сделано выше, python stdlib документы говорит: «Никакие другие методы (кроме конструктора) не должны быть переопределены в подклассе. Другими словами, переопределяйте только методы __init __ () и run () этого класса». Не уверен, что смогу спать по ночам после прямого неповиновения документам python.

Jasha 30.12.2020 10:05

Если вы пытаетесь завершить всю программу, вы можете установить поток как «демон». видеть Thread.daemon

В этом нет никакого смысла. В документации четко сказано: «это должно быть установлено до вызова start (), иначе возникает RuntimeError». Таким образом, если я хочу убить поток, который изначально не был демоном, как я могу это использовать?

Raffi Khatchadourian 29.11.2011 23:21

Раффи, я думаю, он предлагает вам установить его заранее, зная, что при выходе из вашего основного потока вы также хотите, чтобы потоки демона завершились.

fantabolous 31.07.2014 11:06

Разве установка потока в качестве демона не является чем-то, что вы бы сделали в случае, если вы хотите, чтобы поток продолжал работать, даже если ваша основная программа завершается?

Michele Piccolini 19.02.2020 12:41

Вы, сэр, мой герой дня. Именно то, что я искал, и без лишних хлопот.

Blizz 11.03.2020 16:41

@MichelePiccolini: Все наоборот: потоки демона не поддерживают выполнение процесса, когда остальные отсутствуют.

Davis Herring 02.05.2020 04:57

Для меня это был лучший ответ, я просто хотел очистить потоки, когда родительский процесс завершает работу. Спасибо!

Lin Meyer 20.03.2021 16:49

This is a bad answer, see the comments

Вот как это сделать:

from threading import *

...

for thread in enumerate():
    if thread.isAlive():
        try:
            thread._Thread__stop()
        except:
            print(str(thread.getName()) + ' could not be terminated'))

Подождите несколько секунд, и ваш поток должен быть остановлен. Также проверьте метод thread._Thread__delete().

Я бы порекомендовал для удобства метод thread.quit(). Например, если у вас есть сокет в вашем потоке, я бы рекомендовал создать метод quit() в вашем классе дескриптора сокета, завершить сокет, а затем запустить thread._Thread__stop() внутри вашего quit().

Мне пришлось использовать self._Thread__stop () внутри моего объекта threading.Thread, чтобы он остановился. Я не понимаю, почему простой self.join (), подобный этому примеру (code.activestate.com/recipes/65448-thread-control-idiom), не работает.

harijay 06.10.2011 23:45

Были бы полезны более подробные сведения о том, что «это на самом деле не останавливает обсуждение».

2371 28.12.2011 02:40

По сути, вызов метода _Thread__stop не имеет никакого эффекта, кроме сообщения Python о том, что поток остановлен. Он действительно может продолжать работать. См. Пример gist.github.com/2787191.

Bluehorn 25.05.2012 14:26

Это совершенно неправильно. _Thread__stop() просто отмечает поток как остановленный, он фактически не останавливает поток! Никогда не делай этого. Читать.

dotancohen 21.07.2013 11:16

multiprocessing.Process может p.terminate()

В тех случаях, когда я хочу убить поток, но не хочу использовать флаги / блокировки / сигналы / семафоры / события / что угодно, я продвигаю потоки в полноценные процессы. Для кода, который использует всего несколько потоков, накладные расходы не так уж и плохи.

Например. это удобно, чтобы легко завершить вспомогательные "потоки", которые выполняют блокирующий ввод-вывод.

Преобразование тривиально: в соответствующем коде замените все threading.Thread на multiprocessing.Process и все queue.Queue на multiprocessing.Queue и добавьте необходимые вызовы p.terminate() в родительский процесс, который хочет убить свой дочерний p.

Смотрите Документация Python для multiprocessing.

Пример:

import multiprocessing
proc = multiprocessing.Process(target=your_proc_function, args=())
proc.start()
# Terminate the process
proc.terminate()  # sends a SIGTERM

Спасибо. Я заменил queue.Queue на multiprocessing.JoinableQueue и последовал этому ответу: stackoverflow.com/a/11984760/911207

David Braun 15.08.2014 21:30

Много страниц по этому поводу. Думаю, это очевидное решение для многих.

geotheory 12.01.2016 05:19
multiprocessing хорош, но имейте в виду, что аргументы подбираются к новому процессу. Так что, если один из аргументов - это что-то, что нельзя выбрать (например, logging.log), использование multiprocessing может быть плохой идеей.
Lyager 09.04.2018 12:10

Аргументы multiprocessing подобраны для нового процесса в Windows, но Linux использует разветвление для их копирования (Python 3.7, не знаю, какие еще версии). Таким образом, вы получите код, который работает в Linux, но вызывает ошибки рассола в Windows.

nyanpasu64 19.04.2019 01:21
multiprocessing с логированием - дело хитрое. Необходимо использовать QueueHandler (см. этот учебник). Я усвоил это на собственном горьком опыте.
Fanchen Bao 09.12.2019 03:10

В Python вы просто не можете убить поток напрямую.

Если вам на самом деле НЕ нужен поток (!), Вы можете вместо использования Пакет заправка использовать Пакет многопроцессорность. Здесь, чтобы убить процесс, вы можете просто вызвать метод:

yourProcess.terminate()  # kill the process!

Python убьет ваш процесс (в Unix через сигнал SIGTERM, а в Windows через вызов TerminateProcess()). Обратите внимание на то, чтобы использовать его при использовании Queue или Pipe! (это может привести к повреждению данных в очереди / конвейере)

Обратите внимание, что multiprocessing.Event и multiprocessing.Semaphore работают точно так же, как threading.Event и threading.Semaphore соответственно. По сути, первые - клоны последних.

Если вам ДЕЙСТВИТЕЛЬНО нужно использовать поток, нет способа убить его напрямую. Однако вы можете использовать "поток демона". Фактически, в Python поток может быть помечен как демон:

yourThread.daemon = True  # set the Thread as a "daemon thread"

Основная программа завершится, когда не останется живых потоков, не являющихся демонами. Другими словами, когда ваш основной поток (который, конечно, не является потоком демона) завершит свои операции, программа завершится, даже если некоторые потоки демона все еще работают.

Обратите внимание, что перед вызовом метода daemon необходимо установить поток как start()!

Конечно, вы можете и должны использовать daemon даже с multiprocessing. Здесь, когда основной процесс завершается, он пытается завершить все свои демонические дочерние процессы.

Наконец, обратите внимание, что sys.exit() и os.kill() не подходят.

Я не знаю, почему люди не голосуют за это. Что не так с этим ответом? Хотя это работает для меня.

fsevenm 10.09.2020 15:51

@fsevenm: процессы такие же, как потоки. Они выполняются в отдельных пространствах памяти, поэтому совместное использование глобальных переменных затруднено. а передача аргументов включает в себя их маринование и распаковку с другой стороны. Это плюс накладные расходы на запуск и выполнение отдельных процессов влечет за собой гораздо больше других накладных расходов, чем просто переключение потоков. Во многих отношениях это яблоко против апельсина, поэтому, наверное, поэтому - отвечу на ваш вопрос.

martineau 29.11.2020 02:36

Это основано на thread2 - убиваемые потоки (рецепт Python)

Вам нужно вызвать PyThreadState_SetasyncExc (), который доступен только через ctypes.

Это было протестировано только на Python 2.7.3, но, вероятно, будет работать с другими недавними выпусками 2.x.

import ctypes

def terminate_thread(thread):
    """Terminates a python thread from another thread.

    :param thread: a threading.Thread instance
    """
    if not thread.isAlive():
        return

    exc = ctypes.py_object(SystemExit)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(thread.ident), exc)
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

Я использую что-то вроде этого, чтобы дать своим потокам KeyboardInterrupt, чтобы у них была возможность очиститься. Если после этого они все еще зависают, тогда подходит SystemExit или просто завершите процесс с терминала.

drevicko 21.11.2013 05:01

Это работает, если поток в настоящее время выполняется. Это не работает, если поток находится в системном вызове; исключение будет автоматически проигнорировано.

Matthias Urlichs 22.06.2014 20:08

@MatthiasUrlichs есть идеи, как определить состояние выполнения потока, чтобы иметь возможность распечатать предупреждение или повторить попытку?

Johan Dahlin 10.07.2014 21:05

@JohanDahlin Вы можете немного подождать (что, если вы хотите повторить попытку, вам все равно нужно сделать), а затем выполните тест isAlive (). В любом случае, хотя это сработает, я также не буду гарантировать, что при этом не останется болтающихся ссылок. Хотя теоретически возможно сделать безопасное уничтожение потоков в CPython, разумное использование pthread_cleanup_push()/_pop() потребует много работы для правильной реализации и заметно замедлит работу интерпретатора.

Matthias Urlichs 11.07.2014 05:00
from ctypes import *
pthread = cdll.LoadLibrary("libpthread-2.15.so")
pthread.pthread_cancel(c_ulong(t.ident))

ттrong> - ваш объект Thread.

Прочтите исходный код Python (Modules/threadmodule.c и Python/thread_pthread.h), вы увидите, что Thread.ident относится к типу pthread_t, поэтому вы можете делать все, что pthread может делать в Python, используя libpthread.

И как это сделать в Windows?

iChux 11.02.2014 16:10

Вы не делаете; ни в Windows, ни в Linux тоже. Причина: рассматриваемый поток может содержать GIL, пока вы это делаете (Python освобождает GIL, когда вы вызываете C). Если это так, ваша программа мгновенно зайдет в тупик. Даже если этого не произойдет, наконец: блоки не будут выполняться и т. д., Так что это очень небезопасная идея.

Matthias Urlichs 22.06.2014 20:00

Я хочу добавить, что если вы читаете официальную документацию в библиотека потоковой передачи Python, рекомендуется избегать использования «демонических» потоков, когда вы не хотите, чтобы потоки завершались резко, с флагом Paolo Rovelli упомянул.

Из официальной документации:

Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signaling mechanism such as an Event.

Я думаю, что создание демонических потоков зависит от вашего приложения, но в целом (и на мой взгляд) лучше избегать их уничтожения или превращения в демонические. В многопроцессорной обработке вы можете использовать is_alive() для проверки статуса процесса и «завершить» для их завершения (также вы избегаете проблем с GIL). Но иногда вы можете обнаружить больше проблем, когда выполняете свой код в Windows.

И всегда помните, что если у вас есть «живые потоки», интерпретатор Python будет ждать их. (Благодаря этому демон может помочь вам, если неважно резко закончится).

Я не понимаю последний абзац.

tshepang 01.07.2014 19:39

@Tshepang Это означает, что если в вашем приложении есть какие-либо запущенные недемонические потоки, интерпретатор Python будет продолжать работать, пока потоки все не демон не станут Готово. Если вам все равно, заканчиваются ли поток (-ы) при завершении программы, то создание их демона может быть полезным.

Tom Myddeltyn 06.05.2016 19:17

Если вам действительно нужна возможность убить подзадачу, используйте альтернативную реализацию. multiprocessing и gevent оба поддерживают неизбирательное уничтожение «нити».

Потоковая обработка Python не поддерживает отмену. Даже не пытайся. Ваш код, скорее всего, зайдет в тупик, повредит память или вызовет утечку памяти, или будет иметь другие непреднамеренные "интересные" трудно поддающиеся отладке эффекты, которые происходят редко и недетерминированно.

… И да, я знаю, что оба не являются строго «многопоточными», но они оба работают, если ваш код соответствует (или может быть адаптирован) их модели.

Matthias Urlichs 20.09.2015 18:08

Определенно возможно реализовать метод Thread.stop, как показано в следующем примере кода:

import sys
import threading
import time


class StopThread(StopIteration):
    pass

threading.SystemExit = SystemExit, StopThread


class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace


class Thread3(threading.Thread):

    def _bootstrap(self, stop_thread=False):
        def stop():
            nonlocal stop_thread
            stop_thread = True
        self.stop = stop

        def tracer(*_):
            if stop_thread:
                raise StopThread()
            return tracer
        sys.settrace(tracer)
        super()._bootstrap()

###############################################################################


def main():
    test1 = Thread2(target=printer)
    test1.start()
    time.sleep(1)
    test1.stop()
    test1.join()
    test2 = Thread2(target=speed_test)
    test2.start()
    time.sleep(1)
    test2.stop()
    test2.join()
    test3 = Thread3(target=speed_test)
    test3.start()
    time.sleep(1)
    test3.stop()
    test3.join()


def printer():
    while True:
        print(time.time() % 1)
        time.sleep(0.1)


def speed_test(count=0):
    try:
        while True:
            count += 1
    except StopThread:
        print('Count =', count)

if __name__ == '__main__':
    main()

Класс Thread3, похоже, выполняет код примерно на 33% быстрее, чем класс Thread2.

Это умный способ ввести в поток проверки наличия установленного self.__stop. Обратите внимание, что, как и большинство других решений здесь, он фактически не прерывает блокирующий вызов, поскольку функция трассировки вызывается только при входе в новую локальную область. Также стоит отметить, что sys.settrace действительно предназначен для реализации отладчиков, профилей и т. д. И как таковой считается деталью реализации CPython, и его существование в других реализациях Python не гарантируется.

dano 05.09.2014 01:18

@dano: Одна из самых больших проблем с классом Thread2 заключается в том, что он запускает код примерно в десять раз медленнее. Некоторые люди все еще могут счесть это приемлемым.

Noctis Skytower 05.09.2014 18:20

+1 при этом значительно замедляет выполнение кода .. Я бы посоветовал автору этого решения включить эту информацию в ответ.

Vishal 06.09.2018 08:47

Как уже упоминали другие, нормой является установка флажка остановки. Для чего-то легкого (без подкласса Thread, без глобальной переменной) возможен лямбда-обратный вызов. (Обратите внимание на скобки в if stop().)

import threading
import time

def do_work(id, stop):
    print("I am thread", id)
    while True:
        print("I am thread {} doing something".format(id))
        if stop():
            print("  Exiting loop.")
            break
    print("Thread {}, signing off".format(id))


def main():
    stop_threads = False
    workers = []
    for id in range(0,3):
        tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
        workers.append(tmp)
        tmp.start()
    time.sleep(3)
    print('main: done sleeping; time to stop the threads.')
    stop_threads = True
    for worker in workers:
        worker.join()
    print('Finis.')

if __name__ == '__main__':
    main()

Замена print() на функцию pr(), которая всегда очищает (sys.stdout.flush()), может улучшить точность вывода оболочки.

(Проверено только в Windows / Eclipse / Python3.3)

Проверено на Linux / Python 2.7, отлично работает. Это должен быть официальный ответ, это намного проще.

Paul Kenjora 20.11.2017 07:15

Проверено на Linux Ubuntu Server 17.10 / Python 3.6.3 и работает.

Marcos 06.12.2017 22:40

Также проверено в 2.7. Такой хороший ответ!

silgon 12.04.2018 13:56

Что такое функция pr()?

alper 23.05.2020 15:51

@alper Вы создаете новую функцию, которая действует так же, как функция print, но flush обрабатывает вывод и называет ее pr.

Pyzard 03.01.2021 03:07

Кажется, это работает с pywin32 в Windows 7

my_thread = threading.Thread()
my_thread.start()
my_thread._Thread__stop()

Вы можете выполнить свою команду в процессе, а затем убить его, используя идентификатор процесса. Мне нужно было синхронизировать два потока, один из которых не возвращается сам по себе.

processIds = []

def executeRecord(command):
    print(command)

    process = subprocess.Popen(command, stdout=subprocess.PIPE)
    processIds.append(process.pid)
    print(processIds[0])

    #Command that doesn't return by itself
    process.stdout.read().decode("utf-8")
    return;


def recordThread(command, timeOut):

    thread = Thread(target=executeRecord, args=(command,))
    thread.start()
    thread.join(timeOut)

    os.kill(processIds.pop(), signal.SIGINT)

    return;

Запустите дополнительный поток с помощью setDaemon (True).

def bootstrap(_filename):
    mb = ModelBootstrap(filename=_filename) # Has many Daemon threads. All get stopped automatically when main thread is stopped.

t = threading.Thread(target=bootstrap,args=('models.conf',))
t.setDaemon(False)

while True:
    t.start()
    time.sleep(10) # I am just allowing the sub-thread to run for 10 sec. You can listen on an event to stop execution.
    print('Thread stopped')
    break

Хотя он довольно старый, это может быть удобным решением для некоторых:

A little module that extends the threading's module functionality -- allows one thread to raise exceptions in the context of another thread. By raising SystemExit, you can finally kill python threads.

import threading
import ctypes     

def _async_raise(tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble, 
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class Thread(threading.Thread):
    def raise_exc(self, excobj):
        assert self.isAlive(), "thread must be started"
        for tid, tobj in threading._active.items():
            if tobj is self:
                _async_raise(tid, excobj)
                return

        # the thread was alive when we entered the loop, but was not found 
        # in the dict, hence it must have been already terminated. should we raise
        # an exception here? silently ignore?

    def terminate(self):
        # must raise the SystemExit type, instead of a SystemExit() instance
        # due to a bug in PyThreadState_SetAsyncExc
        self.raise_exc(SystemExit)

Таким образом, он позволяет «потоку вызывать исключения в контексте другого потока», и, таким образом, завершенный поток может обрабатывать завершение без регулярной проверки флага прерывания.

Однако, согласно его первоисточник, с этим кодом есть некоторые проблемы.

  • The exception will be raised only when executing python bytecode. If your thread calls a native/built-in blocking function, the exception will be raised only when execution returns to the python code.
    • There is also an issue if the built-in function internally calls PyErr_Clear(), which would effectively cancel your pending exception. You can try to raise it again.
  • Only exception types can be raised safely. Exception instances are likely to cause unexpected behavior, and are thus restricted.
  • I asked to expose this function in the built-in thread module, but since ctypes has become a standard library (as of 2.5), and this
    feature is not likely to be implementation-agnostic, it may be kept
    unexposed.

Питер Хинтьенс - один из основателей проекта ØMQ - говорит, что использование ØMQ и отказ от примитивов синхронизации, таких как блокировки, мьютексы, события и т. д., Является самым разумным и безопасным способом написания многопоточных программ:

http://zguide.zeromq.org/py:all#Multithreading-with-ZeroMQ

Это включает в себя сообщение дочернему потоку о том, что он должен отменить свою работу. Это можно сделать, оснастив поток ØMQ-сокетом и опросив этот сокет на предмет сообщения о том, что он должен отменить.

Ссылка также предоставляет пример многопоточного кода Python с ØMQ.

Чтобы убить поток, можно использовать следующий обходной путь:

kill_threads = False

def doSomething():
    global kill_threads
    while True:
        if kill_threads:
            thread.exit()
        ......
        ......

thread.start_new_thread(doSomething, ())

Это можно использовать даже для завершения потоков, код которых написан в другом модуле, из основного потока. Мы можем объявить глобальную переменную в этом модуле и использовать ее для завершения потоков, порожденных в этом модуле.

Я обычно использую это для завершения всех потоков при выходе из программы. Это может быть не лучший способ завершить поток / ы, но может помочь.

Недурно. Просто понять.

alyssaeliyah 22.10.2018 18:30

Если вы явно вызываете time.sleep() как часть вашего потока (например, опрашиваете какую-то внешнюю службу), улучшением метода Филиппа является использование тайм-аута в методе eventwait() везде, где вы sleep()

Например:

import threading

class KillableThread(threading.Thread):
    def __init__(self, sleep_interval=1):
        super().__init__()
        self._kill = threading.Event()
        self._interval = sleep_interval

    def run(self):
        while True:
            print("Do Something")

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

Затем запустить его

t = KillableThread(sleep_interval=5)
t.start()
# Every 5 seconds it prints:
#: Do Something
t.kill()
#: Killing Thread

Преимущество использования wait() вместо sleep()ing и регулярной проверки события заключается в том, что вы можете программировать в более длительных интервалах сна, поток останавливается почти сразу (когда в противном случае вы были бы sleep()ing) и, на мой взгляд, код для обработки выхода значительно проще.

почему этот пост был отклонен? Что не так с этим постом? Это похоже на то, что мне нужно ....

JDOaktown 29.08.2018 20:57

да, это намного лучше, чем тратить время на ожидание окончания длительного сна ()

Neil McGill 09.12.2020 18:18

Хотя этот пост был не тем, что мне было нужно (мне нужно безопасно прервать родителя от ребенка), я определенно использовал time.sleep в других частях моего кода и уменьшал интервал опроса, чтобы мой скрипт реагировал быстрее, однако в этом решении есть все преимущества создания небольшого интервала опроса без каких-либо недостатков (бесполезные вычисления). +1 Спасибо большое.

A Kareem 25.12.2020 15:53

Я очень опаздываю в эту игру, но я боролся с аналогичный вопрос, и следующее, похоже, отлично решает проблему для меня И позволяет мне выполнить базовую проверку и очистку состояния потока при выходе из демонизированного подпотока:

import threading
import time
import atexit

def do_work():

  i = 0
  @atexit.register
  def goodbye():
    print ("'CLEANLY' kill sub-thread with value: %s [THREAD: %s]" %
           (i, threading.currentThread().ident))

  while True:
    print i
    i += 1
    time.sleep(1)

t = threading.Thread(target=do_work)
t.daemon = True
t.start()

def after_timeout():
  print "KILL MAIN THREAD: %s" % threading.currentThread().ident
  raise SystemExit

threading.Timer(2, after_timeout).start()

Урожайность:

0
1
KILL MAIN THREAD: 140013208254208
'CLEANLY' kill sub-thread with value: 2 [THREAD: 140013674317568]

Это отличный ответ, который должен быть выше в списке

drootang 27.02.2020 17:09

Почему повышение SystemExit в потоке after_timeout что-то делает с основным потоком (который просто ожидает выхода первого в этом примере)?

Davis Herring 02.03.2020 20:08

@DavisHerring Я не совсем понимаю, к чему вы клоните. SystemExit убивает основной поток, как вы думаете, почему он НЕ БУДЕТ делать что-либо в основном потоке? Без этого вызова программа просто продолжит ждать дочернего потока. Вы также можете ctrl + c или использовать любые другие средства, чтобы убить основной поток, но это пример.

slumtrimpet 01.05.2020 19:51

@slumtrimpet: SystemExit имеет только два специальных свойства: он не производит обратную трассировку (когда какой-либо поток завершает работу, выбрасывая один), и если поток основной завершается, выбрасывая один, он устанавливает статус выхода (тем не менее ожидая другого не-демона). потоки для выхода).

Davis Herring 01.05.2020 21:17

@DavisHerring О, я понял. Полностью с вами согласен. Я неправильно запомнил, что мы здесь сделали, и неправильно понял ваш комментарий.

slumtrimpet 01.05.2020 21:56

-1 Основной поток продолжил работу и не был прерван SystemExit, поднятым из дочернего потока. Пришлось прервать скрипт из терминала с kill -9

A Kareem 25.12.2020 15:39

Для этого создана библиотека стопить. Хотя некоторые из перечисленных здесь предостережений все еще применимы, по крайней мере, эта библиотека представляет собой регулярный, повторяемый метод для достижения заявленной цели.

Предполагая, что вы хотите иметь несколько потоков одной и той же функции, это ИМХО самая простая реализация, чтобы остановить один по идентификатору:

import time
from threading import Thread

def doit(id=0):
    doit.stop=0
    print("start id:%d"%id)
    while 1:
        time.sleep(1)
        print(".")
        if doit.stop==id:
            doit.stop=0
            break
    print("end thread %d"%id)

t5=Thread(target=doit, args=(5,))
t6=Thread(target=doit, args=(6,))

t5.start() ; t6.start()
time.sleep(2)
doit.stop =5  #kill t5
time.sleep(2)
doit.stop =6  #kill t6

Здесь хорошо то, что вы можете иметь несколько одинаковых и разных функций и останавливать их все с помощью functionname.stop.

Если вы хотите иметь только один поток функции, вам не нужно запоминать идентификатор. Просто остановитесь, если doit.stop> 0.

Просто чтобы развить идею @ SCB (это именно то, что мне нужно) создать подкласс KillableThread с настраиваемой функцией:

from threading import Thread, Event

class KillableThread(Thread):
    def __init__(self, sleep_interval=1, target=None, name=None, args=(), kwargs = {}):
        super().__init__(None, target, name, args, kwargs)
        self._kill = Event()
        self._interval = sleep_interval
        print(self._target)

    def run(self):
        while True:
            # Call custom function with arguments
            self._target(*self._args)

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

if __name__ == '__main__':

    def print_msg(msg):
        print(msg)

    t = KillableThread(10, print_msg, args=("hello world"))
    t.start()
    time.sleep(6)
    print("About to kill thread")
    t.kill()

Естественно, как и в случае с @SBC, поток не дожидается запуска нового цикла до остановки. В этом примере вы увидите сообщение «Killing Thread», напечатанное сразу после «About to kill thread», вместо того, чтобы ждать еще 4 секунды для завершения потока (поскольку мы уже спали 6 секунд).

Второй аргумент в конструкторе KillableThread - это ваша пользовательская функция (здесь print_msg). Аргумент args - это аргументы, которые будут использоваться здесь при вызове функции (("hello world")).

Как упоминалось в отвечать @ Kozyarchuk, установка трассировки работает. Поскольку этот ответ не содержал кода, вот рабочий, готовый к использованию пример:

import sys, threading, time 

class TraceThread(threading.Thread): 
    def __init__(self, *args, **keywords): 
        threading.Thread.__init__(self, *args, **keywords) 
        self.killed = False
    def start(self): 
        self._run = self.run 
        self.run = self.settrace_and_run
        threading.Thread.start(self) 
    def settrace_and_run(self): 
        sys.settrace(self.globaltrace) 
        self._run()
    def globaltrace(self, frame, event, arg): 
        return self.localtrace if event == 'call' else None
    def localtrace(self, frame, event, arg): 
        if self.killed and event == 'line': 
            raise SystemExit() 
        return self.localtrace 

def f(): 
    while True: 
        print('1') 
        time.sleep(2)
        print('2') 
        time.sleep(2)
        print('3') 
        time.sleep(2)

t = TraceThread(target=f) 
t.start() 
time.sleep(2.5) 
t.killed = True

Он останавливается после того, как напечатал 1 и 2. 3 не печатается.

Вот еще один способ сделать это, но с чрезвычайно чистым и простым кодом, который будет работать в Python 3.7 в 2021 году:

import ctypes 

def kill_thread(thread):
    """
    thread: a threading.Thread object
    """
    thread_id = thread.ident
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit))
    if res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
        print('Exception raise failure')

Адаптировано отсюда: https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/

Версия Python: 3.8

Используя поток демона для выполнения того, что мы хотели, если мы хотим, чтобы поток демона был завершен, все, что нам нужно, это выполнить выход из родительского потока, тогда система завершит поток демона, созданный родительским потоком.

Также поддерживает сопрограммы и функции сопрограмм.

def main():
    start_time = time.perf_counter()
    t1 = ExitThread(time.sleep, (10,), debug=False)
    t1.start()
    time.sleep(0.5)
    t1.exit()
    try:
        print(t1.result_future.result())
    except concurrent.futures.CancelledError:
        pass
    end_time = time.perf_counter()
    print(f"time cost {end_time - start_time:0.2f}")

ниже исходный код ExitThread

import concurrent.futures
import threading
import typing
import asyncio


class _WorkItem(object):
    """ concurrent\futures\thread.py

    """

    def __init__(self, future, fn, args, kwargs, *, debug=None):
        self._debug = debug
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if self._debug:
            print("ExitThread._WorkItem run")
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            coroutine = None
            if asyncio.iscoroutinefunction(self.fn):
                coroutine = self.fn(*self.args, **self.kwargs)
            elif asyncio.iscoroutine(self.fn):
                coroutine = self.fn
            if coroutine is None:
                result = self.fn(*self.args, **self.kwargs)
            else:
                result = asyncio.run(coroutine)
            if self._debug:
                print("_WorkItem done")
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)


class ExitThread:
    """ Like a stoppable thread

    Using coroutine for target then exit before running may cause RuntimeWarning.

    """

    def __init__(self, target: typing.Union[typing.Coroutine, typing.Callable] = None
                 , args=(), kwargs = {}, *, daemon=None, debug=None):
        #
        self._debug = debug
        self._parent_thread = threading.Thread(target=self._parent_thread_run, name = "ExitThread_parent_thread"
                                               , daemon=daemon)
        self._child_daemon_thread = None
        self.result_future = concurrent.futures.Future()
        self._workItem = _WorkItem(self.result_future, target, args, kwargs, debug=debug)
        self._parent_thread_exit_lock = threading.Lock()
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock_released = False  # When done it will be True
        self._started = False
        self._exited = False
        self.result_future.add_done_callback(self._release_parent_thread_exit_lock)

    def _parent_thread_run(self):
        self._child_daemon_thread = threading.Thread(target=self._child_daemon_thread_run
                                                     , name = "ExitThread_child_daemon_thread"
                                                     , daemon=True)
        self._child_daemon_thread.start()
        # Block manager thread
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock.release()
        if self._debug:
            print("ExitThread._parent_thread_run exit")

    def _release_parent_thread_exit_lock(self, _future):
        if self._debug:
            print(f"ExitThread._release_parent_thread_exit_lock {self._parent_thread_exit_lock_released} {_future}")
        if not self._parent_thread_exit_lock_released:
            self._parent_thread_exit_lock_released = True
            self._parent_thread_exit_lock.release()

    def _child_daemon_thread_run(self):
        self._workItem.run()

    def start(self):
        if self._debug:
            print(f"ExitThread.start {self._started}")
        if not self._started:
            self._started = True
            self._parent_thread.start()

    def exit(self):
        if self._debug:
            print(f"ExitThread.exit exited: {self._exited} lock_released: {self._parent_thread_exit_lock_released}")
        if self._parent_thread_exit_lock_released:
            return
        if not self._exited:
            self._exited = True
            if not self.result_future.cancel():
                if self.result_future.running():
                    self.result_future.set_exception(concurrent.futures.CancelledError())

Другие вопросы по теме