Улучшите запрос SQL Server, чтобы он был «потокобезопасным»

У меня есть приложение, которое обрабатывает входящие сообщения, но в зависимости от типа сообщения я хочу обрабатывать их одноэлементным подходом, что означает, что два сообщения одного типа не могут обрабатываться одновременно.

Итак, в моей базе данных есть таблица MessageInformation, содержащая сообщения и ExecuteStatus. Где 0 = Необработано, 1 = Обработка и 6 = При удержании.

Теперь я хочу выяснить, следует ли пометить поступившее новое сообщение для обработки или приостановить его.

Следующий оператор SQL делает это довольно легко:

IF EXISTS (SELECT 1 FROM [dbo].[MessageInformations] WHERE MessageType = @MessageType AND ExecuteStatus IN (1,6))
BEGIN
    UPDATE [dbo].[MessageInformations]
    SET ExecuteStatus = 6
    OUTPUT Inserted.ExecuteStatus
    WHERE [Identifier] = @MessageIdentifier
END
ELSE
BEGIN
    UPDATE [dbo].[MessageInformations]
    SET ExecuteStatus = 1
    OUTPUT Inserted.ExecuteStatus
    WHERE [Identifier] = @MessageIdentifier
END

Проблема здесь в том, что это не является «потокобезопасным», то есть, если одновременно приходят 2 сообщения, они оба могут быть помечены как Processing, тогда как им должно быть только 1. Я мог бы поместить это в транзакцию и некоторую блокировку таблицы MessageInformation, но я не хочу блокировать всю таблицу, потому что могут быть другие сообщения для других типов, которые не хотят ждать блокировки Закончив, я запускаю этот запрос только для сообщений этого конкретного типа. Другие типы сообщений будут просто установлены на Processing.

А еще в таблице МНОГО сообщений, их будет стабильно около 40-50 миллионов (очистка старых сообщений).

Поэтому мне нужен либо один оператор UPDATE, который может сделать это за меня, либо способ заблокировать только те строки в таблице, которые соответствуют MessageType (я мог бы добавить к нему диапазон дат, чтобы получить меньше совпадающих записей, потому что если сообщение находится в режиме ожидания более пары часов, что-то не так).

Чтобы было немного понятнее. Вот несколько шагов.

  • Сначала есть шаг, на котором сообщение создается в базе данных и помечается как «Необработано».

  • Затем есть несколько процессоров, которым присваивается идентификатор сообщения, чтобы попытаться обработать эти сообщения. Таким образом, в основном процессор имеет идентификатор сообщения и должен проверить, можно ли обработать это сообщение или он должен перевести его в режим ожидания. Если он отмечен как «Обработка» = 1, то процессор может продолжать обработку сообщения, но если он отмечен как «OnHold» = 6, процессор должен остановиться и завершить обработку. И это то, что я хочу, чтобы мой SQL-запрос определил.

  • Когда процессор завершит обработку «одиночного» сообщения, он пометит его как «Обработано» = 2 и сообщит «Менеджеру», что можно взять первое (на основе времени создания) «задержанное» сообщение и обработать его, и так что это будет продолжаться до тех пор, пока не исчезнут сообщения об удержании.

  • Любые новые сообщения, которые поступают, когда в базе данных для этого типа сообщений есть сообщения «задержаны» или «обрабатываются», всегда должны переходить в статус «задержаны», поэтому сообщения задержки обрабатываются в порядке их поступления.

Я мог бы создать отдельную таблицу и поместить в нее сообщение об удержании, но если я смогу решить эту проблему, просто изменив статус в таблице MessageInformation, это будет намного проще, поскольку от этой таблицы зависит множество другой логики. .

Возможно, поможет разделение таблиц на ExecuteStatus? Нравится этот пример?

3N1GM4 25.09.2023 13:43

Вы упомянули «приложение». Вы имеете в виду, что существует один экземпляр приложения или оно потенциально масштабируется? Если у вас один экземпляр, не могли бы вы создать очередь FIFO для каждого типа сообщений в приложении (относительно дешево) вместо использования блокировки базы данных (относительно дорого)? Если обработка сообщений должна контролироваться через базу данных, вы можете сделать что-то эквивалентное, создав таблицу CurrentlyProcessingTypes, мощность которой равна просто количеству типов (а не общему количеству сообщений), и выбрав строку типа с помощью rowlock, updlock,

