Начать слушать kafka в параллельном потоке при запуске проекта django

Я хочу запустить файл, который слушает kafka в параллельном потоке с проектом django. Мой файл manage.py

import asyncio
import os
import sys
import multiprocessing as mt

from kafka.run_kafka import run_kafka


def main():
    """Run administrative tasks."""
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
    try:
        from django.core.management import execute_from_command_line
    except ImportError as exc:
        raise ImportError(
            "Couldn't import Django. Are you sure it's installed and "
            "available on your PYTHONPATH environment variable? Did you "
            "forget to activate a virtual environment?"
        ) from exc
    execute_from_command_line(sys.argv)


if __name__ == '__main__':
    kafka_process = mt.Process(target=asyncio.run(run_kafka()))
    django_process = mt.Process(target=main())

    kafka_process.start()
    django_process.start()

    kafka_process.join()
    django_process.join()

В моем файле run_kafka.py используется Confluent Kafka Python.

import os
import django

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
django.setup()

import asyncio

from business_logic.settings import KAFKA_CONF, TOPIC_PROD
from kafka.kafka_consuners import KafkaConsumerBL

async def run_kafka():
    """
    Запуск прослушивания Kafka на всех топиках на все ответы
    """

    consumer = KafkaConsumerBL(KAFKA_CONF)
    consumer.indicate_topic([TOPIC_PROD])
    await consumer.run_consumer(consumer)

if __name__ == '__main__':
    asyncio.run(run_kafka())

Я попытался решить проблему, используя библиотеки потоков и многопроцессорности. После использования любой из библиотек запускается либо проект django, либо kafka.

При использовании многопроцессорной библиотеки запускается один процесс, а не оба управлять.py

...
if __name__ == '__main__':
    kafka_process = mt.Process(target=asyncio.run(run_kafka()))
    django_process = mt.Process(target=main())

    kafka_process.start()
    django_process.start()

    kafka_process.join()
    django_process.join()

При использовании библиотеки потоков повторно запускается только один процесс управлять.py

...
if __name__ == '__main__':
    threading.Thread(target=asyncio.run(run_kafka())).start()
    threading.Thread(target=main()).start()

Можете ли вы сказать мне, где я сделал ошибку, я неправильно использовал библиотеку или мне нужно использовать другой метод вообще?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
129
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Решил проблему следующим образом:

if __name__ == "__main__":
    process = subprocess.Popen(['python3', 'kafka_run.py'], stdout=subprocess.PIPE)
    uvicorn.run(app=application, host='0.0.0.0', port=8000)

Другие вопросы по теме