Потоки kafka - как установить новый ключ для KTable

Я новичок в Kafka Streams, использую версию 1.0.0. Я хотел бы установить новый ключ для KTable из одного из значений.

При использовании KStream это можно сделать с помощью метода selectKey () следующим образом.

kstream.selectKey ((k,v) -> v.newKey)

Однако такой метод отсутствует в KTable. Единственный способ - преобразовать данный KTable в KStream. Есть мысли по этому поводу? Это изменение ключа против дизайна KTable?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
7
0
5 871
5
Перейти к ответу Данный вопрос помечен как решенный

Ответы 5

Ответ принят как подходящий

Если вы хотите установить новый ключ, вам нужно перегруппировать KTable:

KTable newTable = table.groupBy(/*put select key function here*/)
                       .aggregate(...);

Поскольку ключ должен быть уникальным для KTable (в отличие от KStream), необходимо указать функцию агрегирования, которая объединяет все записи с одним и тем же (новым) ключом в одно значение.

вы можете проверить свой ответ, пожалуйста? Я не знаю аргументов в пользу дизайна Kafka Streams API, но это звучит логично, исходя из способа распараллеливания Kafka Streams.

yuranos 31.03.2021 00:50

То, что вы говорите, правильно. Однако обратите внимание, что вы можете «пропустить» этап агрегации, просто применив агрегатор (k,v,a) -> v, который просто слепо выбирает «последнее» значение - это «последнее» значение будет в порядке смещения темы повторного разбиения (что groupBy() подразумевает ), но, конечно, порядок будет "недетерминированным" по той причине, что вы выложили ...

Matthias J. Sax 31.03.2021 03:33

Некоторые другие предлагаемые ответы, к сожалению, следуют этому шаблону, но на самом деле он сломан ... (то есть недетерминированный ...)

Matthias J. Sax 31.03.2021 03:37

@ Ответ Матиаса привел меня по правильному пути, но я подумал, что здесь может помочь образец кода

final KTable<String, User> usersKeyedByApplicationIDKTable = usersKTable.groupBy(
        // First, going to set the new key to the user's application id
        (userId, user) -> KeyValue.pair(user.getApplicationID().toString(), user)
).aggregate(
        // Initiate the aggregate value
        () -> null,
        // adder (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user,
        // subtractor (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user
);

Документация KGroupedTable aggregate (): https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate-org.apache.kafka.streams.kstream.Initializer-org.apache.kafka.streams. kstream.Aggregator-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Materialized-

при вычитании не следует ли возвращать нуль?

Tudor 31.03.2020 10:23

Программа, которую вы пишете, является недетерминированной ... Та же проблема относится к подходу @Jackson Oliveira: если у вас есть две восходящие записи, которые сопоставляются с одним и тем же новым ключом, вы не знаете, какая из них окажется в таблице ...

Matthias J. Sax 31.03.2021 03:36

Ответ @yuaranos очень хорошо объясняет, почему ..

Matthias J. Sax 31.03.2021 03:36

Код @Allen Underwood мне помог, пришлось внести некоторые изменения, если ключом является пользовательский Pojo. Поскольку я получал исключение приведения класса. Ниже код работал

usersKTable.groupBy((k, v) -> KeyValue.pair(v.getCompositeKey(), v),Grouped.with(compositeKeySerde,valueSerde))
                .aggregate(
                        () -> null,
                        (applicationId, value, aggValue) -> value,
                        (applicationId, value, aggValue) -> value,
                        Materialized.with(compositeKeySerde, valueSerde)
                );

Нашел другой простой способ, но не уверен, что он эффективен. Преобразуйте таблицу в поток, а затем с помощью клавиши выбора измените ключ. Переместите этот поток в новую тему, а затем позвольте таблице читать из новой темы.

Sumeet 05.02.2020 11:56

Программа, которую вы пишете, не является детерминированной ... Та же проблема относится к подходу @Jackson Oliveira: если у вас есть две восходящие записи, которые сопоставляются с одним и тем же новым ключом, вы не знаете, какая из них окажется в таблице ....

Matthias J. Sax 31.03.2021 03:35

Ответ @yuaranos очень хорошо объясняет, почему ...

Matthias J. Sax 31.03.2021 03:35

Для тех, кто использует confluent 5.5. +, Есть метод, позволяющий извлечь ключ из потока и напрямую преобразовать его в KTable:

       KTable<String, User> userTable = builder
            .stream("topic_name", Consumed.with(userIdSerde, userSerde))
            .selectKey((key, value) -> key.getUserId())             
            .toTable( Materialized.with(stringIdSerde, userSerde));

Подробности можно узнать здесь

Что ж, пока это «работает», нужно учитывать одну важную вещь. Если во входной таблице есть две строки, которые соответствуют одному и тому же новому ключу, нет гарантии, в каком порядке обе записи появятся в таблице результатов. Таким образом, это может быть недетерминированная программа.

Matthias J. Sax 26.09.2020 00:33

Я не думаю, что описание @Matthias достаточно точным / подробным. Это правильно, но основная причина такого ограничения (существует и для синтаксиса ksqlDBCREATE TABLE) выходит за рамки простого факта, что ключи должны быть уникальными для KTable.

Уникальность сама по себе не ограничивает KTables. В конце концов, любая основная тема может содержать и часто содержит сообщения с одинаковыми ключами. У KTable с этим проблем нет. Он просто применяет последнее состояние для каждого ключа. Это имеет множество последствий, включая тот факт, что KTable, созданный из агрегированной функции, может создавать несколько сообщений в своей выходной теме на основе одного входного сообщения ... Но давайте вернемся к вашему вопросу.

Итак, KTable необходимо знать, какое сообщение для определенного ключа является последним сообщением, то есть это последнее состояние ключа.

Какие гарантии заказа есть у Kafka? Правильно, по разделам.

Что происходит, когда сообщения изменяются? Верно, они будут распределены по разделам, очень отличным от входного сообщения.

Итак, исходные сообщения с одним и тем же ключом были правильно сохранены самим брокером в том же разделе (если вы не сделали ничего необычного / глупого с вашим пользовательским Partitioner) Таким образом, KTable всегда может определить последнее состояние.

Но что произойдет, если сообщения перепрограммированы внутри приложения Kafka Streams на лету?

Они снова будут распространяться по разделам, но теперь с другим ключом, и если ваше приложение масштабировано и у вас есть несколько задач, работающих параллельно, вы просто не можете гарантировать, что последнее сообщение по новому ключу на самом деле является последним сообщением, поскольку оно было сохранено в исходной теме. У отдельных задач нет такой координации. А они не могут. В противном случае это не будет эффективно.

В результате KTable потеряет свою основную семантику, если такое изменение ключей было разрешено.

Другие вопросы по теме