Project Reactor и модель памяти Java

Я пытаюсь понять, какие гарантии в отношении видимости данных Projectactor обеспечивает код приложения. Например, Я ожидал, что приведенный ниже код не сработает, но этого не произойдет после миллиона итераций. Я изменяю состояние типичного POJO в потоке A и читаю его из потока B. Гарантирует ли Reactor, что изменения POJO видны в потоке?

public class Main {
    public static void main(String[] args) {
        Integer result = Flux.range(1, 1_000_000)
                .map(i -> {
                    Data data = new Data();
                    data.setValue(i);
                    data.setValueThreeTimes(i);
                    data.setValueObj(i + i);
                    return data;
                })
                .parallel(250)
                .runOn(Schedulers.newParallel("par", 500))
                .map(d -> {
                    d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
                    return d;
                })
                .sequential()
                .parallel(250)
                .runOn(Schedulers.newParallel("par", 500))
                .map(d -> {
                    d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
                    return d;
                })
                //                .sequential()
                .map(d -> {
                    if (d.getValue() * 3 != d.getValueThreeTimes()) throw new RuntimeException("data corrupt error");
                    return d;
                })
                .reduce(() -> 0, (Integer sum, Data d) -> sum + d.getValueObj() + d.getValue())
                .sequential()
                .blockLast();
    }

    static class Data {
        private int value;
        private int valueThreeTimes;
        private Integer valueObj;

        public int getValueThreeTimes() {
            return valueThreeTimes;
        }

        public void setValueThreeTimes(int valueThreeTimes) {
            this.valueThreeTimes = valueThreeTimes;
        }

        public int getValue() {
            return value;
        }

        @Override
        public String toString() {
            return "Data{" +
                    "value = " + value +
                    ", valueObj = " + valueObj +
                    '}';
        }

        public void setValue(int value) {
            this.value = value;
        }

        public Integer getValueObj() {
            return valueObj;
        }

        public void setValueObj(Integer valueObj) {
            this.valueObj = valueObj;
        }
    }

    private static <T> T identityWithThreadLogging(T el, String operation) {
        System.out.println(operation + " -- " + el + " -- " +
                Thread.currentThread().getName());
        return el;
    }
}

Вы не пробовали убрать sequential из цепочки?

Antoniossss 01.11.2018 19:22

Только что пробовал без sequential. Возвращает правильный результат. Похоже, что при перемещении элементов от одного оператора к другому применяется ограждение памяти.

Rowan A 01.11.2018 19:30

Идентифицируйте реализацию, но, возможно, шаги являются последовательными, а конкретная операция в шаге параллельна - таким образом, ваш "барьер"

Antoniossss 01.11.2018 19:41
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
5
3
678
1

Ответы 1

Спецификация реактивных потоков требует, чтобы для Flux или Mono (Publisher) события onNext были последовательными.

parallel() - это ParallelFlux, который немного смягчает это за счет разделения и завоевания: вы получаете несколько «рельсов», каждая из которых по отдельности придерживается спецификации, но в целом нет (параллелизация между рельсами).

В свою очередь, sequential() возвращается в мир Flux и вводит барьер памяти, чтобы гарантировать, что результирующая последовательность соответствует спецификации RS.

Если бы мне пришлось создать простой POJO и вызвать несколько сеттеров на этапе карты, а затем начать обработку на рельсах parallel, будут ли значения из вызова сеттера видны в параллельных рельсах? Должны ли мы использовать поля volatile или final при использовании реактора, чтобы получить согласованное представление данных по мере их изменения в потоке?

Rowan A 08.11.2018 08:19

Этапы перед parallel () будут выполняться в том же потоке или использовать volatile для запуска барьеров памяти.

Simon Baslé 08.11.2018 17:04

Другие вопросы по теме