У меня есть данные в теме, которые необходимо учитывать на нескольких уровнях, и во всем коде и статьях упоминается только пример подсчета слов.
Примером данных может быть:
серийный номер: 123 страна: мы дата: 05.01.2018 штат: Нью-Йорк город: нью-йорк посетителей: 5
серийный номер: 123 страна: мы дата: 06.01.2018 штат: Нью-Йорк город: Квинс посетителей: 10
серийный номер: 456 дата: 06.01.2018 страна: мы штат: Нью-Йорк город: Квинс посетителей: 27
серийный номер: 123 дата: 06.01.2018 страна: мы штат: Нью-Йорк город: нью-йорк посетителей: 867
Я сделал фильтр, groupBy, но совокупность? Извините за Java 8 и & mix, я предпочитаю 8, но в то же время изучаю его.
KTable<String, CountryVisitorModel> countryStream1 = inStream
.filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
.groupBy((key, value) -> value.serial)
.aggregate(
new Initializer<CountryVisitorModel>() {
public CountryVisitorModelapply() {
return new CountryVisitorModel();
}
},
new Aggregator<String, InputModel, CountryVisitorModel>() {
@Override
public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {
aggregate.serial = value.serial;
aggregate.country_name = value.country_name;
aggregate.city_name = value.city_name;
aggregate.country_count++;
aggregate.city_count++;
aggregate.ip_count++;
//
return aggregate;
}
},
Materialized.with(stringSerde, visitorSerde));
Для всех одинаковых serial_id (это будет группа по) посчитайте общее количество посетителей по этому:
серия страна штат город total_num_visitors




Если каждая запись способствует ровно одному подсчету, я бы рекомендовал branch() поток и подсчет для каждого подпотока:
KStream stream = builder.stream(...)
KStream[] subStreams = stream.branch(...);
// each record of `stream` will be contained in exactly _one_ `substream`
subStream[0].grouByKey().count(); // or aggregate() instead of count()
subStream[1].grouByKey().count();
// ...
Если ветвление не работает, потому что одна запись должна пройти несколько подсчетов, вы можете «транслировать» и фильтровать:
KStream stream = builder.stream(...)
// each record in `stream` will be "duplicated" and sent to all `filters`
stream.filter(...).grouByKey().count(); // or aggregate() instead of count()
stream.filter(...).grouByKey().count();
// ...
Несколько раз использовать один и тот же объект KStream и применять несколько операторов (в нашем случае filter() каждая запись будет «транслироваться» всем операторам). Обратите внимание, что запись (т. е. объекты) в этом случае физически не копируется, но для вызова каждого filter() используется один и тот же объект входной записи.