Неблокирующее чтение в подпроцессе. PIPE в Python

Я использую модуль подпроцесса для запуска подпроцесса и подключения к его потоку вывода (стандартный вывод). Я хочу иметь возможность выполнять неблокирующее чтение на его стандартном выходе. Есть ли способ сделать .readline неблокирующим или проверить, есть ли данные в потоке, прежде чем я вызываю .readline? Я бы хотел, чтобы это было портативным или, по крайней мере, работало под Windows и Linux.

Вот как я это делаю сейчас (блокируется на .readline, если данные недоступны):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

(Исходя из Google?) Все PIPEs зайдут в тупик, когда один из буферов PIPE будет заполнен и не прочитан. например stdout тупик при заполнении stderr. Никогда не передавайте ТРУБУ, которую не собираетесь читать.

Nasser Al-Wohaibi 07.05.2014 15:07

@ NasserAl-Wohaibi Значит ли это, что лучше всегда создавать файлы?

Charlie Parker 28.02.2019 03:18

мне было любопытно понять, почему это вообще блокируется ... Я спрашиваю, потому что видел комментарий: To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read()

Charlie Parker 01.03.2019 22:31

Он «по замыслу» ожидает получения входных данных.

Mathieu Pagé 01.03.2019 22:34

связанные: stackoverflow.com/q/19880190/240515

user240515 09.05.2019 05:03
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
537
5
245 145
29
Перейти к ответу Данный вопрос помечен как решенный

Ответы 29

Одно из решений - сделать другой процесс для выполнения вашего чтения процесса или сделать поток процесса с таймаутом.

Вот поточная версия функции тайм-аута:

http://code.activestate.com/recipes/473878/

Однако нужно ли вам читать стандартный вывод по мере его поступления? Другое решение может заключаться в том, чтобы выгрузить вывод в файл и дождаться завершения процесса с использованием p.wait ().

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

похоже, что поток Рецпи не завершится после тайм-аута, и его убийство зависит от возможности убить подпроцесс (sg. в противном случае не имеет отношения в этом отношении), который он читает (вещь, которую вы должны иметь возможность, но на всякий случай вы не можете ..) .

n611x007 09.09.2013 11:08

Модуль Выбрать помогает вам определить, где находится следующий полезный вход.

Тем не менее, вам почти всегда больше нравится отдельные темы. Один выполняет блокировку чтения stdin, другой - везде, где вы не хотите блокировать.

Я думаю, что этот ответ бесполезен по двум причинам: (a) Модуль Выбрать не будет работать на каналах под Windows (как четко указано в предоставленной ссылке), что противоречит намерениям OP иметь портативное решение. (b) Асинхронные потоки не допускают синхронного диалога между родительским и дочерним процессами. Что, если родительский процесс хочет отправить следующее действие в соответствии со следующей строкой, прочитанной из дочернего процесса ?!

ThomasH 15.07.2009 02:32

select также бесполезен, поскольку чтение Python будет блокироваться даже после выбора, потому что он не имеет стандартной семантики C и не будет возвращать частичные данные.

Helmut Grohne 27.01.2011 17:51

Отдельный thresd для чтения из дочернего вывода решил мою проблему, которая была похожа на эту. Если вам нужно синхронное взаимодействие, я думаю, вы не можете использовать это решение (если вы не знаете, какой результат ожидать). Я бы принял этот ответ

Emiliano 18.02.2011 10:20

Попробуйте модуль asyncproc. Например:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Модуль заботится обо всех потоках, как это было предложено С.Лоттом.

Абсолютно блестящий. Намного проще, чем необработанный модуль подпроцесса. У меня отлично работает на Ubuntu.

Cerin 02.12.2010 15:30

