FLINK CEP (Java 8) - постоянная «идентификация» посредством сопоставления с шаблоном

Я пытаюсь использовать FLINK-CEP для измерения времени, которое требуется ставке на рынке от BidState.OPEN до BidState.Closed. Я получаю поток данных ставок с идентификаторами и состояниями, и в его нынешнем виде я сопоставляю все «открытые» заявки со всеми «закрытыми» заявками.

У меня есть условие в patternStream.process, которое позволяет спаривать только открывающие и закрывающие ставки с одним и тем же идентификатором, как и должно быть. Однако это кажется неправильным, так как количество совпадений растет невероятно быстро, и у меня есть ощущение, что это можно сделать с помощью шаблонов. Итак, есть ли способ убедиться, что и «начальный», и «конечный» объекты имеют одинаковый идентификатор?

AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
//Is it possible to make sure that start.BidID == end.BidID in the pattern?
Pattern<BidEvent, ?> pattern = Pattern.<BidEvent>begin("start", skipStrategy).where(
        new SimpleCondition<BidEvent>() {
            @Override
            public boolean filter(BidEvent value) {
                return value.getState() == BidState.OPENED;
            }
        }).followedByAny("end").where(
        new SimpleCondition<BidEvent>() {
            @Override
            public boolean filter(BidEvent value) throws Exception {
                return value.getState() == BidState.CLOSED; // && value.getBidID == start.getBidID?
            }
        }).within(timeout);

PatternStream<BidEvent> patternStream = CEP.pattern(BidEventDataStream, pattern);

patternStream.process(new PatternProcessFunction<BidEvent, MatchingDuration>() {
    @Override
    public void processMatch(Map<String
            , List<BidEvent>> map
            , Context context
            , Collector<MatchingDuration> collector) {

        BidEvent start = map.get("start").get(0);
        BidEvent end = map.get("end").get(0);
        if (start.getBidId() == end.getBidId()){ // Make sure opening and closing bid is the same. Can this be done in the pattern?
            collector.collect(new MatchingDuration(start.getBidId(), (end.getTimestamp() - start.getTimestamp())));
        }
    }
}).addSink(matchingDurationSinkFunction);
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
119
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я понял, как получить желаемое поведение: BidEventDataStreamдолжен быть зашифрован для сопоставления с образцом для объектов с одним и тем же ключом. Никаких изменений в коде вопроса не требуется, однако BidEventDataStream необходимо отредактировать, чтобы получить BidEvent.getBidId():

BidEventDataStream.keyBy(new KeySelector<BidEvent, Long>() {
                    @Override
                    public Long getKey(BidEventvalue) {
                        return value.getBidId();
                    }
                })

Другие вопросы по теме