Нужна помощь в предотвращении запуска нескольких cron/планировщиков в golang

Я работаю над микросервисом Go, где использую пакет robfig/cron для планирования заданий, изначально думал об использовании time.Ticker(), но пакет облегчил мне задачу. Мой вопрос заключается в том, как обеспечить регистрацию задания cron только один раз, пока работающий cron не завершится, и предотвратить одновременный запуск нескольких горутин.

В этом отличие от одноэлементного подхода: блок кода внутри RunProcess() не должен выполняться до тех пор, пока не завершится предыдущая функция RunProcess.

Я новичок в Голанге и его соглашении. :(

Вот упрощенная версия моей текущей реализации:

package main

import (
    "fmt"
    "sync"
    "time"

    "github.com/robfig/cron/v3"
)

var isProcessRunning = false
var mu sync.Mutex

func RunProcess() {
    mu.Lock()
    defer mu.Unlock()

    if isProcessRunning {
        fmt.Println("Already Running...")
        return
    }

    isProcessRunning = true
    fmt.Println("Running...")

    // Simulate work
    time.Sleep(15 * time.Second)
    isProcessRunning = false
}

func InitCron() {
    // Create a new cron scheduler
    c := cron.New(cron.WithSeconds())

    // Add the task to run every 10 seconds
    _, err := c.AddFunc("*/10 * * * * *", RunProcess)
    if err != nil {
        fmt.Println("Error adding cron job:", err)
        return
    }

    // Start the cron scheduler
    c.Start()

    // Block indefinitely to keep the cron jobs running
    select {}
}

func main() {
    InitCron()
}

Однако я заметил, что когда InitCron вызывается несколько раз, он потенциально может создать несколько заданий cron, что приведет к проблемам параллелизма и неожиданному поведению легковесного микросервиса.

Будем очень признательны за любые советы или примеры того, как правильно с этим справиться.

Запускаем cron в golang и пытаемся предотвратить выполнение cron до тех пор, пока не завершится первый cron.

«RunProcess() не должен запускаться…» использование вами мьютекса в RunProcess уже гарантирует, что он не будет выполняться одновременно, так что вы, похоже, уже достигли своей цели? Код сбивает с толку, потому что «Уже работает» никогда не будет напечатано (из-за заблокированного Mutex); моя ссылка на игровую площадку показывает, как решить эту проблему, но неясно, хотите ли вы этого. Ссылка на синглтон была ответом на ваш комментарий «заметил, что когда InitCron вызывается несколько раз...» (что, похоже, было вашим главным вопросом?).

Brits 03.07.2024 11:42

Спасибо @Brits за ответ. Я отредактировал свой вопрос. Я пытаюсь заблокировать выполнение с помощью переменной RunProcess, когда следующий цикл cron вызывает isProcessRunning, он должен прекратить выполнение того же кода. Я пытаюсь добиться удаления очереди сообщений в 1-м процессе, 2-й/3-й процесс не должен влиять на 1-й RunProcess, пока 1-й процесс не будет отмечен как завершенный. В своем коде вы использовали wg.done() как своего рода подтверждение, верно?

Harry 03.07.2024 12:35

«... вы использовали wg.done() как своего рода подтверждение права» — Не совсем; это просто используется для ожидания завершения горутин (в противном случае программа завершится, завершив все до завершения горутин). Я все еще не на 100% понимаю вашу цель, но опубликую ответ с предложениями, поскольку это сложно, в комментариях.

Brits 03.07.2024 22:22
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
1
4
86
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Я думаю, проблема в том, что вы пытаетесь создать универсальное решение для программирования на ходу, где у вас есть для него мощный набор (простых) инструментов.

Чтобы убедиться, что я правильно понял, вы хотите запускать функцию каждые X секунд, и, если она уже запущена, пропустить текущий триггер.

задание cron периодически запускает процесс, поэтому вместо того, чтобы пытаться изменить его внутреннее поведение, я бы рекомендовал просто использовать что-то более подходящее.

Я предлагаю создать канал, который постоянно прослушивает триггеры и обрабатывает их. Это означает, что по своей природе он будет обрабатывать только один за раз. если вы сохраните размер канала по умолчанию (размер 0), он будет заблокирован до завершения обработки, поэтому они не смогут работать параллельно.

единственная небольшая проблема, которую я вижу, заключается в том, что это означает, что он не будет «ожидать» дополнительное время между триггерами, если триггер вызывается во время обработки. если тебе не нравится такое поведение. просто в функции обработки добавьте таймер для ожидания вашего интервала.

// sendTriggers sends a trigger to the channel every 10 seconds.
func sendTriggers(triggerChan chan<- struct{}) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            triggerChan <- struct{}{}
        }
    }
}

