Потребители Python Kafka, работающие в параллельных потоках

Я новичок в питоне и кафке. У меня есть сценарий, который должен запускать трех потребителей кафки, ждать сообщений от этих потребителей и делать другие вещи. На данный момент я даже не знаю, иду ли я в правильном направлении, поэтому любая помощь будет оценена по достоинству.

class MainClass():
    def do_something_before(self):
        # something is done here

    def start_consumer(self):
        consumer1_thread = threading.Thread(target=self.cons1, args=())
        consumer2_thread = threading.Thread(target=self.cons2, args=())
        consumer1_thread.daemon = True
        consumer2_thread.daemon = True
        consumer1_thread.start()
        consumer2_thread.start()

    def cons1(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my-topic'])
        for message in consumer:
            print(message.value)

    def cons2(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my2-topic'])
        for message in consumer:
            print(message.value)

    def keep_working(self):
        # something is done here

if __name__ == 'main':
    g = MainClass()
    g.do_something_before()
    g.keep_working()

Не могли бы вы пояснить, в чем вы просите нас помочь, пожалуйста. Вы получаете сообщение об ошибке? Код ведет себя не так, как вы ожидаете? Если да, то чего вы ожидаете и что наблюдаете?

Robin Moffatt 12.06.2018 10:40
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
1
4 968
2

Ответы 2

Я добавил пример python-kafka с двумя потребителями (в основном двумя процессами python), вы можете найти его по ссылке github здесь https://github.com/Shubhamgorde/kafka-python-app.

Не могу опубликовать все файлы Python, его немного больше.

from multiprocessing import Process

def consumeData(topic):
    try:
         consumer = KafkaConsumer(topic, value_deserializer=lambda v: 
           binascii.unhexlify(v).decode('utf-8'))
    except:
         print("Error!!")

    for msg in consumer:
        msg=ast.literal_eval(msg.value)
        if (msg[2] == 'C'):
            performCreditOperation(msg)
        elif (msg[2] == 'D'):
              performDebitOperation(msg)

t1 = Process(target=consumeData, args=('Credit_transac',))
t2 = Process(target=consumeData, args=('Debit_transac',))
t1.start()
t2.start()

Это моя реализация. Надеюсь, вы найдете ее полезной.

class ConsumerThread:
    def __init__(self, config, topics):
        self.config = config
        self.topics = topics

    def readData(self):
        consumer = Consumer(self.config)
        consumer.subscribe(self.topics)
        self.run(consumer)

    def process_msg(self, msg):
        print('Received message.')
        print('Key: {}, Val: {}'.format(msg.key(), msg.value()))
        print('Partition: {}, Offset: {}'.format(msg.partition(), msg.offset()))

    def run(self, consumer):
        try:
            while True:
                msg = consumer.poll(0.1)
                if not msg:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        print('End of partition reached {0}/{1}'
                            .format(msg.topic(), msg.partition()))
                    else:
                        raise KafkaException(msg.error())
                else:
                    self.process_msg(msg)

        except KeyboardInterrupt:
            print("Detected Keyboard Interrupt. Cancelling.")
            pass

        finally:
            consumer.close()

Другие вопросы по теме