Я использую BullMQ для добавления заданий в очередь, обработка которых настроена на 15-секундную задержку. Итак, я делаю что-то вроде этого:
// REDIS CONFIG
const redis = new Redis(<REDIS_CONFIG>, {
maxRetriesPerRequest: null,
})
// QUEUE DEFINITION
const myQueue = new Queue(<QUEUE_NAME>, {
defaultJobOptions: { removeOnComplete: true },
connection: redis,
})
// QUEUE HANDLER
const queueHandler = async (job: Job) => {
const { id } = job.data
console.info(`Processing job ${id} after delay:`, new Date())
}
// WORKER DEFINITION
const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, { concurrency: 1, connection: redis })
// ON COMPLETE EVENT
sendWorker.on('completed', async (job: Job) => {
// SOME CODE HERE
})
// ON FAIL EVENT
sendWorker.on('failed', async (job: any, error: Error) => {
// SOME CODE HERE
})
В другом файле я делаю что-то вроде этого:
const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
console.info('Adding job to queue:', object, ' at: ', new Date())
myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000}).catch(console.error)
Я использую опцию «задержка», чтобы установить задержку при добавлении заданий в очередь, как описано здесь. Однако это не работает так, как ожидалось. Задержка применяется только к первому заданию, а последующие задания обрабатываются последовательно без задержки. Консоль показывает следующий вывод:
Добавление задания в очередь: { id: '1' } по адресу: 2024-09-02T15:08:11.421Z
Добавление задания в очередь: { id: '2' } по адресу: 2024-09-02T15:08:11.426Z
Добавление задания в очередь: { id: '3' } по адресу: 2024-09-02T15:08:11.426Z
Обработка задания 1 после задержки: 2024-09-02T15:08:26.488Z // Примечание: прошло 15 секунд.
Обработка задания 2 после задержки: 2024-09-02T15:08:27.129Z // Ожидается: 15-секундная задержка перед обработкой.
Обработка задания 3 после задержки: 2024-09-02T15:08:27.425Z // Ожидается: 15-секундная задержка перед обработкой
Что я пробовал до сих пор:
1- Установите параметр «задержка» в параметрах задания по умолчанию в очереди:
const myQueue = new Queue(<QUEUE_NAME>, {
defaultJobOptions: { removeOnComplete: true, delay: 15000 },
connection: redis,
})
Результат: Никаких изменений не наблюдается.
2- Добавьте задания в очередь, используя «для каждого» вместо «для»:
const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
some_objects.forEach((object) => {
console.info('Adding job to queue:', object, ' at: ', new Date())
myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000})
}).catch(console.error)
Результат: Никаких изменений не наблюдается.
3- Добавляйте задания одно за другим:
const some_objects = [{ id: '1' }, { id: '2' }, { id: '3' }]
myQueue.add('<JOB_NAME>', { id: some_objects[0].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[1].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[2].id }, { delay: 15000 })
Результат: Никаких изменений не наблюдается.
4- Используйте разные имена заданий при добавлении в очередь:
const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
console.info('Adding job to queue:', object, ' at: ', new Date())
myQueue.add(`object_${object.id}`, {id: object.id}, {delay: 15000}).catch(console.error)
Результат: Никаких изменений не наблюдается.
5- Проверьте очередь на Redis:KEYS *
Команда redis-cli показывает это
- "бык:<имя_очереди>:3"
- "бык:<имя_очереди>:1"
- "bull:<имя_очереди>:id"
- "bull:<имя_очереди>:events"
- "bull:<имя_очереди>:мета"
- "bull:<имя_очереди>:задержано"
- "бык:<имя_очереди>:2"
Затем я выполняю команду ZRANGE bull:<queue_name>:delayed 0 -1
для вывода списка всех членов отсортированного набора bull:<queue_name>:delayed
и получаю:
- "1"
- "2"
- "3"
Результаты: Задания правильно сохраняются в наборе Bull:<queue_name>:delayed до тех пор, пока не истечет указанное время задержки, после чего их следует переместить в основную очередь для обработки. Однако отложенные задания не обрабатываются должным образом.
Активен только один исполнитель с уровнем параллелизма 1 (см. определение работника).
Итак, недавно я копался в документации BullMQ и обнаружил вот это. Оказывается, желаемая функциональность была не достижима с конфигурацией delay
, а с ограничением скорости. В своем работнике я просто добавил конфигурацию limiter
и добился желаемого поведения: 15-секундной задержки между обработкой заданий.
const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, {
concurrency: 1,
connection: redis,
limiter: {
max: 1,
duration: 15000,
},
})
Как я сейчас понимаю, опция delay
добавляет в набор все задания с задержкой, и по истечении времени задержки рабочий обрабатывает весь набор непрерывно. Однако при использовании опции limiter
работник ожидает определенную продолжительность между обработкой каждого задания.
Надеюсь, этот ответ кому-нибудь поможет! Если это так, пожалуйста, проголосуйте за меня. 😁