Завершение работы ThreadPoolExecutor с помощью KeyboardInterrupt

Я использую ThreadPoolExecutor Python для реализации асинхронного клиента. Время от времени сценарий отправляет синхронного клиента, вызываемого в пул потоков. Я хотел бы иметь возможность остановить цикл с помощью KeyboardInterrupt.

Я написал следующий код.

#!/usr/bin/env python

import numpy as np
import tritonclient.http as tritonclient

import argparse
import itertools
import logging
import random
import sys
import time
from concurrent.futures import ThreadPoolExecutor

distributions = {
    'poisson': lambda w: random.expovariate(1/w),
    'uniform': lambda w: random.uniform(0, 2*w),
}

class Client:
    def __init__(self, url, model):
        self.client = tritonclient.InferenceServerClient(url)
        config = self.client.get_model_config(model)
        self.inputs = config['input']
        self.outputs = [output['name'] for output in config['output']]
        self.model = model

    def __call__(self):
        inputs = []
        for config in self.inputs:
            assert config['data_type'] == 'TYPE_FP32'
            shape = [1] + config['dims']
            datatype = config['data_type'].removeprefix('TYPE_')
            input = tritonclient.InferInput(config['name'], shape, datatype)
            array = np.random.default_rng().random(shape, dtype=np.float32)
            input.set_data_from_numpy(array)
            inputs.append(input)
        result = self.client.infer(self.model, inputs)
        for output in self.outputs:
            result.get_output(output)

def benchmark(fn):
    t_i = time.time()
    fn()
    t_f = time.time()
    print(t_i, t_f - t_i)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-d', '--distribution', choices=distributions.values(),
                        type=distributions.get, default=lambda w: w)
    parser.add_argument('-n', '--nrequests', default=-1, type=int)
    parser.add_argument('-o', '--open', '--open-loop', action='store_true')
    parser.add_argument('-u', '--url', default='localhost:8000')
    parser.add_argument('-v', '--verbose', action='count', default=0)
    parser.add_argument('model')
    rate = parser.add_mutually_exclusive_group()
    rate.add_argument('-w', '--wait', '--delay', '-l', '--lambda',
                      default=0, type=float)
    rate.add_argument('-r', '--rate', '-f', '--frequency', type=float)
    args = parser.parse_args()

    level = (logging.DEBUG if args.verbose > 1
            else logging.INFO if args.verbose
            else logging.WARNING)
    logging.basicConfig(level=level)

    if args.rate:
        args.wait = 1/args.rate
    logging.debug(args)

    client = Client(args.url, args.model)

    with ThreadPoolExecutor() as executor:
        try:
            for _ in (itertools.count() if args.nrequests < 0
                        else range(args.nrequests)):
                if args.open:
                    executor.submit(benchmark, client)
                else:
                    benchmark(client)
                time.sleep(args.distribution(args.wait))
        except KeyboardInterrupt:
            pass
        except BrokenPipeError:
            pass

Он зависает на первом Control-C и требует еще двух, прежде чем наконец завершится со следующей ошибкой. Как бы то ни было, я впервые использовал

1717617460.23863 0.003475189208984375
1717617460.250774 0.0033867359161376953
1717617460.2690861 0.0033500194549560547
^C^CTraceback (most recent call last):
  File "/data/pcoppock/mlos/apps/tritonclient", line 73, in <module>
    with ThreadPoolExecutor() as executor:
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 649, in __exit__
    self.shutdown(wait=True)
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 235, in shutdown
    t.join()
  File "/usr/lib/python3.10/threading.py", line 1096, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt
^CException ignored in: <module 'threading' from '/usr/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1537, in _shutdown
    atexit_call()
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 31, in _python_exit
    t.join()
  File "/usr/lib/python3.10/threading.py", line 1096, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt: 

linux$ 

Re: «...ChatGPT...» ChatGPT не знает, как решать проблемы. ChatGPT умеет создавать текст (включая текст исходного кода), который выглядит как ответ, который мог бы дать какой-то разумный человек. Но выглядеть как ответ и быть на самом деле правильным ответом — не всегда одно и то же.

Solomon Slow 05.06.2024 22:20
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
1
79
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Linux отправляет SIGINT в произвольный поток, который перемещается в основной поток, но основной поток должен дождаться завершения всех потоков, прежде чем он завершится, поэтому вам нужен второй Ctrl-C.

нет чистого способа исправить это, вы можете захотеть обработать SIGINT и установить некоторое логическое значение, чтобы все потоки завершились, но поскольку вы используете синхронный ввод-вывод, вы не можете легко прервать текущий ввод-вывод.

единственный надежный способ - резко завершить приложение при нажатии Ctrl-C, заставив обработчик сигнала выполнить os._exit(), что убивает приложение без какой-либо очистки, ОС закроет все сокеты и освободит память для тебя.

import sys
import time
from concurrent.futures import ThreadPoolExecutor
import signal
import os

def signal_handler(sig, frame):
    print("App killed by Ctrl-C", flush=True)
    os._exit(0)  # KILL THE APPLICATION RIGHT NOW !!

signal.signal(signal.SIGINT, signal_handler)

class Client:
    def __init__(self):
        pass

    def __call__(self):
        start = time.time()
        while time.time() - start < 10:
            pass

def benchmark(fn):
    t_i = time.time()
    fn()
    t_f = time.time()
    print(t_i, t_f - t_i)

if __name__ == '__main__':
    client = Client()
    with ThreadPoolExecutor() as executor:
        try:
            task = executor.submit(benchmark, client)
            task.result()
        except KeyboardInterrupt:
            pass
        except BrokenPipeError:
            pass

хотя, как правило, убивать Python таким образом плохо, поскольку очистка не выполняется, альтернативой является либо дождаться завершения всех задач, либо установить логическое значение, которое потоки будут периодически проверять, и то, и другое не будет работать в вашем случае.

наконец, вы можете использовать task.result() вместо сна в течение произвольного промежутка времени, чтобы убедиться, что вы правильно ожидаете все задачи и правильно отображаете исключения в основном потоке.

обратите внимание: если у вас есть какие-либо дочерние процессы из пулов процессов, они не будут очищены, вам необходимо правильно завершить их в обработчике сигналов, поэтому не используйте этот «трюк» как часть большого приложения.


С другой стороны, если вы просто хотите «остановить новые отправки», вы можете заставить обработчик установить логическое значение, которое ваш основной поток может прослушивать и избегать отправки новых задач.

Да, в основном я просто хочу остановить новые отправки, поэтому, вероятно, лучше использовать обработчик. Я до сих пор не понимаю, почему основной поток блокируется при ожидании завершения работы пула потоков. Почему после того, как сигнал доставлен в основной поток, блокируются другие потоки?

Patrick Coppock 06.06.2024 01:24

@PatrickCoppock область with пула потоков будет блокироваться до тех пор, пока все рабочие не будут объединены, именно так работает пул: если Python не делает этого, приложение может завершиться, пока запросы все еще выполняются, что будет плохо.

Ahmed AEK 06.06.2024 08:07
Ответ принят как подходящий

Проблема заключалась в клиентском модуле Triton. Объекты Client не были потокобезопасными. После того, как я изменил код для инициализации нового клиента в каждом потоке, нажатие Ctrl-C в конечном итоге остановило программу.

Другие вопросы по теме