asyncproc не работает в окнах, а окна не поддерживают os.WNOHANG :-(

Bryan Oakley 11.01.2011 01:01

asyncproc - это GPL, что еще больше ограничивает его использование :-(

Bryan Oakley 17.02.2011 01:28

Спасибо. Одна маленькая вещь: кажется, что замена вкладок на 8 пробелов в asyncproc.py - это правильный путь :)

benjaoming 11.11.2012 18:49

Не похоже, что вы можете получить код возврата запущенного вами процесса через модуль asyncproc; только результат, который он сгенерировал.

grayaii 27.10.2015 17:17

"asyncproc - это GPL, что еще больше ограничивает его использование :-(", вы сторонник проприетарного программного обеспечения? @BryanOakley

bsound 15.02.2021 19:35

@bsound: Да, компании должны иметь возможность писать несвободные программы. Но независимо от моих личных убеждений, это факт, что GPL ограничивает использование программного обеспечения под этой лицензией. Многие компании не разрешают использование программного обеспечения GPL из-за его вирусной природы. По определению это означает, что существуют ограничения на использование программ GPL. Это не политическое заявление, это просто факт.

Bryan Oakley 15.02.2021 19:40

У меня часто была подобная проблема; Программы Python, которые я пишу часто, должны иметь возможность выполнять некоторые основные функции, одновременно принимая ввод пользователя из командной строки (stdin). Простое размещение функций обработки пользовательского ввода в другом потоке не решает проблемы, поскольку readline() блокируется и не имеет тайм-аута. Если основная функциональность завершена и больше нет необходимости ждать дальнейшего ввода данных пользователем, я обычно хочу, чтобы моя программа завершилась, но это невозможно, потому что readline() все еще блокируется в другом потоке, ожидающем строки. Решение, которое я нашел для этой проблемы, - сделать stdin неблокирующим файлом с помощью модуля fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

На мой взгляд, это немного чище, чем использование модулей select или signal для решения этой проблемы, но опять же, это работает только в UNIX ...

Согласно документации, fcntl () может получать либо файловый дескриптор, либо объект, имеющий метод .fileno ().

Denilson Sá Maia 27.04.2010 23:10

Использование строки чтения кажется неправильным в Python 2. См. Ответ anonnn stackoverflow.com/questions/375427/…

Catalin Iacob 27.10.2010 18:19

Пожалуйста, не используйте занятые петли. Используйте poll () с таймаутом для ожидания данных.

Ivo Danihelka 14.02.2011 00:10
Ответ Джесси неверен. По словам Гуидо, readline некорректно работает с неблокирующим режимом и не будет работать до Python 3000. bugs.python.org/issue1175#msg56041 Если вы хотите использовать fcntl для установки неблокирующего режима файла, вы должны использовать низкоуровневый os.read () и самостоятельно отделите строки. Смешивание fcntl с высокоуровневыми вызовами, выполняющими буферизацию строки, вызывает проблемы.
anonnn 26.10.2010 20:49

@Stefano, что означает buffer_size?

cat 22.02.2016 07:15

@cat: на ваш выбор, у меня обычно 1024 - это количество байтов, которое нужно прочитать за один раз, поэтому установите его меньше или больше в соответствии с вашим ожидаемым размером данных!

Stefano 24.02.2016 15:29

@Stefano Да, я не понимал, что это может быть произвольный буквальный

cat 24.02.2016 16:37

есть ли причина, по которой библиотека docs.python.org/3/library/asyncio-api-index.html не упоминается или не используется? Разве это не годится для этого?

Charlie Parker 01.03.2019 22:27

Просто измените readline () на read, а затем добавьте логику для разделения и отдачи. Отлично работает!

Paul Kenjora 31.10.2019 02:34

Используйте select & read (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Для readline () - например:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

не хорошо. select не должен работать в окнах с файловыми дескрипторами, согласно документы

n611x007 09.09.2013 11:10

МОЙ БОГ. Считайте мегабайты или, возможно, гигабайты по одному символу за раз ... это худшая идея, которую я видел за долгое время ... разумеется, этот код не работает, потому что proc.stdout.read() независимо от того, насколько мал аргумент это блокирующий звонок.

wvxvw 03.07.2019 14:26
OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failed
nmz787 31.10.2019 08:43
Ответ принят как подходящий

fcntl, select, asyncproc в этом случае не помогут.

Надежный способ чтения потока без блокировки независимо от операционной системы - использовать Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

Да, это работает для меня, хотя я многое удалил. Он включает в себя передовой опыт, но не всегда необходим. Python 3.x 2.X compat и close_fds можно не указывать, но они все равно будут работать. Но просто знайте, что все делает, и не копируйте это вслепую, даже если это просто работает! (На самом деле самое простое решение - использовать поток и делать строку чтения, как это сделал Себ, Qeues - это просто простой способ получить данные, есть и другие, потоки - ответ!)

Aki 22.02.2012 17:19

Внутри потока вызов out.readline блокирует поток и основной поток, и мне нужно дождаться возврата строки чтения, прежде чем все остальное продолжится. Есть какой-нибудь простой способ обойти это? (Я читаю несколько строк из своего процесса, который также является еще одним файлом .py, который выполняет БД и тому подобное)

Justin 09.04.2012 23:00

@Justin: out.readline не блокирует основной поток, он выполняется в другом потоке.

jfs 15.04.2012 22:45

close_fds определенно не то, что вы хотели бы слепо копировать в свое приложение ...

shoosh 13.06.2013 13:55

что, если я не смогу завершить подпроцесс, например. из-за исключений? поток stdout-reader не умрет, а python зависнет, даже если основной поток завершился, не так ли? как можно было обойти это? python 2.x не поддерживает уничтожение потоков, что еще хуже, не поддерживает их прерывание. :( (очевидно, что нужно обрабатывать исключения, чтобы гарантировать завершение подпроцесса, но на всякий случай, что вы можете сделать?)

n611x007 09.09.2013 10:51

@naxa: обратите внимание на daemon=True: процесс python не зависнет, если основной поток завершен.

jfs 09.09.2013 11:21

Я создал несколько дружественных оберток этого в пакете shelljobpypi.python.org/pypi/shelljob

edA-qa mort-ora-y 31.10.2013 14:07

Я запустил Popen в отдельном потоке, поэтому я могу «занято ожидание» в этом потоке, используя: .... t.start() while p.poll() == None: time.sleep(0.1) В этом случае мой графический интерфейс не будет блокироваться (я использую TKinter). Так что я также могу использовать parent.after(100, self.consume) для имитации опроса типа событий. В методе потребления я в конечном итоге использую метод q.get () для извлечения данных из очереди. работает как шарм! Хотя некоторые говорят, что можно зайти в тупик, используя wait() или poll() в сочетании с stdout PIPE ???

danger89 26.11.2014 17:23

@ dangerous89: да. Вы можете заблокироваться, если ваш родитель заблокирован на .wait(), но ваш ребенок ждет, пока вы прочитаете его вывод (не ваш случай, но q.get() в обратном вызове GUI также неверен). Самый простой вариант - использовать .communicate() в отдельном потоке. Вот пара примеров кода, которые показывают, как вы можете читать вывод подпроцесса без «блокировки» графического интерфейса: 1. с нитками 2. нет потоков (POSIX). Если что-то непонятно, спросить

jfs 01.12.2014 19:17

какова здесь цель close_fds?

dashesy 05.02.2015 07:23

@dashesy: ​​чтобы избежать утечки файловых дескрипторов родителей. Продемонстрированное поведение используется по умолчанию в Python 3.

jfs 05.02.2015 08:08

@ Дж. Ф. Себастьян: имеет смысл. Это единственное решение, которое не блокирует, кажется, на 2.7 другого выхода нет (пробовал выбрать, fcntl)! Есть идеи, почему я не могу получить результат процесса sgid? Он отлично работает в терминале, но здесь я получаю no output yet, чего не может быть.

dashesy 05.02.2015 20:43

@dashesy: ​​select, fcntl должны работать в системах POSIX. Если вы не понимаете, почему "еще нет вывода" является всегда возможностью для данного решения; задать новый вопрос. Обязательно прочтите Как спросить и stackoverflow.com/help/mcve

jfs 05.02.2015 21:01

@ J.F.Sebastian, извините за мою отчаянную попытку, я думал, вы уже знаете ответ, похоже, вы знаете :) Я задам новый вопрос в следующий раз. Кстати, после использования setbuf(stdout, NULL) в исполняемом файле он работал (большинство работало, даже select и fcntl), как шарм, я понятия не имею, почему \n не сбрасывал stdout, но я не удивлюсь, если это как-то связано с suid и selinux.

dashesy 05.02.2015 21:58

@dashesy: ​​если stdout подпроцесса перенаправлен в канал, то он буферизируется по блоку (он буферизуется по строке, если stdout является терминалом (tty)) для программ на основе stdio, написанных на C. См. Подпроцесс программы Python C зависает в строке «for line in iter»

jfs 05.02.2015 22:08

@ J.F.Sebastian не знал об этом, наивно, я всегда предполагал поведение, которое видно в tty, но теперь это имеет смысл. Это означает, что интеллектуальные приложения должны смотреть на тип stdout, если они производят вещи, которые являются строками и могут использоваться в каналах / grep.

dashesy 06.02.2015 20:39

по какой причине вы выполняете out.close () в выводе в очередь? Разве это не должно быть задачей объекта Popen?

badideas 18.02.2015 04:51

@zaphod: это делается по той же причине, по которой оператор with используется для обычных файлов: чтобы не полагаться на сложную сборку мусора для освобождения ресурсов, т.е. даже если p.stdout закрыт, когда Popen() собирает мусор: Я предпочитаю явный простой и детерминированный out.close() (хотя вместо этого мне следовало использовать with out: в потоке - похоже, он работает на Python 2.7 и Python 3).

jfs 18.02.2015 05:08

Это решение не подходит для систем майских кодов (системы с 16-48 ядрами). GIL стал играть роль переключателя контекста.

Anton Medvedev 05.06.2015 11:13

Лучше использовать неблокирующий ввод-вывод, как описано ниже.

Anton Medvedev 05.06.2015 11:14

@AntonMedvedev: 1. Неважно, сколько там процессоров. Речь идет о вводе-выводе (Python выпускает GIL при блокировании операций ввода-вывода). 2. На момент ответа альтернативы потокам в stdlib для переносимого решения не было. И это спорно IO ли блокирующая лучше (безусловно), чем потоки. Разумный подход - изучить компромиссы в вашем конкретном случае.

jfs 05.06.2015 13:27

вы пишете «Windows и Linux», это исключает OSX? Я просто попробовал это на OSX с запуском ffmpeg, который, похоже, имеет проблемы. Я собираюсь протестировать это более подробно, если только вы не скажете мне, что это не может работать в OSX.

P.R. 25.06.2015 02:51

@ P.R .: OP спрашивает об этих операционках, поэтому они прямо упоминаются. Он должен работать и на OS X.

jfs 25.06.2015 02:54

@nights: "У меня не работает" не очень информативен. Создайте минимальный пример кода, опишите словами, что вы ожидаете получить и что вы получите вместо этого, шаг за шагом, какая у вас ОС, версия Python и опубликуйте это как отдельный вопрос.

jfs 16.01.2017 14:03

Не отключайте потоки для многопроцессорности и многопроцессорности. Очередь с этим ответом. Завершение подпроцесса привело к тому, что курсор stdout переместился для меня в начало экрана.

Bryce Guinta 21.06.2017 20:02

@jfs - неблокирующий ввод-вывод должен быть асинхронным?

uzay95 26.10.2017 09:17

@ uzay95 Я не совсем понимаю, о чем вы спрашиваете. Эти понятия тесно связаны. Ответ показывает, как реализовать неблокирующее чтение поверх синхронного блокирующего вызова readline (). Интерфейс также асинхронный (во время ввода-вывода могут происходить другие вещи).

jfs 27.10.2017 15:02

Мне кажется, что я натыкаюсь на эту ветку примерно раз в месяц ... Я до сих пор не понимаю, почему утверждение «fcntl не поможет в этом случае». Я, должно быть, реализовал это 10-20 раз к настоящему времени, используя fnctl для установки os.O_NONBLOCKos.read(), а не readline()). В целом кажется, что все работает, как ожидалось.

cheshirekow 25.07.2018 19:18

@cheshirekow поддерживает ли Windows fcntl? Есть ли readline () на Python 2 поддерживает неблокирующий режим?

jfs 25.07.2018 19:44

@jfs не уверен насчет окон ... ты знаешь? поэтому ты сказал, что это не поможет? Также обратите внимание, что я сказал с os.read(), а не с readline().

cheshirekow 04.08.2018 02:01
os.read() доступен в Windows, но os.O_NONBLOCK задокументирован как специфичный для Unix, поэтому я не ожидал, что он там будет работать.
Christopher Barber 11.08.2018 23:27

Когда я попытался использовать предоставленную логику в ответ, данные stderr не смогли их получить.

Karthi1234 23.01.2019 16:01

Похоже, данные stderr также собираются в stdout

Karthi1234 23.01.2019 16:08

@ Karthi1234 нет, stdout в ответе идет в трубу. stderr здесь не идет в трубу. Другое дело ваш код.

jfs 23.01.2019 18:52

есть ли причина, по которой библиотека docs.python.org/3/library/asyncio-api-index.html не упоминается или не используется? Разве это не годится для этого?

Charlie Parker 01.03.2019 22:26

@CharlieParker прокрутите вниз до мой другой ответ

jfs 01.03.2019 22:28

Я добавляю эту задачу, чтобы прочитать какой-то подпроцесс. Открыть stdout. Вот мое неблокирующее решение для чтения:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

Согласно документы, fcntl не работает в Windows.

n611x007 09.09.2013 11:13

@anatolytechtonik используйте вместо этого msvcrt.kbhit()

cat 22.02.2016 06:43

Вы можете сделать это очень легко в Скрученный. В зависимости от вашей существующей базы кода, это может быть не так просто в использовании, но если вы создаете искаженное приложение, такие вещи становятся почти тривиальными. Вы создаете класс ProcessProtocol и переопределяете метод outReceived(). Twisted (в зависимости от используемого реактора) обычно представляет собой просто большой цикл select() с установленными обратными вызовами для обработки данных из разных файловых дескрипторов (часто сетевых сокетов). Таким образом, метод outReceived() просто устанавливает обратный вызов для обработки данных, поступающих от STDOUT. Вот простой пример, демонстрирующий это поведение:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

В Скрученная документация есть полезная информация по этому поводу.

Если вы создаете все свое приложение на основе Twisted, асинхронная связь с другими процессами, локальными или удаленными, становится действительно элегантной. С другой стороны, если ваша программа не построена на Twisted, это не принесет особой пользы. Надеюсь, это может быть полезно для других читателей, даже если это не применимо к вашему конкретному приложению.

не хорошо. select не должен работать в окнах с файловыми дескрипторами, согласно документы

n611x007 09.09.2013 11:10

@naxa Я не думаю, что select(), о котором он говорит, - это то же самое, что и вы. Я предполагаю это, потому что Twisted работает в Windows ...

notbad.jpeg 30.09.2013 01:25

Я добавил аналогичное решение на основе asyncio из stdlib.

jfs 20.12.2013 09:51

«Скрученный (в зависимости от используемого реактора) - это обычно просто большой цикл select ()» означает, что есть несколько реакторов, между которыми можно выбирать. select() - самый портативный для unix и unix-like, но есть также два реактора для Windows: twistedmatrix.com/documents/current/core/howto/…

clacke 18.04.2016 22:52

Отказ от ответственности: это работает только для торнадо

Вы можете сделать это, установив для fd неблокирующий режим, а затем используя ioloop для регистрации обратных вызовов. Я упаковал это в яйцо под названием tornado_subprocess, и вы можете установить его через PyPI:

easy_install tornado_subprocess

теперь вы можете сделать что-то вроде этого:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

вы также можете использовать его с RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

Спасибо за приятную фичу! Чтобы прояснить, почему мы не можем просто использовать threading.Thread для создания новых неблокирующих процессов? Я использовал его в on_message экземпляра веб-сокета Tornado, и он отлично справился со своей задачей.

VisioN 27.11.2012 02:46

Зарезка потоков в торнадо в основном не рекомендуется. они подходят для небольших, непродолжительных функций. Об этом можно прочитать здесь: stackoverflow.com/questions/7846323/tornado-web-and-threadsgithub.com/facebook/tornado/wiki/Threading-and-concurrency

Vukasin Toroman 02.12.2012 02:25

@VukasinToroman, ты действительно спас меня этим. Большое спасибо за модуль подпроцесса торнадо :)

