// job-queue.js // Copyright (C) 2024 DTP Technologies, LLC // All Rights Reserved 'use strict'; import Bull from 'bull'; import { SiteService } from '../../lib/site-lib.js'; export default class JobQueueService extends SiteService { static get slug () { return 'jobQueue'; } static get name ( ) { return 'JobQueueService'; } constructor (dtp) { super(dtp, JobQueueService); this.queues = { }; } getJobQueue (name, defaultJobOptions) { /* * If we have a named queue, return it. */ let queue = this.queues[name]; if (queue) { return queue; } /* * Create a new named queue */ defaultJobOptions = Object.assign({ priority: 10, delay: 0, attempts: 1, removeOnComplete: true, removeOnFail: false, }, defaultJobOptions); queue = new Bull(name, { prefix: process.env.REDIS_KEY_PREFIX || 'dtp', redis: { host: process.env.REDIS_HOST, port: parseInt(process.env.REDIS_PORT || '6379', 10), password: process.env.REDIS_PASSWORD, }, defaultJobOptions, }); queue.setMaxListeners(64); this.queues[name] = queue; return queue; } async discoverJobQueues (pattern) { const { cache: cacheService } = this.dtp.services; let bullQueues = await cacheService.getKeys(pattern); return bullQueues .map((queue) => queue.split(':')[1]) .sort() ; } }