У меня есть несколько входных потоков, которые могут со временем отправлять обновления. Если происходит обновление, мне нужно рассчитать дельту, чтобы иметь возможность обрабатывать его дальше. Суммируя:
Input: 10 -> State: 10, Output: 10
Input: 12 -> State: 12, Output: 2
Input: 5 -> State: 5, Output: -7
Я прочитал обработка с сохранением состояния и своевременная обработка, чтобы понять, как работать с таким состоянием в приложении Apache Beam, но я не могу понять:
Для №2 мне было интересно, может ли это сработать при использовании глобальных окон:
public class Delta extends DoFn<KV<String, Integer>, Integer> {
@StateId("state")
private final StateSpec<ValueState<Integer>> stateSpec = StateSpecs.value();
@TimerId("timer")
private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(ProcessContext context,
BoundedWindow window,
@StateId("state") ValueState<Integer> state,
@TimerId("timer") Timer myTimer) {
// Assign the timer to the end of the current window, which is a global window
// Not sure if this always triggers when the application stops...
myTimer.set(window.maxTimestamp());
int value = context.element().getValue();
int acc = getOrInitialize(state.read());
int delta = value - acc;
state.write(value);
context.output(delta);
}
@OnTimer("timer")
public void onTimer(OnTimerContext context,
@StateId("state") ValueState<Integer> state) {
// Persist value of state here
}
private int getOrInitialize(Integer a) {
// Get initial value of state here
return (a != null) ? a : 0;
}
}




BoundedWindow. @StartBundle / @Setup и @FinishBundle должны быть лучшими местами для восстановления и контрольной точки. Я не рекомендую @Teardown, потому что его вызов не гарантируется.Извините, я этого не осознавал. Один из обходных путей - использовать внешнюю базу данных (например, Cloud Datastore, Cloud BigTable и т. д.), Но на этом этапе вам, вероятно, вообще не нужно использовать Apache Beam. Другой обходной путь - использовать таймеры обработки для значений контрольных точек. Однако при перезапуске конвейера не все гарантируется.
На самом деле я планирую использовать Cloud Datastore для сохранения «текущего» значения для каждого ключа. Проблема, которую я пытаюсь решить, связана с условиями гонки, возникающими, когда один из наших источников публикует сразу много обновлений. Почему мне не нужно использовать Beam при работе с базой данных?
Datastore поддерживает транзакции: вы можете читать, вычислять дельту и сохранять новое значение в транзакции. Если фиксация не удалась, должна быть некоторая логика повтора. Это может сделать простой веб-сервис. Вам не обязательно использовать Apache Beam. Это не повредит, если вы все еще хотите его использовать.
Что ж, вычисление дельты - это лишь небольшая часть более крупного конвейера, с которым, к сожалению, веб-сервис не может легко справиться.
Обработка с отслеживанием состояния распараллеливается для каждого ключа и окна. Два элемента для одного и того же ключа, но разные окна могут обрабатываться параллельно. Но все ваши элементы находятся в глобальном окне, поэтому в вашем случае это то же самое, что и индивидуальный параллелизм.
Использование таймеров - правильная идея, чтобы очистить ваше состояние. Таймер времени события в конце глобального окна никогда не сработает во время нормальной работы конвейера, но будет срабатывать в сценарии «слива», в котором все водяные знаки перемещаются на бесконечность. Drain - это функция Cloud Dataflow, которая была предложена в качестве переносимой концепции для Beam, но вам следует выяснить, есть ли у вашего выбранного бегуна такая функция.
Я изучал документацию для
@StartBundleи@FinishBundle, но похоже, что они не поддерживают аннотациюStateIdдля обеспечения доступа к состоянию.