Я, наверное, уже в десятый раз внедряю что-то подобное, и я никогда не был на 100% доволен решениями, которые придумал.
Причина, по которой использование таблицы mysql вместо «правильной» системы обмена сообщениями является привлекательной, в первую очередь потому, что большинство приложений уже используют некоторую реляционную базу данных для других вещей (которые, как правило, являются mysql для большинства вещей, которые я делал), в то время как очень мало приложений использовать систему обмена сообщениями. Кроме того, реляционные базы данных имеют очень сильные свойства ACID, в то время как системы обмена сообщениями часто этого не делают.
Первая идея - использовать:
create table jobs( id auto_increment not null primary key, message text not null, process_id varbinary(255) null default null, key jobs_key(process_id) );
И тогда очередь выглядит так:
insert into jobs(message) values('blah blah');
А удаление из очереди выглядит так:
begin; select * from jobs where process_id is null order by id asc limit 1; update jobs set process_id = ? where id = ?; -- whatever i just got commit; -- return (id, message) to application, cleanup after done
Table и enqueue выглядят красиво, но dequeue меня немного беспокоит. Насколько вероятно откат? Или быть заблокированным? Какие ключи я должен использовать, чтобы сделать его O (1) -ish?
Или есть лучшее решение, чем то, что я делаю?
Ваш дизайн удаления из очереди страдает от состояния гонки. Вам либо нужно превратить ваш SELECT в «SELECT ... FOR UPDATE», либо вам нужно сначала выполнить UPDATE (как предлагает @pawstrong), поскольку UPDATE является атомарным.
В RabbitMQ задача будет повторяться, если соединение между воркером и брокером прервется. Как это сделать с MySQL, нужно ли добавить поле: ping и обновлять его во время выполнения задачи? Теоретически может быть максимальный тайм-аут ожидающей задачи, но тогда, если у вас есть длительные задачи (несколько часов), задачи не будут повторяться сразу.






