Модульный тест Flink на предмет истечения срока TTL состояния

Я пытаюсь увеличить время обработки моего OneInputStreamOperatorTestHarness, чтобы истек срок действия TTL.

    Set<Integer> setA = new HashSet<>(Arrays.asList(1, 2, 3));
    Set<Integer> setB = new HashSet<>(Collections.singletonList(4));

    Mockito.when(configFetcher.getNumbers())
        .thenReturn(setA)
        .thenReturn(setB);
    OneInputStreamOperatorTestHarness<TypeA, TypeA> testHarness =
        new OneInputStreamOperatorTestHarness<>(new StreamFilter<>(filterA));
    testHarness.setup();
    testHarness.open();
    testHarness.setProcessingTime(testTimeMs);
    testHarness.setStateTtlProcessingTime(testTimeMs);

    // getNumbers() will be called for the first time here
    testHarness.processElement(elementA, testTimeMs);

    // expire the TTL
    long ttlExpireTimeMs = testTimeMs + Duration.ofMinutes(5).toMillis();
    testHarness.setProcessingTime(ttlExpireTimeMs);
    testHarness.setStateTtlProcessingTime(ttlExpireTimeMs);

    // getNumbers() called for second time only if state is empty. 
    // Assuming TTL will have expired, state will be empty meaning we fetch the setB. 
    // But the problem here is that the state is not clearing
    testHarness.processElement(elementB, ttlExpireTimeMs);

getNumbers() вызывается впервые для заполнения состояния, но даже после того, как я увеличил время обработки, состояние не истекает и не очищается. Следовательно, второй метод getNumbers() не вызывается для возврата setB. Я что-то упускаю?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
59
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Состояние TTL работает только для потоков с ключом, поэтому я считаю, что вам нужно использовать KeyedOneInputStreamOperatorTestHarness.

Другие вопросы по теме