Избегайте обработки одной и той же записи потребителями

Невозможно запретить нескольким потребителям обрабатывать одну и ту же запись в очереди

Используя библиотеку System.Threading.Channels, когда писатель ставит в очередь некоторую модель, один из нескольких потребителей начинает ее обрабатывать. при этом писатель обращается к базе данных и читает ту же запись (поскольку она все еще обрабатывается, а статус не обновляется), в результате чего другой потребитель начинает обрабатывать ту же модель. Я реализовал ConcurrentDictionary, чтобы предотвратить проблему, но это не может помочь. Любые идеи о том, как решить эту проблему?

Вот код:

public sealed class SendImagesBackgroundService : BackgroundTask
{
    private readonly ILogger<SendImagesBackgroundService> _logger;
    private readonly IServiceProvider _serviceProvider;
    private readonly IApiService _apiService;
    private readonly Channel<InspectionFileModel> _channel;
    private readonly SendImagesBackgroundServiceOptions _options;
    private static readonly ConcurrentDictionary<string, bool>
        s_concurrentDictionary = new();

    public SendImagesBackgroundService(
        ILogger<SendImagesBackgroundService> logger,
        IServiceProvider serviceProvider,
        IApiService apiService,
        IOptions<SendImagesBackgroundServiceOptions> options,
        IBackgroundTaskLockProvider lockProvider) : base(logger)
    {
        UseExclusiveLock(lockProvider);

        _options = options.Value;
        _logger = logger;
        _serviceProvider = serviceProvider;
        _apiService = apiService;

        _channel = Channel.CreateBounded<InspectionFileModel>(
            new BoundedChannelOptions(_options.ChannelQueueSize));
    }

    protected override Task ExecuteAsync(CancellationToken cancellationToken)
    {
        Task.Run(() => FetchImages(cancellationToken), cancellationToken);
        Task.Run(() => StartProcessingInspectionImages(cancellationToken),
            cancellationToken);
        return Task.CompletedTask;
    }

    private async Task FetchImages(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            using var serviceScope = _serviceProvider.CreateScope();
            List<InspectionFileModel>? inspectionImages = default;
            do
            {
                try
                {
                     var mediatr = serviceScope.ServiceProvider
                         .GetRequiredService<IMediator>();
                     inspectionImages = await mediatr.Send(
                         new DownloadInspectionFilesQueryNew(),cancellationToken);
                     if (inspectionImages is not {Count: > 0})
                     {
                         continue;
                     }

                     var nonProcessingImages = inspectionImages.Where(
                         x => !s_concurrentDictionary.ContainsKey(x.Id));

                     foreach (var image in nonProcessingImages)
                     {
                         s_concurrentDictionary.TryAdd(image.Id, true);
                         while (!(cancellationToken.IsCancellationRequested &&
                             await _channel.Writer.WaitToWriteAsync(
                                 cancellationToken)))
                         {
                             if (!_channel.Writer.TryWrite(image))
                             {
                                 continue;
                             }

                             break;
                         }
                     }
                }
                catch (Exception ex) when (!(cancellationToken
                    .IsCancellationRequested && ex is OperationCanceledException))
                {
                    _logger.LogError(ex, "Fetching inspection images Failed");
                }

            } while (!cancellationToken.IsCancellationRequested
                && inspectionImages  is {Count: > 0});

            await Task.Delay( _options.DelayBetweenFetchBatchMs, cancellationToken);
        }

        _logger.LogInformation("End fetching inspection images");
    }

    private async Task StartProcessingInspectionImages(
        CancellationToken cancellationToken)
    {
        var parallelProcesses = new List<Task>();
        for (int i = 0; i <   _options.NumberOfParallelTasks; i++)
        {
            var task = Task.Run(() => ProcessInspectionImages(cancellationToken),
                cancellationToken);
            parallelProcesses.Add(task);
        }
        await Task.WhenAll(parallelProcesses);
    }

    private async Task ProcessInspectionImages(CancellationToken cancellationToken)
    {
        while (!(cancellationToken.IsCancellationRequested &&
                 await _channel.Reader.WaitToReadAsync(cancellationToken)))
        {
            while (!cancellationToken.IsCancellationRequested &&
                     _channel.Reader.TryRead(out var inspectionImage ))
            {
                try
                {
                    await SendInspectionImageToLivo(inspectionImage,
                        cancellationToken);
                }
                catch (Exception ex) when (!(cancellationToken
                    .IsCancellationRequested && ex is OperationCanceledException))
                {
                   //handle
                }
            }
        }
    }

    private async Task SendInspectionImageToLivo(InspectionFileModel image,
        CancellationToken cancellationToken)
    {
        try
        {
            //send data over the network
        }
        catch (ApiException ex)
        {
            //handle 
        }
        finally
        {
            s_concurrentDictionary.TryRemove(image.Id, out bool _);
        }
    }

    public override object? GetTelemetry() => null;
}

