Apache Kafka получить список потребителей по определенной теме

Поскольку это может быть гость из названия, есть ли способ получить список потребителей по определенной теме в java? До сих пор я могу получить список таких тем

    final ListTopicsResult listTopicsResult = adminClient.listTopics();
    KafkaFuture<Set<String>> kafkaFuture = listTopicsResult.names();
    Set<String> map = kafkaFuture.get();

но я не нашел способ получить список потребителей по каждой теме

stackoverflow.com/questions/32697999/kafka-consumer-list может подойти вам?
Stefan 01.05.2019 16:29

@Stefan, спасибо за ответ, я уже видел это, но ничего полезного, нет информации о потребителях по теме

Ares91 01.05.2019 16:45
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
3
2
6 501
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Потребители не привязаны к теме. Они привязаны к группам потребителей

Чтобы получить всех потребителей темы, вы должны сначала перечислить все группы, а затем отфильтровать тему в каждой группе.

Который будет начинаться с AdminClient.listConsumerGroups, за которым следует AdminClient.describeConsumerGroups

Это дает вам список описаний, содержащих «членов», где вы можете найти темы

https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/admin/ConsumerGroupDescription.html

Примечание. Существуют внешние инструменты, которые значительно упрощают эту задачу, например Hortonworks SMM https://docs.hortonworks.com/HDPDocuments/SMM/SMM-1.2.0/monitoring-kafka-clusters/content/smm-monitoring-consumers.html.

Ответ принят как подходящий

Недавно я решал ту же проблему для своего клиентского инструмента kafka. Это непросто, но единственный способ, который я нашел из кода, следующий:

Properties props = ...//here you put your properties
AdminClient kafkaClient = AdminClient.create(props);

//Here you get all the consumer groups
List<String> groupIds = kafkaClient.listConsumerGroups().all().get().
                       stream().map(s -> s.groupId()).collect(Collectors.toList()); 

//Here you get all the descriptions for the groups
Map<String, ConsumerGroupDescription> groups = kafkaClient.
                                               describeConsumerGroups(groupIds).all().get();
for (final String groupId : groupIds) {
    ConsumerGroupDescription descr = groups.get(groupId);
    //find if any description is connected to the topic with topicName
    Optional<TopicPartition> tp = descr.members().stream().
                                  map(s -> s.assignment().topicPartitions()).
                                  flatMap(coll -> coll.stream()).
                                  filter(s -> s.topic().equals(topicName)).findAny();
            if (tp.isPresent()) {
                //you found the consumer, so collect the group id somewhere
            }
} 

Этот API доступен с версии 2.0. Вероятно, есть лучший способ, но я не смог его найти. Вы также можете найти код на моем битбакет

Я знаю, что эта тема немного устарела, но в настоящее время я работаю над информационной панелью, в которой необходимо перечислить темы с соответствующими группами потребителей. Ответ Катя в порядке, но он по-прежнему не будет отображать TopicPartitions для групп потребителей без активных участников. Я наткнулся на это решение, надеюсь, оно поможет:

Properties props = ...//here you put your properties
AdminClient kafkaClient = AdminClient.create(props);

Map<TopicPartition, OffsetAndMetadata> offsets = kafkaClient
    .listConsumerGroupOffsets("consumer_group_name")
    .partitionsToOffsetAndMetadata()
    .get();

TopicPartition на карте смещений может быть проанализирован для конкретной темы. Этот список можно повторить для всего списка groupId из ответа Катя.

Другие вопросы по теме