Как безопасно использовать потоки Java без методов isFinite() и isOrdered()?

Существует вопрос о том, должны ли методы Java возвращать Коллекции или потоки, на который Брайан Гетц отвечает, что даже для конечных последовательностей обычно следует отдавать предпочтение потокам.

Но мне кажется, что в настоящее время многие операции над потоками, которые приходят из других мест, не могут быть безопасно выполнены, а защитные кодовые охранники невозможны, потому что потоки не раскрывают, являются ли они бесконечными или неупорядоченными.

Если параллельность была проблемой для операций, которые я хочу выполнить в Stream(), я могу вызвать isParallel() для проверки или последовательности, чтобы убедиться, что вычисления выполняются параллельно (если я не забыл).

Но если упорядоченность или конечность (размерность) имеют отношение к безопасности моей программы, я не могу писать меры безопасности.

Предполагая, что я использую библиотеку, реализующую этот фиктивный интерфейс:

public interface CoordinateServer {
    public Stream<Integer> coordinates();
    // example implementations:
    // finite, ordered, sequential
    // IntStream.range(0, 100).boxed()
    // final AtomicInteger atomic = new AtomicInteger();
    
    // // infinite, unordered, sequential
    // Stream.generate(() -> atomic2.incrementAndGet()) 

    // infinite, unordered, parallel
    // Stream.generate(() -> atomic2.incrementAndGet()).parallel()
    
    // finite, ordered, sequential, should-be-closed
    // Files.lines(Path.path("coordinates.txt")).map(Integer::parseInt)
}

Тогда какие операции я могу безопасно вызывать в этом потоке, чтобы написать правильный алгоритм?

Кажется, если я, возможно, захочу записать элементы в файл в качестве побочного эффекта, мне нужно беспокоиться о параллельности потока:

// if stream is parallel, which order will be written to file?
coordinates().peek(i -> {writeToFile(i)}).count();
// how should I remember to always add sequential() in  such cases?

А также, если он параллелен, на основе какого пула потоков он параллелен?

Если я хочу отсортировать поток (или другие операции без короткого замыкания), мне как-то нужно быть осторожным с тем, что он бесконечен:

coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
coordinates().allMatch(x -> x > 0); // will this terminate?

Я могу ввести ограничение перед сортировкой, но какое это должно быть магическое число, если я ожидаю конечный поток неизвестного размера?

Наконец, может быть, я хочу вычислить параллельно, чтобы сэкономить время, а затем собрать результат:

// will result list maintain the same order as sequential?
coordinates().map(i -> complexLookup(i)).parallel().collect(toList());

Но если поток не упорядочен (в этой версии библиотеки), то результат может быть искажен из-за параллельной обработки. Но как я могу защититься от этого, кроме как не использовать параллель (что противоречит цели производительности)?

Коллекции явно указывают, являются ли они конечными или бесконечными, имеют порядок или нет, и они не несут с собой режим обработки или пулы потоков. Это кажется ценным свойством для API.

Кроме того, Потоки иногда нужно закрывать, но чаще всего нет. Если я использую поток из метода (или из параметра метода), должен ли я обычно вызывать close?

Кроме того, потоки могли быть уже потреблены, и было бы хорошо иметь возможность корректно обрабатывать этот случай, поэтому было бы хорошо использовать проверить, не был ли уже использован поток;

Я бы хотел, чтобы какой-нибудь фрагмент кода можно было использовать для проверки предположений о потоке перед его обработкой, например >

Stream<X> stream = fooLibrary.getStream();
Stream<X> safeStream = StreamPreconditions(
    stream, 
    /*maxThreshold or elements before IllegalArgumentException*/
    10_000,
    /* fail with IllegalArgumentException if not ordered */
    true
    )

Я думаю, многое зависит от этого. Подождал бы, пока кто-то вроде Хольгера ответит на этот вопрос, если не считать его широким.

Naman 10.06.2019 18:05

Я думаю, вы могли бы использовать для этого характеристики потока - см. вопрос это для более подробной информации.

Oleksandr Pyrohov 10.06.2019 18:23

Спасибо, я не знал о характеристиках сплиттератора. Тем не менее, они не похожи на что-то, что можно использовать в программировании приложений (скорее на детали реализации Stream).

