Детали реализации разделителя потоков

Изучая исходный код WrappingSpliterator::trySplit, я был очень введен в заблуждение его реализацией:

    @Override
    public Spliterator<P_OUT> trySplit() {
        if (isParallel && buffer == null && !finished) {
            init();

            Spliterator<P_IN> split = spliterator.trySplit();
            return (split == null) ? null : wrap(split);
        }
        else
            return null;
    }

И если вам интересно, почему это важно, то, например, это:

Arrays.asList(1,2,3,4,5)
      .stream()
      .filter(x -> x != 1)
      .spliterator();

использует его. Насколько я понимаю, добавление любой промежуточной операции к потоку вызовет запуск этого кода.

В основном этот метод говорит о том, что если поток не является параллельным, рассматривайте этот Spliterator как тот, который вообще не может быть разделен. И это важно для меня. В одном из моих методов (вот как я добрался до этого кода) я получаю Stream в качестве входных данных и «анализирую» его на более мелкие части вручную с помощью trySplit. Например, вы можете подумать, что я пытаюсь сделать findLast из Stream.

И здесь мое желание разбиться на более мелкие куски обрывается, потому что как только я это делаю:

Spliterator<T> sp = stream.spliterator();
Spliterator<T> prefixSplit = sp.trySplit();

Я узнаю, что prefixSplit — это null, а это означает, что я в принципе не могу ничего сделать, кроме как потреблять все sp вместе с forEachRemaning.

И это немного странно, может быть имеет смысл, когда присутствует filter; потому что в этом случае единственный способ (в моем понимании) вернуть Spliterator — это использовать какой-то buffer, может быть, даже с предопределенным размером (очень похоже на Files::lines). Но почему это:

Arrays.asList(1,2,3,4)
      .stream()
      .sorted()
      .spliterator()
      .trySplit();

возвращает null — это то, чего я не понимаю. sorted — это операция с отслеживанием состояния, которая в любом случае буферизует элементы, фактически не уменьшая и не увеличивая их начальное количество, поэтому, по крайней мере, теоретически это может вернуть что-то отличное от null...

Ну... может потому что поток не параллельный? Очевидный вопрос... Вы пробовали Arrays.asList(1,2,3,4).parallelStream()......?

fps 02.04.2019 22:57

@FedericoPeraltaSchaffner, конечно... :) Но если убрать filter и поток тоже не параллельный, разделение сработает. В документации не сказано, что это должно происходить для параллельного потока.

Eugene 03.04.2019 06:52

из javadoc: * <p>Этот метод может возвращать {@code null} по любой причине, * включая пустоту, невозможность разделения после * начала обхода, ограничения структуры данных и * соображения эффективности.

Wisthler 03.04.2019 08:59

@Wisthler, да, я читал это, просто это кажется странным, наверное.

Eugene 03.04.2019 09:03

Когда вы не связываете промежуточную операцию, spliterator() просто вернет разделитель источника, то есть Arrays.asList(1,2,3,4,5) .stream() .spliterator() .getClass() == Arrays.spliterator(new Integer[] { 1,2,3,4,5 }) .getClass(). Такой разделитель даже не знает, является ли поток параллельным и существует ли вообще поток. И нет, Arrays.asList(1,2,3,4,5) .parallelStream() .filter(x -> x != 1) .spliterator();нет нуждается в буферизации.

Holger 03.04.2019 09:22

@Holger правильно, я видел, что исходный разделитель возвращается в случае отсутствия промежуточных операций, даже WrappingSpliterator «знает» об этом через spliteratorSupplier. Второй момент заключается в том, что я мысль это может быть реализовано с буфером, я посмотрел реализацию сейчас, и вы правы, он разделяет исходный сплитератор... так что вопрос в этом случае будет заключаться в том, что это преднамеренное решение для последовательный поток? вы случайно не знаете причин этого? Спасибо

Eugene 03.04.2019 09:33

Конечно, это обдуманное решение, так как кто-то вставил isParallel в строку if (isParallel && …. Есть ли причина (и я полагаю, вы имеете в виду «уважительную причину») для этого — другой вопрос. Я не вижу никакого преимущества в этом ограничении.

Holger 03.04.2019 09:51
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
6
7
132
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Когда вы вызываете spliterator() на Stream, есть только два возможных результата с текущей реализацией.

Если в потоке нет промежуточных операций, вы получите исходный сплитератор, который использовался для создания потока и чья возможность разделения полностью не зависит от параллельного состояния потока, так как на самом деле сплитератор ничего не знает о потоке.

В противном случае вы получите WrappingSpliterator, который будет инкапсулировать источник Spliterator и состояние конвейера, выраженное как PipelineHelper. Эта комбинация Spliterator и PipelineHelper не должна работать параллельно и, по сути, не будет работать в случае distinct(), так как WrappingSpliterator получит совершенно другую комбинацию, в зависимости от того, параллельен Поток или нет.

Однако для промежуточных операций без сохранения состояния это не имеет значения. Но, как обсуждалось в «Почему tryAdvance для stream.spliterator() может накапливать элементы в буфере?», WrappingSpliterator — это «универсальная реализация», которая не учитывает реальную природу конвейера, поэтому его ограничения являются надмножеством всех возможных ограничений всех поддерживаемых этапов конвейера. Таким образом, существования одного сценария, который не будет работать при игнорировании флага parallel, достаточно, чтобы запретить разделение для всех конвейеров, когда они не являются parallel.

Другие вопросы по теме