Невозможно запретить нескольким потребителям обрабатывать одну и ту же запись в очереди
Используя библиотеку System.Threading.Channels, когда писатель ставит в очередь некоторую модель, один из нескольких потребителей начинает ее обрабатывать. при этом писатель обращается к базе данных и читает ту же запись (поскольку она все еще обрабатывается, а статус не обновляется), в результате чего другой потребитель начинает обрабатывать ту же модель. Я реализовал ConcurrentDictionary, чтобы предотвратить проблему, но это не может помочь. Любые идеи о том, как решить эту проблему?
Вот код:
public sealed class SendImagesBackgroundService : BackgroundTask
{
private readonly ILogger<SendImagesBackgroundService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IApiService _apiService;
private readonly Channel<InspectionFileModel> _channel;
private readonly SendImagesBackgroundServiceOptions _options;
private static readonly ConcurrentDictionary<string, bool>
s_concurrentDictionary = new();
public SendImagesBackgroundService(
ILogger<SendImagesBackgroundService> logger,
IServiceProvider serviceProvider,
IApiService apiService,
IOptions<SendImagesBackgroundServiceOptions> options,
IBackgroundTaskLockProvider lockProvider) : base(logger)
{
UseExclusiveLock(lockProvider);
_options = options.Value;
_logger = logger;
_serviceProvider = serviceProvider;
_apiService = apiService;
_channel = Channel.CreateBounded<InspectionFileModel>(
new BoundedChannelOptions(_options.ChannelQueueSize));
}
protected override Task ExecuteAsync(CancellationToken cancellationToken)
{
Task.Run(() => FetchImages(cancellationToken), cancellationToken);
Task.Run(() => StartProcessingInspectionImages(cancellationToken),
cancellationToken);
return Task.CompletedTask;
}
private async Task FetchImages(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
using var serviceScope = _serviceProvider.CreateScope();
List<InspectionFileModel>? inspectionImages = default;
do
{
try
{
var mediatr = serviceScope.ServiceProvider
.GetRequiredService<IMediator>();
inspectionImages = await mediatr.Send(
new DownloadInspectionFilesQueryNew(),cancellationToken);
if (inspectionImages is not {Count: > 0})
{
continue;
}
var nonProcessingImages = inspectionImages.Where(
x => !s_concurrentDictionary.ContainsKey(x.Id));
foreach (var image in nonProcessingImages)
{
s_concurrentDictionary.TryAdd(image.Id, true);
while (!(cancellationToken.IsCancellationRequested &&
await _channel.Writer.WaitToWriteAsync(
cancellationToken)))
{
if (!_channel.Writer.TryWrite(image))
{
continue;
}
break;
}
}
}
catch (Exception ex) when (!(cancellationToken
.IsCancellationRequested && ex is OperationCanceledException))
{
_logger.LogError(ex, "Fetching inspection images Failed");
}
} while (!cancellationToken.IsCancellationRequested
&& inspectionImages is {Count: > 0});
await Task.Delay( _options.DelayBetweenFetchBatchMs, cancellationToken);
}
_logger.LogInformation("End fetching inspection images");
}
private async Task StartProcessingInspectionImages(
CancellationToken cancellationToken)
{
var parallelProcesses = new List<Task>();
for (int i = 0; i < _options.NumberOfParallelTasks; i++)
{
var task = Task.Run(() => ProcessInspectionImages(cancellationToken),
cancellationToken);
parallelProcesses.Add(task);
}
await Task.WhenAll(parallelProcesses);
}
private async Task ProcessInspectionImages(CancellationToken cancellationToken)
{
while (!(cancellationToken.IsCancellationRequested &&
await _channel.Reader.WaitToReadAsync(cancellationToken)))
{
while (!cancellationToken.IsCancellationRequested &&
_channel.Reader.TryRead(out var inspectionImage ))
{
try
{
await SendInspectionImageToLivo(inspectionImage,
cancellationToken);
}
catch (Exception ex) when (!(cancellationToken
.IsCancellationRequested && ex is OperationCanceledException))
{
//handle
}
}
}
}
private async Task SendInspectionImageToLivo(InspectionFileModel image,
CancellationToken cancellationToken)
{
try
{
//send data over the network
}
catch (ApiException ex)
{
//handle
}
finally
{
s_concurrentDictionary.TryRemove(image.Id, out bool _);
}
}
public override object? GetTelemetry() => null;
}
Статус обновляется в методе SendInspectionImageToLivo. Если код состояния равен 200, поле состояния становится успешным. В случае 4 ** он установлен как неудавшийся, и последующие запросы к БД не будут включать их в результат на основе этих статусов.
Пользовательский @TheodorZoulias, реализует IHostedService
Можно ли решить эту проблему на уровне базы данных, добавив логическое поле Processing в базу данных? Потребитель должен начать с обновления этого поля до true (только если в данный момент оно равно false), а в случае recordsAffected = 1 продолжить обработку записи, в противном случае пропустить ее. UPDATE Records SET Processing = 1 WHERE Id = @Id AND Processing = 0;
@TheodorZoulias да, это допустимый вариант, но я хотел бы управлять им на уровне кода
Проблема с опорой на код заключается в том, что даже если вы правильно написали код, в тот момент, когда вы случайно запустите два экземпляра своего приложения, все ставки сняты. Делать это на уровне базы данных безопаснее.
Об этом позаботится метод UseExclusiveLock(lockProvider), процесс будет запущен только в одном экземпляре.
«потому что он все еще обрабатывается, а статус не обновляется» — в какой момент конвейера обновляется статус? Это происходит внутри метода SendInspectionImageToLivo?
Да, если код состояния равен 200, поле состояния становится успешным, в случае 4 ** для него устанавливается значение «Ошибка», и последующие запросы к базе данных не будут включать их в результат на основе этих статусов.
На какую платформу .NET вы ориентируетесь? .NET 7?
Нет, я нацеливаюсь на сеть 6
Чтобы было ясно, здесь нет ничего плохого в каналах. Проблема в том, что производитель помещает один и тот же элемент в канал более одного раза. Таким образом, вам нужно либо пометить запись базы данных как обрабатываемую (как было предложено выше) (примечание: есть сложности с потерянными записями), либо изменить канал на долговременную очередь и использовать шаблон Исходящие (т.е. как только сообщение добавляется в очередь, запись БД удаляется или помечается как обработанная).
@tabsandze, вы путаете разные «одновременные» доступы. Канал не позволяет нескольким потребителям получать одно и то же сообщение. Никаких если или но. Если только ваше собственное приложение не помещает туда один и тот же элемент несколько раз. Вот почему люди говорят о сообщениях, а не об объектах или предметах. Если вы хотите изменить запись, сообщение должно быть операцией Update Record 123 with X, а не самим объектом.
Что касается writer goes to the database and reads the same record, это означает лучший способ определить, что нужно читать. Не зная, какие данные загружаются, исходя из каких критериев, можно только догадываться. Один из вариантов — использовать предложение UPDATE ... OUTPUT в SQL Server или RETURNING в MariaDB/PostgreSQL, чтобы пометить строку и вернуть ее идентификатор. Другой вариант — использовать Отслеживание изменений в SQL Server для получения новых/измененных строк.
Я думаю, что ключом к решению вашей проблемы является создание моментального снимка обрабатываемых в данный момент изображений перед получением inspectionImages с помощью команды DownloadInspectionFilesQueryNew. Затем фильтруйте изображения на основе снимка, а не на основе текущего состояния словаря. В противном случае можно начать выборку изображений, пока изображения загружаются, изображение X обрабатывается и удаляется из словаря, изображение X находится среди выбранных изображений, и, в конце концов, изображение X не фильтруется и обрабатывается снова. Использование моментального снимка делает этот сценарий невозможным при условии, что запросы к базе данных запрашиваются напрямую или, по крайней мере, запросы распространяются в базу данных в правильном порядке FIFO.
Ниже приведен альтернативный способ реализации вашего сервиса, который может вас заинтересовать. Он основан на перегрузке Parallel.ForEachAsync , которая принимает IAsyncEnumerable<T> как source. Источником является метод итератора, который извлекает необработанные изображения и возвращает их. Я думаю, что это проще и читабельнее, чем реализация на основе Channel<T>.
protected override Task ExecuteAsync(CancellationToken cancellationToken)
{
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = _options.NumberOfParallelTasks,
CancellationToken = cancellationToken,
};
HashSet<string> processing = new();
async IAsyncEnumerable<InspectionFileModel> Producer(
[EnumeratorCancellation] CancellationToken ct = default)
{
while (true)
{
HashSet<string> processingSnapshot;
lock (processing) processingSnapshot = new(processing);
var inspectionImages = await mediatr.Send(
new DownloadInspectionFilesQueryNew(), ct);
var nonProcessingImages = inspectionImages
.Where(x => !processingSnapshot.Contains(x.Id));
int yieldedCount = 0;
foreach (InspectionFileModel image in nonProcessingImages)
{
lock (processing) processing.Add(image.Id);
yield return image; yieldedCount++;
}
if (yieldedCount == 0)
await Task.Delay(1000, ct); // Take a small break.
}
}
return Parallel.ForEachAsync(Producer(), parallelOptions, async (image, ct) =>
{
try
{
await SendInspectionImageToLivo(image, ct);
}
finally { lock (processing) processing.Remove(image.Id); }
});
}
Вы можете обнаружить, что он ведет себя лучше и в случае необработанных исключений.
Если вы не уверены на 100 %, что база данных будет корректно обрабатывать одновременные команды, позволяя выполнять две команды A-B в порядке B-A, вы можете пойти по более безопасному, но более медленному пути сериализации всего доступа к базе данных, предотвращая любой доступ к базе данных. параллелизм вообще. Для этой цели вы можете использовать SemaphoreSlim в качестве мьютекса:
SemaphoreSlim mutex = new(1, 1);
//...
await mutex.WaitAsync();
try { /* DB command and dictionary modifications */ } finally { mutex.Release(); }
При таком подходе вам не нужно было бы делать снимки словаря, потому что весь доступ к словарю также был бы сериализован. Вам также не понадобится ConcurrentDictionary. Обычного Dictionary или HashSet будет достаточно.
public sealed class SendImagesBackgroundService : BackgroundTask
-- Что такоеBackgroundTask
? Он встроенный или настраиваемый?