У меня проблема с InMemoryTestHarness
и сагой о конечных автоматах. Похоже, каждое опубликованное сообщение потребляется сагой дважды. Если сообщение отправлено (не опубликовано), проблема не возникает. Проблема началась, когда я изменил CorrelationId
на пользовательское поле - ProcessId
. Ниже приведен упрощенный пример, раскрывающий проблему.
Определение саги:
public class MySaga : MassTransitStateMachine<MySagaState>
{
public State InProgress { get; private set; }
public Event<StartProcess> StartProcess { get; private set; }
public Event<ProcessStageFinished> StageFinished { get; private set; }
public Event<ProcessFinished> ProcessFinished { get; private set; }
public MySaga()
{
InstanceState(x => x.CurrentState);
Event(() => StartProcess, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Event(() => StageFinished, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Event(() => ProcessFinished, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Initially(
When(StartProcess)
.Then(x => x.Instance.ProcessId = x.Data.ProcessId)
.TransitionTo(InProgress)
);
During(InProgress,
When(StageFinished)
.Then(x => x.Instance.Stage++)
);
During(InProgress,
When(ProcessFinished)
.Finalize()
);
}
}
public class MySagaState : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public int Version { get; set; }
public string ProcessId { get; set; }
public string CurrentState { get; set; }
public int Stage { get; set; }
}
public record StartProcess(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
public record ProcessStageFinished(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
public record ProcessFinished(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
И тест xUnit:
public class MySagaTests
{
InMemoryTestHarness Harness { get; }
IStateMachineSagaTestHarness<MySagaState, MySaga> SagaHarness { get; }
public MySagaTests()
{
var services = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(config =>
{
config.AddSagaStateMachine<MySaga, MySagaState>(sagaConfig =>
{
sagaConfig.UseConcurrentMessageLimit(1);
sagaConfig.UseInMemoryOutbox();
})
.InMemoryRepository();
config.AddSagaStateMachineTestHarness<MySaga, MySagaState>();
});
var serviceProvider = services.BuildServiceProvider(true);
Harness = serviceProvider.GetRequiredService<InMemoryTestHarness>();
Harness.OnConfigureInMemoryReceiveEndpoint += config =>
{
config.ConfigureSagas(serviceProvider.GetRequiredService<IBusRegistrationContext>());
};
SagaHarness = serviceProvider.GetRequiredService<IStateMachineSagaTestHarness<MySagaState, MySaga>>();
}
[Fact]
public async Task TestMySaga()
{
string processId = "newProcessId";
var correlationId = NewId.NextGuid();
await Harness.Start();
try
{
await Harness.Bus.Publish(new StartProcess(correlationId, processId));
Assert.True(await Harness.Published.Any<StartProcess>());
Assert.True(await Harness.Consumed.Any<StartProcess>());
Assert.True(await SagaHarness.Consumed.Any<StartProcess>());
Assert.Equal(1, SagaHarness.Sagas.Count()); // HERE should be only one saga created
Assert.True(await SagaHarness.Created.Any(s => s.ProcessId == processId && s.CurrentState == "InProgress"));
await Harness.Bus.Publish(new ProcessStageFinished(correlationId, processId));
Assert.True(await Harness.Published.Any<ProcessStageFinished>());
Assert.True(await Harness.Consumed.Any<ProcessStageFinished>());
Assert.True(await SagaHarness.Consumed.Any<ProcessStageFinished>());
var saga = SagaHarness.Sagas.Select(s => s.ProcessId == processId).FirstOrDefault()?.Saga;
Assert.NotNull(saga);
Assert.Equal(1, saga.Stage); // HERE stage should by 1
await Harness.Bus.Publish(new ProcessFinished(correlationId, processId));
Assert.True(await Harness.Published.Any<ProcessFinished>());
Assert.True(await Harness.Consumed.Any<ProcessFinished>());
Assert.True(await SagaHarness.Consumed.Any<ProcessFinished>());
Assert.True(await SagaHarness.Sagas.Any(s => s.ProcessId == processId && s.CurrentState == "Final"));
}
finally
{
await Harness.Stop();
}
}
}
Я пробовал это на простой конфигурации шины в памяти и на RabbitMQ. На обеих конфигурациях работает нормально. Сообщения потребляются дважды только в пределах InMemoryTestHarness
.
У вас есть предложения, что следует исправить? На первый взгляд это выглядит как неправильное поведение.
Да, удалите эту строку — вы настраиваете сагу на двух разных конечных точках.
Harness.OnConfigureInMemoryReceiveEndpoint += config =>
{
config.ConfigureSagas(serviceProvider.GetRequiredService<IBusRegistrationContext>());
};
И вот еще. Как сейчас я могу отправить команду для тестирования саги? Harness.InputQueueSendEndpoint.Send(new StartProcess(correlationId, processId))
не работает.
Звонить Publish
вместо Send
? Кроме того, вы смотрели 2 сезон сериала MassTransit на YouTube? Все дело в конечных автоматах и модульном тестировании. Образец обширен и показывает, как использовать Exists
и т. д. для проверки прогресса саги. Task.Delay в модульном тесте не работает.
Спасибо. Образец многое объясняет. Я реорганизовал тест, и теперь он выглядит и работает нормально.
После ответа [@Chris Patterson] я реорганизовал тест. Может кому поможет, реорганизовал тест ниже:
public class MySagaTests
{
InMemoryTestHarness Harness { get; }
IStateMachineSagaTestHarness<MySagaState, MySaga> SagaHarness { get; }
public MySagaTests()
{
var services = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(config =>
{
config.AddSagaStateMachine<MySaga, MySagaState>(sagaConfig =>
{
sagaConfig.UseConcurrentMessageLimit(1);
sagaConfig.UseInMemoryOutbox();
})
.InMemoryRepository();
config.AddSagaStateMachineTestHarness<MySaga, MySagaState>();
});
var serviceProvider = services.BuildServiceProvider(true);
Harness = serviceProvider.GetRequiredService<InMemoryTestHarness>();
SagaHarness = serviceProvider.GetRequiredService<IStateMachineSagaTestHarness<MySagaState, MySaga>>();
}
[Fact]
public async Task TestMySaga()
{
string processId = "newProcessId";
var correlationId = NewId.NextGuid();
await Harness.Start();
try
{
await Harness.Bus.Publish(new StartProcess(correlationId, processId));
var sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId, x => x.InProgress)).Any();
Assert.True(sagaExists, "Saga not exists");
await Harness.Bus.Publish(new ProcessStageFinished(correlationId, processId));
sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId && s.Stage == 1, x => x.InProgress)).Any();
Assert.True(sagaExists, "Saga not exists");
await Harness.Bus.Publish(new ProcessFinished(correlationId, processId));
sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId, x => x.Final)).Any();
Assert.True(sagaExists, "Saga not exists");
}
finally
{
await Harness.Stop();
}
}
}
Большой. Это помогло. Но теперь, чтобы пройти тест, мне нужно добавить некоторые задержки после каждой операции публикации — вот так:
await Task.Delay(100);
Это правильно?