Брокер Kafka не получает сообщение от производителя Python

Здравствуйте, я использую следующую конфигурацию для Kafka, используя Docker Compose

compose_kafka.yml

version: '3'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.1.10
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  kafka_manager:
    image: kafkamanager/kafka-manager
    container_name: kafka-manager
    restart: always
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "192.168.1.10:2181"
      APPLICATION_SECRET: "random-secret"

Я создаю производителя, который отправляет сообщения на сервер Kafka.

Создать.py

from faker import Faker

fake = Faker()

class Registered_user:

    def get_registered_user():
        return {
            "name": fake.name(),
            "address": fake.address(),
            "created_at": fake.year()
        }

Producer_registered_user.py

import time
import json
from kafka import KafkaProducer
from fake_data import Generate

  
def json_serializer(data):
    return json.dumps(data).encode("utf-8")


producer = KafkaProducer(bootstrap_servers='192.168.1.10:9092',
                         value_serializer=json_serializer)


if __name__ == '__main__':
    while 1 == 1:
        user = Generate.Registered_user.get_registered_user()
        producer.send('registered_user', user)
        print(user)
        time.sleep(4)

Но потребитель не получает никаких сообщений:

Consumer_registered_user.py

import json
from kafka import KafkaConsumer

if __name__ == '__main__':
    consumer = KafkaConsumer(
        bootstrap_servers='192.168.1.10:9092',
        auto_offset_reset = "from-beginning",
        group_id = "consumer-group-a"
    )

    for message in consumer:
        print("User = {}".format(json.loads(message.value)))

Я также проверил, указана ли тема в темах:

и если Кафка получил сообщения:

Не могли бы вы помочь мне с этой проблемой?

Убедитесь, что ваша тема есть в списке — docker exec -it kafka kafka-topics.sh --list --bootstrap-server 192.168.1.10:9092

Jeevan ebi 07.08.2024 15:38

Спасибо за быстрый ответ! Я проверил (см. отредактированное сообщение), и они существуют.

Erik hoeven 07.08.2024 15:53

Используйте Kafka Console Consumer для проверки получения сообщений — docker exec -it kafka kafka-console-consumer.sh --bootstrap-server 192.168.1.10:9092 --topic registered_user --from-beginning

Jeevan ebi 07.08.2024 15:54

Спасибо, Дживан Эби, за быстрый ответ! В скорректированном сообщении я вижу, что сообщения отображаются. Но скрипт (также помещенный в пост) и CMAK не отображали сообщения (или количество сообщений). Как я могу это исправить?

Erik hoeven 07.08.2024 16:47
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
4
51
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Проверьте конфигурацию потребительского приложения,

Образец:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'topic_1', 'topic_2',
    bootstrap_servers='192.168.1.10:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group'
)

#Make sure the consumer is successfully subscribing to all required topics
print(consumer.subscription()) 

for message in consumer:
    print(f"Received from {message.topic}: {message.value.decode('utf-8')}")

Добавьте тему к вашему потребителю:

Consumer_registered_user.py

import json
from kafka import KafkaConsumer

if __name__ == '__main__':
    consumer = KafkaConsumer(
        'registered_user',  # Specify the topic you want to consume
        bootstrap_servers='192.168.1.10:9092',
        auto_offset_reset = "earliest",  # Correct value
        group_id = "consumer-group-a"
    )

    for message in consumer:
        print("User = {}".format(json.loads(message.value.decode('utf-8'))))

Проблемы:

  1. Отсутствует подписка на тему. Вам необходимо передать имя темы («registered_user») в качестве первого аргумента KafkaConsumer, чтобы гарантировать, что ваш потребитель подпишется на эту тему.

  2. Установка auto_offset_reset = "earliest" гарантирует, что при отсутствии зафиксированных смещений потребитель начнет читать с самого раннего доступного сообщения в теме.

Ссылка — https://docs.confluent.io/kafka-clients/python/current/overview.html

Спасибо! Потребитель работает. CMAK (менеджер Kafka по-прежнему не обеспечивает правильное представление сообщений.

Erik hoeven 07.08.2024 19:06

@Erikhoeven - это можно найти путем отладки вашего приложения CMAK.

Jeevan ebi 09.08.2024 11:16

Другие вопросы по теме