Как непрерывно просматривать самые низкие элементы из списка отсортированных потоков

Я начал изучать Java Stream, и мне интересно, можно ли только просмотреть первый элемент потока, не извлекая его.

Например, у меня есть несколько потоков, и в каждом из них есть целые числа, отсортированные в неубывающем порядке, и я хочу получить отсортированный список всех целых чисел, поэтому я думаю об использовании PrioirtyQueue<Stream>, отсортированного в неубывающем порядке. также.

Однако, чтобы заставить PrioirtyQueue<Stream> сортировать потоки, мне нужно передать компаратор для потока, чтобы сравнить потоки по их первому элементу, и я не уверен, как просмотреть первый элемент в каждом потоке.

Например, у меня есть следующие потоки.

[1, 2, 3, 5],
[0, 2, 4, 6]

Я хочу написать функцию getNextInteger(), которая обрабатывает список отсортированные потоки.

Каждый раз, когда я вызываю метод, он возвращает следующее наименьшее целое число, поэтому результатом может быть [0,1,2,2], если я вызову метод 4 раз.

Я хочу использовать PriorityQueue для сортировки потоков по их первому значению, извлекать наименьшее из них и повторно ставить поток в очередь, если он не пуст.

Звучит так, как будто вы ожидаете, что PriorityQueue будет переупорядочивать свое содержимое по мере его изменения. Я не думаю, что стандартные реализации сделают это. Или дело в том, что если первый элемент потока меньше, чем первый элемент другого потока, все элементы первого потока будут меньше, чем все элементы второго потока?

tgdavies 16.05.2022 03:11

Короткий ответ: нет. Это невозможно сделать.

Stephen C 16.05.2022 03:18

обновил вопрос, чтобы быть более точным. Спасибо!

Sammy 16.05.2022 03:32
it returns the next smallest integer - без выполнения любого из этих потоков. Это невозможно. Вам нужен кастомный Iterator, а не очередь потоков.
Alexander Ivanchenko 16.05.2022 03:36

Я понимаю. как будет выглядеть пользовательский Iterator для обработки нескольких потоков?

Sammy 16.05.2022 03:41
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
5
64
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Поток — это средство итерации по источнику данных, предназначенное для обработки данных, а не для их хранения.

Поэтому ваш вопрос в корне некорректен. Краткий ответ: нет.

Это не структура данных, вы не можете получить доступ к элементам в поток так же, как к элементам в List или в Queue.

Взгляните на документация:

Collections and streams, while bearing some superficial similarities, have different goals. Collections are primarily concerned with the efficient management of, and access to, their elements. By contrast, streams do not provide a means to directly access or manipulate their elements, and are instead concerned with declaratively describing their source and the computational operations which will be performed in aggregate on that source.

Как я уже сказал, поток — это средство итерации, но потоковый конвейер также отличается от Iterator. Iterator позволяет извлекать элементы один за другим. И наоборот, потоковый конвейер будет либо выполняться и давать результат (в виде одного значения или набора значений) и закрываться, либо не будет выполняться. Это будет зависеть от того, имеет ли поток терминальную операцию или нет.

Например, этот поток действителен, он нормально скомпилируется, но не будет выполнен:

Stream.of("a", "b", "c").map(String::toUpperCase);

Потому что ему не хватает терминальной операции.

Каждый поток должен иметь источник и один терминальная операция, который инициирует выполнение конвейера и выдает результат. Промежуточные операции, такие как map() и filter(), которые предназначены для преобразования потока, являются необязательными.

Вы не можете получить данные из поток без их обработки. И как только он будет обработан, он больше не может быть использован.

В качестве возможного решения этой проблемы вы можете подумать о том, чтобы обернуть поток объектом, который будет поддерживать отдельно первый элемент из источника потока и сам поток.

public record StreamWrapper(int first, IntStream stream) {}

Такой подход можно использовать, достаточно будет сравнивать потоки по одному значению, которое должно быть извлечено из источника потока (если источник потока позволяет это) одновременно с генерацией потока.


Обновлять

I want to write a function getNextInteger(), that handles a list of sorted streams.

Every time I call the method, it returns the next smallest integer, so the result might be [0,1,2,2] if I call the method 4 times.

Эта задача не подходит для потоков. Разве что можно слепить тот факт, что данные в каждом потоке уже отсортированы.

Если объединить все потоки в один и применить сортировку, это не вызовет гигантского удара по производительности, как могло показаться вначале. Для сортировки поток данных сбрасывает все элементы в массив, который в этом случае будет состоять из отсортированных подмассивов. Поскольку массив ссылочного типа будет отсортирован с использованием Тимсорт, реализация алгоритма обнаружит все эти отсортированные фрагменты. т.е. сортировка массива, состоящего из частично отсортированных подмассивов, — это не то же самое, что сортировка всех этих данных с нуля. Следовательно, мы можем рассматривать его как возможный вариант:

List<Stream<Integer>> streams =
List.of(Stream.of(1, 3), Stream.of(5), Stream.of(2, 6, 7),
        Stream.of(4, 9, 10), Stream.of(8));
        
streams.stream()
    .flatMap(Function.identity())
    .sorted()
    .forEach(num -> System.out.print(num + " "));

Выведет вывод:

