Я не могу заставить работать TTL состояния оператора. Я установил TTL на 5 минут, поэтому по истечении этого времени я ожидаю, что состояние будет очищено. Когда я замечаю, что состояние пусто из-за истечения срока TTL, оно должно обновить состояние.
public static class TestFilter extends RichFilterFunction<A>
implements CheckpointedFunction {
final Helper helper;
final ConfigFetcher configFetcher;
private final ListStateDescriptor<Integer> listStateDescriptor =
new ListStateDescriptor<>(
"state_descriptor", TypeInformation.of(new TypeHint<>() {}));
@SuppressWarnings("NullAway.Init")
private transient ListState<Integer> listState;
TestFilter(
Helper helper,
ConfigFetcher configFetcher) {
this.helper = helper;
this.configFetcher = configFetcher;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
StateTtlConfig stateTTLConfig =
StateTtlConfig.newBuilder(Time.minutes(5))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
stateDescriptor.enableTimeToLive(stateTTLConfig);
state =
context.getOperatorStateStore().getListState(stateDescriptor);
}
@Override
public void snapshotState(FunctionSnapshotContext context) {}
@Override
public boolean filter(A element) throws Exception {
Set<Integer> ids = new HashSet<>();
if (listState.get() == null) {
ids = configFetcher.getIds();
listState.update(new ArrayList<>(ids));
} else {
listState.get().forEach(ids::add);
if (ids.isEmpty()) {
ids = configFetcher.getIds();
listState.update(new ArrayList<>(ids));
}
}
return ids.contains(element)
}
Когда я проверяю это, я замечаю, что через 5 минут состояние не очищается/обновляется. Кто-нибудь знает, почему?
Функция initializeState()
чаще используется в ситуациях, когда вам может потребоваться обработать пользовательскую логику восстановления или инициализировать содержимое состояния. Возможно, вы захотите определить свое состояние в функции open()
, которая обычно используется для начальной настройки оператора, аналогично следующему:
public class TestFun extends RichFilterFunction<A> implements CheckpointedFunction {
@SuppressWarnings("NullAway.Init")
private transient ListState<Integer> listState;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig stateTTLConfig =
StateTtlConfig.newBuilder(Time.minutes(5))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
stateDescriptor.enableTimeToLive(stateTTLConfig);
listState = runtimeContext.getListState(stateDescriptor);
}
}
Кроме того, похоже, что ваш предыдущий пример ссылается на state
, однако вы, похоже, определяете только listState
. Если у вас есть несколько ссылок на каждый из них, вам необходимо убедиться, что вы устанавливаете/читаете правильную ссылку.
Я не осознавал, что ваш оператор в данном случае не был оператором с ключом. Как упомянул @David Anderson, это не поддерживается вне ключевого состояния. Вы можете явно указать свой поток до этого, чтобы использовать его (что может быть очень жестким подходом), если вам это абсолютно необходимо.
Я считаю, что TTL состояния поддерживается только для состояния с ключом. Реализация не обрабатывает состояние оператора.
Вот почему в документации State TTL является подразделом «Использование ключевого состояния»: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/ штат/#state-time-to-live-ttl
Однако я не могу получить состояние оператора из open(). RuntimeContext.getListState выдает мне ошибки, которые мне нужны для использования ключевого состояния. Причина, по которой я использовал CheckpointedFunction, заключается в том, что мне нужно состояние без ключа. Есть ли способ получить состояние оператора внутри open()?