KStreams: Как узнать (исходную) тему записи?

у меня есть следующее

//Config setup
Properties props = ...; //setup

List<String> topicList = Arrays.asList({"A", "B", "C"});

StreamBuilder builder = new StreamBuilder();
KStream<String, String> source = builder.stream(topicList);

source
  .map((k,v) -> {

    //How can i get the topic of the record here

  })
  .to((k,v,r) -> {//busy code for topic routing});

new KafkaStream(builder.build(), properties).start();
docs.confluent.io/current/streams/…
Michael G. Noll 02.06.2019 15:31
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
1
270
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы можете получить нужное вам название темы, используя ProcessorContext.topic(). Чтобы получить доступ к ProcessorContext, используйте KStream.process(), предоставив ему соответствующую реализацию Процессор.

Также вы можете использовать KStream.transform():

KStream<InputKeyType, InputValueType> stream2 = stream.transform(new TransformerSupplier<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {
            @Override
            public Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>> get() {
                return new Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {
                    private ProcessorContext context;

                    @Override
                    public void init(ProcessorContext context) {
                        this.context = context;
                    }

                    @Override
                    public KeyValue<OutputKeyType, OutputValueType> transform(InputKeyType key, InputValueType value) {

                        this.context.topic() // topic name you need
                        // logic here
                        return new KeyValue<>(OutputKeyType key, OutputValueType value);

                    }

                    @Override
                    public void close() {

                    }
                };
            }
        });

Другие вопросы по теме