Можно ли повторно использовать GlobalKTable?

Я хочу ReKey GlobalKTable (возможно, при его инициализации, поскольку я считаю, что они читаются только после создания).

Это возможно?

У меня есть две темы, с которыми я работаю в приложении Spring/Java Kafka Streams. Первый не уплотнен, второй есть. Оба используют Avro для своих ключей и значений.

Приложение передает записи из первой (несжатой) темы и прикрепляет дополнительные данные из сжатой темы через KStream#leftJoin. Сжатая тема была перенесена в приложение как GlobalKTable, создана с помощью StreamsBuilder#globalTable() и должна оставаться такой (мне нужна каждая запись из всех разделов темы, доступных в каждом экземпляре приложения).

Я знаю, что говорят о поддержке соединений без первичного ключа (https://issues.apache.org/jira/browse/KAFKA-3705), но, насколько мне известно, я пока не могу этого сделать...

@Configuration
@EnableKafkaStreams
public class StreamsConfig {

  @Autowired
  private MyCustomSerdes serdes;

  @Bean
  public KStream<AvroKeyOne, AvroValueOne> reKeyJoin(StreamsBuilder streamsBuilder) {

    GlobalKTable<AvroKeyOne, AvroValueOne> globalTable = streamsBuilder.globalTable("topicOne", Consumed.with(
      serdes.getAvroKeyOne()
      serdes.getAvroValueOne()
    ));

    KStream<AvroKeyTwo, AvroValueOne> kStream = streamsBuilder.stream("topicTwo", Consumed.with(
      serdes.getAvroKeyTwo(),
      serdes.getAvroValueOne()
    ));

    kStream.join(
      globalTable,
       /**
        * the KeyValueMapper. I need to rekey the Global table as well to the
        * corresponding String (which it's data will have) if I want this join
        * to return results
        */
      (streamKey, streamValue) -> {return streamKey.getNewStringKey()},
      (/**ValueJoiner Deal**/)
    );
  }

}
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
6
0
833
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

I want to ReKey a GlobalKTable (probably while initializing it, as I believe they are read only once created).

Is this possible?

Прямой поддержки этому сегодня нет. Вы уже упоминали о предстоящей работе, такой как добавление поддержка глобальных таблиц для соединений без первичного ключа, но это пока недоступно.

Что вы могли бы сделать сегодня: вы можете повторно ввести (переразбить) исходную тему Kafka в новую тему, а затем прочитать тему с измененным ключом в свой глобальный KTable. Возможно, это вариант для вас.

Изменение исходной темы - это мысль. Хотя каждый экземпляр приложения будет потреблять и переустанавливать только записи в разделе, которому он назначен (во всяком случае, я так понимаю, я ошибаюсь в этом?). Проблема здесь в том, что мне нужны все данные из темы (по всем разделам) в каждом экземпляре приложения, а не только данные в назначенном ему разделе, поэтому я использую GlobalKTable

Casey 18.04.2019 21:43

Повторное разбиение может быть выполнено с помощью обычного KStream/KTable, что приведет к созданию новой перераспределенной темы Kafka. Здесь не имеет значения, может ли экземпляр вашего приложения видеть только «свои» ключи — если только для самого переразметки не требуется глобальная информация (это будет редкий случай). Затем прочитайте раздел с переразметкой в ​​GlobalKTable, что делает все данные доступными для каждого экземпляра в вашем приложении.

Michael G. Noll 23.04.2019 13:47

Другие вопросы по теме