Я новичок в клиенте kafka на python, мне нужна помощь, чтобы описать темы с использованием клиента.
Я смог перечислить все свои темы кафки, используя следующий код: -
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['kafka1'])
topicList = consumer.topics()
Интересно, что для Java эта функциональность (describeTopics()
) находится в пределах KafkaAdminCLient.java.
Итак, я пытался найти эквивалент того же самого на Python и обнаружил файл репозиторий кода kafka-python.
Документация (встроенные комментарии) в эквиваленте admin-client в пакете kafka-python говорит следующее:
describe topics functionality is in ClusterMetadata Note: if implemented here, send the request to the controller
Затем я переключился на файл cluster.py в том же репозитории. Он содержит функцию topics()
, которую вы использовали для получения списка тем, и следующие две функции, которые могут помочь вам реализовать функциональность describe
:
partitions_for_topic()
- Возвращает набор всех разделов для темы (независимо от того, доступны они или нет)available_partitions_for_topic()
- Вернуть набор разделов с известными лидерамиПримечание: Я не пробовал это сам, поэтому я не совсем уверен, будет ли поведение идентично тому, что вы увидите в результате для команды kafka-topics --describe ...
, но попробовать стоит.
Надеюсь, это поможет!
Хорошо. Справедливо. Я попытаюсь найти что-нибудь, но похоже, что библиотеки Python немного отличаются и не совсем аналогичны.
После обращения к нескольким статьям и примерам кода я смог сделать это с помощью description_configs, используя confluent_kafka.
Ссылка 1 [Сливающийся кафка-питон] Ссылка 2 Git-образец
Ниже мой пример кода!!
from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource
import confluent_kafka
import concurrent.futures
#Creation of config
conf = {'bootstrap.servers': 'kafka1','session.timeout.ms': 6000}
adminClient = AdminClient(conf)
topic_configResource = adminClient.describe_configs([ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, "myTopic")])
for j in concurrent.futures.as_completed(iter(topic_configResource.values())):
config_response = j.result(timeout=1)
Я нашел, как это сделать с помощью kafka-python:
from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
KAFKA_URL = "localhost:9092" # kafka broker
KAFKA_TOPIC = "test" # topic name
admin_client = KafkaAdminClient(bootstrap_servers=[KAFKA_URL])
configs = admin_client.describe_configs(config_resources=[ConfigResource(ConfigResourceType.TOPIC, KAFKA_TOPIC)])
config_list = configs.resources[0][4]
В config_list (список кортежей) у вас есть все конфиги по теме.
Я не могу использовать ConfigResourceType, есть идеи?
В Kafka-Python 2.0 конфиги представляют собой список DescribeConfigsResponse_v2, поэтому вам нужно: configs[0].resources[0][4]
См.: https://docs.confluent.io/current/clients/confluent-kafka-python/
kafka.admin.TopicMetadata.partitions предоставляет: confluent_kafka.admin.PartitionMetadata (идентификатор раздела, лидер, реплики, isrs)
from confluent_kafka.admin import AdminClient
kafka_admin = AdminClient({"bootstrap.servers": bootstrap_servers})
for topic in topics:
x = kafka_admin.list_topics(topic=topic)
print x.topics, '\n'
for key, value in x.topics.items():
for keyy, valuey in value.partitions.items():
print keyy, ' Partition id : ', valuey, 'leader : ', valuey.leader,' replica: ', valuey.replicas
Спасибо за объяснение Лалит. Я посмотрел на доступные методы не повезло. На самом деле я искал период хранения для каждой темы, который мы получаем через kafka-topics.sh --describe.