James Gentes 23.05.2013 23:14

это работает в окнах? (обратите внимание, что select с файловыми дескрипторами, не)

n611x007 09.09.2013 11:11

Эта библиотека не использует вызов select. Я не пробовал это под Windows, но у вас, вероятно, возникнут проблемы, поскольку библиотека использует модуль fcntl. Вкратце: нет, это, вероятно, не будет работать под Windows.

Vukasin Toroman 10.09.2013 13:41

Существующие решения у меня не работали (подробности ниже). Что в итоге сработало, так это реализовать readline с помощью read (1) (на основе этот ответ). Последний не блокирует:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Почему существующие решения не сработали:

  1. Решения, для которых требуется строка чтения (включая решения на основе очереди), всегда блокируются. Трудно (невозможно?) Убить поток, выполняющий readline. Он уничтожается только тогда, когда завершается процесс, создавший его, но не когда завершается процесс, производящий выходные данные.
  2. Смешивание низкоуровневого fcntl с высокоуровневыми вызовами readline может работать некорректно, как указал anonnn.
  3. Использование select.poll () удобно, но не работает в Windows согласно документации python.
  4. Использование сторонних библиотек кажется излишним для этой задачи и добавляет дополнительные зависимости.

1. q.get_nowait() из моего ответа никогда не должен блокировать, в этом смысл его использования. 2. Поток, выполняющий строку чтения (enqueue_output() функция), завершается при EOF, например, включая случай, когда процесс создания вывода завершается. Если вы верите, что это не так; пожалуйста, предоставьте полный пример минимального кода, который показывает иное (возможно, как новый вопрос).

