В настоящее время я разрабатываю оператор (приемник), который использует ключевое состояние flink. Серверная часть состояния основана на куче. Время жизни штата установлено на 24 часа. Вариант использования оператора такой: сначала мы перехватываем запрос и сохраняем что-то в valueState, затем перехватываем ответ и выполняем некоторую логику с запросом и ответом.
Чего я боюсь, так это того, что штат может вырасти в среде производства гораздо быстрее, чем я ожидаю. Поэтому я подумал о добавлении некоторого свойства FeatureFlag, чтобы иметь возможность быстро отключить оператора, если что-то пойдет не так.
Насколько я понимаю, flink, похоже, не любит случаи, когда вы добавляете и удаляете операторы с состоянием из графа заданий от контрольной точки к контрольной точке, поэтому я изменил свой оператор: если свойство истинно - оператор работает так, как ожидалось, если оно ложно - он немедленно вернется и пропустит его функциональность.
Это должно сработать, но я понятия не имею, как очистить состояние, которое уже было бы создано, если бы я решил отключить FeatureFlag.
Из-за ключевой природы состояния я не могу просто вызвать state.clear(), потому что нет ключевого контекста, и я не хочу ждать 24 часа с слишком большим состоянием. Я думал об изменении ttlConfig на 1 минуту вместо 24 часов (например), если FeatureFlag имеет значение false, но я проверил исходный код и понял, что если состояние по дескриптору уже существовало в stateBackend, оно будет возвращено, и новая конфигурация ttl никогда не будет использоваться. .
Так
Есть ли способы принудительно очистить состояние с помощью stateDescriptor? Или мне остается только подождать, пока TTL очистит состояние?
P.S. Вопрос, похоже, связан с Как очищается состояние потока Flink для неактивных ключей?
но я не могу использовать тот же подход из-за
@kkrugler Я много читал о RocksDB, но, к сожалению, в моем случае это не вариант. я не могу использовать его из-за некоторых ограничений инфраструктуры.
Один из вариантов, который вы не упомянули, заключается в следующем: вместо того, чтобы удерживать состояние клавиши в чем-то вроде KeyedProcessFunction
, используйте KeyedBroadcastProcessFunction
. Затем, когда вы захотите очистить какое-то состояние, передайте событие этому оператору. В ответ на это входящее широковещательное событие метод processBroadcastElement
может вызвать applyToKeyedState
для переданного объекта KeyedBroadcastProcessFunction.Context
. Этот метод applyToKeyedState
может использовать KeyedStateFunction
для перебора состояния ключа для всех ключей и очистки состояния для любых ключей, которые он хочет. очистить.
Подробности см.
Если вы можете допустить некоторое время простоя, другим решением может быть остановка задания с точкой сохранения, использование API процессора состояний для очистки состояния некоторых ключей, а затем перезапуск с измененной точки сохранения.
Прежде чем выполнять какие-либо трюки с вертолетом для поддержки очистки состояния быстрее, чем было настроено ранее для TTL, я бы убедился, что это действительно проблема. Вы можете использовать RocksDB для поддержки (почти) неограниченного размера состояния и/или быть осторожным с размером того, что вы храните в состоянии.