В настоящее время я работаю над семестровым проектом, в котором я должен распознать серию из трех событий. Нравится P -> R -> P
У нас есть два разных типа событий, которые потребляются через коннектор Kafka в одной и той же теме.
Я создал родительский класс под названием Event, от которого наследуются два других типа.
Коннектор Kafka десериализует JSON с помощью EventSchema в родительский класс Event.
val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)
Выглядит выкройка так:
val pattern = Pattern
.begin[Event]("before")
.subtype(classOf[Position])
.next("recognized")
.subtype(classOf[Recognized])
.next("after")
.subtype(classOf[Position])
Текущая проблема заключается в том, что если я отправлю три сообщения в соответствующем формате, шаблон не будет распознан.
Что еще пробовал .. Я изменил схему вот так:
val pattern = Pattern
.begin[Event]("before")
.where(e => e.getType == "position")
.next("recognized")
.where(e => e.getType == "recognition")
.next("after")
.where(e => e.getType == "position")
Этот шаблон работает, но позже я не могу преобразовать класс Event в положение или распознавание ..
Что мне здесь не хватает?
Это верно, но разве нельзя иметь разные типы событий, упорядочить их по возрастанию от времени события и найти закономерность внутри? Если все события связаны с одной темой или каждое событие имеет свою собственную тему, это не должно иметь смысла.
Вы инициализировали объекты с подтипом при десериализации из кафки?
Я просто сериализую его как событие с val kafkaSource = new FlinkKafkaConsumer("sp", new EventSchema, properties), потому что во время выполнения несколько типов находятся в одной теме .. Но могу ли я объединить несколько kafkaSources с разными типами в один?
Можете ли вы поместить сюда коды EventSchema? Пробовал по вашим описаниям, но работает.
class EventSchema extends AbstractDeserializationSchema[Event] { val mapper = new ObjectMapper() override def deserialize(bytes: Array[Byte]): Event = mapper.readValue(bytes, classOf[Event]) }@bupt_ljy, можете ли вы разместить свой код где-нибудь еще, чтобы я мог проверить, какая часть отличается?

Согласно комментариям, я думаю, вам следует вернуть экземпляры подтипа вместо Event. Вот мой пример кода для вас:
val event = mapper.readValue(bytes, classOf[Event])
event.getType match {
case "position" => mapper.readValue(bytes, classOf[Position])
case "recognition" => mapper.readValue(bytes, classOf[Recognized])
case _ =>
}
Я успешно попробовал пример из тестового примера в CEPITCase.java.
DataStream<Event> input = env.fromElements(
new Event(1, "foo", 4.0),
new SubEvent(2, "foo", 4.0, 1.0),
new SubEvent(3, "foo", 4.0, 1.0),
new SubEvent(4, "foo", 4.0, 1.0),
new Event(5, "middle", 5.0)
);
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").subtype(SubEvent.class)
.followedByAny("middle").subtype(SubEvent.class)
.followedByAny("end").subtype(SubEvent.class);
Ты гений! Только одна вещь, которая не сработала в вашем примере ... Когда я использую asInstanceOf, возникает исключение, что я не могу преобразовать событие в позицию ... Из-за этого я обменялся этим с mapper.readValue(bytes, classOf[Position]) и просто повторно десериализовал его. Если бы вы могли отредактировать это в своем сообщении, я приму это как ответ! Еще раз спасибо!
@DanielEisenreich Это то изменение, которое вам нужно?
Может быть, элементы, которые вы передаете в шаблон, являются событиями?