Я хочу запустить файл, который слушает kafka в параллельном потоке с проектом django. Мой файл manage.py
import asyncio
import os
import sys
import multiprocessing as mt
from kafka.run_kafka import run_kafka
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
kafka_process = mt.Process(target=asyncio.run(run_kafka()))
django_process = mt.Process(target=main())
kafka_process.start()
django_process.start()
kafka_process.join()
django_process.join()
В моем файле run_kafka.py используется Confluent Kafka Python.
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
django.setup()
import asyncio
from business_logic.settings import KAFKA_CONF, TOPIC_PROD
from kafka.kafka_consuners import KafkaConsumerBL
async def run_kafka():
"""
Запуск прослушивания Kafka на всех топиках на все ответы
"""
consumer = KafkaConsumerBL(KAFKA_CONF)
consumer.indicate_topic([TOPIC_PROD])
await consumer.run_consumer(consumer)
if __name__ == '__main__':
asyncio.run(run_kafka())
Я попытался решить проблему, используя библиотеки потоков и многопроцессорности. После использования любой из библиотек запускается либо проект django, либо kafka.
При использовании многопроцессорной библиотеки запускается один процесс, а не оба управлять.py
...
if __name__ == '__main__':
kafka_process = mt.Process(target=asyncio.run(run_kafka()))
django_process = mt.Process(target=main())
kafka_process.start()
django_process.start()
kafka_process.join()
django_process.join()
При использовании библиотеки потоков повторно запускается только один процесс управлять.py
...
if __name__ == '__main__':
threading.Thread(target=asyncio.run(run_kafka())).start()
threading.Thread(target=main()).start()
Можете ли вы сказать мне, где я сделал ошибку, я неправильно использовал библиотеку или мне нужно использовать другой метод вообще?
Решил проблему следующим образом:
if __name__ == "__main__":
process = subprocess.Popen(['python3', 'kafka_run.py'], stdout=subprocess.PIPE)
uvicorn.run(app=application, host='0.0.0.0', port=8000)