Как заставить работать TTL состояния оператора Flink

Я не могу заставить работать TTL состояния оператора. Я установил TTL на 5 минут, поэтому по истечении этого времени я ожидаю, что состояние будет очищено. Когда я замечаю, что состояние пусто из-за истечения срока TTL, оно должно обновить состояние.

public static class TestFilter extends RichFilterFunction<A>
      implements CheckpointedFunction {
    final Helper helper;
    final ConfigFetcher configFetcher;
    private final ListStateDescriptor<Integer> listStateDescriptor =
        new ListStateDescriptor<>(
            "state_descriptor", TypeInformation.of(new TypeHint<>() {}));

    @SuppressWarnings("NullAway.Init")
    private transient ListState<Integer> listState;

    TestFilter(
        Helper helper,
        ConfigFetcher configFetcher) {
      this.helper = helper;
      this.configFetcher = configFetcher;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
      StateTtlConfig stateTTLConfig =
          StateTtlConfig.newBuilder(Time.minutes(5))
              .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
              .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
              .build();
      stateDescriptor.enableTimeToLive(stateTTLConfig);
      state =          
       context.getOperatorStateStore().getListState(stateDescriptor);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) {}

    @Override
    public boolean filter(A element) throws Exception {
        Set<Integer> ids = new HashSet<>();
        if (listState.get() == null) {
          ids = configFetcher.getIds();
          listState.update(new ArrayList<>(ids));
        } else {
          listState.get().forEach(ids::add);
          if (ids.isEmpty()) {
            ids = configFetcher.getIds();
            listState.update(new ArrayList<>(ids));
          }
        }
        return ids.contains(element)
    
    }

Когда я проверяю это, я замечаю, что через 5 минут состояние не очищается/обновляется. Кто-нибудь знает, почему?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
73
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Функция initializeState() чаще используется в ситуациях, когда вам может потребоваться обработать пользовательскую логику восстановления или инициализировать содержимое состояния. Возможно, вы захотите определить свое состояние в функции open(), которая обычно используется для начальной настройки оператора, аналогично следующему:

public class TestFun extends RichFilterFunction<A> implements CheckpointedFunction {
    @SuppressWarnings("NullAway.Init")
    private transient ListState<Integer> listState;

    @Override
    public void open(Configuration parameters) throws Exception {
        StateTtlConfig stateTTLConfig =
          StateTtlConfig.newBuilder(Time.minutes(5))
              .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
              .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
              .build();
      stateDescriptor.enableTimeToLive(stateTTLConfig);
      listState = runtimeContext.getListState(stateDescriptor);
    }
}

Кроме того, похоже, что ваш предыдущий пример ссылается на state, однако вы, похоже, определяете только listState. Если у вас есть несколько ссылок на каждый из них, вам необходимо убедиться, что вы устанавливаете/читаете правильную ссылку.

Однако я не могу получить состояние оператора из open(). RuntimeContext.getListState выдает мне ошибки, которые мне нужны для использования ключевого состояния. Причина, по которой я использовал CheckpointedFunction, заключается в том, что мне нужно состояние без ключа. Есть ли способ получить состояние оператора внутри open()?

Scott 28.02.2024 18:48

Я не осознавал, что ваш оператор в данном случае не был оператором с ключом. Как упомянул @David Anderson, это не поддерживается вне ключевого состояния. Вы можете явно указать свой поток до этого, чтобы использовать его (что может быть очень жестким подходом), если вам это абсолютно необходимо.

Rion Williams 01.03.2024 18:15
Ответ принят как подходящий

Я считаю, что TTL состояния поддерживается только для состояния с ключом. Реализация не обрабатывает состояние оператора.

Вот почему в документации State TTL является подразделом «Использование ключевого состояния»: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/ штат/#state-time-to-live-ttl

Другие вопросы по теме