Я хочу иметь возможность подавлять любую печать на стандартный вывод в определенном потоке. Вот что я пробовал:
import sys, io, time
from threading import Thread
def do_thread_action():
# Disable stdout
sys.stdout = io.StringIO()
print("don't print this 1")
time.sleep(1)
print("don't print this 2")
time.sleep(1)
print("don't print this 3")
# Re-enable stdout
sys.stdout = sys.__stdout__
thread = Thread(target=do_thread_action)
thread.start()
time.sleep(1.5)
# Print this to stdout
print('Print this')
thread.join()
Однако это не работает, поскольку sys.stdout
является глобальным как для thread
, так и для основного потока.
Как подавить отпечатки внутри do_thread_action
внутри потока, но не подавить отпечатки за его пределами?
Есть ли у вас контроль над операторами печати в потоке? Если да, вы можете изменить параметр file
для печати в другой поток. Например, написав /null, вы можете добавить nullstream = open(os.devnull,"w")
, а затем file=nullstream
.
@ ipodtouch0218 нет, в моем реальном коде потоки запускают функции из библиотеки, которую я не могу редактировать. Я хочу подавить в потоке все, что отправляется на стандартный вывод
@Wilson, а что насчет этого решения тогда? Они создают прокси-объект для перехвата результата sys.stdout
только в одном потоке. Похоже, это то, что пытался сделать @jsbueno.
Итак, вот оно — просто замените объект sys.stdout
на объект с write
(и, чтобы охватить все случаи, flush
) методами, которые могут выбирать, куда должен идти вывод для текущего работающего потока.
И они могут проверить текущий поток, используя имя потока.
Вот почти «готовый к производству» класс, который может позаботиться обо всем, включая даже декораторы для исправления кодовых путей, которые должны быть защищены для печати:
import time
import threading
import io
from unittest import mock
import sys
delayed_outputs = None
class SelectOutput():
def __init__(self, config):
self.text_io = io.StringIO()
self.ns = threading.local()
self.config = config
def filter(self, func):
def wrapper( *args, **kwargs):
# in this example, thread_id is passed as
# a parameter, but one could also use
thread_id = threading.current_thread().name
if thread_id in self.config and self.config[thread_id] == "capture":
self.ns.stdout = self.text_io
else:
self.ns.stdout = sys.__stdout__
return func(*args, **kwargs)
return wrapper
def instrument(self, func):
def wrapper(*args, **kwargs):
#global all_outputs
with mock.patch("sys.stdout", self):
# all_outputs = tmp
return func(*args, **kwargs)
return wrapper
def write(self, text):
self.ns.stdout.write(text)
def flush(self):
self.ns.stdout.flush()
select_output = SelectOutput(config = {"2": "capture"})
@select_output.filter
def target(thread_id):
time.sleep(0.1 * thread_id)
print(f"At thread ID: {thread_id}")
time.sleep(0.2 * thread_id)
print(f"closing thread ID: {thread_id}")
@select_output.instrument
def instrumented():
threads = []
for thread_id in (1, 2, 3):
thread = threading.Thread(target=target, args=(thread_id,))
# name the threads so that each is identified when they are running:
thread.name = str(thread_id)
threads.append(thread)
thread.start()
[t.join() for t in threads]
def main():
global delayed_outputs
delayed_outputs = io.StringIO()
instrumented()
print("Delayed outputs:", select_output.text_io.getvalue())
if __name__ == "__main__":
main()
блестящее решение! В моей собственной реализации я удалил config
из класса и вместо этого разделил filter
на две оболочки, filter
и dont_filter
, каждая из которых могла быть применена к соответствующим целевым функциям.
Я не уверен, возможно ли это, но если это так, то способ сделать это - это обезьяний патч sys.stdout с локальным объектом потока. Я попробую это и вернусь с полным ответом, если посчитаю, что это может сработать.