Flink Job design — работа с гибридной темой Kafka

У меня есть тема Kafka, содержащая несколько типов событий. (Дано)
События представляют собой документы JSON.

Назовем типы событий: A,B,C,D,E.

Я могу определить тип, используя поле, которое есть в каждом событии.

Я хочу иметь задание Flink, которое будет обрабатывать события A и B отдельно (используя окно сеанса), C ​​и D должны перейти в другой тип окна, а событие D должно быть удалено.

Могу ли я реализовать такой дизайн во Flink?

Спасибо

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
52
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Если да, то вы можете воспользоваться поддержкой Flink для Side Outputs и использовать ее как средство для сопоставления каждого из отдельных типов с отдельным потоком и работать с ними отдельно (или объединять их в дальнейшем и т. д.).

По сути:

  • Прочитайте свои данные из темы Kafka (через KafkaSource)
  • Сопоставьте/обработайте ваши данные с помощью побочных результатов, чтобы получить каждый конкретный тип.
  • Постройте график должностей так, чтобы он соответствовал тому, что вам нужно (объединение и оконные блоки и т. д.).

Это может выглядеть примерно так:

val events = streamEnv
  .fromSource(KafkaSource.build(...))
  .process(YourTypeSeparatorOperator())

// Example: Getting A & B events
val a = events.getSideOutput(Tags.a)
val b = events.getSideOutput(Tags.b)

// Union this stream (and act on it via windowing, etc.)
val ab = a.union(b)

// Likewise perform operations necessary for C & D types here

// Eventually merge all of these separate streams together if needed

В приведенном выше примере YourTypeSeparatorOperator() на самом деле будет использовать побочные выходы и в зависимости от типа вашего события выводить их на назначенный боковой выход:

// Example OutputTag
object Tags{
    val a = OutputTag("a", TypeInformation.of(YourClass::class.java))
    val b = OutputTag("b", TypeInformation.of(YourClass::class.java))
    val c = OutputTag("c", TypeInformation.of(YourClass::class.java))
    val d = OutputTag("d", TypeInformation.of(YourClass::class.java))     
}

// Usage
override fun processElement(...) {
   ...
   when (message.type) {
      "a" -> context.output(Tags.a, message)
      "b" -> context.output(Tags.b, message)
      "c" -> context.output(Tags.c, message)
      "d" -> context.output(Tags.d, message)
   }
}

Спасибо, Рион. Я попробую это направление и обновлю пост.

balderman 01.05.2024 18:49

Если вы предпочитаете использовать API Table/SQL, это легко сделать. Например.,

WITH use_session_windows AS
  (select * from events where events.type = 'a' or events.type = 'b')
SELECT window_start, window_end, count(*) AS cnt
  FROM TABLE(
    SESSION(DATA => TABLE use_session_windows,
            TIMECOL => DESCRIPTOR(rowtime),  
            GAP => INTERVAL '30' MINUTES))
  GROUP BY window_start, window_end;

Другие вопросы по теме