Статус обновляется в методе SendInspectionImageToLivo. Если код состояния равен 200, поле состояния становится успешным. В случае 4 ** он установлен как неудавшийся, и последующие запросы к БД не будут включать их в результат на основе этих статусов.

public sealed class SendImagesBackgroundService : BackgroundTask -- Что такое BackgroundTask? Он встроенный или настраиваемый?
Theodor Zoulias 22.02.2023 10:23

Пользовательский @TheodorZoulias, реализует IHostedService

tabsandze 22.02.2023 10:29

Можно ли решить эту проблему на уровне базы данных, добавив логическое поле Processing в базу данных? Потребитель должен начать с обновления этого поля до true (только если в данный момент оно равно false), а в случае recordsAffected = 1 продолжить обработку записи, в противном случае пропустить ее. UPDATE Records SET Processing = 1 WHERE Id = @Id AND Processing = 0;

Theodor Zoulias 22.02.2023 10:54

@TheodorZoulias да, это допустимый вариант, но я хотел бы управлять им на уровне кода

tabsandze 22.02.2023 13:31

Проблема с опорой на код заключается в том, что даже если вы правильно написали код, в тот момент, когда вы случайно запустите два экземпляра своего приложения, все ставки сняты. Делать это на уровне базы данных безопаснее.

Theodor Zoulias 22.02.2023 13:59

Об этом позаботится метод UseExclusiveLock(lockProvider), процесс будет запущен только в одном экземпляре.

tabsandze 22.02.2023 14:03

«потому что он все еще обрабатывается, а статус не обновляется» — в какой момент конвейера обновляется статус? Это происходит внутри метода SendInspectionImageToLivo?

Theodor Zoulias 22.02.2023 14:09

Да, если код состояния равен 200, поле состояния становится успешным, в случае 4 ** для него устанавливается значение «Ошибка», и последующие запросы к базе данных не будут включать их в результат на основе этих статусов.

tabsandze 22.02.2023 14:15

На какую платформу .NET вы ориентируетесь? .NET 7?

Theodor Zoulias 22.02.2023 14:23

Нет, я нацеливаюсь на сеть 6

tabsandze 22.02.2023 14:31

Чтобы было ясно, здесь нет ничего плохого в каналах. Проблема в том, что производитель помещает один и тот же элемент в канал более одного раза. Таким образом, вам нужно либо пометить запись базы данных как обрабатываемую (как было предложено выше) (примечание: есть сложности с потерянными записями), либо изменить канал на долговременную очередь и использовать шаблон Исходящие (т.е. как только сообщение добавляется в очередь, запись БД удаляется или помечается как обработанная).

Stephen Cleary 22.02.2023 14:53

@tabsandze, вы путаете разные «одновременные» доступы. Канал не позволяет нескольким потребителям получать одно и то же сообщение. Никаких если или но. Если только ваше собственное приложение не помещает туда один и тот же элемент несколько раз. Вот почему люди говорят о сообщениях, а не об объектах или предметах. Если вы хотите изменить запись, сообщение должно быть операцией Update Record 123 with X, а не самим объектом.

Panagiotis Kanavos 22.02.2023 14:54

Что касается writer goes to the database and reads the same record, это означает лучший способ определить, что нужно читать. Не зная, какие данные загружаются, исходя из каких критериев, можно только догадываться. Один из вариантов — использовать предложение UPDATE ... OUTPUT в SQL Server или RETURNING в MariaDB/PostgreSQL, чтобы пометить строку и вернуть ее идентификатор. Другой вариант — использовать Отслеживание изменений в SQL Server для получения новых/измененных строк.