// processTriggers listens to the channel and processes each trigger one at a time.
func processTriggers(triggerChan <-chan struct{}) {
    for {
        select {
        case <-triggerChan:
            fmt.Println("Received trigger, processing...")
            // Simulate work with sleep
            time.Sleep(5 * time.Second)
            fmt.Println("Processing complete.")
        }
    }
}

func main() {
    triggerChan := make(chan struct{})
    
    go sendTriggers(triggerChan)
    go processTriggers(triggerChan)
    
    // Prevent the main function from exiting
    select {}
}
Ответ принят как подходящий

Ваш код уже предотвращает «одновременный запуск нескольких горутин», но, вероятно, не так, как вы ожидаете. Есть два способа справиться с этим:

  1. Параллельные вызовы блокируются до тех пор, пока не завершится предыдущий вызов (может образоваться очередь).
  2. Параллельные вызовы завершаются без выполнения задачи (т. е. time.Sleep).

В настоящее время ваш код использует первый подход; но fmt.Println("Already Running...") указывает на то, что вы ожидаете, что он займет второе место.

Так почему же «Already Running…» никогда не выводится? Это связано с тем, как вы используете Mutex. Когда вы звоните Блокировка:

Замок замки м. Если блокировка уже используется, вызывающая горутина блокируется до тех пор, пока мьютекс не станет доступен.

Это означает, что когда RunProcess вызывается одновременно, выполнение будет блокироваться на mu.Lock() до тех пор, пока не будет вызван mu.Unlock() (в отдельной горутине). Вы откладываете вызов mu.Unlock(), что означает, что Mutex разблокируется «в тот момент, когда возвращается окружающая функция». Это может быть легче увидеть, если выполнять шаг за шагом:

  1. Сначала RunProcess() начинает, вызывает mu.Lock() и переходит к time.Sleep(15 * time.Second).
  2. Начинается вторая RunProcess(), блокируется на mu.Lock().
  3. Сначала завершается RunProcess(), это означает, что выполняется отложенный mu.Unlock().
  4. Во-вторых, RunProcess() приобретает Mutex и переходит к time.Sleep(15 * time.Second).

Я считаю, что вы хотите, чтобы второй RunProcess() вывел «Уже работает...» и завершился; в этом случае ваш код должен быть больше похож на игровую площадку:

func RunProcess() {
    mu.Lock() // Mutex protects isProcessRunning variable    
    if isProcessRunning {
        fmt.Println("Already Running...")
        mu.Unlock()
        return
    }
    isProcessRunning = true
    mu.Unlock() // isProcessRunning has been checked/set so release the Mutex (allowing other checks to proceed)

    fmt.Println("Running...")

    // Simulate work
    time.Sleep(2 * time.Second)

    fmt.Println("Done")

    mu.Lock() // Need to access isProcessRunning so acquire Mutex
    isProcessRunning = false
    mu.Unlock()
}

Надеюсь, это ответ на первую часть вашего вопроса. Я считаю, что ссылка на вторую часть «InitCron вызывается несколько раз» является дубликатом (т.е. используйте sync.Once). Однако можно утверждать, что документирования того, что InitCron нужно вызывать только один раз, может быть достаточно.

Переходим к тому, как можно улучшить ваше решение; Я предполагаю, что вы выбрали robfig/cron по веским причинам (я использую его для планирования отчетов и т. д.). Предотвращение одновременного выполнения задач является общим требованием, поэтому robfig/cron обеспечивает SkipIfStillRunning ; в документации есть пример.

Другие вопросы по теме