Приостановка обработки на Flink KeyedStream

У меня есть потоковое приложение Flink, которому нужна возможность «приостанавливать» и «возобновлять» обработку определенного потока с ключами. «Обработка» означает просто выполнение простого обнаружения аномалий в потоке.

Поток, о котором мы думаем, работает так:

Поток команд, ProcessCommand, PauseCommand или ResumeCommand, каждый с id, который используется для KeyBy.

ProcessCommands проверит, приостановлен ли ключ перед обработкой, и буферизирует, если нет.

PauseCommands приостановит обработку ключа.

ResumeCommands возобновит обработку ключа и очистит буфер.

Кажется ли этот поток разумным, и если да, смогу ли я для этого использовать что-то вроде оператора split?

Пример потока, временные метки отдельных записей опущены:

[{command: process, id: 1}, {command: pause, id: 1}, {command: process, id: 1}, {command: resume, id: 1}, {command: process, id: 1}]

Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=> 
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=> 
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off 

Итак, ваш фактический поток (который содержит записи вместе с id) будет получать команды как часть записи, и обработка потока будет контролироваться с помощью этих команд, верно?

shriyog 28.10.2018 14:11

Да, именно так @shriyog. У каждой записи есть id и command.

austin_ce 29.10.2018 15:31

В чем разница между командами process и pause? Не могли бы вы привести пример, демонстрирующий поток команд ввода и ожидаемое поведение?

shriyog 29.10.2018 18:37

@shriyog, эти правки лучше объясняют? Спасибо за помощь!

austin_ce 29.10.2018 19:17
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
4
510
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Этого можно добиться с помощью Оператор окна Flink. Во-первых, создайте поток на основе POJO или tuple, применив операцию map.

Затем, в соответствии с вашими потребностями, вы можете использовать keyBy в этом потоке, чтобы получить keyedStream.

Теперь, используя комбинацию основанных на времени бесконечных window, trigger и window function, вы можете добиться переключения вашего командного потока.

По сути, вы можете использовать windows в качестве своего буфера, который после получения записи о паузе хранит записи процесса до тех пор, пока не будет получена запись возобновления. Вы бы написали настраиваемый триггер, который вытесняет окно (буфер) в соответствии с вашим сценарием.

Ниже приведена индивидуальная реализация Trigger с переопределенным методом onElement().

/**
 * We trigger the window processing as per command inside the record. The
 * process records are buffered when a pause record is received and the
 * buffer is evicted once resume record is received. If no pause record is
 * received earlier, then for each process record the buffer is evicted.
 */
@Override
public TriggerResult onElement(Tuple2<Integer, String> element, long timestamp, Window window,
        TriggerContext context) throws Exception {
    if (element.f1.equals("pause")) {
        paused = true;
        return TriggerResult.CONTINUE;
    } else if (element.f1.equals("resume")) {
        paused = false;
        return TriggerResult.FIRE_AND_PURGE;
    } else if (paused) // paused is a ValueState per keyed stream.
        return TriggerResult.CONTINUE;
    return TriggerResult.FIRE_AND_PURGE;
}

Посмотрите полный рабочий пример в этом репозиторий github

Огромное спасибо! Я выполню это и приму ответ!

austin_ce 29.10.2018 22:58

Все предельно ясно - мой единственный вопрос касается переменной transientpaused. Так же разумно сохранить это в ValueState, не так ли?

austin_ce 30.10.2018 16:39

Ага, на самом деле это правильный путь. Здесь нам может помочь управляемое состояние с ключом ci.apache.org/projects/flink/flink-docs-stable/dev/stream/st‌ ate /…

shriyog 30.10.2018 17:28

Отлично, вот что я использую! Еще раз спасибо за помощь.

austin_ce 30.10.2018 17:39

Расширяя этот вопрос, есть идея, как можно буферизовать одни сообщения, но пропустить другие? Поэтому мы бы добавили команду «process-passthrough», чтобы пропустить окно буферизации и продолжить работу вниз по потоку.

austin_ce 30.10.2018 22:31

Подумайте об использовании потока split и оконной обработке элементов process, а затем присоединитесь к потоку process-passthrough для последующей обработки.

austin_ce 30.10.2018 22:48

Я думаю, что сквозные записи будут вести себя так же, как записи процессов, поскольку окно (содержащее 1 запись) будет вытеснено для каждой записи. Но если вы хотите, чтобы записи process-passthrough пропускали весь этап вытеснения окна, лучшим вариантом будет splitting поток.

shriyog 31.10.2018 12:50

Другие вопросы по теме