Как описать тему с помощью клиента kafka в Python

Я новичок в клиенте kafka на python, мне нужна помощь, чтобы описать темы с использованием клиента.

Я смог перечислить все свои темы кафки, используя следующий код: -

consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['kafka1'])
topicList = consumer.topics()
Анализ настроения постов в Twitter с помощью Python, Tweepy и Flair
Анализ настроения постов в Twitter с помощью Python, Tweepy и Flair
Анализ настроения текстовых сообщений может быть настолько сложным или простым, насколько вы его сделаете. Как и в любом ML-проекте, вы можете выбрать...
7 лайфхаков для начинающих Python-программистов
7 лайфхаков для начинающих Python-программистов
В этой статье мы расскажем о хитростях и советах по Python, которые должны быть известны разработчику Python.
Установка Apache Cassandra на Mac OS
Установка Apache Cassandra на Mac OS
Это краткое руководство по установке Apache Cassandra.
Сертификатная программа "Кванты Python": Бэктестер ансамблевых методов на основе ООП
Сертификатная программа "Кванты Python": Бэктестер ансамблевых методов на основе ООП
В одном из недавних постов я рассказал о том, как я использую навыки количественных исследований, которые я совершенствую в рамках программы TPQ...
Создание персонального файлового хранилища
Создание персонального файлового хранилища
Вы когда-нибудь хотели поделиться с кем-то файлом, но он содержал конфиденциальную информацию? Многие думают, что электронная почта безопасна, но это...
Создание приборной панели для анализа данных на GCP - часть I
Создание приборной панели для анализа данных на GCP - часть I
Недавно я столкнулся с интересной бизнес-задачей - визуализацией сбоев в цепочке поставок лекарств, которую могут просматривать врачи и...
3
0
4 174
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Интересно, что для Java эта функциональность (describeTopics()) находится в пределах KafkaAdminCLient.java.

Итак, я пытался найти эквивалент того же самого на Python и обнаружил файл репозиторий кода kafka-python.

Документация (встроенные комментарии) в эквиваленте admin-client в пакете kafka-python говорит следующее:

describe topics functionality is in ClusterMetadata
Note: if implemented here, send the request to the controller

Затем я переключился на файл cluster.py в том же репозитории. Он содержит функцию topics(), которую вы использовали для получения списка тем, и следующие две функции, которые могут помочь вам реализовать функциональность describe:

  1. partitions_for_topic() - Возвращает набор всех разделов для темы (независимо от того, доступны они или нет)
  2. available_partitions_for_topic() - Вернуть набор разделов с известными лидерами

Примечание: Я не пробовал это сам, поэтому я не совсем уверен, будет ли поведение идентично тому, что вы увидите в результате для команды kafka-topics --describe ..., но попробовать стоит.

Надеюсь, это поможет!

Спасибо за объяснение Лалит. Я посмотрел на доступные методы не повезло. На самом деле я искал период хранения для каждой темы, который мы получаем через kafka-topics.sh --describe.

Ashwin 23.05.2019 15:06

Хорошо. Справедливо. Я попытаюсь найти что-нибудь, но похоже, что библиотеки Python немного отличаются и не совсем аналогичны.

Lalit 23.05.2019 18:09
Ответ принят как подходящий

После обращения к нескольким статьям и примерам кода я смог сделать это с помощью description_configs, используя confluent_kafka.

Ссылка 1 [Сливающийся кафка-питон] Ссылка 2 Git-образец

Ниже мой пример кода!!

from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource
import confluent_kafka
import concurrent.futures

#Creation of config
conf = {'bootstrap.servers': 'kafka1','session.timeout.ms': 6000}
adminClient = AdminClient(conf)
topic_configResource = adminClient.describe_configs([ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, "myTopic")])
    for j in concurrent.futures.as_completed(iter(topic_configResource.values())):
        config_response = j.result(timeout=1)

Я нашел, как это сделать с помощью kafka-python:

from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
KAFKA_URL = "localhost:9092" # kafka broker
KAFKA_TOPIC = "test" # topic name

admin_client = KafkaAdminClient(bootstrap_servers=[KAFKA_URL])
configs = admin_client.describe_configs(config_resources=[ConfigResource(ConfigResourceType.TOPIC, KAFKA_TOPIC)])
config_list = configs.resources[0][4]

В config_list (список кортежей) у вас есть все конфиги по теме.

Я не могу использовать ConfigResourceType, есть идеи?

mealesbia 25.08.2020 13:10

В Kafka-Python 2.0 конфиги представляют собой список DescribeConfigsResponse_v2, поэтому вам нужно: configs[0].resources[0][4]

Pickled 18.10.2021 15:19

См.: https://docs.confluent.io/current/clients/confluent-kafka-python/

  1. list_topics предоставляет confluent_kafka.admin.TopicMetadata (тема, перегородки)
  2. kafka.admin.TopicMetadata.partitions предоставляет: confluent_kafka.admin.PartitionMetadata (идентификатор раздела, лидер, реплики, isrs)

    from confluent_kafka.admin import AdminClient
    kafka_admin = AdminClient({"bootstrap.servers": bootstrap_servers})    
    for topic in topics:    
        x = kafka_admin.list_topics(topic=topic)    
        print x.topics, '\n'    
        for key, value in x.topics.items():    
            for keyy, valuey in value.partitions.items():    
                print keyy, ' Partition id : ', valuey, 'leader : ', valuey.leader,' replica: ', valuey.replicas
    

Другие вопросы по теме