Вычисление дельт в apache beam с использованием обработки с отслеживанием состояния

У меня есть несколько входных потоков, которые могут со временем отправлять обновления. Если происходит обновление, мне нужно рассчитать дельту, чтобы иметь возможность обрабатывать его дальше. Суммируя:

Input: 10 -> State: 10, Output: 10
Input: 12 -> State: 12, Output:  2
Input:  5 -> State:  5, Output: -7

Я прочитал обработка с сохранением состояния и своевременная обработка, чтобы понять, как работать с таким состоянием в приложении Apache Beam, но я не могу понять:

  1. Есть ли 100% гарантия, что мой DoFn с отслеживанием состояния не будет обрабатывать элементы с одним и тем же ключом параллельно?
  2. Я хочу убедиться, что состояние сохраняется, когда мое приложение перезапускается или выходит из строя, чтобы я мог начать с правильного начального значения. Как мне убедиться, что мой DoFn «очищается» (сохраняется в хранилище данных) перед выключением?

Для №2 мне было интересно, может ли это сработать при использовании глобальных окон:

public class Delta extends DoFn<KV<String, Integer>, Integer> {
    @StateId("state")
    private final StateSpec<ValueState<Integer>> stateSpec = StateSpecs.value();

    @TimerId("timer")
    private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement
    public void process(ProcessContext context,
                        BoundedWindow window,
                        @StateId("state") ValueState<Integer> state,
                        @TimerId("timer") Timer myTimer) {
        // Assign the timer to the end of the current window, which is a global window
        // Not sure if this always triggers when the application stops...
        myTimer.set(window.maxTimestamp());

        int value = context.element().getValue();
        int acc = getOrInitialize(state.read());
        int delta = value - acc;
        state.write(value);
        context.output(delta);
    }

    @OnTimer("timer")
    public void onTimer(OnTimerContext context,
                        @StateId("state") ValueState<Integer> state) {
        // Persist value of state here
    }

    private int getOrInitialize(Integer a) {
        // Get initial value of state here
        return (a != null) ? a : 0;
    }
}
1
0
338
2

Ответы 2

  1. да
  2. Я не думаю, что ваш подход с таймером будет работать без какой-либо настройки BoundedWindow. @StartBundle / @Setup и @FinishBundle должны быть лучшими местами для восстановления и контрольной точки. Я не рекомендую @Teardown, потому что его вызов не гарантируется.

Я изучал документацию для @StartBundle и @FinishBundle, но похоже, что они не поддерживают аннотацию StateId для обеспечения доступа к состоянию.

Sander 12.08.2018 11:31

Извините, я этого не осознавал. Один из обходных путей - использовать внешнюю базу данных (например, Cloud Datastore, Cloud BigTable и т. д.), Но на этом этапе вам, вероятно, вообще не нужно использовать Apache Beam. Другой обходной путь - использовать таймеры обработки для значений контрольных точек. Однако при перезапуске конвейера не все гарантируется.

Jiayuan Ma 13.08.2018 00:49

На самом деле я планирую использовать Cloud Datastore для сохранения «текущего» значения для каждого ключа. Проблема, которую я пытаюсь решить, связана с условиями гонки, возникающими, когда один из наших источников публикует сразу много обновлений. Почему мне не нужно использовать Beam при работе с базой данных?

Sander 13.08.2018 11:08

Datastore поддерживает транзакции: вы можете читать, вычислять дельту и сохранять новое значение в транзакции. Если фиксация не удалась, должна быть некоторая логика повтора. Это может сделать простой веб-сервис. Вам не обязательно использовать Apache Beam. Это не повредит, если вы все еще хотите его использовать.

Jiayuan Ma 13.08.2018 19:54

Что ж, вычисление дельты - это лишь небольшая часть более крупного конвейера, с которым, к сожалению, веб-сервис не может легко справиться.

Sander 14.08.2018 11:36
  1. Обработка с отслеживанием состояния распараллеливается для каждого ключа и окна. Два элемента для одного и того же ключа, но разные окна могут обрабатываться параллельно. Но все ваши элементы находятся в глобальном окне, поэтому в вашем случае это то же самое, что и индивидуальный параллелизм.

  2. Использование таймеров - правильная идея, чтобы очистить ваше состояние. Таймер времени события в конце глобального окна никогда не сработает во время нормальной работы конвейера, но будет срабатывать в сценарии «слива», в котором все водяные знаки перемещаются на бесконечность. Drain - это функция Cloud Dataflow, которая была предложена в качестве переносимой концепции для Beam, но вам следует выяснить, есть ли у вашего выбранного бегуна такая функция.

Другие вопросы по теме