Запустите команду django в течение установленного времени внутри теста

Обновление 21.04.23

Ниже приведен рабочий тест благодаря ответу Пола Гилмартина (создатель django-pgpubsub). Я переименовал свою модель уведомлений при импорте в EmailNotification, чтобы избежать путаницы, поскольку это то же имя, что и у модели уведомлений в django-pgpubsub:

@pytest.mark.django_db(transaction=True)  # needed to work
def test_shipment_status_notificiations(testuser, pg_connection):
    user = testuser
    # should already be made by our signal
    notification_settings = NotificationSettings.objects.get(user=user)
    notification_settings.failed_delivery_attempt = True
    notification_settings.available_for_pickup = True
    notification_settings.delivery_issue = True
    notification_settings.save()
    cust_email = "[email protected]"
    customer = Customer.objects.create(email=cust_email)
    order = Order.objects.create(customer=customer)
    shipment = Shipment.objects.create(user=user, order=order)
    pg_connection.poll()
    shipment.status = Shipment.FAILED_ATTEMPT
    shipment.save()
    shipment.status = Shipment.HELD
    shipment.save()
    shipment.status = Shipment.ISSUE
    shipment.save()
    assert 3 == len(pg_connection.notifies)

    # process_notifications() is needed for the listener to run and EmailNotifications to be created
    process_notifications(pg_connection)
    print(f"all emailnotifications: {EmailNotification.objects.all()}")
    existing_notifications = EmailNotification.objects.filter(
        shipment=shipment,
        email=cust_email,
    )
    assert existing_notifications.count() == 3

Оригинальные разделы сообщений ниже (теперь устарели)

Проблема

У меня есть тест, который запускает команду django-admin в потоке с тайм-аутом. Тест проходит, но зависает и приходится вручную останавливать его выполнение. Я не знаком с многопоточностью, но я думаю, что поток продолжает работать, что приводит к «зависанию» теста после его прохождения. Я ищу решение, которое запускает команду, поскольку это скорее интеграционный тест, а не модульный тест с макетами. Любое решение, которое запускает команду в течение заданного времени, должно работать, пока есть надлежащий разрыв теста и это не вызывает проблем для других тестов во время CI.

Фон

Django-pgpubsub прослушивает триггеры PostgresSQL, запуская python manage.py listen. Я использую это как облегченную альтернативу сигналам Django и celery для отправки электронных писем, когда поле состояния экземпляра модели отгрузки изменяется на определенные значения.

Тест

# transaction=True is needed listen command to work for django-pgpubsub
@pytest.mark.django_db(transaction=True)
def test_shipment_status_notifications_with_listen_command(testuser):
    user = testuser
    notification_settings = NotificationSettings.objects.get(user=user)
    notification_settings.failed_attempt = True
    notification_settings.save()
    cust_email = "[email protected]"
    customer = Customer.objects.create(email=cust_email)
    order = Order.objects.create(customer=customer)

    def run_listen_command():
        call_command("listen")

    listen_thread = threading.Thread(target=run_listen_command, daemon=True)
    listen_thread.start()
    # Change Shipment status to trigger the notification
    shipment = Shipment.objects.create(user=user, order=order)
    shipment.status = Shipment.FAILED_ATTEMPT
    shipment.save()
    sleep(1)  # required to allow listen command to process the notification
    # Ensure the function that sends the notification is called and the notification is created
    existing_notifications = Notification.objects.filter(
        type=Notification.Type.FAILED_DELIVERY_ATTEMPT,
        shipment=shipment,
        email=cust_email,
    )
    assert existing_notifications.count() == 1
    # Wait for the listen_thread to complete, with a timeout
    # todo: this doesn't end the test and it hangs
    listen_thread.join(timeout=3)

Другие попытки

Я не могу настроить команду Django, потому что она из внешней библиотеки, django-pgpubsub. Я не уверен, что многопоточность является лучшим решением, поскольку я нашел это предложение с использованием GPT-4 и не смог найти существующие примеры. Другая попытка включала ThreadPoolExecutor с try/except/finally с TimeoutError. Это тоже прошло, но зависло. Другой связан с использованием модуля подпроцесса. Эта версия дает сбой, так как уведомления не отправляются, или она зависает, если subprocess.PIPE передается в параметры stdout и stderr subprocess.Popen() для устранения неполадок. Вот код для этой версии теста:

