MassTransit StateMachineSagaTestHarness — двойное потребление сообщений

У меня проблема с InMemoryTestHarness и сагой о конечных автоматах. Похоже, каждое опубликованное сообщение потребляется сагой дважды. Если сообщение отправлено (не опубликовано), проблема не возникает. Проблема началась, когда я изменил CorrelationId на пользовательское поле - ProcessId. Ниже приведен упрощенный пример, раскрывающий проблему.

Определение саги:

    public class MySaga : MassTransitStateMachine<MySagaState>
    {
        public State InProgress { get; private set; }

        public Event<StartProcess> StartProcess { get; private set; }
        public Event<ProcessStageFinished> StageFinished { get; private set; }
        public Event<ProcessFinished> ProcessFinished { get; private set; }


        public MySaga()
        {
            InstanceState(x => x.CurrentState);

            Event(() => StartProcess, e => e
                .CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
                .SelectId(x => NewId.NextGuid()));

            Event(() => StageFinished, e => e
                .CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
                .SelectId(x => NewId.NextGuid()));

            Event(() => ProcessFinished, e => e
                .CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
                .SelectId(x => NewId.NextGuid()));

            Initially(
                When(StartProcess)
                    .Then(x => x.Instance.ProcessId = x.Data.ProcessId)
                    .TransitionTo(InProgress)
                );

            During(InProgress,
                When(StageFinished)
                    .Then(x => x.Instance.Stage++)
                );

            During(InProgress,
                When(ProcessFinished)
                    .Finalize()
                );
        }
    }

    public class MySagaState : SagaStateMachineInstance, ISagaVersion
    {
        public Guid CorrelationId { get; set; }
        public int Version { get; set; }
        public string ProcessId { get; set; }
        public string CurrentState { get; set; }
        public int Stage { get; set; }
    }

    public record StartProcess(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
    {
    }

    public record ProcessStageFinished(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
    {
    }

    public record ProcessFinished(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
    {
    }

И тест xUnit:

    public class MySagaTests
    {
        InMemoryTestHarness Harness { get; }
        IStateMachineSagaTestHarness<MySagaState, MySaga> SagaHarness { get; }

        public MySagaTests()
        {
            var services = new ServiceCollection()
                .AddMassTransitInMemoryTestHarness(config =>
                {
                    config.AddSagaStateMachine<MySaga, MySagaState>(sagaConfig =>
                        {
                            sagaConfig.UseConcurrentMessageLimit(1);
                            sagaConfig.UseInMemoryOutbox();
                        })
                       .InMemoryRepository();

                    config.AddSagaStateMachineTestHarness<MySaga, MySagaState>();
                });

            var serviceProvider = services.BuildServiceProvider(true);

            Harness = serviceProvider.GetRequiredService<InMemoryTestHarness>();
            Harness.OnConfigureInMemoryReceiveEndpoint += config =>
            {
                config.ConfigureSagas(serviceProvider.GetRequiredService<IBusRegistrationContext>());
            };

            SagaHarness = serviceProvider.GetRequiredService<IStateMachineSagaTestHarness<MySagaState, MySaga>>();
        }

        [Fact]
        public async Task TestMySaga()
        {
            string processId = "newProcessId";
            var correlationId = NewId.NextGuid();

            await Harness.Start();

            try
            {
                await Harness.Bus.Publish(new StartProcess(correlationId, processId));

                Assert.True(await Harness.Published.Any<StartProcess>());
                Assert.True(await Harness.Consumed.Any<StartProcess>());
                Assert.True(await SagaHarness.Consumed.Any<StartProcess>());
                Assert.Equal(1, SagaHarness.Sagas.Count());  // HERE should be only one saga created

                Assert.True(await SagaHarness.Created.Any(s => s.ProcessId == processId && s.CurrentState == "InProgress"));

                await Harness.Bus.Publish(new ProcessStageFinished(correlationId, processId));

                Assert.True(await Harness.Published.Any<ProcessStageFinished>());
                Assert.True(await Harness.Consumed.Any<ProcessStageFinished>());
                Assert.True(await SagaHarness.Consumed.Any<ProcessStageFinished>());
                var saga = SagaHarness.Sagas.Select(s => s.ProcessId == processId).FirstOrDefault()?.Saga;
                Assert.NotNull(saga);
                Assert.Equal(1, saga.Stage); // HERE stage should by 1

                await Harness.Bus.Publish(new ProcessFinished(correlationId, processId));

                Assert.True(await Harness.Published.Any<ProcessFinished>());
                Assert.True(await Harness.Consumed.Any<ProcessFinished>());
                Assert.True(await SagaHarness.Consumed.Any<ProcessFinished>());

                Assert.True(await SagaHarness.Sagas.Any(s => s.ProcessId == processId && s.CurrentState == "Final"));
            }
            finally
            {
                await Harness.Stop();
            }

        }
    }

Я пробовал это на простой конфигурации шины в памяти и на RabbitMQ. На обеих конфигурациях работает нормально. Сообщения потребляются дважды только в пределах InMemoryTestHarness.

У вас есть предложения, что следует исправить? На первый взгляд это выглядит как неправильное поведение.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
524
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Да, удалите эту строку — вы настраиваете сагу на двух разных конечных точках.

Harness.OnConfigureInMemoryReceiveEndpoint += config =>
{
    config.ConfigureSagas(serviceProvider.GetRequiredService<IBusRegistrationContext>());
};

Большой. Это помогло. Но теперь, чтобы пройти тест, мне нужно добавить некоторые задержки после каждой операции публикации — вот так: await Task.Delay(100); Это правильно?

Piotr Jakóbczyk 22.12.2020 15:31

И вот еще. Как сейчас я могу отправить команду для тестирования саги? Harness.InputQueueSendEndpoint.Send(new StartProcess(correlationId, processId)) не работает.

Piotr Jakóbczyk 22.12.2020 15:53

Звонить Publish вместо Send? Кроме того, вы смотрели 2 сезон сериала MassTransit на YouTube? Все дело в конечных автоматах и ​​модульном тестировании. Образец обширен и показывает, как использовать Exists и т. д. для проверки прогресса саги. Task.Delay в модульном тесте не работает.

Chris Patterson 22.12.2020 15:54

Спасибо. Образец многое объясняет. Я реорганизовал тест, и теперь он выглядит и работает нормально.

Piotr Jakóbczyk 22.12.2020 16:22

После ответа [@Chris Patterson] я реорганизовал тест. Может кому поможет, реорганизовал тест ниже:

    public class MySagaTests
    {
        InMemoryTestHarness Harness { get; }
        IStateMachineSagaTestHarness<MySagaState, MySaga> SagaHarness { get; }

        public MySagaTests()
        {
            var services = new ServiceCollection()
                .AddMassTransitInMemoryTestHarness(config =>
                {
                    config.AddSagaStateMachine<MySaga, MySagaState>(sagaConfig =>
                        {
                            sagaConfig.UseConcurrentMessageLimit(1);
                            sagaConfig.UseInMemoryOutbox();
                        })
                       .InMemoryRepository();

                    config.AddSagaStateMachineTestHarness<MySaga, MySagaState>();
                });

            var serviceProvider = services.BuildServiceProvider(true);

            Harness = serviceProvider.GetRequiredService<InMemoryTestHarness>();

            SagaHarness = serviceProvider.GetRequiredService<IStateMachineSagaTestHarness<MySagaState, MySaga>>();
        }

        [Fact]
        public async Task TestMySaga()
        {
            string processId = "newProcessId";
            var correlationId = NewId.NextGuid();

            await Harness.Start();

            try
            {
                await Harness.Bus.Publish(new StartProcess(correlationId, processId));

                var sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId, x => x.InProgress)).Any();
                Assert.True(sagaExists, "Saga not exists");


                await Harness.Bus.Publish(new ProcessStageFinished(correlationId, processId));

                sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId && s.Stage == 1, x => x.InProgress)).Any();
                Assert.True(sagaExists, "Saga not exists");


                await Harness.Bus.Publish(new ProcessFinished(correlationId, processId));

                sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId, x => x.Final)).Any();
                Assert.True(sagaExists, "Saga not exists");
            }
            finally
            {
                await Harness.Stop();
            }
        }
    }

Другие вопросы по теме