Я пытаюсь использовать FLINK-CEP для измерения времени, которое требуется ставке на рынке от BidState.OPEN
до BidState.Closed
. Я получаю поток данных ставок с идентификаторами и состояниями, и в его нынешнем виде я сопоставляю все «открытые» заявки со всеми «закрытыми» заявками.
У меня есть условие в patternStream.process
, которое позволяет спаривать только открывающие и закрывающие ставки с одним и тем же идентификатором, как и должно быть. Однако это кажется неправильным, так как количество совпадений растет невероятно быстро, и у меня есть ощущение, что это можно сделать с помощью шаблонов. Итак, есть ли способ убедиться, что и «начальный», и «конечный» объекты имеют одинаковый идентификатор?
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
//Is it possible to make sure that start.BidID == end.BidID in the pattern?
Pattern<BidEvent, ?> pattern = Pattern.<BidEvent>begin("start", skipStrategy).where(
new SimpleCondition<BidEvent>() {
@Override
public boolean filter(BidEvent value) {
return value.getState() == BidState.OPENED;
}
}).followedByAny("end").where(
new SimpleCondition<BidEvent>() {
@Override
public boolean filter(BidEvent value) throws Exception {
return value.getState() == BidState.CLOSED; // && value.getBidID == start.getBidID?
}
}).within(timeout);
PatternStream<BidEvent> patternStream = CEP.pattern(BidEventDataStream, pattern);
patternStream.process(new PatternProcessFunction<BidEvent, MatchingDuration>() {
@Override
public void processMatch(Map<String
, List<BidEvent>> map
, Context context
, Collector<MatchingDuration> collector) {
BidEvent start = map.get("start").get(0);
BidEvent end = map.get("end").get(0);
if (start.getBidId() == end.getBidId()){ // Make sure opening and closing bid is the same. Can this be done in the pattern?
collector.collect(new MatchingDuration(start.getBidId(), (end.getTimestamp() - start.getTimestamp())));
}
}
}).addSink(matchingDurationSinkFunction);
Я понял, как получить желаемое поведение: BidEventDataStream
должен быть зашифрован для сопоставления с образцом для объектов с одним и тем же ключом. Никаких изменений в коде вопроса не требуется, однако BidEventDataStream
необходимо отредактировать, чтобы получить BidEvent.getBidId()
:
BidEventDataStream.keyBy(new KeySelector<BidEvent, Long>() {
@Override
public Long getKey(BidEventvalue) {
return value.getBidId();
}
})