Я работаю над микросервисом Go, где использую пакет robfig/cron
для планирования заданий, изначально думал об использовании time.Ticker(), но пакет облегчил мне задачу.
Мой вопрос заключается в том, как обеспечить регистрацию задания cron только один раз, пока работающий cron не завершится, и предотвратить одновременный запуск нескольких горутин.
В этом отличие от одноэлементного подхода: блок кода внутри
RunProcess()
не должен выполняться до тех пор, пока не завершится предыдущая функция RunProcess.
Я новичок в Голанге и его соглашении. :(
Вот упрощенная версия моей текущей реализации:
package main
import (
"fmt"
"sync"
"time"
"github.com/robfig/cron/v3"
)
var isProcessRunning = false
var mu sync.Mutex
func RunProcess() {
mu.Lock()
defer mu.Unlock()
if isProcessRunning {
fmt.Println("Already Running...")
return
}
isProcessRunning = true
fmt.Println("Running...")
// Simulate work
time.Sleep(15 * time.Second)
isProcessRunning = false
}
func InitCron() {
// Create a new cron scheduler
c := cron.New(cron.WithSeconds())
// Add the task to run every 10 seconds
_, err := c.AddFunc("*/10 * * * * *", RunProcess)
if err != nil {
fmt.Println("Error adding cron job:", err)
return
}
// Start the cron scheduler
c.Start()
// Block indefinitely to keep the cron jobs running
select {}
}
func main() {
InitCron()
}
Однако я заметил, что когда InitCron вызывается несколько раз, он потенциально может создать несколько заданий cron, что приведет к проблемам параллелизма и неожиданному поведению легковесного микросервиса.
Будем очень признательны за любые советы или примеры того, как правильно с этим справиться.
Запускаем cron в golang и пытаемся предотвратить выполнение cron до тех пор, пока не завершится первый cron.
«RunProcess() не должен запускаться…» использование вами мьютекса в RunProcess
уже гарантирует, что он не будет выполняться одновременно, так что вы, похоже, уже достигли своей цели? Код сбивает с толку, потому что «Уже работает» никогда не будет напечатано (из-за заблокированного Mutex
); моя ссылка на игровую площадку показывает, как решить эту проблему, но неясно, хотите ли вы этого. Ссылка на синглтон была ответом на ваш комментарий «заметил, что когда InitCron вызывается несколько раз...» (что, похоже, было вашим главным вопросом?).
Спасибо @Brits за ответ. Я отредактировал свой вопрос. Я пытаюсь заблокировать выполнение с помощью переменной RunProcess
, когда следующий цикл cron вызывает isProcessRunning
, он должен прекратить выполнение того же кода. Я пытаюсь добиться удаления очереди сообщений в 1-м процессе, 2-й/3-й процесс не должен влиять на 1-й RunProcess
, пока 1-й процесс не будет отмечен как завершенный. В своем коде вы использовали wg.done() как своего рода подтверждение, верно?
«... вы использовали wg.done() как своего рода подтверждение права» — Не совсем; это просто используется для ожидания завершения горутин (в противном случае программа завершится, завершив все до завершения горутин). Я все еще не на 100% понимаю вашу цель, но опубликую ответ с предложениями, поскольку это сложно, в комментариях.
Я думаю, проблема в том, что вы пытаетесь создать универсальное решение для программирования на ходу, где у вас есть для него мощный набор (простых) инструментов.
Чтобы убедиться, что я правильно понял, вы хотите запускать функцию каждые X секунд, и, если она уже запущена, пропустить текущий триггер.
задание cron периодически запускает процесс, поэтому вместо того, чтобы пытаться изменить его внутреннее поведение, я бы рекомендовал просто использовать что-то более подходящее.
Я предлагаю создать канал, который постоянно прослушивает триггеры и обрабатывает их. Это означает, что по своей природе он будет обрабатывать только один за раз. если вы сохраните размер канала по умолчанию (размер 0), он будет заблокирован до завершения обработки, поэтому они не смогут работать параллельно.
единственная небольшая проблема, которую я вижу, заключается в том, что это означает, что он не будет «ожидать» дополнительное время между триггерами, если триггер вызывается во время обработки. если тебе не нравится такое поведение. просто в функции обработки добавьте таймер для ожидания вашего интервала.
// sendTriggers sends a trigger to the channel every 10 seconds.
func sendTriggers(triggerChan chan<- struct{}) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
triggerChan <- struct{}{}
}
}
}
// processTriggers listens to the channel and processes each trigger one at a time.
func processTriggers(triggerChan <-chan struct{}) {
for {
select {
case <-triggerChan:
fmt.Println("Received trigger, processing...")
// Simulate work with sleep
time.Sleep(5 * time.Second)
fmt.Println("Processing complete.")
}
}
}
func main() {
triggerChan := make(chan struct{})
go sendTriggers(triggerChan)
go processTriggers(triggerChan)
// Prevent the main function from exiting
select {}
}
Ваш код уже предотвращает «одновременный запуск нескольких горутин», но, вероятно, не так, как вы ожидаете. Есть два способа справиться с этим:
time.Sleep
).В настоящее время ваш код использует первый подход; но fmt.Println("Already Running...")
указывает на то, что вы ожидаете, что он займет второе место.
Так почему же «Already Running…» никогда не выводится? Это связано с тем, как вы используете Mutex
. Когда вы звоните Блокировка:
Замок замки м. Если блокировка уже используется, вызывающая горутина блокируется до тех пор, пока мьютекс не станет доступен.
Это означает, что когда RunProcess
вызывается одновременно, выполнение будет блокироваться на mu.Lock()
до тех пор, пока не будет вызван mu.Unlock()
(в отдельной горутине). Вы откладываете вызов mu.Unlock()
, что означает, что Mutex
разблокируется «в тот момент, когда возвращается окружающая функция». Это может быть легче увидеть, если выполнять шаг за шагом:
RunProcess()
начинает, вызывает mu.Lock()
и переходит к time.Sleep(15 * time.Second)
.RunProcess()
, блокируется на mu.Lock()
.RunProcess()
, это означает, что выполняется отложенный mu.Unlock()
.RunProcess()
приобретает Mutex и переходит к time.Sleep(15 * time.Second)
.Я считаю, что вы хотите, чтобы второй RunProcess()
вывел «Уже работает...» и завершился; в этом случае ваш код должен быть больше похож на игровую площадку:
func RunProcess() {
mu.Lock() // Mutex protects isProcessRunning variable
if isProcessRunning {
fmt.Println("Already Running...")
mu.Unlock()
return
}
isProcessRunning = true
mu.Unlock() // isProcessRunning has been checked/set so release the Mutex (allowing other checks to proceed)
fmt.Println("Running...")
// Simulate work
time.Sleep(2 * time.Second)
fmt.Println("Done")
mu.Lock() // Need to access isProcessRunning so acquire Mutex
isProcessRunning = false
mu.Unlock()
}
Надеюсь, это ответ на первую часть вашего вопроса. Я считаю, что ссылка на вторую часть «InitCron вызывается несколько раз» является дубликатом (т.е. используйте sync.Once). Однако можно утверждать, что документирования того, что InitCron
нужно вызывать только один раз, может быть достаточно.
Переходим к тому, как можно улучшить ваше решение; Я предполагаю, что вы выбрали robfig/cron
по веским причинам (я использую его для планирования отчетов и т. д.). Предотвращение одновременного выполнения задач является общим требованием, поэтому robfig/cron
обеспечивает SkipIfStillRunning ; в документации есть пример.
Этот вопрос похож на: Синглтон в ходу . Если вы считаете, что это другое, отредактируйте вопрос, поясните, чем он отличается и/или как ответы на этот вопрос не помогают решить вашу проблему (поскольку ваш вопрос, похоже, заключается в том, как предотвратить многократное выполнение
InitCron
). Обратите внимание, чтоRunProcess
, вероятно, не делает то, что вы хотите детская площадка.