Проблемы с отложенной обработкой заданий в BullMQ

Я использую BullMQ для добавления заданий в очередь, обработка которых настроена на 15-секундную задержку. Итак, я делаю что-то вроде этого:


// REDIS CONFIG
const redis = new Redis(<REDIS_CONFIG>, {
    maxRetriesPerRequest: null,
})

// QUEUE DEFINITION
const myQueue = new Queue(<QUEUE_NAME>, {
    defaultJobOptions: { removeOnComplete: true },
    connection: redis,
})

// QUEUE HANDLER
const queueHandler = async (job: Job) => {
    const { id } = job.data
    console.info(`Processing job ${id} after delay:`, new Date())
}

// WORKER DEFINITION
const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, { concurrency: 1, connection: redis })

// ON COMPLETE EVENT
sendWorker.on('completed', async (job: Job) => {
    // SOME CODE HERE
})

// ON FAIL EVENT
sendWorker.on('failed', async (job: any, error: Error) => {
    // SOME CODE HERE
})

В другом файле я делаю что-то вроде этого:

const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
    console.info('Adding job to queue:', object, ' at: ', new Date())
    myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000}).catch(console.error)

Я использую опцию «задержка», чтобы установить задержку при добавлении заданий в очередь, как описано здесь. Однако это не работает так, как ожидалось. Задержка применяется только к первому заданию, а последующие задания обрабатываются последовательно без задержки. Консоль показывает следующий вывод:

Добавление задания в очередь: { id: '1' } по адресу: 2024-09-02T15:08:11.421Z

Добавление задания в очередь: { id: '2' } по адресу: 2024-09-02T15:08:11.426Z

Добавление задания в очередь: { id: '3' } по адресу: 2024-09-02T15:08:11.426Z

Обработка задания 1 после задержки: 2024-09-02T15:08:26.488Z // Примечание: прошло 15 секунд.

Обработка задания 2 после задержки: 2024-09-02T15:08:27.129Z // Ожидается: 15-секундная задержка перед обработкой.

Обработка задания 3 после задержки: 2024-09-02T15:08:27.425Z // Ожидается: 15-секундная задержка перед обработкой

Что я пробовал до сих пор:

1- Установите параметр «задержка» в параметрах задания по умолчанию в очереди:

const myQueue = new Queue(<QUEUE_NAME>, {
    defaultJobOptions: { removeOnComplete: true, delay: 15000 },
    connection: redis,
})

Результат: Никаких изменений не наблюдается.

2- Добавьте задания в очередь, используя «для каждого» вместо «для»:

const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
some_objects.forEach((object) => {
    console.info('Adding job to queue:', object, ' at: ', new Date())
    myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000})
}).catch(console.error)

Результат: Никаких изменений не наблюдается.

3- Добавляйте задания одно за другим:

const some_objects = [{ id: '1' }, { id: '2' }, { id: '3' }]
myQueue.add('<JOB_NAME>', { id: some_objects[0].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[1].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[2].id }, { delay: 15000 })

Результат: Никаких изменений не наблюдается.

4- Используйте разные имена заданий при добавлении в очередь:

const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
    console.info('Adding job to queue:', object, ' at: ', new Date())
    myQueue.add(`object_${object.id}`, {id: object.id}, {delay: 15000}).catch(console.error)

Результат: Никаких изменений не наблюдается.

5- Проверьте очередь на Redis:KEYS * Команда redis-cli показывает это

  1. "бык:<имя_очереди>:3"
  2. "бык:<имя_очереди>:1"
  3. "bull:<имя_очереди>:id"
  4. "bull:<имя_очереди>:events"
  5. "bull:<имя_очереди>:мета"
  6. "bull:<имя_очереди>:задержано"
  7. "бык:<имя_очереди>:2"

Затем я выполняю команду ZRANGE bull:<queue_name>:delayed 0 -1 для вывода списка всех членов отсортированного набора bull:<queue_name>:delayed и получаю:

  1. "1"
  2. "2"
  3. "3"

Результаты: Задания правильно сохраняются в наборе Bull:<queue_name>:delayed до тех пор, пока не истечет указанное время задержки, после чего их следует переместить в основную очередь для обработки. Однако отложенные задания не обрабатываются должным образом.

Активен только один исполнитель с уровнем параллелизма 1 (см. определение работника).

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
0
60
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Итак, недавно я копался в документации BullMQ и обнаружил вот это. Оказывается, желаемая функциональность была не достижима с конфигурацией delay, а с ограничением скорости. В своем работнике я просто добавил конфигурацию limiter и добился желаемого поведения: 15-секундной задержки между обработкой заданий.

const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, {
    concurrency: 1,
    connection: redis,
    limiter: {
        max: 1,
        duration: 15000,
    },
})

Как я сейчас понимаю, опция delay добавляет в набор все задания с задержкой, и по истечении времени задержки рабочий обрабатывает весь набор непрерывно. Однако при использовании опции limiter работник ожидает определенную продолжительность между обработкой каждого задания.

Надеюсь, этот ответ кому-нибудь поможет! Если это так, пожалуйста, проголосуйте за меня. 😁

Другие вопросы по теме