Здравствуйте, я использую следующую конфигурацию для Kafka, используя Docker Compose
compose_kafka.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.1.10
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
kafka_manager:
image: kafkamanager/kafka-manager
container_name: kafka-manager
restart: always
ports:
- "9000:9000"
environment:
ZK_HOSTS: "192.168.1.10:2181"
APPLICATION_SECRET: "random-secret"
Я создаю производителя, который отправляет сообщения на сервер Kafka.
Создать.py
from faker import Faker
fake = Faker()
class Registered_user:
def get_registered_user():
return {
"name": fake.name(),
"address": fake.address(),
"created_at": fake.year()
}
Producer_registered_user.py
import time
import json
from kafka import KafkaProducer
from fake_data import Generate
def json_serializer(data):
return json.dumps(data).encode("utf-8")
producer = KafkaProducer(bootstrap_servers='192.168.1.10:9092',
value_serializer=json_serializer)
if __name__ == '__main__':
while 1 == 1:
user = Generate.Registered_user.get_registered_user()
producer.send('registered_user', user)
print(user)
time.sleep(4)
Но потребитель не получает никаких сообщений:
Consumer_registered_user.py
import json
from kafka import KafkaConsumer
if __name__ == '__main__':
consumer = KafkaConsumer(
bootstrap_servers='192.168.1.10:9092',
auto_offset_reset = "from-beginning",
group_id = "consumer-group-a"
)
for message in consumer:
print("User = {}".format(json.loads(message.value)))
Я также проверил, указана ли тема в темах:
и если Кафка получил сообщения:
Не могли бы вы помочь мне с этой проблемой?
Спасибо за быстрый ответ! Я проверил (см. отредактированное сообщение), и они существуют.
Используйте Kafka Console Consumer для проверки получения сообщений — docker exec -it kafka kafka-console-consumer.sh --bootstrap-server 192.168.1.10:9092 --topic registered_user --from-beginning
Спасибо, Дживан Эби, за быстрый ответ! В скорректированном сообщении я вижу, что сообщения отображаются. Но скрипт (также помещенный в пост) и CMAK не отображали сообщения (или количество сообщений). Как я могу это исправить?
Проверьте конфигурацию потребительского приложения,
Образец:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'topic_1', 'topic_2',
bootstrap_servers='192.168.1.10:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group'
)
#Make sure the consumer is successfully subscribing to all required topics
print(consumer.subscription())
for message in consumer:
print(f"Received from {message.topic}: {message.value.decode('utf-8')}")
Добавьте тему к вашему потребителю:
Consumer_registered_user.py
import json
from kafka import KafkaConsumer
if __name__ == '__main__':
consumer = KafkaConsumer(
'registered_user', # Specify the topic you want to consume
bootstrap_servers='192.168.1.10:9092',
auto_offset_reset = "earliest", # Correct value
group_id = "consumer-group-a"
)
for message in consumer:
print("User = {}".format(json.loads(message.value.decode('utf-8'))))
Проблемы:
Отсутствует подписка на тему. Вам необходимо передать имя темы («registered_user») в качестве первого аргумента KafkaConsumer, чтобы гарантировать, что ваш потребитель подпишется на эту тему.
Установка auto_offset_reset = "earliest"
гарантирует, что при отсутствии зафиксированных смещений потребитель начнет читать с самого раннего доступного сообщения в теме.
Ссылка — https://docs.confluent.io/kafka-clients/python/current/overview.html
Спасибо! Потребитель работает. CMAK (менеджер Kafka по-прежнему не обеспечивает правильное представление сообщений.
@Erikhoeven - это можно найти путем отладки вашего приложения CMAK.
Убедитесь, что ваша тема есть в списке —
docker exec -it kafka kafka-topics.sh --list --bootstrap-server 192.168.1.10:9092