У меня есть следующий код, который считывает транзакцию из Kafka и обновляет баланс счета, чтобы показать эту транзакцию.
public class KafkaConsumerService : BackgroundService
{
private readonly IConsumer<string, Transaction> _kafkaConsumer;
private readonly IRepository _repository;
private readonly ICalculator _calculator;
public KafkaConsumerService(
IConsumer<string, Transaction> kafkaConsumer,
IRepository repository,
ICalculator calculator
)
{
_kafkaConsumer = kafkaConsumer;
_repository = repository;
_calculator = calculator;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumeResult = await Task.Run(() => _kafkaConsumer.Consume(stoppingToken), stoppingToken);
var transaction = consumeResult.Message.Value;
var account = await _repository.GetAccount(transaction.Account);
await _repository.UpdateAccount(_calculator.CalculateAccount(account, Normalize(transaction)));
}
private Transaction Normalize(Transaction transaction)
{
if (!transaction.IsCancellation)
{
return transaction;
}
return new Transaction(transaction)
{
Amount = transaction.Amount * -1,
IsCancellation = false
};
}
}
Затем я написал для этого следующий модульный тест, используя XUnit и Moq.
public class KafkaConsumerServiceTest
{
private readonly Mock<IConsumer<string, Transaction>> _kafka = new();
private readonly Mock<IRepository> _repository = new();
private readonly Mock<ICalculator> _calculator = new();
private readonly Fixture _fixture = new();
private readonly KafkaConsumerService _kafkaConsumerService;
public KafkaConsumerServiceTest()
{
_kafkaConsumerService = new KafkaConsumerService(_kafka.Object, _repository.Object, _calculator.Object);
}
[Fact]
public async Task KafkaConsumerService_ProcessesCancelationTransaction()
{
_fixture.Customize<Transaction>(composer => composer
.With(transaction => transaction.IsCancellation, true)
);
var transaction = _fixture.Create<Transaction>();
_kafka
.Setup(consumer => consumer.Consume(It.IsAny<CancellationToken>()))
.Returns(new ConsumeResult<string, Transaction>
{
Message = new Message<string, Transaction>
{
Value = transaction,
},
});
var result = _fixture.Create<Account>() with
{
AccountName = transaction.Account
};
_repository
.Setup(repository => repository.GetAccount(transaction.Account))
.ReturnsAsync(result);
_calculator
.Setup(calculator => calculator.CalculateAccount(It.IsAny<Account?>(), It.IsAny<Transaction>()))
.Returns(result);
await _kafkaConsumerService.StartAsync(CancellationToken.None);
_repository.Verify(repository =>
repository.GetAccount(transaction.Account)
);
_calculator.Verify(calculator =>
calculator.CalculateAccount(result, transaction)
);
_repository.Verify(repository => repository.UpdateAccount(result));
}
}
Однако затем я получаю следующую ошибку
Moq.MockException
Expected invocation on the mock at least once, but was never performed: repository => repository.GetAccount("Account73ccea18-e39c-493f-9533-7af7f983b8ab")
Performed invocations:
Mock<IRepository:1> (repository):
IRepository.GetAccount("Account73ccea18-e39c-493f-9533-7af7f983b8ab")
IRepository.UpdateAccount(Account { AccountName = Account73ccea18-e39c-493f-9533-7af7f983b8ab, Amount = 119 })
Как вы можете видеть, в нем говорится, что метод GetAccount("Account73ccea18-e39c-493f-9533-7af7f983b8ab")
никогда не вызывался, однако прямо под ним в разделе «Выполненные вызовы» говорится, что он был вызван.
Если у кого-то есть какие-либо идеи относительно того, что здесь происходит, я был бы признателен.
РЕДАКТИРОВАТЬ Добавление await Task.Delay(100) в модульные тесты, кажется, решает проблему, однако это не идеальное решение, и я до сих пор не понимаю, почему проблема возникает в первую очередь.
РЕДАКТИРОВАТЬ #2 Кажется, что удаление расширения BackgroundService (https://learn.microsoft.com/en-us/dotnet/api/microsoft.extensions.hosting.backgroundservice?view=dotnet-plat-ext-7.0) кажется исправьте тест, а также. Может ли это каким-то образом вызывать состояние гонки в моем коде?
Мой класс расширяет Microsoft.Extensions.Hosting.BackgroundService, который включает метод StartAsync. По сути, это просто метод, который вызывает абстрактную задачу ExecuteAsync, которую я переопределяю.
Я думаю, что виновником может быть это:
return new Transaction(transaction)
{
Amount = transaction.Amount * -1,
IsCancellation = false
};
Когда вы проверяете экземпляр, он выполняет проверку ссылок, поэтому он не может быть другим вновь созданным объектом.
Пытаться
_repository.Verify(repository =>
repository.GetAccount(It.IsAny<string>())
);
_repository.Verify(repository => repository.UpdateAccount(It.IsAny<Transaction>()));
Вы также можете использовать It.Is<Transaction>(t => t.AccountName == "account")
для проверки определенных значений в утверждении.
Спасибо за ответ, к сожалению, It.IsAny<string> тоже не работает. Я также не думаю, что это проблема, поскольку ошибка возникает до того, как мы вызовем этот метод нормализации. и удаление нормализовать все вместе тоже не исправляет. Я также заметил, что добавление await Task.Delay(100) в модульный тест устраняет проблему. Но это не идеально. Может ли здесь иметь место какое-то состояние гонки?
@ JDChris100 Пожалуйста, добавьте эту дополнительную информацию к вопросу.
Попробуйте переписать Task.Run(() => _kafkaConsumer.Consume(stoppingToken), stoppingToken)
. Не уверен, почему это должно быть в отдельной задаче.
@beautifulcoder Вероятно, это потому, что Consume()
— это длительный блокирующий вызов.
Да, Consume — это метод Kafka, который блокирует поток до тех пор, пока сообщение не будет доступно для потребления, поэтому мне пришлось написать его в Task.Run, чтобы он не блокировал поток.
Изучаем это дальше. Оказывается, BackgroundService.StartAsync
позвонит ExecuteAsync
, а потом return Task.CompletedTask
public virtual Task StartAsync(CancellationToken cancellationToken)
{
// Store the task we're executing
_executingTask = ExecuteAsync(_stoppingCts.Token);
// If the task is completed then return it,
// this will bubble cancellation and failure to the caller
if (_executingTask.IsCompleted)
{
return _executingTask;
}
// Otherwise it's running
return Task.CompletedTask;
}
Это означало, что мой код еще не завершил выполнение, поэтому мои утверждения moq не увенчались успехом. Затем я предполагаю, что время между этими двумя было очень близким, так что к тому времени, когда ошибка была сгенерирована, методы были вызваны, отсюда и запутанное сообщение об ошибке.
Я исправил эту проблему, просто дождавшись завершения выполненной задачи.
await _kafkaConsumerService.StartAsync(CancellationToken.None);
await _kafkaConsumerService.ExecuteTask;
Где определен метод
StartAsync()
? Вы показываете толькоExecuteAsync()
, но вы называетеawait _kafkaConsumerService.StartAsync()
в своем тесте. Что там происходит?