tkruse 11.06.2019 01:38

возможно, это ответит на некоторые из ваших вопросов baeldung.com/java-поток-заказ

JavaMan 24.06.2019 13:04

Как говорит сам Брайан в опубликованном вами ответе: «Единственный случай, когда вы должны вернуть коллекцию, - это когда существуют строгие требования согласованности». Требование конечности является одним из них.

daniu 08.07.2019 07:08

@daniu: Но он прямо предлагает использовать Streams для конечных данных, а это означает, что конечность не является для него строгим требованием согласованности.

tkruse 08.07.2019 07:28

Если только вы не предполагаете, что автор метода знает все варианты использования (текущее и будущее) метода и, таким образом, знает, что для клиентов конечность не является требованием согласованности.

tkruse 08.07.2019 07:29
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
23
7
444
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Посмотрев немного на вещи (некоторые эксперименты и здесь), насколько я вижу, нет способа точно узнать, является ли поток конечным или нет.

Более того, иногда даже он не определяется, кроме как во время выполнения (например, в java 11 — IntStream.generate(() -> 1).takeWhile(x -> externalCondition(x))).

Что вы можете сделать, это:

  1. Вы можете с уверенностью узнать, является ли оно конечным, несколькими способами (обратите внимание, что получение false для них не означает, что оно бесконечно, а только то, что это может быть так):

    1. stream.spliterator().getExactSizeIfKnown() - если у этого есть известный точный размер, он конечен, иначе он вернет -1.

    2. stream.spliterator().hasCharacteristics(Spliterator.SIZED) - если это так, SIZED вернет true.

  2. Вы можете обезопасить себя, предполагая худшее (зависит от вашего случая).

    1. stream.sequential()/stream.parallel() - явно укажите предпочитаемый тип потребления.
    2. С потенциально бесконечным потоком предположите наихудший случай для каждого сценария.

      1. Например, предположим, что вы хотите прослушать поток твитов, пока не найдете один по Венкат — это потенциально бесконечная операция, но вы хотели бы подождать, пока такой твит не будет найден. Так что в этом случае просто нажмите stream.filter(tweet -> isByVenkat(tweet)).findAny() — он будет повторяться до тех пор, пока не появится такой твит (или навсегда).
      2. Другой сценарий, и, возможно, более распространенный, заключается в желании сделать что-то со всеми элементами или попробовать только определенное время (аналогично тайм-ауту). Для этого я бы рекомендовал всегда вызывать stream.limit(x) перед вызовом вашей операции (collect или allMatch или аналогичной), где x — количество попыток, которые вы готовы терпеть.

После всего этого я просто упомяну, что я думаю, что возврат потока, как правило, не очень хорошая идея, и я бы постарался избежать этого, если нет больших преимуществ.

Я думаю, что .splititerator() - это метод, а не общедоступное поле. Также вы можете скопировать проверку SIZED для ORDERED, я думаю? Я думаю, что должна быть возможность иметь счетчик для элементов по мере обработки потока, чтобы даже для потенциально бесконечных потоков он мог генерировать исключение, если испускается больше элементов, чем я максимально ожидал (конечно, за счет производительности). Еще хороший ответ до сих пор.

tkruse 26.06.2019 12:01

сплитератор - правильно. упорядочено — проблема в том, что его можно заказать только в том случае, если оно конечно, иначе это займет вечность (например, Stream.generate(random::nextInt).sorted() вызовет предупреждение intellij), поэтому проверка на упорядоченность немного избыточна. Вместо того, чтобы держать счетчик и вкл. сами, почему бы не использовать limit(x) по максимуму?

orirab 26.06.2019 12:09

Ограничение не говорит вам, что их было больше. Например, вызов Макса в очень длинном потоке, который может быть бесконечным, безопаснее генерировать исключение, чем возвращать неправильный номер.

tkruse 26.06.2019 14:16

Я не слишком уверен в этом - это очень зависит от вашего варианта использования, но я понимаю вашу точку зрения.

orirab 26.06.2019 14:22

Если это подходящий ответ, не могли бы вы принять его?

orirab 26.06.2019 23:02

Другие вопросы по теме