jfs 03.04.2013 13:32

@sebastian Я потратил час или больше, пытаясь придумать минимальный пример. В конце концов, я должен согласиться, что ваш ответ касается всех случаев. Я предполагаю, что раньше у меня это не сработало, потому что, когда я пытался убить процесс создания вывода, он уже был убит и выдал ошибку, которую трудно отладить. Час был потрачен не зря, потому что, придумывая минимальный пример, я мог придумать более простое решение.

Vikram Pudi 05.04.2013 14:11

Не могли бы вы опубликовать и более простое решение? :) (если он отличается от Себастьяна)

n611x007 09.09.2013 11:33

@ dangerous89: Думаю, dcmpid = myprocess.

ViFI 18.11.2016 19:00

В состоянии после вызова read () (сразу после while True): out никогда не будет пустой строкой, потому что вы читаете как минимум строку / байты длиной 1.

sergzach 05.03.2019 22:52

Я создал библиотеку на основе Решение Дж. Ф. Себастьяна. Вы можете использовать это.

https://github.com/cenkalti/what

Обновлено: эта реализация все еще блокируется. Вместо этого используйте отвечать Дж. Ф. Себастьяна.

Я попробовал главный ответ, но дополнительный риск и поддержка кода потока вызывали беспокойство.

Просматривая io модуль (и ограничиваясь 2.6), я нашел BufferedReader. Это мое неблокирующее решение без потоков.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

