Я новичок в Python и только начинаю работать с Kafka. Я использую библиотеку питон-кафка для связи с Kafka. Теперь у меня есть требование, что мне нужно создавать темы динамически, однако, если они существуют, мне не нужно их создавать.
Из чтения документов я вижу, что могу использовать KafkaAdminClient для создания и удаления тем, однако я не нахожу ни одной, чтобы проверить, существует ли тема.
KafkaAdminClient не предоставляет метод для составления списка тем, но вы можете получить список существующих тем, просто запросив метаданные кластера из KafkaКлиент.
Например, это напечатает все темы в кластере:
from kafka.client import KafkaClient
client = KafkaClient(bootstrap_servers='localhost:9092')
future = client.cluster.request_update()
client.poll(future=future)
metadata = client.cluster
print(metadata.topics())
Для более новых версий используйте
from kafka import KafkaClien
, так как структура проекта изменена.