У меня есть потоковое приложение Flink, которому нужна возможность «приостанавливать» и «возобновлять» обработку определенного потока с ключами. «Обработка» означает просто выполнение простого обнаружения аномалий в потоке.
Поток, о котором мы думаем, работает так:
Поток команд, ProcessCommand
, PauseCommand
или ResumeCommand
, каждый с id
, который используется для KeyBy
.
ProcessCommands
проверит, приостановлен ли ключ перед обработкой, и буферизирует, если нет.
PauseCommands
приостановит обработку ключа.
ResumeCommands
возобновит обработку ключа и очистит буфер.
Кажется ли этот поток разумным, и если да, смогу ли я для этого использовать что-то вроде оператора split
?
Пример потока, временные метки отдельных записей опущены:
[{command: process, id: 1}, {command: pause, id: 1}, {command: process, id: 1}, {command: resume, id: 1}, {command: process, id: 1}]
Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=>
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=>
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off
Да, именно так @shriyog. У каждой записи есть id
и command
.
В чем разница между командами process
и pause
? Не могли бы вы привести пример, демонстрирующий поток команд ввода и ожидаемое поведение?
@shriyog, эти правки лучше объясняют? Спасибо за помощь!
Этого можно добиться с помощью Оператор окна Flink. Во-первых, создайте поток на основе POJO
или tuple
, применив операцию map
.
Затем, в соответствии с вашими потребностями, вы можете использовать keyBy
в этом потоке, чтобы получить keyedStream
.
Теперь, используя комбинацию основанных на времени бесконечных window
, trigger
и window function
, вы можете добиться переключения вашего командного потока.
По сути, вы можете использовать windows
в качестве своего буфера, который после получения записи о паузе хранит записи процесса до тех пор, пока не будет получена запись возобновления. Вы бы написали настраиваемый триггер, который вытесняет окно (буфер) в соответствии с вашим сценарием.
Ниже приведена индивидуальная реализация Trigger
с переопределенным методом onElement()
.
/**
* We trigger the window processing as per command inside the record. The
* process records are buffered when a pause record is received and the
* buffer is evicted once resume record is received. If no pause record is
* received earlier, then for each process record the buffer is evicted.
*/
@Override
public TriggerResult onElement(Tuple2<Integer, String> element, long timestamp, Window window,
TriggerContext context) throws Exception {
if (element.f1.equals("pause")) {
paused = true;
return TriggerResult.CONTINUE;
} else if (element.f1.equals("resume")) {
paused = false;
return TriggerResult.FIRE_AND_PURGE;
} else if (paused) // paused is a ValueState per keyed stream.
return TriggerResult.CONTINUE;
return TriggerResult.FIRE_AND_PURGE;
}
Посмотрите полный рабочий пример в этом репозиторий github
Огромное спасибо! Я выполню это и приму ответ!
Все предельно ясно - мой единственный вопрос касается переменной transient
paused
. Так же разумно сохранить это в ValueState
, не так ли?
Ага, на самом деле это правильный путь. Здесь нам может помочь управляемое состояние с ключом ci.apache.org/projects/flink/flink-docs-stable/dev/stream/st ate /…
Отлично, вот что я использую! Еще раз спасибо за помощь.
Расширяя этот вопрос, есть идея, как можно буферизовать одни сообщения, но пропустить другие? Поэтому мы бы добавили команду «process-passthrough», чтобы пропустить окно буферизации и продолжить работу вниз по потоку.
Подумайте об использовании потока split
и оконной обработке элементов process
, а затем присоединитесь к потоку process-passthrough
для последующей обработки.
Я думаю, что сквозные записи будут вести себя так же, как записи процессов, поскольку окно (содержащее 1 запись) будет вытеснено для каждой записи. Но если вы хотите, чтобы записи process-passthrough
пропускали весь этап вытеснения окна, лучшим вариантом будет splitting
поток.
Итак, ваш фактический поток (который содержит записи вместе с
id
) будет получать команды как часть записи, и обработка потока будет контролироваться с помощью этих команд, верно?