вы пробовали for line in iter(p.stdout.readline, ""): # do stuff with the line? Он не имеет потоков (однопоточный) и блокируется, когда ваш код блокируется.

jfs 10.11.2013 07:20

@ j-f-sebastian Да, в конце концов я вернулся к твоему ответу. Моя реализация все еще иногда блокируется. Я отредактирую свой ответ, чтобы предупредить других, чтобы они не шли по этому пути.

romc 27.11.2013 20:22

Вот мой код, используемый для быстрого перехвата каждого вывода подпроцесса, включая частичные строки. Он качает одновременно и stdout, и stderr в почти правильном порядке.

Протестировано и корректно работает на Python 2.7 linux и windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"] = "1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR= = ", line, "=? = "
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

Один из немногих ответов, которые позволяют читать то, что не обязательно заканчивается новой строкой.

totaam 26.11.2014 00:40

Работая с ответом Дж.Ф. Себастьяна и несколькими другими источниками, я собрал простой диспетчер подпроцессов. Он обеспечивает запрос неблокирующего чтения, а также параллельное выполнение нескольких процессов. Он не использует никаких вызовов, специфичных для ОС (насколько я знаю), и поэтому должен работать где угодно.

Он доступен из pypi, поэтому просто pip install shelljob. Обратитесь к страница проекта за примерами и полными документами.