Panagiotis Kanavos 22.02.2023 15:27
Запуск PHP на IIS без использования программы установки веб-платформы
Запуск PHP на IIS без использования программы установки веб-платформы
Установщик веб-платформы, предлагаемый компанией Microsoft, перестанет работать 31 декабря 2022 года. Его закрытие привело к тому, что мы не можем...
Оптимизация React Context шаг за шагом в 4 примерах
Оптимизация React Context шаг за шагом в 4 примерах
При использовании компонентов React в сочетании с Context вы можете оптимизировать рендеринг, обернув ваш компонент React в React.memo сразу после...
Библиотека для работы с мороженым
Библиотека для работы с мороженым
Лично я попрощался с операторами print() в python. Без шуток.
Настройка шаблона Metronic с помощью Webpack и Gulp
Настройка шаблона Metronic с помощью Webpack и Gulp
Я пишу эту статью, чтобы поделиться тем, как настроить макет Metronic с помощью Sass, поскольку Metronic предоставляет так много документации, и они...
Уроки CSS 6
Уроки CSS 6
Здравствуйте дорогие читатели, я Ферди Сефа Дюзгюн, сегодня мы продолжим с вами уроки css. Сегодня мы снова продолжим с так называемых классов.
Что такое Css? Для чего он используется?
Что такое Css? Для чего он используется?
CSS, или "Каскадные таблицы стилей", - это язык стилей, используемый в веб-страницах. CSS является одним из основных инструментов веб-разработки...
2
13
67
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я думаю, что ключом к решению вашей проблемы является создание моментального снимка обрабатываемых в данный момент изображений перед получением inspectionImages с помощью команды DownloadInspectionFilesQueryNew. Затем фильтруйте изображения на основе снимка, а не на основе текущего состояния словаря. В противном случае можно начать выборку изображений, пока изображения загружаются, изображение X обрабатывается и удаляется из словаря, изображение X находится среди выбранных изображений, и, в конце концов, изображение X не фильтруется и обрабатывается снова. Использование моментального снимка делает этот сценарий невозможным при условии, что запросы к базе данных запрашиваются напрямую или, по крайней мере, запросы распространяются в базу данных в правильном порядке FIFO.

Ниже приведен альтернативный способ реализации вашего сервиса, который может вас заинтересовать. Он основан на перегрузке Parallel.ForEachAsync , которая принимает IAsyncEnumerable<T> как source. Источником является метод итератора, который извлекает необработанные изображения и возвращает их. Я думаю, что это проще и читабельнее, чем реализация на основе Channel<T>.

protected override Task ExecuteAsync(CancellationToken cancellationToken)
{
    ParallelOptions parallelOptions = new()
    {
        MaxDegreeOfParallelism = _options.NumberOfParallelTasks,
        CancellationToken = cancellationToken,
    };

    HashSet<string> processing = new();

    async IAsyncEnumerable<InspectionFileModel> Producer(
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        while (true)
        {
            HashSet<string> processingSnapshot;
            lock (processing) processingSnapshot = new(processing);
            var inspectionImages = await mediatr.Send(
                new DownloadInspectionFilesQueryNew(), ct);
            var nonProcessingImages = inspectionImages
                .Where(x => !processingSnapshot.Contains(x.Id));
            int yieldedCount = 0;
            foreach (InspectionFileModel image in nonProcessingImages)
            {
                lock (processing) processing.Add(image.Id);
                yield return image; yieldedCount++;
            }
            if (yieldedCount == 0)
                await Task.Delay(1000, ct); // Take a small break.
        }
    }

    return Parallel.ForEachAsync(Producer(), parallelOptions, async (image, ct) =>
    {
        try
        {
            await SendInspectionImageToLivo(image, ct);
        }
        finally { lock (processing) processing.Remove(image.Id); }
    });
}

Вы можете обнаружить, что он ведет себя лучше и в случае необработанных исключений.


Если вы не уверены на 100 %, что база данных будет корректно обрабатывать одновременные команды, позволяя выполнять две команды A-B в порядке B-A, вы можете пойти по более безопасному, но более медленному пути сериализации всего доступа к базе данных, предотвращая любой доступ к базе данных. параллелизм вообще. Для этой цели вы можете использовать SemaphoreSlim в качестве мьютекса:

SemaphoreSlim mutex = new(1, 1);
//...
await mutex.WaitAsync();
try { /* DB command and dictionary modifications */ } finally { mutex.Release(); }

При таком подходе вам не нужно было бы делать снимки словаря, потому что весь доступ к словарю также был бы сериализован. Вам также не понадобится ConcurrentDictionary. Обычного Dictionary или HashSet будет достаточно.

Другие вопросы по теме