Сложная агрегация

У меня есть данные в теме, которые необходимо учитывать на нескольких уровнях, и во всем коде и статьях упоминается только пример подсчета слов.

Примером данных может быть:

серийный номер: 123 страна: мы дата: 05.01.2018 штат: Нью-Йорк город: нью-йорк посетителей: 5

серийный номер: 123 страна: мы дата: 06.01.2018 штат: Нью-Йорк город: Квинс посетителей: 10

серийный номер: 456 дата: 06.01.2018 страна: мы штат: Нью-Йорк город: Квинс посетителей: 27

серийный номер: 123 дата: 06.01.2018 страна: мы штат: Нью-Йорк город: нью-йорк посетителей: 867

Я сделал фильтр, groupBy, но совокупность? Извините за Java 8 и & mix, я предпочитаю 8, но в то же время изучаю его.

KTable<String, CountryVisitorModel> countryStream1 = inStream
    .filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
    .groupBy((key, value) -> value.serial)
    .aggregate(
            new Initializer<CountryVisitorModel>() {

            public CountryVisitorModelapply() {
                return new CountryVisitorModel();
            }
        },
        new Aggregator<String, InputModel, CountryVisitorModel>() {

            @Override
            public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {

    aggregate.serial = value.serial;
    aggregate.country_name = value.country_name;
    aggregate.city_name = value.city_name;

    aggregate.country_count++;
    aggregate.city_count++;
    aggregate.ip_count++;

        //
    return aggregate;
       }
},
Materialized.with(stringSerde, visitorSerde));

Для всех одинаковых serial_id (это будет группа по) посчитайте общее количество посетителей по этому:

серия страна штат город total_num_visitors

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
341
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Если каждая запись способствует ровно одному подсчету, я бы рекомендовал branch() поток и подсчет для каждого подпотока:

KStream stream = builder.stream(...)
KStream[] subStreams = stream.branch(...);

// each record of `stream` will be contained in exactly _one_ `substream`
subStream[0].grouByKey().count(); // or aggregate() instead of count()
subStream[1].grouByKey().count();
// ...

Если ветвление не работает, потому что одна запись должна пройти несколько подсчетов, вы можете «транслировать» и фильтровать:

KStream stream = builder.stream(...)

// each record in `stream` will be "duplicated" and sent to all `filters`
stream.filter(...).grouByKey().count(); // or aggregate() instead of count()
stream.filter(...).grouByKey().count();
// ...

Несколько раз использовать один и тот же объект KStream и применять несколько операторов (в нашем случае filter() каждая запись будет «транслироваться» всем операторам). Обратите внимание, что запись (т. е. объекты) в этом случае физически не копируется, но для вызова каждого filter() используется один и тот же объект входной записи.

Другие вопросы по теме