У меня есть работающий и протестированный кластер Kafka, и я пытаюсь использовать скрипт Python для отправки сообщений брокерам. Это работает, когда я использую оболочку Python3 и вызываю метод производителя, однако, когда я помещаю те же самые команды в файл python и выполняю его - кажется, что скрипт зависает.
Я использую библиотеку kafka-python для потребителя и производителя. Когда я использую оболочку Python3, я вижу, что сообщения появляются в теме с помощью инструмента Kafka GUI 2.0.4. Я пробовал различные циклы и операторы в коде Python, но, похоже, ничто не заставляет его «запускаться» до конца.
>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>
И это работает, и байты появляются в данных топика брокера.
Когда я помещаю тот же код, что и выше, в файл Python .py и выполняю его с помощью Python3, он завершается, но никакие данные не отправляются брокеру Kafka. Ошибка тоже не показывается.
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')
Как видите, он возвращает будущее.
Клиенты Kafka будут группировать записи, они не сразу отправляют одну запись за раз, и чтобы это сделать, вам нужно будет подождать или сбросить буфер производителя, чтобы он был отправлен до выхода из приложения. Другими словами, интерактивный терминал хранит данные производителя в памяти, работая в фоновом режиме, и, наоборот, отбрасывает эти данные.
future = producer.send(...)
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
Или просто поставьте producer.flush()
, если вас не волнуют метаданные или захват будущего.