1 2 3 4 5 6 7 8 9 10 

Если печать (или сохранение в коллекцию) общих данных, отсортированных в порядке возрастания, не кажется удовлетворительной, и вы настаиваете на извлечении только одного значения в результате вызова метода, я повторю, что невозможно непрерывно извлекать значения одно за другим из поток.

Для этого вам понадобится Iterator, как предлагает документация:

However, if the provided stream operations do not offer the desired functionality, the BaseStream.iterator() and BaseStream.spliterator() operations can be used to perform a controlled traversal.

Вы можете реализовать custom iterator, который будет использовать PriorityQueue под капотом.

Я предполагаю, что потоки относятся к типу, реализующему Comparable, и потоки отсортированы (как в приведенном вами примере).

Итератор:

public class QueueBasedIterator<T extends Comparable<T>> implements Iterator<T> {
    private Queue<IteratorWrapper<T>> nextValues = new PriorityQueue<>();
    private List<Iterator> iterators = new ArrayList<>();
    
    @SafeVarargs
    public StreamBasedIterator(Stream<T>... streams) {
        this.iterators = Stream.of(streams).map(Stream::iterator)
            .collect(Collectors.toList());
        
        for (int i = 0; i < iterators.size(); i++) {
            Iterator<T> iterator = iterators.get(i);
            if (iterator.hasNext()) 
                nextValues.add(new IteratorWrapper<T>(i, iterator.next()));
        }
    }
    
    @Override
    public boolean hasNext() {
        return !nextValues.isEmpty();
    }
    
    @Override
    public T next() {
        if (nextValues.isEmpty()) {
            throw new NoSuchElementException();
        }
        
        IteratorWrapper<T> next = nextValues.remove();
        Iterator<T> iterator = iterators.get(next.getPosition());
        if (iterator.hasNext())
            nextValues.add(new IteratorWrapper<T>(next.getPosition(), iterator.next()));
        
        return next.getValue();
    }
}

ИтераторВраппер:

class IteratorWrapper<T extends Comparable<T>> implements Comparable<IteratorWrapper<T>> {
    private T value;
    private int position;
    
    public IteratorWrapper(int position, T value) {
        this.value = value;
        this.position = position;
    }
    
    public T getValue() {
        return value;
    }
    
    public int getPosition() {
        return position;
    }
    
    @Override
    public int compareTo(IteratorWrapper<T> o) {
        return this.value.compareTo(o.value);
    }
}

main() - демо

public static void main(String[] args) {
    QueueBasedIterator<Integer> iterator =
        new QueueBasedIterator<>(Stream.of(1, 3), Stream.of(5), Stream.of(2, 6, 7),
                                 Stream.of(4, 9, 10), Stream.of(8));
    
    while (iterator.hasNext()) {
        System.out.print(iterator.next() + " ");
    }
}

Выход

1 2 3 4 5 6 7 8 9 10

если я оборачиваю каждый поток в объект, поддерживающий первое значение, как мне получить первое значение? как вы сказали, мне нужно обработать поток, чтобы получить значение, что, если в каждом потоке есть миллионы целых чисел, я не уверен, что хочу обработать их все, чтобы получить первое целое число? Спасибо!

Sammy 16.05.2022 03:37

@Sammy Добавлен пример записи, которая будет хранить первое значение из источника и потока отдельно.

Alexander Ivanchenko 16.05.2022 03:41

Хорошо, я вижу. Итератор afaik работает только со своим потоком, как мне обработать несколько потоков и убедиться, что целое число, которое я получаю, является наименьшим среди потока? Или вы предлагаете мне сделать PriortyQueue<Iterator> ?

Sammy 16.05.2022 04:14

@ Сэмми, я не совсем понимаю твой вопрос. Я нет предлагаю работать с одним потоком, вы можете спутать меня с WJS. Я предоставил демонстрацию, в которой итератор принимает в качестве входных данных количество отсортированных потоков и обрабатывает их. Посмотрите, пожалуйста, и уточните вопрос

Alexander Ivanchenko 16.05.2022 04:23

I am wondering is it possible to only peek the first element of the stream without retrieving it.

Нет. Peek — это intermediate operation, как и map, sort и т. д. Они не заставляют поток начинать доставку данных. Для этого требуется terminal operation, например reduce, forEach, or collector и т. д., чтобы начать процесс потоковой передачи.

Это также позволяет делать такие вещи, как следующие, без сохранения каких-либо данных. Если бы это было так, то первый оператор (series) никогда бы не закончился, так как для него потребовалось бы бесконечное хранилище, так как не существует метода ограничения.

IntStream series = IntStream.iterate(0, i->i+1);
IntStream first10 = series.limit(10);
int[] toArray = first10.toArray();

System.out.println(Arrays.toString(toArray));

Отпечатки

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

В приведенном выше примере toArray() — это терминальная операция, которая запускает поток. После завершения поток исчерпывается, и ни одно из вышеупомянутых назначений (например, series, first10) не может быть использовано снова.

Я понимаю. имеет смысл иметь дело с одним таким потоком! Можно ли иметь дело с несколькими потоками, каждый из которых может содержать миллионы целых чисел, и объединять результат по мере обработки каждого потока?

Sammy 16.05.2022 03:47

Другие вопросы по теме