Я настроил кластер Kafka из 3 узлов с помощью docker-compose, затем создал 5 тем с 3 разделами и коэффициентом репликации 3. Я настроил производителей для подключения к порту каждого узла.
Сообщения идут из одного места в другое по порядку (как и должно быть), но после проверки моего кластера с помощью пользовательского интерфейса я понял, что все сообщения всех тем идут в один и тот же раздел (раздел № 2).
Сначала я подумал, что это может быть связано с тем, что для сообщений не был установлен какой-либо ключ раздела, поэтому я изменил свой сценарий, добавив ключ раздела к каждому сообщению (комбинация первых двух букв темы и идентификационного номера). из твита, имеет ли смысл этот формат ключа раздела?), но проблема не устранена.
Это код (он получает твиты от Twitter API v2 и отправляет сообщения продюсеру):
from dotenv import load_dotenv
import os
import json
import tweepy
from pykafka import KafkaClient
# Getting credentials:
BEARER_TOKEN=os.getenv("BEARER_TOKEN")
# Setting up pykafka:
def get_kafka_client():
return KafkaClient(hosts='localhost:9092,localhost:9093,localhost:9094')
def send_message(data, name_topic, id):
client = get_kafka_client()
topic = client.topics[name_topic]
producer = topic.get_sync_producer()
producer.produce(data, partition_key=f"{name_topic[:2].upper()}{id}".encode())
# Creating a Twitter stream listener:
class Listener(tweepy.StreamingClient):
def on_data(self, data):
print(data)
message = json.loads(data)
for rule in message['matching_rules']:
send_message(data, rule['tag'], message['data']['id'])
return True
def on_error(self, status):
print(status)
# Start streaming:
Listener(BEARER_TOKEN).filter(tweet_fields=['created_at'])
Я думал, что без какого-либо заданного ключа он начнет случайным образом отправлять сообщения в три раздела, но этого тоже не произошло. Я не знаю, где может быть проблема.
В случае, если это может быть актуально, все 5 тем были созданы в docker compose с использованием этого формата:
docker-compose exec kafka1 kafka-topics --bootstrap-server kafka1:19092 --create --replication-factor 3 --partitions 3 --topic NoFlyZone
Он должен отправляет в несколько разделов, если ключ не указан. Если вы дадите ключ, вы рискуете вычислить один и тот же хэш раздела, даже если у вас разные ключи.
Вы можете протестировать другие библиотеки, такие как kafka-python
или confluent-kafka-python
, поскольку PyKafka
больше не поддерживается.
Переписывание моего скрипта с использованием
confluent-kafka-python
вместоPyKafka
решило проблему.