Python 3.4 представляет новый предварительный API для асинхронного ввода-вывода - asyncio модуль.

Подход аналогичен Ответ на основе twisted от @Bryan Ward - определите протокол, и его методы вызываются, как только данные будут готовы:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

См. «Подпроцесс» в документации.

Существует высокоуровневый интерфейс asyncio.create_subprocess_exec(), который возвращает Process объекты, что позволяет читать строку асинхронно с использованием StreamReader.readline() сопрограмма. (с async / await Синтаксис Python 3.5+):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() выполняет следующие задачи:

  • запустить подпроцесс, перенаправить его стандартный вывод в канал
  • асинхронно читать строку из stdout подпроцесса
  • убить подпроцесс
  • подождите, пока он выйдет

При необходимости каждый шаг может быть ограничен таймаутом в секундах.

Когда я пробую что-то подобное с помощью сопрограмм python 3.4, я получаю вывод только после того, как весь скрипт будет запущен. Я хотел бы, чтобы строка вывода была напечатана, как только подпроцесс напечатает строку. Вот что у меня есть: pastebin.com/qPssFGep.

flutefreak7 14.01.2016 20:06

@ flutefreak7: проблемы с буферизацией не имеют отношения к текущему вопросу. Перейдите по ссылке, чтобы узнать о возможных решениях.

jfs 14.01.2016 20:12

Благодарность! Решил проблему для моего скрипта, просто используя print(text, flush=True), чтобы напечатанный текст был немедленно доступен наблюдателю, вызывающему readline. Когда я тестировал его с исполняемым файлом на основе Fortran, я действительно хотел обернуть / посмотреть, он не буферизует его вывод, поэтому ведет себя так, как ожидалось.

flutefreak7 14.01.2016 22:34

Можно ли разрешить подпроцессу сохраняться и выполнять дальнейшие операции чтения / записи. readline_and_kill во втором сценарии работает очень похоже на subprocess.comunicate в том смысле, что завершает процесс после одной операции чтения / записи. Я также вижу, что вы используете одиночный канал, stdout, который обрабатывается подпроцессом как неблокирующий. Пытаюсь использовать как stdout, так и stderrЯ нахожу, что в конечном итоге блокирую.

Carel 11.05.2017 13:24

@Carel код в ответе работает так, как задумано, как явно описано в ответе. При желании можно реализовать другое поведение. Оба канала одинаково неблокируемы, если используются, вот пример как читать из обоих каналов одновременно.

