Обновление 21.04.23
Ниже приведен рабочий тест благодаря ответу Пола Гилмартина (создатель django-pgpubsub). Я переименовал свою модель уведомлений при импорте в EmailNotification, чтобы избежать путаницы, поскольку это то же имя, что и у модели уведомлений в django-pgpubsub:
@pytest.mark.django_db(transaction=True) # needed to work
def test_shipment_status_notificiations(testuser, pg_connection):
user = testuser
# should already be made by our signal
notification_settings = NotificationSettings.objects.get(user=user)
notification_settings.failed_delivery_attempt = True
notification_settings.available_for_pickup = True
notification_settings.delivery_issue = True
notification_settings.save()
cust_email = "[email protected]"
customer = Customer.objects.create(email=cust_email)
order = Order.objects.create(customer=customer)
shipment = Shipment.objects.create(user=user, order=order)
pg_connection.poll()
shipment.status = Shipment.FAILED_ATTEMPT
shipment.save()
shipment.status = Shipment.HELD
shipment.save()
shipment.status = Shipment.ISSUE
shipment.save()
assert 3 == len(pg_connection.notifies)
# process_notifications() is needed for the listener to run and EmailNotifications to be created
process_notifications(pg_connection)
print(f"all emailnotifications: {EmailNotification.objects.all()}")
existing_notifications = EmailNotification.objects.filter(
shipment=shipment,
email=cust_email,
)
assert existing_notifications.count() == 3
Оригинальные разделы сообщений ниже (теперь устарели)
Проблема
У меня есть тест, который запускает команду django-admin в потоке с тайм-аутом. Тест проходит, но зависает и приходится вручную останавливать его выполнение. Я не знаком с многопоточностью, но я думаю, что поток продолжает работать, что приводит к «зависанию» теста после его прохождения. Я ищу решение, которое запускает команду, поскольку это скорее интеграционный тест, а не модульный тест с макетами. Любое решение, которое запускает команду в течение заданного времени, должно работать, пока есть надлежащий разрыв теста и это не вызывает проблем для других тестов во время CI.
Фон
Django-pgpubsub прослушивает триггеры PostgresSQL, запуская
python manage.py listen
. Я использую это как облегченную альтернативу сигналам Django и celery для отправки электронных писем, когда поле состояния экземпляра модели отгрузки изменяется на определенные значения.
Тест
# transaction=True is needed listen command to work for django-pgpubsub
@pytest.mark.django_db(transaction=True)
def test_shipment_status_notifications_with_listen_command(testuser):
user = testuser
notification_settings = NotificationSettings.objects.get(user=user)
notification_settings.failed_attempt = True
notification_settings.save()
cust_email = "[email protected]"
customer = Customer.objects.create(email=cust_email)
order = Order.objects.create(customer=customer)
def run_listen_command():
call_command("listen")
listen_thread = threading.Thread(target=run_listen_command, daemon=True)
listen_thread.start()
# Change Shipment status to trigger the notification
shipment = Shipment.objects.create(user=user, order=order)
shipment.status = Shipment.FAILED_ATTEMPT
shipment.save()
sleep(1) # required to allow listen command to process the notification
# Ensure the function that sends the notification is called and the notification is created
existing_notifications = Notification.objects.filter(
type=Notification.Type.FAILED_DELIVERY_ATTEMPT,
shipment=shipment,
email=cust_email,
)
assert existing_notifications.count() == 1
# Wait for the listen_thread to complete, with a timeout
# todo: this doesn't end the test and it hangs
listen_thread.join(timeout=3)
Другие попытки
Я не могу настроить команду Django, потому что она из внешней библиотеки, django-pgpubsub. Я не уверен, что многопоточность является лучшим решением, поскольку я нашел это предложение с использованием GPT-4 и не смог найти существующие примеры. Другая попытка включала ThreadPoolExecutor с try/except/finally с TimeoutError. Это тоже прошло, но зависло. Другой связан с использованием модуля подпроцесса. Эта версия дает сбой, так как уведомления не отправляются, или она зависает, если subprocess.PIPE передается в параметры stdout и stderr subprocess.Popen() для устранения неполадок. Вот код для этой версии теста:
@pytest.mark.django_db(transaction=True)
def test_shipment_status_notifications_with_listen_command_subprocess(testuser):
print("starting test")
user = testuser
notification_settings = NotificationSettings.objects.get(user=user)
notification_settings.failed_attempt = True
notification_settings.save()
cust_email = "[email protected]"
customer = Customer.objects.create(email=cust_email)
order = Order.objects.create(customer=customer)
print(f"settings.ROOT_DIR: {settings.ROOT_DIR}")
listen_command = [
"python",
str(settings.ROOT_DIR / "manage.py"),
"listen",
]
print("about to call listen command: ", listen_command)
# todo: this version doesn't hang the test, but no notifications are sent
# listen_process = subprocess.Popen(listen_command)
# todo: using this version with stdout and stderr hangs the test
listen_process = subprocess.Popen(listen_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, universal_newlines=True)
# Change Shipment status to trigger the notification
shipment = Shipment.objects.create(user=user, order=order)
shipment.status = Shipment.FAILED_ATTEMPT
shipment.save()
time.sleep(5)
# Stop the listen command
listen_process.send_signal(signal.SIGTERM)
listen_process.wait()
stdout, stderr = listen_process.communicate()
print("Output from listen command:")
print(stdout)
print("Error output from listen command:")
print(stderr)
existing_notifications = Notification.objects.filter(
type=Notification.Type.FAILED_DELIVERY_ATTEMPT,
shipment=shipment,
email=cust_email,
)
# todo: No existing notifications found when using listen_process = subprocess.Popen(listen_command)
print(f"existing_notifications: {existing_notifications}")
assert existing_notifications.count() == 1
Я автор django-pgpubsub. Приятно видеть Ваш интерес к библиотеке!
Это не совсем ответ, но у меня недостаточно кармы, чтобы оставить комментарий. Я не на 100% понимаю, что вы хотите сделать с тестом, но похоже, что вы хотите смоделировать то, что происходит, когда процесс listen
запущен, чтобы потреблять объекты уведомлений?
Вам не обязательно использовать здесь команду управления listen
или запускать ее в отдельном процессе, чтобы сделать это. Вместо этого вы можете использовать функцию pgpubsub.listen.listen
непосредственно в том же потоке, что и тест, а затем использовать функцию poll
в соединении с базой данных. https://github.com/Opus10/django-pgpubsub/blob/master/pgpubsub/tests/test_core.py есть несколько примеров тестов, делающих это, в частности
@pytest.mark.django_db(transaction=True)
def test_recover_notifications(pg_connection):
Author.objects.create(name='Billy')
Author.objects.create(name='Billy2')
pg_connection.poll()
assert 2 == len(pg_connection.notifies)
assert 2 == Notification.objects.count()
assert 0 == Post.objects.count()
# Simulate when the listener fails to
# receive notifications
pg_connection.notifies = []
pg_connection.poll()
assert 0 == len(pg_connection.notifies)
listen(recover=True, poll_count=1)
pg_connection.poll()
assert 0 == Notification.objects.count()
assert 2 == Post.objects.count()
Вам, вероятно, не нужно проходить recover=False
для вашего случая. В этом модуле также есть тесты с использованием process_notifications
, которая является функцией, используемой внутри listen
.
Может быть, мне следует добавить некоторые документы по модульному тестированию!
@NathanSmeltzer Добро пожаловать! Да, надеюсь, со временем появится больше пользователей, просто довольно сложно предать гласности такие вещи. Не стесняйтесь обращаться к нам, если у вас есть дополнительные вопросы или предложения по библиотеке.
В моем приложении есть команды управления, которые выполняются как фоновые задачи на сервере. Я позволяю им работать в течение 60 минут, затем они заканчивают работу и снова запускаются. Продолжительность в минутах является аргументом командной строки.
Для тестирования я добавил опцию --quick
, которая меняет продолжительность на секунды.
Скелет команды управления:
from django.core.management.base import BaseCommand
import datetime
class Command(BaseCommand):
def __init__(self, stdout=None, stderr=None, no_color=False, force_color=False):
super().__init__(stdout, stderr, no_color, force_color)
self.stop_at = datetime.datetime.now()
def add_arguments(self, parser):
parser.add_argument('duration', type=int,
choices = {1, 2, 5, 7, 10, 15, 20, 30, 45, 60})
parser.add_argument('--quick', action='store_true') # for testing
def _set_stop_time(self, **options):
# decide when to stop running
duration = options['duration']
if options['quick']:
self.stop_at += datetime.timedelta(seconds=duration)
else:
self.stop_at += datetime.timedelta(minutes=duration)
self.stdout.write('[INFO] Will run until %s' % str(self.stop_at))
def handle(self, *args, **options):
self._set_stop_time(**options)
# ...
Модульный тест с использованием опции --quick
:
from django.core import management
from django.test import TestCase
class Foo(TestCase):
def test_foo(self):
f1 = io.StringIO()
f2 = io.StringIO()
management.call_command('mgmt_cmd', '1', '--quick', stderr=f1, stdout=f2)
#self.assert...
Спасибо за помощь и за создание этой замечательной библиотеки. Меня поражает, почему пользователей не так много, учитывая, что это гораздо эффективнее и менее проблематично, чем запуск celery. Мне пришлось использовать process_notifications, чтобы запустить мою функцию слушателя (в listeners.py).