Как мне обрабатывать несколько потоков в Java?

Я пытаюсь запустить процесс и что-то делать с его потоками ввода, вывода и ошибок. Очевидный способ сделать это - использовать что-то вроде select(), но единственное, что я могу найти в Java, который делает это, - это Selector.select(), который принимает Channel. Невозможно получить Channel от InputStream или OutputStream (FileStream имеет метод getChannel(), но здесь это не помогает)

Итак, вместо этого я написал код для опроса всех потоков:

while( !out_eof || !err_eof )
{
    while( out_str.available() )
    {
        if ( (bytes = out_str.read(buf)) != -1 )
        {
            // Do something with output stream
        }
        else
            out_eof = true;
    }
    while( err_str.available() )
    {
        if ( (bytes = err_str.read(buf)) != -1 )
        {
            // Do something with error stream
        }
        else
            err_eof = true;
    }
    sleep(100);
}

который работает, за исключением того, что он никогда не прекращается. Когда один из потоков достигает конца файла, available() возвращает ноль, поэтому read() не вызывается, и мы никогда не получаем возврат -1, который указывал бы на EOF.

Одним из решений может быть неблокирующий способ обнаружения EOF. Я нигде не вижу ни одного в документации. Или есть лучший способ делать то, что я хочу делать?

Я вижу вот этот вопрос: текст ссылки и хотя это не совсем то, что я хочу, я, вероятно, смогу использовать эту идею создания отдельных потоков для каждого потока для конкретной проблемы, которая у меня сейчас есть. Но ведь это не единственный способ сделать это? Конечно, должен быть способ читать из нескольких потоков без использования потока для каждого?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
5
0
6 171
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Как вы сказали, решение изложено в этом ответе - это традиционный способ чтения как stdout, так и stderr из процесса. Можно использовать поток за потоком, даже если это немного раздражает.

Вам действительно нужно будет пойти по пути создания потока для каждого потока, который вы хотите отслеживать. Если ваш вариант использования позволяет комбинировать как stdout, так и stderr рассматриваемого процесса, вам нужен только один поток, в противном случае необходимы два.

Мне потребовалось некоторое время, чтобы исправить это в одном из наших проектов, где мне нужно запустить внешний процесс, взять его результат и что-то с ним сделать, в то же время ища ошибки и завершение процесса, а также иметь возможность завершить его. когда пользователь java-приложения отменяет операцию.

Я создал довольно простой класс для инкапсуляции наблюдающей части, чей метод run () выглядит примерно так:

public void run() {
    BufferedReader tStreamReader = null;
    try {
        while (externalCommand == null && !shouldHalt) {
            logger.warning("ExtProcMonitor("
                           + (watchStdErr ? "err" : "out")
                           + ") Sleeping until external command is found");
            Thread.sleep(500);
        }
        if (externalCommand == null) {
            return;
        }
        tStreamReader =
                new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream()
                        : externalCommand.getInputStream()));
        String tLine;
        while ((tLine = tStreamReader.readLine()) != null) {
            logger.severe(tLine);
            if (filter != null) {
                if (filter.matches(tLine)) {
                    informFilterListeners(tLine);
                    return;
                }
            }
        }
    } catch (IOException e) {
        logger.logExceptionMessage(e, "IOException stderr");
    } catch (InterruptedException e) {
        logger.logExceptionMessage(e, "InterruptedException waiting for external process");
    } finally {
        if (tStreamReader != null) {
            try {
                tStreamReader.close();
            } catch (IOException e) {
                // ignore
            }
        }
    }
}

На вызывающей стороне это выглядит так:

    Thread tExtMonitorThread = new Thread(new Runnable() {

        public void run() {
            try {
                while (externalCommand == null) {
                    getLogger().warning("Monitor: Sleeping until external command is found");
                    Thread.sleep(500);
                    if (isStopRequested()) {
                        getLogger()
                                .warning("Terminating external process on user request");
                        if (externalCommand != null) {
                            externalCommand.destroy();
                        }
                        return;
                    }
                }
                int tReturnCode = externalCommand.waitFor();
                getLogger().warning("External command exited with code " + tReturnCode);
            } catch (InterruptedException e) {
                getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit");
            }
        }
    }, "ExtCommandWaiter");

    ExternalProcessOutputHandlerThread tExtErrThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true);
    ExternalProcessOutputHandlerThread tExtOutThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true);
    tExtMonitorThread.start();
    tExtOutThread.start();
    tExtErrThread.start();
    tExtErrThread.setFilter(new FilterFunctor() {

        public boolean matches(Object o) {
            String tLine = (String)o;
            return tLine.indexOf("Error") > -1;
        }
    });

    FilterListener tListener = new FilterListener() {
        private boolean abortFlag = false;

        public boolean shouldAbort() {
            return abortFlag;
        }

        public void matched(String aLine) {
            abortFlag = abortFlag || (aLine.indexOf("Error") > -1);
        }

    };

    tExtErrThread.addFilterListener(tListener);
    externalCommand = new ProcessBuilder(aCommand).start();
    tExtErrThread.setProcess(externalCommand);
    try {
        tExtMonitorThread.join();
        tExtErrThread.join();
        tExtOutThread.join();
    } catch (InterruptedException e) {
        // when this happens try to bring the external process down 
        getLogger().severe("Aborted because auf InterruptedException.");
        getLogger().severe("Killing external command...");
        externalCommand.destroy();
        getLogger().severe("External command killed.");
        externalCommand = null;
        return -42;
    }
    int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue();

    externalCommand = null;
    try {
        getLogger().warning("command exit code: " + tRetVal);
    } catch (IllegalThreadStateException ex) {
        getLogger().warning("command exit code: unknown");
    }
    return tRetVal;

К сожалению, мне не нужно использовать автономный исполняемый пример, но, возможно, это поможет. Если бы мне пришлось сделать это снова, я бы еще раз посмотрел на использование метода Thread.interrupt () вместо самодельного флага остановки (не забудьте объявить его нестабильным!), Но я оставлю это на другой раз. :)

Другие вопросы по теме