Я хочу ReKey GlobalKTable (возможно, при его инициализации, поскольку я считаю, что они читаются только после создания).
Это возможно?
У меня есть две темы, с которыми я работаю в приложении Spring/Java Kafka Streams. Первый не уплотнен, второй есть. Оба используют Avro для своих ключей и значений.
Приложение передает записи из первой (несжатой) темы и прикрепляет дополнительные данные из сжатой темы через KStream#leftJoin. Сжатая тема была перенесена в приложение как GlobalKTable, создана с помощью StreamsBuilder#globalTable() и должна оставаться такой (мне нужна каждая запись из всех разделов темы, доступных в каждом экземпляре приложения).
Я знаю, что говорят о поддержке соединений без первичного ключа (https://issues.apache.org/jira/browse/KAFKA-3705), но, насколько мне известно, я пока не могу этого сделать...
@Configuration
@EnableKafkaStreams
public class StreamsConfig {
@Autowired
private MyCustomSerdes serdes;
@Bean
public KStream<AvroKeyOne, AvroValueOne> reKeyJoin(StreamsBuilder streamsBuilder) {
GlobalKTable<AvroKeyOne, AvroValueOne> globalTable = streamsBuilder.globalTable("topicOne", Consumed.with(
serdes.getAvroKeyOne()
serdes.getAvroValueOne()
));
KStream<AvroKeyTwo, AvroValueOne> kStream = streamsBuilder.stream("topicTwo", Consumed.with(
serdes.getAvroKeyTwo(),
serdes.getAvroValueOne()
));
kStream.join(
globalTable,
/**
* the KeyValueMapper. I need to rekey the Global table as well to the
* corresponding String (which it's data will have) if I want this join
* to return results
*/
(streamKey, streamValue) -> {return streamKey.getNewStringKey()},
(/**ValueJoiner Deal**/)
);
}
}




I want to ReKey a GlobalKTable (probably while initializing it, as I believe they are read only once created).
Is this possible?
Прямой поддержки этому сегодня нет. Вы уже упоминали о предстоящей работе, такой как добавление поддержка глобальных таблиц для соединений без первичного ключа, но это пока недоступно.
Что вы могли бы сделать сегодня: вы можете повторно ввести (переразбить) исходную тему Kafka в новую тему, а затем прочитать тему с измененным ключом в свой глобальный KTable. Возможно, это вариант для вас.
Повторное разбиение может быть выполнено с помощью обычного KStream/KTable, что приведет к созданию новой перераспределенной темы Kafka. Здесь не имеет значения, может ли экземпляр вашего приложения видеть только «свои» ключи — если только для самого переразметки не требуется глобальная информация (это будет редкий случай). Затем прочитайте раздел с переразметкой в GlobalKTable, что делает все данные доступными для каждого экземпляра в вашем приложении.
Изменение исходной темы - это мысль. Хотя каждый экземпляр приложения будет потреблять и переустанавливать только записи в разделе, которому он назначен (во всяком случае, я так понимаю, я ошибаюсь в этом?). Проблема здесь в том, что мне нужны все данные из темы (по всем разделам) в каждом экземпляре приложения, а не только данные в назначенном ему разделе, поэтому я использую
GlobalKTable