Python Producer может отправлять через оболочку, но не .py

У меня есть работающий и протестированный кластер Kafka, и я пытаюсь использовать скрипт Python для отправки сообщений брокерам. Это работает, когда я использую оболочку Python3 и вызываю метод производителя, однако, когда я помещаю те же самые команды в файл python и выполняю его - кажется, что скрипт зависает.

Я использую библиотеку kafka-python для потребителя и производителя. Когда я использую оболочку Python3, я вижу, что сообщения появляются в теме с помощью инструмента Kafka GUI 2.0.4. Я пробовал различные циклы и операторы в коде Python, но, похоже, ничто не заставляет его «запускаться» до конца.

>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>

И это работает, и байты появляются в данных топика брокера.

Когда я помещаю тот же код, что и выше, в файл Python .py и выполняю его с помощью Python3, он завершается, но никакие данные не отправляются брокеру Kafka. Ошибка тоже не показывается.

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
1 465
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Как видите, он возвращает будущее.

Клиенты Kafka будут группировать записи, они не сразу отправляют одну запись за раз, и чтобы это сделать, вам нужно будет подождать или сбросить буфер производителя, чтобы он был отправлен до выхода из приложения. Другими словами, интерактивный терминал хранит данные производителя в памяти, работая в фоновом режиме, и, наоборот, отбрасывает эти данные.

Как документы, показать

future = producer.send(...)

try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

Или просто поставьте producer.flush(), если вас не волнуют метаданные или захват будущего.

Другие вопросы по теме