jfs 11.05.2017 17:37

зачем заморачиваться потоком очереди? В отличие от readline (), BufferedReader.read 1 () не блокирует ожидание \ r \ n, он возвращает как можно скорее, если есть какие-либо выходные данные.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

Вернется ли он как можно скорее, если ничего не будет? Если этого не происходит, происходит блокировка.

Mathieu Pagé 25.01.2015 19:02

@ Матье Паж прав. read1 заблокирует первый базовый блок чтения, что происходит, когда канал все еще открыт, но вход недоступен.

Jack O'Connor 22.08.2015 02:14

В моем случае мне понадобился модуль регистрации, который улавливает вывод фоновых приложений и дополняет его (добавляя отметки времени, цвета и т. д.).

В итоге я получил фоновый поток, который выполняет фактический ввод-вывод. Следующий код предназначен только для платформ POSIX. Я снял несущественные части.

Если кто-то собирается использовать этого зверя в течение длительного времени, подумайте об управлении открытыми дескрипторами. В моем случае это не было большой проблемой.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

Вот модуль, который поддерживает неблокирующее чтение и фоновую запись в python:

https://pypi.python.org/pypi/python-nonblock

Предоставляет функцию,

nonblock_read, который будет читать данные из потока, если они доступны, в противном случае возвращает пустую строку (или None, если поток закрыт на другой стороне и все возможные данные были прочитаны)

Вы также можете рассмотреть модуль python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

который добавляет к модулю подпроцесса. Поэтому к объекту, возвращаемому из "subprocess.Popen", добавляется дополнительный метод runInBackground. Это запускает поток и возвращает объект, который будет автоматически заполнен, когда материал будет записан в stdout / stderr, без блокировки вашего основного потока.

Наслаждаться!

Я хотел бы попробовать этот модуль неблокирующий, но я относительно новичок в некоторых процедурах Linux. Как именно мне установить эти подпрограммы? Я использую Raspbian Jessie, разновидность Debian Linux для Raspberry Pi. Я попробовал sudo apt-get install nonblock и python-nonblock, и оба вызвали ошибку - не найдено. Я скачал zip-файл с сайта pypi.python.org/pypi/python-nonblock, но не знаю, что с ним делать. Спасибо .... РДК

RDK 01.09.2017 21:27

Добавление этого ответа здесь, поскольку он предоставляет возможность устанавливать неблокирующие каналы в Windows и Unix.

Все детали ctypes - спасибо Ответ @ techtonik.

Существует немного измененная версия, которая может использоваться как в системах Unix, так и в Windows.

  • Python3-совместимый (требуется лишь незначительное изменение).
  • Включает версию posix и определяет исключение для использования в любом из них.

Таким образом, вы можете использовать одну и ту же функцию и исключение для кода Unix и Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Чтобы избежать чтения неполных данных, я написал свой собственный генератор строки чтения (который возвращает байтовую строку для каждой строки).

Это генератор, так что вы можете, например ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

(1) этот комментарий указывает, что readline() не работает с неблокирующими каналами (например, установленными с помощью fcntl) на Python 2 - как вы думаете, это больше не правильно? (мой ответ содержит ссылку (fcntl), которая предоставляет ту же информацию, но теперь кажется удаленной). (2) Посмотрите, как multiprocessing.connection.Pipe использует SetNamedPipeHandleState.

jfs 29.01.2016 12:53

Я тестировал это только на Python3. Но эту информацию тоже видели и ожидаем, что она останется в силе. Я также написал свой собственный код для использования вместо строки чтения, я обновил свой ответ, включив его.

ideasman42 29.01.2016 14:54

Эта версия неблокирующего чтения не требует специальных модулей и будет работать "из коробки" в большинстве дистрибутивов Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

Это пример запуска интерактивной команды в подпроцессе, а стандартный вывод является интерактивным с использованием псевдотерминала. Вы можете обратиться к: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

Моя проблема немного отличается, поскольку я хотел собрать как stdout, так и stderr из запущенного процесса, но в конечном итоге то же самое, поскольку я хотел отобразить вывод в виджете в том виде, в каком он был сгенерирован.

Я не хотел прибегать ко многим из предлагаемых обходных путей с использованием очередей или дополнительных потоков, поскольку они не должны быть необходимы для выполнения такой общей задачи, как запуск другого скрипта и сбор его вывода.

Прочитав предлагаемые решения и документы Python, я решил свою проблему с помощью приведенной ниже реализации. Да, это работает только для POSIX, поскольку я использую вызов функции select.

