Я пытаюсь увеличить время обработки моего OneInputStreamOperatorTestHarness, чтобы истек срок действия TTL.
Set<Integer> setA = new HashSet<>(Arrays.asList(1, 2, 3));
Set<Integer> setB = new HashSet<>(Collections.singletonList(4));
Mockito.when(configFetcher.getNumbers())
.thenReturn(setA)
.thenReturn(setB);
OneInputStreamOperatorTestHarness<TypeA, TypeA> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamFilter<>(filterA));
testHarness.setup();
testHarness.open();
testHarness.setProcessingTime(testTimeMs);
testHarness.setStateTtlProcessingTime(testTimeMs);
// getNumbers() will be called for the first time here
testHarness.processElement(elementA, testTimeMs);
// expire the TTL
long ttlExpireTimeMs = testTimeMs + Duration.ofMinutes(5).toMillis();
testHarness.setProcessingTime(ttlExpireTimeMs);
testHarness.setStateTtlProcessingTime(ttlExpireTimeMs);
// getNumbers() called for second time only if state is empty.
// Assuming TTL will have expired, state will be empty meaning we fetch the setB.
// But the problem here is that the state is not clearing
testHarness.processElement(elementB, ttlExpireTimeMs);
getNumbers() вызывается впервые для заполнения состояния, но даже после того, как я увеличил время обработки, состояние не истекает и не очищается. Следовательно, второй метод getNumbers() не вызывается для возврата setB. Я что-то упускаю?
Состояние TTL работает только для потоков с ключом, поэтому я считаю, что вам нужно использовать KeyedOneInputStreamOperatorTestHarness
.