Я бы предложил использовать Quartz.NET
У него есть поставщики для SQL Server, Oracle, MySql, SQLite и Firebird.
Hangfire.io - также отличная альтернатива.
Брайан Эйкер некоторое время назад говорил о машина очереди. Также были разговоры о синтаксисе SELECT table FROM DELETE.
Если вас не беспокоит пропускная способность, вы всегда можете использовать ВЫБРАТЬ GET_LOCK () в качестве мьютекса. Например:
SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');
А если вы хотите получить действительно что-то особенное, оберните это в хранимую процедуру.
Эта ветка содержит проектную информацию, которую следует отображать.
Цитировать:
Вот что я успешно использовал в прошлом:
Схема таблицы MsgQueue
Идентификатор MsgId - НЕ NULL
MsgTypeCode varchar (20) - НЕ NULL
SourceCode varchar (20) - процесс вставки сообщения - NULLable
State char (1) - 'N'ew, если в очереди,' A '(ctive), если обработка,' C'ompleted, по умолчанию 'N' - НЕ NULL
CreateTime datetime - по умолчанию GETDATE () - НЕ NULL
Msg varchar (255) - NULLable
Типы ваших сообщений - это то, что вы ожидаете - сообщения, которые соответствуют контракту между вставкой процесса (ов) и процессом (ами) чтения, структурированы с помощью XML или другого варианта представления (в некоторых случаях может пригодиться JSON для пример).
Затем могут быть вставлены процессы от 0 до n, а процессы от 0 до n могут читать и обрабатывать сообщения. Каждый процесс чтения обычно обрабатывает сообщения одного типа. Для балансировки нагрузки может выполняться несколько экземпляров типа процесса.
Читатель вытаскивает одно сообщение и меняет состояние на «А», пока работает с ним. Когда это будет сделано, он изменит состояние на «C» завершено. Он может удалить сообщение или нет, в зависимости от того, хотите ли вы вести контрольный журнал. Сообщения State = 'N' извлекаются в порядке MsgType / Timestamp, поэтому есть индекс MsgType + State + CreateTime.
Варианты:
Состояние ошибки "E" .
Столбец для кода процесса Reader.
Отметки времени для переходов между состояниями.
Это обеспечило хороший, масштабируемый, видимый и простой механизм для выполнения ряда вещей, которые вы описываете. Если у вас есть базовое представление о базах данных, это довольно надежно и расширяемо. Никогда не возникало проблем с откатом блокировок и т. д. Из-за транзакций перехода атомарного состояния.
Я построил несколько систем очередей сообщений, и я не уверен, какой тип сообщения вы имеете в виду, но в случае удаления из очереди (это слово?) Я сделал то же самое, что и вы. . Ваш метод выглядит простым, чистым и надежным. Не то чтобы моя работа была лучшей, но она оказалась очень эффективной для большого мониторинга многих сайтов. (регистрация ошибок, массовые маркетинговые кампании по электронной почте, уведомления в социальных сетях)
Мой голос: не беспокойтесь!
У этого метода есть проблемы с производительностью или параллелизмом. Вы можете это заметить, а можете и не заметить. Вы можете не осознавать, обрабатываются ли задания дважды время от времени, если, например, это сохраняет строки журнала. У вас также могут быть тупиковые ситуации с этим методом. Он использует оптимистическую блокировку, поэтому возвращается к использованию управления версиями и разрешения конфликтов. Два параллельных запроса могут получить одно и то же задание, тогда один блокируется или имеет конфликт при обновлении, также в зависимости от уровня согласованности. В качестве альтернативы process_id затирается. Он работает много времени и с некоторыми реализациями, но не всегда.
Ваша очередь может быть более лаконичной. Вместо того, чтобы полагаться на откат транзакции, вы можете сделать это одним атомарным оператором без явной транзакции:
UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;
Затем вы можете получить задания с помощью (квадратные скобки [] означают необязательные, в зависимости от ваших данных):
SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];
Отлично! Как я могу сделать то же самое с PostgreSQL?
Вот решение, которое я использовал, работая без process_id текущего потока или блокируя таблицу.
SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;
Получите результат в массиве $ row и выполните:
DELETE from jobs WHERE ID=$row['ID'];
Затем получите затронутые строки (mysql_affected_rows). Если есть затронутые строки, обработайте задание в массиве $ row. Если есть 0 затронутых строк, это означает, что какой-то другой процесс уже обрабатывает выбранное задание. Повторяйте вышеуказанные шаги, пока не останется рядов.
Я тестировал это с таблицей «заданий», содержащей 100 тыс. Строк, и порождал 20 параллельных процессов, которые делают то же самое. Никаких условий гонки не было. Вы можете изменить приведенные выше запросы, чтобы обновить строку с помощью флага обработки и удалить строку после ее фактической обработки:
while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}
Излишне говорить, что вы должны использовать правильную очередь сообщений (ActiveMQ, RabbitMQ и т. д.) Для такого рода работы. Однако нам пришлось прибегнуть к этому решению, так как наш хост регулярно ломает что-то при обновлении программного обеспечения, поэтому чем меньше будет проблем, тем лучше.
У вас может быть промежуточная таблица для поддержания смещения очереди.
create table scan(
scan_id int primary key,
offset_id int
);
У вас также может быть несколько сканирований, следовательно, одно смещение на сканирование. Инициализировать offset_id = 0 в начале сканирования.
begin;
select * from jobs where order by id where id > (select offset_id from scan where scan_id = 0) asc limit 1;
update scan set offset_id = ? where scan_id = ?; -- whatever i just got
commit;
Все, что вам нужно сделать, это просто сохранить последнее смещение. Это также сэкономит вам много места (process_id на запись). Надеюсь, это звучит логично.
В MySQL 8 вы можете использовать новые ключевые слова NOWAIT и SKIP LOCKED, чтобы избежать сложности с помощью специальных механизмов блокировки:
START TRANSACTION;
SELECT id, message FROM jobs
WHERE process_id IS NULL
ORDER BY id ASC LIMIT 1
FOR UPDATE SKIP LOCKED;
UPDATE jobs
SET process_id = ?
WHERE id = ?;
COMMIT;
Традиционно этого было трудно достичь без хаков и необычных специальных таблиц или столбцов, ненадежных решений или потери параллелизма.
SKIP LOCKED может вызвать проблемы с производительностью у очень большого числа потребителей.
Однако это по-прежнему не обрабатывает автоматическую пометку задания как завершенного при откате транзакции. Для этого вам могут понадобиться очки сохранения. Однако это может не решить всех случаев. Вы действительно хотели бы установить действие, которое будет выполняться при сбое транзакции, но как часть транзакции!
В будущем, возможно, появятся дополнительные функции для оптимизации с такими случаями, как обновление, которое также может возвращать совпадающие строки. В журнале изменений важно быть в курсе новых функций и возможностей.
Я бы попытался сделать что-то вроде: UPDATE jobs SET process_id = id_arg WHERE id = (SELECT MIN (id) FROM jobs WHERE process_id IS NULL) <br> ВЫБРАТЬ поля из заданий WHERE process_id = id_arg