Я согласен с тем, что документация сбивает с толку, а реализация неудобна для такой типичной задачи по написанию сценариев. Я считаю, что более старые версии python имеют разные значения по умолчанию для Popen и разные объяснения, что создает большую путаницу. Кажется, это хорошо работает как для Python 2.7.12, так и для 3.5.2.

Ключ состоял в том, чтобы настроить bufsize=1 для буферизации строк, а затем universal_newlines=True для обработки как текстового файла, а не двоичного файла, который, кажется, становится значением по умолчанию при настройке bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG и VERBOSE - это просто макросы, которые выводят вывод на терминал.

Это решение IMHO на 99,99% эффективно, поскольку оно все еще использует функцию блокировки readline, поэтому мы предполагаем, что подпроцесс хорош и выводит полные строки.

Я приветствую отзывы об улучшении решения, поскольку я все еще новичок в Python.

В этом конкретном случае вы можете установить stderr = subprocess.STDOUT в конструкторе Popen и получить весь вывод из cmd.stdout.readline ().

Aaron 23.01.2018 22:48

Хороший наглядный пример. Возникли проблемы с select.select (), но это решило их для меня.

maharvey67 04.10.2019 04:41

Это решение использует модуль select для «чтения любых доступных данных» из потока ввода-вывода. Эта функция сначала блокируется до тех пор, пока данные не станут доступными, но затем считывает только те данные, которые доступны, и не блокируется дальше.

Учитывая тот факт, что он использует модуль select, это работает только в Unix.

Код полностью совместим с PEP8.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

Я также столкнулся с проблемой, описанной Джесси, и решил ее, используя "select", как это делали Брэдли, Энди и другие, но в режиме блокировки, чтобы избежать цикла занятости. Он использует фиктивную трубу в качестве поддельного стандартного ввода. Блоки select и ждут, пока будут готовы либо стандартный ввод, либо канал. Когда клавиша нажата, stdin разблокирует выбор, и значение клавиши можно получить с помощью read (1). Когда другой поток записывает в канал, канал разблокирует выбор, и это можно рассматривать как указание на то, что потребность в stdin отпала. Вот некоторый справочный код:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

ПРИМЕЧАНИЕ. Для того, чтобы эта функция работала в Windows, трубу следует заменить на сокет. Пока не пробовал, но по документации должно работать.

gonzaedu61 07.04.2018 10:56

У меня есть проблема с исходным запросом, но я не хотел вызывать потоки. Я смешал решение Джесси с прямым read() из канала и моим собственным обработчиком буфера для чтения строк (однако мой подпроцесс - ping - всегда записывал полные строки <системного размера страницы). Я избегаю ожидания, если читаю только в часах io, зарегистрированных в gobject. В наши дни я обычно запускаю код в MainLoop gobject, чтобы избежать потоков.

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

Наблюдатель

def watch(f, *other):
    print 'reading',f.read()
    return True

И основная программа устанавливает пинг, а затем вызывает почтовый цикл gobject.

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

Любая другая работа связана с обратными вызовами в gobject.

В современном Python дела обстоят намного лучше.

Вот простая дочерняя программа hello.py:

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

И программа для взаимодействия с ним:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

Это распечатывает:

b'hello bob\n'
b'hello alice\n'

Обратите внимание, что фактический шаблон, который также присутствует почти во всех предыдущих ответах, как здесь, так и в связанных вопросах, заключается в том, чтобы установить дескриптор файла stdout дочернего элемента на неблокирующий, а затем опросить его в некотором цикле выбора. В наши дни, конечно, этот цикл обеспечивается asyncio.

Попробуйте мы ожидаем, альтернативу ожидание для Windows.

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()

В Unix-подобных системах и Python 3.5+ есть os.set_blocking, который делает именно то, что говорит.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

Это выводит:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

os.set_blocking прокомментировал это:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''

Это, безусловно, самое элегантное решение, спасибо, что сделали мой день (на самом деле ночь ^^)

Coronon 14.11.2020 05:49

Очень элегантно и очень эффективно. Спасибо за это решение, оно отлично работает!

gromain 16.11.2020 16:42

Спасибо! Это отлично работает при использовании каналов Popen с Selector, чтобы убедиться, что он никогда не заблокируется.

jlh 27.01.2021 20:32

Вот простое решение, основанное на потоках, которые:

  • работает как в Linux, так и в Windows (не полагаясь на select).
  • читает как stdout, так и stderr асинхронно.
  • не полагается на активный опрос с произвольным временем ожидания (удобен для ЦП).
  • не использует asyncio (что может конфликтовать с другими библиотеками).
  • выполняется до завершения дочернего процесса.

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

Другие вопросы по теме