Состояние ключа принудительной очистки Flink для дескриптора состояния

В настоящее время я разрабатываю оператор (приемник), который использует ключевое состояние flink. Серверная часть состояния основана на куче. Время жизни штата установлено на 24 часа. Вариант использования оператора такой: сначала мы перехватываем запрос и сохраняем что-то в valueState, затем перехватываем ответ и выполняем некоторую логику с запросом и ответом. 

Чего я боюсь, так это того, что штат может вырасти в среде производства гораздо быстрее, чем я ожидаю. Поэтому я подумал о добавлении некоторого свойства FeatureFlag, чтобы иметь возможность быстро отключить оператора, если что-то пойдет не так.

Насколько я понимаю, flink, похоже, не любит случаи, когда вы добавляете и удаляете операторы с состоянием из графа заданий от контрольной точки к контрольной точке, поэтому я изменил свой оператор: если свойство истинно - оператор работает так, как ожидалось, если оно ложно - он немедленно вернется и пропустит его функциональность.

Это должно сработать, но я понятия не имею, как очистить состояние, которое уже было бы создано, если бы я решил отключить FeatureFlag.

Из-за ключевой природы состояния я не могу просто вызвать state.clear(), потому что нет ключевого контекста, и я не хочу ждать 24 часа с слишком большим состоянием. Я думал об изменении ttlConfig на 1 минуту вместо 24 часов (например), если FeatureFlag имеет значение false, но я проверил исходный код и понял, что если состояние по дескриптору уже существовало в stateBackend, оно будет возвращено, и новая конфигурация ttl никогда не будет использоваться. .

Так

  1. Похоже, я не могу изменить ttl для существующего состояния.
  2. Кажется, я не могу очистить состояние, потому что могу никогда не получить второе сообщение с тем же ключом, чтобы получить необходимый контекст ключа для состояния.

Есть ли способы принудительно очистить состояние с помощью stateDescriptor? Или мне остается только подождать, пока TTL очистит состояние?

P.S. Вопрос, похоже, связан с Как очищается состояние потока Flink для неактивных ключей?

но я не могу использовать тот же подход из-за

  1. Природа ключевого состояния. Принимая во внимание, что ValueState — это всего лишь серверная часть состояния с текущим ключом обработки, я сомневаюсь, что обратный вызов сработает.
  2. Поскольку ресурсы ограничены, я думаю, что хранение миллионов обратных вызовов приведет к тем же проблемам с памятью и размером контрольной точки, поэтому проще просто подождать, пока не достигнет ttl.

Прежде чем выполнять какие-либо трюки с вертолетом для поддержки очистки состояния быстрее, чем было настроено ранее для TTL, я бы убедился, что это действительно проблема. Вы можете использовать RocksDB для поддержки (почти) неограниченного размера состояния и/или быть осторожным с размером того, что вы храните в состоянии.

kkrugler 10.04.2024 22:38

@kkrugler Я много читал о RocksDB, но, к сожалению, в моем случае это не вариант. я не могу использовать его из-за некоторых ограничений инфраструктуры.

midikko 11.04.2024 09:32
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
2
74
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Один из вариантов, который вы не упомянули, заключается в следующем: вместо того, чтобы удерживать состояние клавиши в чем-то вроде KeyedProcessFunction, используйте KeyedBroadcastProcessFunction. Затем, когда вы захотите очистить какое-то состояние, передайте событие этому оператору. В ответ на это входящее широковещательное событие метод processBroadcastElement может вызвать applyToKeyedState для переданного объекта KeyedBroadcastProcessFunction.Context. Этот метод applyToKeyedState может использовать KeyedStateFunction для перебора состояния ключа для всех ключей и очистки состояния для любых ключей, которые он хочет. очистить.

Подробности см.

Если вы можете допустить некоторое время простоя, другим решением может быть остановка задания с точкой сохранения, использование API процессора состояний для очистки состояния некоторых ключей, а затем перезапуск с измененной точки сохранения.

Другие вопросы по теме