Я использую модуль подпроцесса для запуска подпроцесса и подключения к его потоку вывода (стандартный вывод). Я хочу иметь возможность выполнять неблокирующее чтение на его стандартном выходе. Есть ли способ сделать .readline неблокирующим или проверить, есть ли данные в потоке, прежде чем я вызываю .readline? Я бы хотел, чтобы это было портативным или, по крайней мере, работало под Windows и Linux.
Вот как я это делаю сейчас (блокируется на .readline, если данные недоступны):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
@ NasserAl-Wohaibi Значит ли это, что лучше всегда создавать файлы?
мне было любопытно понять, почему это вообще блокируется ... Я спрашиваю, потому что видел комментарий: To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read()
Он «по замыслу» ожидает получения входных данных.
связанные: stackoverflow.com/q/19880190/240515






Одно из решений - сделать другой процесс для выполнения вашего чтения процесса или сделать поток процесса с таймаутом.
Вот поточная версия функции тайм-аута:
http://code.activestate.com/recipes/473878/
Однако нужно ли вам читать стандартный вывод по мере его поступления? Другое решение может заключаться в том, чтобы выгрузить вывод в файл и дождаться завершения процесса с использованием p.wait ().
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
похоже, что поток Рецпи не завершится после тайм-аута, и его убийство зависит от возможности убить подпроцесс (sg. в противном случае не имеет отношения в этом отношении), который он читает (вещь, которую вы должны иметь возможность, но на всякий случай вы не можете ..) .
Модуль Выбрать помогает вам определить, где находится следующий полезный вход.
Тем не менее, вам почти всегда больше нравится отдельные темы. Один выполняет блокировку чтения stdin, другой - везде, где вы не хотите блокировать.
Я думаю, что этот ответ бесполезен по двум причинам: (a) Модуль Выбрать не будет работать на каналах под Windows (как четко указано в предоставленной ссылке), что противоречит намерениям OP иметь портативное решение. (b) Асинхронные потоки не допускают синхронного диалога между родительским и дочерним процессами. Что, если родительский процесс хочет отправить следующее действие в соответствии со следующей строкой, прочитанной из дочернего процесса ?!
select также бесполезен, поскольку чтение Python будет блокироваться даже после выбора, потому что он не имеет стандартной семантики C и не будет возвращать частичные данные.
Отдельный thresd для чтения из дочернего вывода решил мою проблему, которая была похожа на эту. Если вам нужно синхронное взаимодействие, я думаю, вы не можете использовать это решение (если вы не знаете, какой результат ожидать). Я бы принял этот ответ
Попробуйте модуль asyncproc. Например:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
Модуль заботится обо всех потоках, как это было предложено С.Лоттом.
Абсолютно блестящий. Намного проще, чем необработанный модуль подпроцесса. У меня отлично работает на Ubuntu.
asyncproc не работает в окнах, а окна не поддерживают os.WNOHANG :-(
asyncproc - это GPL, что еще больше ограничивает его использование :-(
Спасибо. Одна маленькая вещь: кажется, что замена вкладок на 8 пробелов в asyncproc.py - это правильный путь :)
Не похоже, что вы можете получить код возврата запущенного вами процесса через модуль asyncproc; только результат, который он сгенерировал.
"asyncproc - это GPL, что еще больше ограничивает его использование :-(", вы сторонник проприетарного программного обеспечения? @BryanOakley
@bsound: Да, компании должны иметь возможность писать несвободные программы. Но независимо от моих личных убеждений, это факт, что GPL ограничивает использование программного обеспечения под этой лицензией. Многие компании не разрешают использование программного обеспечения GPL из-за его вирусной природы. По определению это означает, что существуют ограничения на использование программ GPL. Это не политическое заявление, это просто факт.
У меня часто была подобная проблема; Программы Python, которые я пишу часто, должны иметь возможность выполнять некоторые основные функции, одновременно принимая ввод пользователя из командной строки (stdin). Простое размещение функций обработки пользовательского ввода в другом потоке не решает проблемы, поскольку readline() блокируется и не имеет тайм-аута. Если основная функциональность завершена и больше нет необходимости ждать дальнейшего ввода данных пользователем, я обычно хочу, чтобы моя программа завершилась, но это невозможно, потому что readline() все еще блокируется в другом потоке, ожидающем строки. Решение, которое я нашел для этой проблемы, - сделать stdin неблокирующим файлом с помощью модуля fcntl:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
На мой взгляд, это немного чище, чем использование модулей select или signal для решения этой проблемы, но опять же, это работает только в UNIX ...
Согласно документации, fcntl () может получать либо файловый дескриптор, либо объект, имеющий метод .fileno ().
Использование строки чтения кажется неправильным в Python 2. См. Ответ anonnn stackoverflow.com/questions/375427/…
Пожалуйста, не используйте занятые петли. Используйте poll () с таймаутом для ожидания данных.
@Stefano, что означает buffer_size?
@cat: на ваш выбор, у меня обычно 1024 - это количество байтов, которое нужно прочитать за один раз, поэтому установите его меньше или больше в соответствии с вашим ожидаемым размером данных!
@Stefano Да, я не понимал, что это может быть произвольный буквальный
есть ли причина, по которой библиотека docs.python.org/3/library/asyncio-api-index.html не упоминается или не используется? Разве это не годится для этого?
Просто измените readline () на read, а затем добавьте логику для разделения и отдачи. Отлично работает!
Используйте select & read (1).
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
Для readline () - например:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
не хорошо. select не должен работать в окнах с файловыми дескрипторами, согласно документы
МОЙ БОГ. Считайте мегабайты или, возможно, гигабайты по одному символу за раз ... это худшая идея, которую я видел за долгое время ... разумеется, этот код не работает, потому что proc.stdout.read() независимо от того, насколько мал аргумент это блокирующий звонок.
OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failedfcntl, select, asyncproc в этом случае не помогут.
Надежный способ чтения потока без блокировки независимо от операционной системы - использовать Queue.get_nowait():
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
Да, это работает для меня, хотя я многое удалил. Он включает в себя передовой опыт, но не всегда необходим. Python 3.x 2.X compat и close_fds можно не указывать, но они все равно будут работать. Но просто знайте, что все делает, и не копируйте это вслепую, даже если это просто работает! (На самом деле самое простое решение - использовать поток и делать строку чтения, как это сделал Себ, Qeues - это просто простой способ получить данные, есть и другие, потоки - ответ!)
Внутри потока вызов out.readline блокирует поток и основной поток, и мне нужно дождаться возврата строки чтения, прежде чем все остальное продолжится. Есть какой-нибудь простой способ обойти это? (Я читаю несколько строк из своего процесса, который также является еще одним файлом .py, который выполняет БД и тому подобное)
@Justin: out.readline не блокирует основной поток, он выполняется в другом потоке.
close_fds определенно не то, что вы хотели бы слепо копировать в свое приложение ...
что, если я не смогу завершить подпроцесс, например. из-за исключений? поток stdout-reader не умрет, а python зависнет, даже если основной поток завершился, не так ли? как можно было обойти это? python 2.x не поддерживает уничтожение потоков, что еще хуже, не поддерживает их прерывание. :( (очевидно, что нужно обрабатывать исключения, чтобы гарантировать завершение подпроцесса, но на всякий случай, что вы можете сделать?)
@naxa: обратите внимание на daemon=True: процесс python не зависнет, если основной поток завершен.
Я создал несколько дружественных оберток этого в пакете shelljobpypi.python.org/pypi/shelljob
Я запустил Popen в отдельном потоке, поэтому я могу «занято ожидание» в этом потоке, используя: .... t.start() while p.poll() == None: time.sleep(0.1) В этом случае мой графический интерфейс не будет блокироваться (я использую TKinter). Так что я также могу использовать parent.after(100, self.consume) для имитации опроса типа событий. В методе потребления я в конечном итоге использую метод q.get () для извлечения данных из очереди. работает как шарм! Хотя некоторые говорят, что можно зайти в тупик, используя wait() или poll() в сочетании с stdout PIPE ???
@ dangerous89: да. Вы можете заблокироваться, если ваш родитель заблокирован на .wait(), но ваш ребенок ждет, пока вы прочитаете его вывод (не ваш случай, но q.get() в обратном вызове GUI также неверен). Самый простой вариант - использовать .communicate() в отдельном потоке. Вот пара примеров кода, которые показывают, как вы можете читать вывод подпроцесса без «блокировки» графического интерфейса: 1. с нитками 2. нет потоков (POSIX). Если что-то непонятно, спросить
какова здесь цель close_fds?
@dashesy: чтобы избежать утечки файловых дескрипторов родителей. Продемонстрированное поведение используется по умолчанию в Python 3.
@ Дж. Ф. Себастьян: имеет смысл. Это единственное решение, которое не блокирует, кажется, на 2.7 другого выхода нет (пробовал выбрать, fcntl)! Есть идеи, почему я не могу получить результат процесса sgid? Он отлично работает в терминале, но здесь я получаю no output yet, чего не может быть.
@dashesy: select, fcntl должны работать в системах POSIX. Если вы не понимаете, почему "еще нет вывода" является всегда возможностью для данного решения; задать новый вопрос. Обязательно прочтите Как спросить и stackoverflow.com/help/mcve
@ J.F.Sebastian, извините за мою отчаянную попытку, я думал, вы уже знаете ответ, похоже, вы знаете :) Я задам новый вопрос в следующий раз. Кстати, после использования setbuf(stdout, NULL) в исполняемом файле он работал (большинство работало, даже select и fcntl), как шарм, я понятия не имею, почему \n не сбрасывал stdout, но я не удивлюсь, если это как-то связано с suid и selinux.
@dashesy: если stdout подпроцесса перенаправлен в канал, то он буферизируется по блоку (он буферизуется по строке, если stdout является терминалом (tty)) для программ на основе stdio, написанных на C. См. Подпроцесс программы Python C зависает в строке «for line in iter»
@ J.F.Sebastian не знал об этом, наивно, я всегда предполагал поведение, которое видно в tty, но теперь это имеет смысл. Это означает, что интеллектуальные приложения должны смотреть на тип stdout, если они производят вещи, которые являются строками и могут использоваться в каналах / grep.
по какой причине вы выполняете out.close () в выводе в очередь? Разве это не должно быть задачей объекта Popen?
@zaphod: это делается по той же причине, по которой оператор with используется для обычных файлов: чтобы не полагаться на сложную сборку мусора для освобождения ресурсов, т.е. даже если p.stdout закрыт, когда Popen() собирает мусор: Я предпочитаю явный простой и детерминированный out.close() (хотя вместо этого мне следовало использовать with out: в потоке - похоже, он работает на Python 2.7 и Python 3).
Это решение не подходит для систем майских кодов (системы с 16-48 ядрами). GIL стал играть роль переключателя контекста.
Лучше использовать неблокирующий ввод-вывод, как описано ниже.
@AntonMedvedev: 1. Неважно, сколько там процессоров. Речь идет о вводе-выводе (Python выпускает GIL при блокировании операций ввода-вывода). 2. На момент ответа альтернативы потокам в stdlib для переносимого решения не было. И это спорно IO ли блокирующая лучше (безусловно), чем потоки. Разумный подход - изучить компромиссы в вашем конкретном случае.
вы пишете «Windows и Linux», это исключает OSX? Я просто попробовал это на OSX с запуском ffmpeg, который, похоже, имеет проблемы. Я собираюсь протестировать это более подробно, если только вы не скажете мне, что это не может работать в OSX.
@ P.R .: OP спрашивает об этих операционках, поэтому они прямо упоминаются. Он должен работать и на OS X.
@nights: "У меня не работает" не очень информативен. Создайте минимальный пример кода, опишите словами, что вы ожидаете получить и что вы получите вместо этого, шаг за шагом, какая у вас ОС, версия Python и опубликуйте это как отдельный вопрос.
Не отключайте потоки для многопроцессорности и многопроцессорности. Очередь с этим ответом. Завершение подпроцесса привело к тому, что курсор stdout переместился для меня в начало экрана.
@jfs - неблокирующий ввод-вывод должен быть асинхронным?
@ uzay95 Я не совсем понимаю, о чем вы спрашиваете. Эти понятия тесно связаны. Ответ показывает, как реализовать неблокирующее чтение поверх синхронного блокирующего вызова readline (). Интерфейс также асинхронный (во время ввода-вывода могут происходить другие вещи).
Мне кажется, что я натыкаюсь на эту ветку примерно раз в месяц ... Я до сих пор не понимаю, почему утверждение «fcntl не поможет в этом случае». Я, должно быть, реализовал это 10-20 раз к настоящему времени, используя fnctl для установки os.O_NONBLOCK (с os.read(), а не readline()). В целом кажется, что все работает, как ожидалось.
@cheshirekow поддерживает ли Windows fcntl? Есть ли readline () на Python 2 поддерживает неблокирующий режим?
@jfs не уверен насчет окон ... ты знаешь? поэтому ты сказал, что это не поможет? Также обратите внимание, что я сказал с os.read(), а не с readline().
os.read() доступен в Windows, но os.O_NONBLOCK задокументирован как специфичный для Unix, поэтому я не ожидал, что он там будет работать.
Когда я попытался использовать предоставленную логику в ответ, данные stderr не смогли их получить.
Похоже, данные stderr также собираются в stdout
@ Karthi1234 нет, stdout в ответе идет в трубу. stderr здесь не идет в трубу. Другое дело ваш код.
есть ли причина, по которой библиотека docs.python.org/3/library/asyncio-api-index.html не упоминается или не используется? Разве это не годится для этого?
@CharlieParker прокрутите вниз до мой другой ответ
Я добавляю эту задачу, чтобы прочитать какой-то подпроцесс. Открыть stdout. Вот мое неблокирующее решение для чтения:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Согласно документы, fcntl не работает в Windows.
@anatolytechtonik используйте вместо этого msvcrt.kbhit()
Вы можете сделать это очень легко в Скрученный. В зависимости от вашей существующей базы кода, это может быть не так просто в использовании, но если вы создаете искаженное приложение, такие вещи становятся почти тривиальными. Вы создаете класс ProcessProtocol и переопределяете метод outReceived(). Twisted (в зависимости от используемого реактора) обычно представляет собой просто большой цикл select() с установленными обратными вызовами для обработки данных из разных файловых дескрипторов (часто сетевых сокетов). Таким образом, метод outReceived() просто устанавливает обратный вызов для обработки данных, поступающих от STDOUT. Вот простой пример, демонстрирующий это поведение:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
В Скрученная документация есть полезная информация по этому поводу.
Если вы создаете все свое приложение на основе Twisted, асинхронная связь с другими процессами, локальными или удаленными, становится действительно элегантной. С другой стороны, если ваша программа не построена на Twisted, это не принесет особой пользы. Надеюсь, это может быть полезно для других читателей, даже если это не применимо к вашему конкретному приложению.
не хорошо. select не должен работать в окнах с файловыми дескрипторами, согласно документы
@naxa Я не думаю, что select(), о котором он говорит, - это то же самое, что и вы. Я предполагаю это, потому что Twisted работает в Windows ...
Я добавил аналогичное решение на основе asyncio из stdlib.
«Скрученный (в зависимости от используемого реактора) - это обычно просто большой цикл select ()» означает, что есть несколько реакторов, между которыми можно выбирать. select() - самый портативный для unix и unix-like, но есть также два реактора для Windows: twistedmatrix.com/documents/current/core/howto/…
Отказ от ответственности: это работает только для торнадо
Вы можете сделать это, установив для fd неблокирующий режим, а затем используя ioloop для регистрации обратных вызовов. Я упаковал это в яйцо под названием tornado_subprocess, и вы можете установить его через PyPI:
easy_install tornado_subprocess
теперь вы можете сделать что-то вроде этого:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
вы также можете использовать его с RequestHandler
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
Спасибо за приятную фичу! Чтобы прояснить, почему мы не можем просто использовать threading.Thread для создания новых неблокирующих процессов? Я использовал его в on_message экземпляра веб-сокета Tornado, и он отлично справился со своей задачей.
Зарезка потоков в торнадо в основном не рекомендуется. они подходят для небольших, непродолжительных функций. Об этом можно прочитать здесь: stackoverflow.com/questions/7846323/tornado-web-and-threadsgithub.com/facebook/tornado/wiki/Threading-and-concurrency
@VukasinToroman, ты действительно спас меня этим. Большое спасибо за модуль подпроцесса торнадо :)
это работает в окнах? (обратите внимание, что select с файловыми дескрипторами, не)
Эта библиотека не использует вызов select. Я не пробовал это под Windows, но у вас, вероятно, возникнут проблемы, поскольку библиотека использует модуль fcntl. Вкратце: нет, это, вероятно, не будет работать под Windows.
Существующие решения у меня не работали (подробности ниже). Что в итоге сработало, так это реализовать readline с помощью read (1) (на основе этот ответ). Последний не блокирует:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
Почему существующие решения не сработали:
1. q.get_nowait() из моего ответа никогда не должен блокировать, в этом смысл его использования. 2. Поток, выполняющий строку чтения (enqueue_output() функция), завершается при EOF, например, включая случай, когда процесс создания вывода завершается. Если вы верите, что это не так; пожалуйста, предоставьте полный пример минимального кода, который показывает иное (возможно, как новый вопрос).
@sebastian Я потратил час или больше, пытаясь придумать минимальный пример. В конце концов, я должен согласиться, что ваш ответ касается всех случаев. Я предполагаю, что раньше у меня это не сработало, потому что, когда я пытался убить процесс создания вывода, он уже был убит и выдал ошибку, которую трудно отладить. Час был потрачен не зря, потому что, придумывая минимальный пример, я мог придумать более простое решение.
Не могли бы вы опубликовать и более простое решение? :) (если он отличается от Себастьяна)
@ dangerous89: Думаю, dcmpid = myprocess.
В состоянии после вызова read () (сразу после while True): out никогда не будет пустой строкой, потому что вы читаете как минимум строку / байты длиной 1.
Я создал библиотеку на основе Решение Дж. Ф. Себастьяна. Вы можете использовать это.
Обновлено: эта реализация все еще блокируется. Вместо этого используйте отвечать Дж. Ф. Себастьяна.
Я попробовал главный ответ, но дополнительный риск и поддержка кода потока вызывали беспокойство.
Просматривая io модуль (и ограничиваясь 2.6), я нашел BufferedReader. Это мое неблокирующее решение без потоков.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
вы пробовали for line in iter(p.stdout.readline, ""): # do stuff with the line? Он не имеет потоков (однопоточный) и блокируется, когда ваш код блокируется.
@ j-f-sebastian Да, в конце концов я вернулся к твоему ответу. Моя реализация все еще иногда блокируется. Я отредактирую свой ответ, чтобы предупредить других, чтобы они не шли по этому пути.
Вот мой код, используемый для быстрого перехвата каждого вывода подпроцесса, включая частичные строки. Он качает одновременно и stdout, и stderr в почти правильном порядке.
Протестировано и корректно работает на Python 2.7 linux и windows.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"] = "1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR= = ", line, "=? = "
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
Один из немногих ответов, которые позволяют читать то, что не обязательно заканчивается новой строкой.
Работая с ответом Дж.Ф. Себастьяна и несколькими другими источниками, я собрал простой диспетчер подпроцессов. Он обеспечивает запрос неблокирующего чтения, а также параллельное выполнение нескольких процессов. Он не использует никаких вызовов, специфичных для ОС (насколько я знаю), и поэтому должен работать где угодно.
Он доступен из pypi, поэтому просто pip install shelljob. Обратитесь к страница проекта за примерами и полными документами.
Python 3.4 представляет новый предварительный API для асинхронного ввода-вывода - asyncio модуль.
Подход аналогичен Ответ на основе twisted от @Bryan Ward - определите протокол, и его методы вызываются, как только данные будут готовы:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
См. «Подпроцесс» в документации.
Существует высокоуровневый интерфейс asyncio.create_subprocess_exec(), который возвращает Process объекты, что позволяет читать строку асинхронно с использованием StreamReader.readline() сопрограмма.
(с async / await Синтаксис Python 3.5+):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill() выполняет следующие задачи:
При необходимости каждый шаг может быть ограничен таймаутом в секундах.
Когда я пробую что-то подобное с помощью сопрограмм python 3.4, я получаю вывод только после того, как весь скрипт будет запущен. Я хотел бы, чтобы строка вывода была напечатана, как только подпроцесс напечатает строку. Вот что у меня есть: pastebin.com/qPssFGep.
@ flutefreak7: проблемы с буферизацией не имеют отношения к текущему вопросу. Перейдите по ссылке, чтобы узнать о возможных решениях.
Благодарность! Решил проблему для моего скрипта, просто используя print(text, flush=True), чтобы напечатанный текст был немедленно доступен наблюдателю, вызывающему readline. Когда я тестировал его с исполняемым файлом на основе Fortran, я действительно хотел обернуть / посмотреть, он не буферизует его вывод, поэтому ведет себя так, как ожидалось.
Можно ли разрешить подпроцессу сохраняться и выполнять дальнейшие операции чтения / записи. readline_and_kill во втором сценарии работает очень похоже на subprocess.comunicate в том смысле, что завершает процесс после одной операции чтения / записи. Я также вижу, что вы используете одиночный канал, stdout, который обрабатывается подпроцессом как неблокирующий. Пытаюсь использовать как stdout, так и stderrЯ нахожу, что в конечном итоге блокирую.
@Carel код в ответе работает так, как задумано, как явно описано в ответе. При желании можно реализовать другое поведение. Оба канала одинаково неблокируемы, если используются, вот пример как читать из обоих каналов одновременно.
зачем заморачиваться потоком очереди? В отличие от readline (), BufferedReader.read 1 () не блокирует ожидание \ r \ n, он возвращает как можно скорее, если есть какие-либо выходные данные.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
Вернется ли он как можно скорее, если ничего не будет? Если этого не происходит, происходит блокировка.
@ Матье Паж прав. read1 заблокирует первый базовый блок чтения, что происходит, когда канал все еще открыт, но вход недоступен.
В моем случае мне понадобился модуль регистрации, который улавливает вывод фоновых приложений и дополняет его (добавляя отметки времени, цвета и т. д.).
В итоге я получил фоновый поток, который выполняет фактический ввод-вывод. Следующий код предназначен только для платформ POSIX. Я снял несущественные части.
Если кто-то собирается использовать этого зверя в течение длительного времени, подумайте об управлении открытыми дескрипторами. В моем случае это не было большой проблемой.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
Вот модуль, который поддерживает неблокирующее чтение и фоновую запись в python:
https://pypi.python.org/pypi/python-nonblock
Предоставляет функцию,
nonblock_read, который будет читать данные из потока, если они доступны, в противном случае возвращает пустую строку (или None, если поток закрыт на другой стороне и все возможные данные были прочитаны)
Вы также можете рассмотреть модуль python-subprocess2,
https://pypi.python.org/pypi/python-subprocess2
который добавляет к модулю подпроцесса. Поэтому к объекту, возвращаемому из "subprocess.Popen", добавляется дополнительный метод runInBackground. Это запускает поток и возвращает объект, который будет автоматически заполнен, когда материал будет записан в stdout / stderr, без блокировки вашего основного потока.
Наслаждаться!
Я хотел бы попробовать этот модуль неблокирующий, но я относительно новичок в некоторых процедурах Linux. Как именно мне установить эти подпрограммы? Я использую Raspbian Jessie, разновидность Debian Linux для Raspberry Pi. Я попробовал sudo apt-get install nonblock и python-nonblock, и оба вызвали ошибку - не найдено. Я скачал zip-файл с сайта pypi.python.org/pypi/python-nonblock, но не знаю, что с ним делать. Спасибо .... РДК
Добавление этого ответа здесь, поскольку он предоставляет возможность устанавливать неблокирующие каналы в Windows и Unix.
Все детали ctypes - спасибо Ответ @ techtonik.
Существует немного измененная версия, которая может использоваться как в системах Unix, так и в Windows.
Таким образом, вы можете использовать одну и ту же функцию и исключение для кода Unix и Windows.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
Чтобы избежать чтения неполных данных, я написал свой собственный генератор строки чтения (который возвращает байтовую строку для каждой строки).
Это генератор, так что вы можете, например ...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
(1) этот комментарий указывает, что readline() не работает с неблокирующими каналами (например, установленными с помощью fcntl) на Python 2 - как вы думаете, это больше не правильно? (мой ответ содержит ссылку (fcntl), которая предоставляет ту же информацию, но теперь кажется удаленной). (2) Посмотрите, как multiprocessing.connection.Pipe использует SetNamedPipeHandleState.
Я тестировал это только на Python3. Но эту информацию тоже видели и ожидаем, что она останется в силе. Я также написал свой собственный код для использования вместо строки чтения, я обновил свой ответ, включив его.
Эта версия неблокирующего чтения не требует специальных модулей и будет работать "из коробки" в большинстве дистрибутивов Linux.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
Это пример запуска интерактивной команды в подпроцессе, а стандартный вывод является интерактивным с использованием псевдотерминала. Вы можете обратиться к: https://stackoverflow.com/a/43012138/3555925
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Моя проблема немного отличается, поскольку я хотел собрать как stdout, так и stderr из запущенного процесса, но в конечном итоге то же самое, поскольку я хотел отобразить вывод в виджете в том виде, в каком он был сгенерирован.
Я не хотел прибегать ко многим из предлагаемых обходных путей с использованием очередей или дополнительных потоков, поскольку они не должны быть необходимы для выполнения такой общей задачи, как запуск другого скрипта и сбор его вывода.
Прочитав предлагаемые решения и документы Python, я решил свою проблему с помощью приведенной ниже реализации. Да, это работает только для POSIX, поскольку я использую вызов функции select.
Я согласен с тем, что документация сбивает с толку, а реализация неудобна для такой типичной задачи по написанию сценариев. Я считаю, что более старые версии python имеют разные значения по умолчанию для Popen и разные объяснения, что создает большую путаницу. Кажется, это хорошо работает как для Python 2.7.12, так и для 3.5.2.
Ключ состоял в том, чтобы настроить bufsize=1 для буферизации строк, а затем universal_newlines=True для обработки как текстового файла, а не двоичного файла, который, кажется, становится значением по умолчанию при настройке bufsize=1.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, DEBUG и VERBOSE - это просто макросы, которые выводят вывод на терминал.
Это решение IMHO на 99,99% эффективно, поскольку оно все еще использует функцию блокировки readline, поэтому мы предполагаем, что подпроцесс хорош и выводит полные строки.
Я приветствую отзывы об улучшении решения, поскольку я все еще новичок в Python.
В этом конкретном случае вы можете установить stderr = subprocess.STDOUT в конструкторе Popen и получить весь вывод из cmd.stdout.readline ().
Хороший наглядный пример. Возникли проблемы с select.select (), но это решило их для меня.
Это решение использует модуль select для «чтения любых доступных данных» из потока ввода-вывода. Эта функция сначала блокируется до тех пор, пока данные не станут доступными, но затем считывает только те данные, которые доступны, и не блокируется дальше.
Учитывая тот факт, что он использует модуль select, это работает только в Unix.
Код полностью совместим с PEP8.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
Я также столкнулся с проблемой, описанной Джесси, и решил ее, используя "select", как это делали Брэдли, Энди и другие, но в режиме блокировки, чтобы избежать цикла занятости. Он использует фиктивную трубу в качестве поддельного стандартного ввода. Блоки select и ждут, пока будут готовы либо стандартный ввод, либо канал. Когда клавиша нажата, stdin разблокирует выбор, и значение клавиши можно получить с помощью read (1). Когда другой поток записывает в канал, канал разблокирует выбор, и это можно рассматривать как указание на то, что потребность в stdin отпала. Вот некоторый справочный код:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
ПРИМЕЧАНИЕ. Для того, чтобы эта функция работала в Windows, трубу следует заменить на сокет. Пока не пробовал, но по документации должно работать.
У меня есть проблема с исходным запросом, но я не хотел вызывать потоки. Я смешал решение Джесси с прямым read() из канала и моим собственным обработчиком буфера для чтения строк (однако мой подпроцесс - ping - всегда записывал полные строки <системного размера страницы). Я избегаю ожидания, если читаю только в часах io, зарегистрированных в gobject. В наши дни я обычно запускаю код в MainLoop gobject, чтобы избежать потоков.
def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
Наблюдатель
def watch(f, *other):
print 'reading',f.read()
return True
И основная программа устанавливает пинг, а затем вызывает почтовый цикл gobject.
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
Любая другая работа связана с обратными вызовами в gobject.
В современном Python дела обстоят намного лучше.
Вот простая дочерняя программа hello.py:
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
И программа для взаимодействия с ним:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
Это распечатывает:
b'hello bob\n'
b'hello alice\n'
Обратите внимание, что фактический шаблон, который также присутствует почти во всех предыдущих ответах, как здесь, так и в связанных вопросах, заключается в том, чтобы установить дескриптор файла stdout дочернего элемента на неблокирующий, а затем опросить его в некотором цикле выбора. В наши дни, конечно, этот цикл обеспечивается asyncio.
Попробуйте мы ожидаем, альтернативу ожидание для Windows.
import wexpect
p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.') // regex pattern of any character
output_str = p.after()
В Unix-подобных системах и Python 3.5+ есть os.set_blocking, который делает именно то, что говорит.
import os
import time
import subprocess
cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
# first iteration always produces empty byte string in non-blocking mode
for i in range(2):
line = p.stdout.readline()
print(i, line)
time.sleep(0.5)
if time.time() > start + 5:
break
p.terminate()
Это выводит:
1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'
os.set_blocking прокомментировал это:
0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
Это, безусловно, самое элегантное решение, спасибо, что сделали мой день (на самом деле ночь ^^)
Очень элегантно и очень эффективно. Спасибо за это решение, оно отлично работает!
Спасибо! Это отлично работает при использовании каналов Popen с Selector, чтобы убедиться, что он никогда не заблокируется.
Вот простое решение, основанное на потоках, которые:
select).stdout, так и stderr асинхронно.asyncio (что может конфликтовать с другими библиотеками).printer.py
import time
import sys
sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)
reader.py
import queue
import subprocess
import sys
import threading
def enqueue_stream(stream, queue, type):
for line in iter(stream.readline, b''):
queue.put(str(type) + line.decode('utf-8'))
stream.close()
def enqueue_process(process, queue):
process.wait()
queue.put('x')
p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()
while True:
line = q.get()
if line[0] == 'x':
break
if line[0] == '2': # stderr
sys.stdout.write("\033[0;31m") # ANSI red color
sys.stdout.write(line[1:])
if line[0] == '2':
sys.stdout.write("\033[0m") # reset ANSI code
sys.stdout.flush()
tp.join()
to.join()
te.join()
(Исходя из Google?) Все PIPEs зайдут в тупик, когда один из буферов PIPE будет заполнен и не прочитан. например stdout тупик при заполнении stderr. Никогда не передавайте ТРУБУ, которую не собираетесь читать.