Я использую ThreadPoolExecutor Python для реализации асинхронного клиента. Время от времени сценарий отправляет синхронного клиента, вызываемого в пул потоков. Я хотел бы иметь возможность остановить цикл с помощью KeyboardInterrupt.
Я написал следующий код.
#!/usr/bin/env python
import numpy as np
import tritonclient.http as tritonclient
import argparse
import itertools
import logging
import random
import sys
import time
from concurrent.futures import ThreadPoolExecutor
distributions = {
'poisson': lambda w: random.expovariate(1/w),
'uniform': lambda w: random.uniform(0, 2*w),
}
class Client:
def __init__(self, url, model):
self.client = tritonclient.InferenceServerClient(url)
config = self.client.get_model_config(model)
self.inputs = config['input']
self.outputs = [output['name'] for output in config['output']]
self.model = model
def __call__(self):
inputs = []
for config in self.inputs:
assert config['data_type'] == 'TYPE_FP32'
shape = [1] + config['dims']
datatype = config['data_type'].removeprefix('TYPE_')
input = tritonclient.InferInput(config['name'], shape, datatype)
array = np.random.default_rng().random(shape, dtype=np.float32)
input.set_data_from_numpy(array)
inputs.append(input)
result = self.client.infer(self.model, inputs)
for output in self.outputs:
result.get_output(output)
def benchmark(fn):
t_i = time.time()
fn()
t_f = time.time()
print(t_i, t_f - t_i)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--distribution', choices=distributions.values(),
type=distributions.get, default=lambda w: w)
parser.add_argument('-n', '--nrequests', default=-1, type=int)
parser.add_argument('-o', '--open', '--open-loop', action='store_true')
parser.add_argument('-u', '--url', default='localhost:8000')
parser.add_argument('-v', '--verbose', action='count', default=0)
parser.add_argument('model')
rate = parser.add_mutually_exclusive_group()
rate.add_argument('-w', '--wait', '--delay', '-l', '--lambda',
default=0, type=float)
rate.add_argument('-r', '--rate', '-f', '--frequency', type=float)
args = parser.parse_args()
level = (logging.DEBUG if args.verbose > 1
else logging.INFO if args.verbose
else logging.WARNING)
logging.basicConfig(level=level)
if args.rate:
args.wait = 1/args.rate
logging.debug(args)
client = Client(args.url, args.model)
with ThreadPoolExecutor() as executor:
try:
for _ in (itertools.count() if args.nrequests < 0
else range(args.nrequests)):
if args.open:
executor.submit(benchmark, client)
else:
benchmark(client)
time.sleep(args.distribution(args.wait))
except KeyboardInterrupt:
pass
except BrokenPipeError:
pass
Он зависает на первом Control-C и требует еще двух, прежде чем наконец завершится со следующей ошибкой. Как бы то ни было, я впервые использовал
1717617460.23863 0.003475189208984375
1717617460.250774 0.0033867359161376953
1717617460.2690861 0.0033500194549560547
^C^CTraceback (most recent call last):
File "/data/pcoppock/mlos/apps/tritonclient", line 73, in <module>
with ThreadPoolExecutor() as executor:
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 649, in __exit__
self.shutdown(wait=True)
File "/usr/lib/python3.10/concurrent/futures/thread.py", line 235, in shutdown
t.join()
File "/usr/lib/python3.10/threading.py", line 1096, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
if lock.acquire(block, timeout):
KeyboardInterrupt
^CException ignored in: <module 'threading' from '/usr/lib/python3.10/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.10/threading.py", line 1537, in _shutdown
atexit_call()
File "/usr/lib/python3.10/concurrent/futures/thread.py", line 31, in _python_exit
t.join()
File "/usr/lib/python3.10/threading.py", line 1096, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
if lock.acquire(block, timeout):
KeyboardInterrupt:
linux$
Linux отправляет SIGINT
в произвольный поток, который перемещается в основной поток, но основной поток должен дождаться завершения всех потоков, прежде чем он завершится, поэтому вам нужен второй Ctrl-C.
нет чистого способа исправить это, вы можете захотеть обработать SIGINT и установить некоторое логическое значение, чтобы все потоки завершились, но поскольку вы используете синхронный ввод-вывод, вы не можете легко прервать текущий ввод-вывод.
единственный надежный способ - резко завершить приложение при нажатии Ctrl-C, заставив обработчик сигнала выполнить os._exit(), что убивает приложение без какой-либо очистки, ОС закроет все сокеты и освободит память для тебя.
import sys
import time
from concurrent.futures import ThreadPoolExecutor
import signal
import os
def signal_handler(sig, frame):
print("App killed by Ctrl-C", flush=True)
os._exit(0) # KILL THE APPLICATION RIGHT NOW !!
signal.signal(signal.SIGINT, signal_handler)
class Client:
def __init__(self):
pass
def __call__(self):
start = time.time()
while time.time() - start < 10:
pass
def benchmark(fn):
t_i = time.time()
fn()
t_f = time.time()
print(t_i, t_f - t_i)
if __name__ == '__main__':
client = Client()
with ThreadPoolExecutor() as executor:
try:
task = executor.submit(benchmark, client)
task.result()
except KeyboardInterrupt:
pass
except BrokenPipeError:
pass
хотя, как правило, убивать Python таким образом плохо, поскольку очистка не выполняется, альтернативой является либо дождаться завершения всех задач, либо установить логическое значение, которое потоки будут периодически проверять, и то, и другое не будет работать в вашем случае.
наконец, вы можете использовать task.result()
вместо сна в течение произвольного промежутка времени, чтобы убедиться, что вы правильно ожидаете все задачи и правильно отображаете исключения в основном потоке.
обратите внимание: если у вас есть какие-либо дочерние процессы из пулов процессов, они не будут очищены, вам необходимо правильно завершить их в обработчике сигналов, поэтому не используйте этот «трюк» как часть большого приложения.
С другой стороны, если вы просто хотите «остановить новые отправки», вы можете заставить обработчик установить логическое значение, которое ваш основной поток может прослушивать и избегать отправки новых задач.
Да, в основном я просто хочу остановить новые отправки, поэтому, вероятно, лучше использовать обработчик. Я до сих пор не понимаю, почему основной поток блокируется при ожидании завершения работы пула потоков. Почему после того, как сигнал доставлен в основной поток, блокируются другие потоки?
@PatrickCoppock область with
пула потоков будет блокироваться до тех пор, пока все рабочие не будут объединены, именно так работает пул: если Python не делает этого, приложение может завершиться, пока запросы все еще выполняются, что будет плохо.
Проблема заключалась в клиентском модуле Triton. Объекты Client не были потокобезопасными. После того, как я изменил код для инициализации нового клиента в каждом потоке, нажатие Ctrl-C в конечном итоге остановило программу.
Re: «...ChatGPT...» ChatGPT не знает, как решать проблемы. ChatGPT умеет создавать текст (включая текст исходного кода), который выглядит как ответ, который мог бы дать какой-то разумный человек. Но выглядеть как ответ и быть на самом деле правильным ответом — не всегда одно и то же.