Я новичок в питоне и кафке. У меня есть сценарий, который должен запускать трех потребителей кафки, ждать сообщений от этих потребителей и делать другие вещи. На данный момент я даже не знаю, иду ли я в правильном направлении, поэтому любая помощь будет оценена по достоинству.
class MainClass():
def do_something_before(self):
# something is done here
def start_consumer(self):
consumer1_thread = threading.Thread(target=self.cons1, args=())
consumer2_thread = threading.Thread(target=self.cons2, args=())
consumer1_thread.daemon = True
consumer2_thread.daemon = True
consumer1_thread.start()
consumer2_thread.start()
def cons1(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my-topic'])
for message in consumer:
print(message.value)
def cons2(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my2-topic'])
for message in consumer:
print(message.value)
def keep_working(self):
# something is done here
if __name__ == 'main':
g = MainClass()
g.do_something_before()
g.keep_working()






Я добавил пример python-kafka с двумя потребителями (в основном двумя процессами python), вы можете найти его по ссылке github здесь https://github.com/Shubhamgorde/kafka-python-app.
Не могу опубликовать все файлы Python, его немного больше.
from multiprocessing import Process
def consumeData(topic):
try:
consumer = KafkaConsumer(topic, value_deserializer=lambda v:
binascii.unhexlify(v).decode('utf-8'))
except:
print("Error!!")
for msg in consumer:
msg=ast.literal_eval(msg.value)
if (msg[2] == 'C'):
performCreditOperation(msg)
elif (msg[2] == 'D'):
performDebitOperation(msg)
t1 = Process(target=consumeData, args=('Credit_transac',))
t2 = Process(target=consumeData, args=('Debit_transac',))
t1.start()
t2.start()
Это моя реализация. Надеюсь, вы найдете ее полезной.
class ConsumerThread:
def __init__(self, config, topics):
self.config = config
self.topics = topics
def readData(self):
consumer = Consumer(self.config)
consumer.subscribe(self.topics)
self.run(consumer)
def process_msg(self, msg):
print('Received message.')
print('Key: {}, Val: {}'.format(msg.key(), msg.value()))
print('Partition: {}, Offset: {}'.format(msg.partition(), msg.offset()))
def run(self, consumer):
try:
while True:
msg = consumer.poll(0.1)
if not msg:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
else:
raise KafkaException(msg.error())
else:
self.process_msg(msg)
except KeyboardInterrupt:
print("Detected Keyboard Interrupt. Cancelling.")
pass
finally:
consumer.close()
Не могли бы вы пояснить, в чем вы просите нас помочь, пожалуйста. Вы получаете сообщение об ошибке? Код ведет себя не так, как вы ожидаете? Если да, то чего вы ожидаете и что наблюдаете?