@pytest.mark.django_db(transaction=True)
def test_shipment_status_notifications_with_listen_command_subprocess(testuser):
    print("starting test")
    user = testuser
    notification_settings = NotificationSettings.objects.get(user=user)
    notification_settings.failed_attempt = True
    notification_settings.save()
    cust_email = "[email protected]"
    customer = Customer.objects.create(email=cust_email)
    order = Order.objects.create(customer=customer)

    print(f"settings.ROOT_DIR: {settings.ROOT_DIR}")
    listen_command = [
        "python",
        str(settings.ROOT_DIR / "manage.py"),
        "listen",
    ]
    print("about to call listen command: ", listen_command)

    # todo: this version doesn't hang the test, but no notifications are sent
    # listen_process = subprocess.Popen(listen_command)
    # todo: using this version with stdout and stderr hangs the test
    listen_process = subprocess.Popen(listen_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, universal_newlines=True)
    # Change Shipment status to trigger the notification
    shipment = Shipment.objects.create(user=user, order=order)
    shipment.status = Shipment.FAILED_ATTEMPT
    shipment.save()

    time.sleep(5)

    # Stop the listen command
    listen_process.send_signal(signal.SIGTERM)
    listen_process.wait()

    stdout, stderr = listen_process.communicate()

    print("Output from listen command:")
    print(stdout)
    print("Error output from listen command:")
    print(stderr)

    existing_notifications = Notification.objects.filter(
        type=Notification.Type.FAILED_DELIVERY_ATTEMPT,
        shipment=shipment,
        email=cust_email,
    )
    # todo: No existing notifications found when using listen_process = subprocess.Popen(listen_command)
    print(f"existing_notifications: {existing_notifications}")
    assert existing_notifications.count() == 1
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
60
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Я автор django-pgpubsub. Приятно видеть Ваш интерес к библиотеке!

Это не совсем ответ, но у меня недостаточно кармы, чтобы оставить комментарий. Я не на 100% понимаю, что вы хотите сделать с тестом, но похоже, что вы хотите смоделировать то, что происходит, когда процесс listen запущен, чтобы потреблять объекты уведомлений?

Вам не обязательно использовать здесь команду управления listen или запускать ее в отдельном процессе, чтобы сделать это. Вместо этого вы можете использовать функцию pgpubsub.listen.listen непосредственно в том же потоке, что и тест, а затем использовать функцию poll в соединении с базой данных. https://github.com/Opus10/django-pgpubsub/blob/master/pgpubsub/tests/test_core.py есть несколько примеров тестов, делающих это, в частности

@pytest.mark.django_db(transaction=True)
def test_recover_notifications(pg_connection):
    Author.objects.create(name='Billy')
    Author.objects.create(name='Billy2')
    pg_connection.poll()
    assert 2 == len(pg_connection.notifies)
    assert 2 == Notification.objects.count()
    assert 0 == Post.objects.count()
    # Simulate when the listener fails to
    # receive notifications
    pg_connection.notifies = []
    pg_connection.poll()
    assert 0 == len(pg_connection.notifies)
    listen(recover=True, poll_count=1)
    pg_connection.poll()
    assert 0 == Notification.objects.count()
    assert 2 == Post.objects.count()

Вам, вероятно, не нужно проходить recover=False для вашего случая. В этом модуле также есть тесты с использованием process_notifications, которая является функцией, используемой внутри listen.

Может быть, мне следует добавить некоторые документы по модульному тестированию!

Спасибо за помощь и за создание этой замечательной библиотеки. Меня поражает, почему пользователей не так много, учитывая, что это гораздо эффективнее и менее проблематично, чем запуск celery. Мне пришлось использовать process_notifications, чтобы запустить мою функцию слушателя (в listeners.py).

Nathan Smeltzer 21.04.2023 14:38

@NathanSmeltzer Добро пожаловать! Да, надеюсь, со временем появится больше пользователей, просто довольно сложно предать гласности такие вещи. Не стесняйтесь обращаться к нам, если у вас есть дополнительные вопросы или предложения по библиотеке.

Paul Gilmartin 22.04.2023 09:32

В моем приложении есть команды управления, которые выполняются как фоновые задачи на сервере. Я позволяю им работать в течение 60 минут, затем они заканчивают работу и снова запускаются. Продолжительность в минутах является аргументом командной строки.

Для тестирования я добавил опцию --quick, которая меняет продолжительность на секунды.

Скелет команды управления:

from django.core.management.base import BaseCommand
import datetime

class Command(BaseCommand):

    def __init__(self, stdout=None, stderr=None, no_color=False, force_color=False):
        super().__init__(stdout, stderr, no_color, force_color)
        self.stop_at = datetime.datetime.now()

    def add_arguments(self, parser):
        parser.add_argument('duration', type=int,
                            choices = {1, 2, 5, 7, 10, 15, 20, 30, 45, 60})
        parser.add_argument('--quick', action='store_true')     # for testing

    def _set_stop_time(self, **options):
        # decide when to stop running
        duration = options['duration']

        if options['quick']:
            self.stop_at += datetime.timedelta(seconds=duration)
        else:
            self.stop_at += datetime.timedelta(minutes=duration)

        self.stdout.write('[INFO] Will run until %s' % str(self.stop_at))

    def handle(self, *args, **options):
        self._set_stop_time(**options)
        # ...

Модульный тест с использованием опции --quick:

from django.core import management
from django.test import TestCase

class Foo(TestCase):

    def test_foo(self):
        f1 = io.StringIO()
        f2 = io.StringIO()
        management.call_command('mgmt_cmd', '1', '--quick', stderr=f1, stdout=f2)
        #self.assert...

Другие вопросы по теме