Шаблон Apache Flink CEP для нескольких типов событий

В настоящее время я работаю над семестровым проектом, в котором я должен распознать серию из трех событий. Нравится P -> R -> P

У нас есть два разных типа событий, которые потребляются через коннектор Kafka в одной и той же теме.

Я создал родительский класс под названием Event, от которого наследуются два других типа.

Коннектор Kafka десериализует JSON с помощью EventSchema в родительский класс Event.

val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)

Выглядит выкройка так:

val pattern = Pattern
  .begin[Event]("before")
  .subtype(classOf[Position])
  .next("recognized")
  .subtype(classOf[Recognized])
  .next("after")
  .subtype(classOf[Position])

Текущая проблема заключается в том, что если я отправлю три сообщения в соответствующем формате, шаблон не будет распознан.

Что еще пробовал .. Я изменил схему вот так:

val pattern = Pattern
  .begin[Event]("before")
  .where(e => e.getType == "position")
  .next("recognized")
  .where(e => e.getType == "recognition")
  .next("after")
  .where(e => e.getType == "position")

Этот шаблон работает, но позже я не могу преобразовать класс Event в положение или распознавание ..

Что мне здесь не хватает?

Может быть, элементы, которые вы передаете в шаблон, являются событиями?

Jiayi Liao 17.01.2019 03:47

Это верно, но разве нельзя иметь разные типы событий, упорядочить их по возрастанию от времени события и найти закономерность внутри? Если все события связаны с одной темой или каждое событие имеет свою собственную тему, это не должно иметь смысла.

Daniel Eisenreich 17.01.2019 10:24

Вы инициализировали объекты с подтипом при десериализации из кафки?

Jiayi Liao 17.01.2019 10:30

Я просто сериализую его как событие с val kafkaSource = new FlinkKafkaConsumer("sp", new EventSchema, properties), потому что во время выполнения несколько типов находятся в одной теме .. Но могу ли я объединить несколько kafkaSources с разными типами в один?

Daniel Eisenreich 17.01.2019 12:42

Можете ли вы поместить сюда коды EventSchema? Пробовал по вашим описаниям, но работает.

Jiayi Liao 17.01.2019 14:38
class EventSchema extends AbstractDeserializationSchema[Event] { val mapper = new ObjectMapper() override def deserialize(bytes: Array[Byte]): Event = mapper.readValue(bytes, classOf[Event]) }
Daniel Eisenreich 17.01.2019 14:40

@bupt_ljy, можете ли вы разместить свой код где-нибудь еще, чтобы я мог проверить, какая часть отличается?

Daniel Eisenreich 17.01.2019 14:54
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
7
507
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Согласно комментариям, я думаю, вам следует вернуть экземпляры подтипа вместо Event. Вот мой пример кода для вас:

val event = mapper.readValue(bytes, classOf[Event])
event.getType match {
  case "position" => mapper.readValue(bytes, classOf[Position])
  case "recognition" => mapper.readValue(bytes, classOf[Recognized])
  case _ =>
}

Я успешно попробовал пример из тестового примера в CEPITCase.java.

DataStream<Event> input = env.fromElements(
  new Event(1, "foo", 4.0),
  new SubEvent(2, "foo", 4.0, 1.0),
  new SubEvent(3, "foo", 4.0, 1.0),
  new SubEvent(4, "foo", 4.0, 1.0),
  new Event(5, "middle", 5.0)
);

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").subtype(SubEvent.class)
.followedByAny("middle").subtype(SubEvent.class)
.followedByAny("end").subtype(SubEvent.class);

Ты гений! Только одна вещь, которая не сработала в вашем примере ... Когда я использую asInstanceOf, возникает исключение, что я не могу преобразовать событие в позицию ... Из-за этого я обменялся этим с mapper.readValue(bytes, classOf[Position]) и просто повторно десериализовал его. Если бы вы могли отредактировать это в своем сообщении, я приму это как ответ! Еще раз спасибо!

Daniel Eisenreich 18.01.2019 09:35

@DanielEisenreich Это то изменение, которое вам нужно?

Jiayi Liao 18.01.2019 09:46

Другие вопросы по теме