У меня есть тема Kafka, содержащая несколько типов событий. (Дано)
События представляют собой документы JSON.
Назовем типы событий: A,B,C,D,E.
Я могу определить тип, используя поле, которое есть в каждом событии.
Я хочу иметь задание Flink, которое будет обрабатывать события A и B отдельно (используя окно сеанса), C и D должны перейти в другой тип окна, а событие D должно быть удалено.
Могу ли я реализовать такой дизайн во Flink?
Спасибо

Если да, то вы можете воспользоваться поддержкой Flink для Side Outputs и использовать ее как средство для сопоставления каждого из отдельных типов с отдельным потоком и работать с ними отдельно (или объединять их в дальнейшем и т. д.).
По сути:
Это может выглядеть примерно так:
val events = streamEnv
.fromSource(KafkaSource.build(...))
.process(YourTypeSeparatorOperator())
// Example: Getting A & B events
val a = events.getSideOutput(Tags.a)
val b = events.getSideOutput(Tags.b)
// Union this stream (and act on it via windowing, etc.)
val ab = a.union(b)
// Likewise perform operations necessary for C & D types here
// Eventually merge all of these separate streams together if needed
В приведенном выше примере YourTypeSeparatorOperator() на самом деле будет использовать побочные выходы и в зависимости от типа вашего события выводить их на назначенный боковой выход:
// Example OutputTag
object Tags{
val a = OutputTag("a", TypeInformation.of(YourClass::class.java))
val b = OutputTag("b", TypeInformation.of(YourClass::class.java))
val c = OutputTag("c", TypeInformation.of(YourClass::class.java))
val d = OutputTag("d", TypeInformation.of(YourClass::class.java))
}
// Usage
override fun processElement(...) {
...
when (message.type) {
"a" -> context.output(Tags.a, message)
"b" -> context.output(Tags.b, message)
"c" -> context.output(Tags.c, message)
"d" -> context.output(Tags.d, message)
}
}
Если вы предпочитаете использовать API Table/SQL, это легко сделать. Например.,
WITH use_session_windows AS
(select * from events where events.type = 'a' or events.type = 'b')
SELECT window_start, window_end, count(*) AS cnt
FROM TABLE(
SESSION(DATA => TABLE use_session_windows,
TIMECOL => DESCRIPTOR(rowtime),
GAP => INTERVAL '30' MINUTES))
GROUP BY window_start, window_end;
Спасибо, Рион. Я попробую это направление и обновлю пост.