У меня работа по таймеру
@Component
public class Worker {
@Scheduled(fixedDelay = 100)
public void processEnvironmentActions() {
Job job = pickJob();
}
public Job pickJob() {
Job job = jobRepository.findFirstByStatus(Status.NOT_PROCESSED);
job.setStatus(Status.PROCESSING);
jobRepository.save(job);
return job;
}
}
Теперь в большинстве ситуаций это должно дать мне правильный результат. Но что произойдет, если два экземпляра микросервиса будут выполнять этот фрагмент кода одновременно?
Как мне убедиться, что даже при наличии нескольких экземпляров службы репозиторий всегда должен отдавать одно задание только одному экземпляру, а не другим экземплярам.
Обновлено:
Я думаю, что люди запутались / сконцентрировались на @Transactional
, поэтому удалили его. Вопрос остается прежним.
Он должен работать, если метод public
, не так ли? Например, я написал его как метод private
, но если я сделаю его public
, как мне добиться синхронизации между несколькими экземплярами?
Я не знаю, как ответить на твой вопрос. Я просто заметил проблему в вашем коде.
Я думаю, ваш вопрос скорее таков: как мне реализовать распределенную рабочую очередь для нескольких экземпляров службы?
@talex прав. Transactional не работает с локальными вызовами, потому что не задействован прокси. Чтобы решить вашу проблему, вам необходимо просмотреть всю таблицу вакансий. Я добавлю ответ позже.
Я не знаком с spring -batch, но, по-видимому, spring-batch реализует оптимистичную блокировку, поэтому операция сохранения не удастся, если другой поток уже выбрал ту же работу.
Я согласен с ответом @ Путаница, но, не зная о политике очистки, вы должны использовать метод jobRepository.saveAndFlush(job)
, чтобы вы были уверены, что операторы sql помещаются в базу данных.
см. также Разница между save и saveAndFlush в данных Spring jpa
But what will happen if there are two instances of microservice executing this piece of code at the same time?
Как это часто бывает, ответ таков: это зависит от обстоятельств.
Все это предполагает, что ваш код работает внутри транзакции.
Оптимистическая блокировка.
Если ваш объект Job
имеет атрибут версии, то есть атрибут, помеченный аннотацией @Version
.
Оптимистическая блокировка включена.
Если процессы обращаются к одному и тому же заданию, можно заметить, что атрибут версий изменился при попытке сохранить измененный объект задания, и произойдет сбой с OptimisticLockingException
.
Все, что вам нужно сделать, это обработать это исключение, чтобы процесс не завершился, а снова попытался получить следующий Job
.
Нет блокировки (на уровне JPA).
Если объект Job
не имеет атрибута версии, JPA по умолчанию не применяет никаких блокировок.
Второй процесс, обращающийся к Job
, выдает обновление, которое по сути является NOOP, поскольку первый процесс уже обновил его.
Никто не заметит проблемы.
Вероятно, вы захотите этого избежать.
Пессимистическая блокировка
Блокировка pessimistic_write не позволит кому-либо прочитать объект до того, как вы закончите чтение и запись (по крайней мере, так я понимаю спецификацию JPA). Следовательно, это должно избежать того, чтобы второй процесс мог выбрать строку до того, как первый процесс завершит ее запись. Вероятно, это блокирует весь второй процесс. Поэтому убедитесь, что транзакция, удерживающая такую блокировку, короткая.
Чтобы получить такую блокировку, аннотируйте метод репозитория findFirstByStatus
с помощью @Lock(LockModeType.PESSIMISTIC_WRITE)
.
Конечно, могут быть библиотеки или фреймворки, которые обрабатывают такие детали за вас.
Ответ @Jens Schauder указывает мне правильное направление. Позвольте мне поделиться кодом, чтобы он помогал другим людям. Вот как я решил свою проблему, я изменил свой класс работы, как показано ниже
@Entity
public class Job {
@Version
private Long version = null;
// other fields omitted for bervity
}
Теперь давайте проследим следующий код
@Transactional
public Job pickJob() {
Job job = jobRepository.findFirstByStatus(Status.NOT_PROCESSED);
job.setStatus(Status.PROCESSING);
Job saved jobRepository.save(job);
return saved;
}
Note: Make sure you return the
saved
object and not thejob
object. If you return the job object, it'll fail for secondsave
operation as the version count that was forjob
will be behind than that forsaved
.
.
Service 1 Service 2 1. Read Object (version = 1) 1. Read Object (version = 1) 2. Change the object and save (changes the version) 3. Continues to process 2. Change the object and save (this operation fails as the version that was read was 1 but in the DB version is 2) 3. Skip the job processing
Таким образом, задание будет выполнено только одним процессом.
Вы знаете, что
@Transactional
не работает в случае вызова метода наthis
?