allmhuran 25.09.2023 14:38

очереди — довольно стандартная концепция. Это много текста, чтобы объяснить, что вы хотите. Кажется, этот вопрос спрашивает о том: «У меня есть приложение, которое обрабатывает входящие сообщения», но в своем комментарии ниже вы говорите: «Я больше помещаю его в очередь и отмечаю». Который из них?

Nick.Mc 26.09.2023 00:39

Ну, это и то, и другое. Приложение помещает его в очередь, а затем обрабатывает.

Magnus Gladh 28.09.2023 18:15
ReactJs | Supabase | Добавление данных в базу данных
ReactJs | Supabase | Добавление данных в базу данных
Это и есть ваш редактор таблиц в supabase.👇
Понимание Python и переход к SQL
Понимание Python и переход к SQL
Перед нами лабораторная работа по BloodOath:
2
4
72
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

Чтобы гарантировать, что сообщения одного и того же типа не обрабатываются одновременно, и избежать блокировки всей таблицы, вы можете использовать подсказку UPDLOCK с общим табличным выражением (CTE).

;WITH CTE AS (
    SELECT TOP 1 [Identifier], ExecuteStatus
    FROM [dbo].[MessageInformations] WITH (UPDLOCK, ROWLOCK)
    WHERE MessageType = @MessageType
    AND ExecuteStatus IN (0, 6) -- NotProcessed or OnHold
    ORDER BY <your_ordering_column> -- Specify an appropriate column for ordering
)
UPDATE CTE
SET ExecuteStatus = 1
OUTPUT Inserted.ExecuteStatus
WHERE [Identifier] = @MessageIdentifier;

Выглядит многообещающе, но, возможно, я немного неясно выразил свой вопрос. Я не пытаюсь получить сообщение для обработки из своей очереди, я скорее помещаю его в очередь и отмечаю как «ToBeProcessed» или «OnHold», чем фактически получаю сообщение, которое мне следует обработать.

Magnus Gladh 25.09.2023 14:51

Я добавил в исходное сообщение дополнительную информацию, которая, возможно, прояснит, как оно работает.

Magnus Gladh 25.09.2023 15:06

Вы используете несколько операторов для решения проблемы, которую можно решить с помощью одного (или двух). @amit mohanty прав в том, что вы можете использовать блокировку, чтобы принудительно решить эту проблему. Однако в целом вы также можете использовать более высокий уровень изоляции (сериализуемый) для достижения того же результата для более широкого класса запросов, если это необходимо.

Вы также можете подумать, нужно ли вам выполнять одну транзакцию для каждой строки или ваше приложение может обрабатывать несколько строк одновременно. Если возможно, это может еще больше упростить проблему за счет сокращения количества необходимых транзакций. Если вы сможете перейти к модели, в которой вы обновляете один раз для всей таблицы (или один раз для каждого типа перехода состояний), вы можете получить дополнительный прирост производительности и параллелизма.

Сообщение Амита помогло мне, но мне нужно было немного настроить SQL-запрос, чтобы он соответствовал моим требованиям.

;WITH CTE AS (
    SELECT *
        FROM [dbo].[MessageInformations] WITH (UPDLOCK, ROWLOCK)
        WHERE MessageType = @MessageType
            AND ExecuteStatus IN (0, 1, 6) -- NotProcessing, Processing or OnHold
)
UPDATE CTE
    SET ExecuteStatus = CASE WHEN (SELECT count(1) FROM CTE WHERE ExecuteStatus IN (1, 6) ) = 0 THEN 1 ELSE 6 END
    OUTPUT Inserted.ExecuteStatus
    WHERE [Identifier] = @MessageIdentifier
        AND ExecuteStatus != 1;

В «Плане выполнения» отображается только 1 обновление TSQL, поэтому я думаю, что многократное выполнение безопасно.

Другие вопросы по теме