From 893a345e9f9955704551023a7b9247d5bf9bb3df Mon Sep 17 00:00:00 2001 From: rob Date: Wed, 28 Sep 2022 12:55:59 -0400 Subject: [PATCH] fix for job queue inits all over to include configs --- app/controllers/email.js | 4 +--- app/services/attachment.js | 2 +- app/services/chat.js | 10 ++++++++-- app/services/sticker.js | 2 +- app/workers/chat/job/chat-room-clear.js | 2 +- app/workers/chat/job/chat-room-delete.js | 2 +- app/workers/media.js | 2 +- app/workers/media/job/attachment-delete.js | 2 +- app/workers/media/job/attachment-ingest.js | 2 +- app/workers/media/job/sticker-delete.js | 2 +- app/workers/media/job/sticker-ingest.js | 2 +- app/workers/newsletter/job/email-send.js | 2 +- app/workers/newsletter/job/transmit.js | 2 +- docs/samples/service.js | 2 +- 14 files changed, 21 insertions(+), 17 deletions(-) diff --git a/app/controllers/email.js b/app/controllers/email.js index 57e64dc..b6078fd 100644 --- a/app/controllers/email.js +++ b/app/controllers/email.js @@ -17,9 +17,7 @@ class EmailController extends SiteController { async start ( ) { const { jobQueue: jobQueueService, limiter: limiterService } = this.dtp.services; - this.emailJobQueue = jobQueueService.getJobQueue('email', { - attempts: 3 - }); + this.emailJobQueue = jobQueueService.getJobQueue('email', this.dtp.config.jobQueues.email); const router = express.Router(); this.dtp.app.use('/email', router); diff --git a/app/services/attachment.js b/app/services/attachment.js index a06fe9b..327938f 100644 --- a/app/services/attachment.js +++ b/app/services/attachment.js @@ -29,7 +29,7 @@ class AttachmentService extends SiteService { }, ]; - this.queue = this.getJobQueue('media'); + this.queue = this.getJobQueue('media', this.dtp.config.jobQueues.media); // this.template = this.loadViewTemplate('attachment/components/attachment-standalone.pug'); } diff --git a/app/services/chat.js b/app/services/chat.js index 5bdcc56..8a66e26 100644 --- a/app/services/chat.js +++ b/app/services/chat.js @@ -31,7 +31,12 @@ class ChatService extends SiteService { } async start ( ) { - const { user: userService, limiter: limiterService } = this.dtp.services; + const { + jobQueue: jobQueueService, + user: userService, + limiter: limiterService, + } = this.dtp.services; + await super.start(); this.populateChatMessage = [ @@ -122,7 +127,8 @@ class ChatService extends SiteService { this.emitter = ioEmitter(this.dtp.redis); this.queues = { - reeeper: await this.getJobQueue('reeeper'), + media: jobQueueService.getJobQueue('media', this.dtp.config.jobQueues.media), + reeeper: jobQueueService.getJobQueue('reeeper', this.dtp.config.jobQueues.reeeper), }; } diff --git a/app/services/sticker.js b/app/services/sticker.js index 1a35ad3..9f37bf7 100644 --- a/app/services/sticker.js +++ b/app/services/sticker.js @@ -33,7 +33,7 @@ class StickerService extends SiteService { }, ]; - this.queue = this.getJobQueue('media'); + this.queue = this.getJobQueue('media', this.dtp.config.jobQueues.media); this.stickerTemplate = this.loadViewTemplate('sticker/components/sticker-standalone.pug'); } diff --git a/app/workers/chat/job/chat-room-clear.js b/app/workers/chat/job/chat-room-clear.js index 1598c4e..922deaf 100644 --- a/app/workers/chat/job/chat-room-clear.js +++ b/app/workers/chat/job/chat-room-clear.js @@ -28,7 +28,7 @@ class ChatRoomClearJob extends SiteWorkerProcess { async start ( ) { await super.start(); - this.queue = await this.getJobQueue('chat'); + this.queue = await this.getJobQueue('chat', this.dtp.config.jobQueues.chat); this.log.info('registering job processor', { queue: this.queue.name, name: 'chat-room-clear' }); this.queue.process('chat-room-clear', this.processChatRoomClear.bind(this)); diff --git a/app/workers/chat/job/chat-room-delete.js b/app/workers/chat/job/chat-room-delete.js index 7c06a00..77998a2 100644 --- a/app/workers/chat/job/chat-room-delete.js +++ b/app/workers/chat/job/chat-room-delete.js @@ -37,7 +37,7 @@ class ChatRoomDeleteJob extends SiteWorkerProcess { async start ( ) { await super.start(); - this.queue = await this.getJobQueue('chat'); + this.queue = await this.getJobQueue('chat', this.dtp.config.jobQueues.chat); this.log.info('registering job processor', { queue: this.queue.name, name: 'chat-room-delete' }); this.queue.process('chat-room-delete', this.processChatRoomDelete.bind(this)); diff --git a/app/workers/media.js b/app/workers/media.js index a00e5d8..e56c728 100644 --- a/app/workers/media.js +++ b/app/workers/media.js @@ -48,7 +48,7 @@ class MediaWorker extends SiteWorker { const stickerId = mongoose.Types.ObjectId(process.argv[2]); this.log.info('creating sticker processing job', { stickerId }); - const queue = this.getJobQueue('media'); + const queue = this.getJobQueue('media', this.dtp.config.jobQueues.media); await queue.add('sticker-ingest', { stickerId }); } diff --git a/app/workers/media/job/attachment-delete.js b/app/workers/media/job/attachment-delete.js index 8791002..50e52e8 100644 --- a/app/workers/media/job/attachment-delete.js +++ b/app/workers/media/job/attachment-delete.js @@ -27,7 +27,7 @@ class AttachmentDeleteJob extends SiteWorkerProcess { async start ( ) { await super.start(); - this.queue = await this.getJobQueue('media'); + this.queue = await this.getJobQueue('media', this.dtp.config.jobQueues.media); this.log.info('registering job processor', { queue: this.queue.name, name: 'attachment-delete' }); this.queue.process('attachment-delete', 1, this.processAttachmentDelete.bind(this)); diff --git a/app/workers/media/job/attachment-ingest.js b/app/workers/media/job/attachment-ingest.js index 4b619c9..0751d5d 100644 --- a/app/workers/media/job/attachment-ingest.js +++ b/app/workers/media/job/attachment-ingest.js @@ -35,7 +35,7 @@ class AttachmentIngestJob extends SiteWorkerProcess { async start ( ) { await super.start(); - this.queue = await this.getJobQueue('media'); + this.queue = await this.getJobQueue('media', this.dtp.config.jobQueues.media); this.log.info('registering job processor', { queue: this.queue.name, name: 'attachment-ingest' }); this.queue.process('attachment-ingest', 1, this.processAttachmentIngest.bind(this)); diff --git a/app/workers/media/job/sticker-delete.js b/app/workers/media/job/sticker-delete.js index 85fff12..b4c642b 100644 --- a/app/workers/media/job/sticker-delete.js +++ b/app/workers/media/job/sticker-delete.js @@ -27,7 +27,7 @@ class StickerDeleteJob extends SiteWorkerProcess { async start ( ) { await super.start(); - this.queue = await this.getJobQueue('media'); + this.queue = await this.getJobQueue('media', this.dtp.config.jobQueues.media); this.log.info('registering job processor', { queue: this.queue.name, name: 'sticker-ingest' }); this.queue.process('sticker-delete', 1, this.processStickerDelete.bind(this)); diff --git a/app/workers/media/job/sticker-ingest.js b/app/workers/media/job/sticker-ingest.js index 42981d6..55dce5b 100644 --- a/app/workers/media/job/sticker-ingest.js +++ b/app/workers/media/job/sticker-ingest.js @@ -36,7 +36,7 @@ class StickerIngestJob extends SiteWorkerProcess { async start ( ) { await super.start(); - this.queue = await this.getJobQueue('media'); + this.queue = await this.getJobQueue('media', this.dtp.config.jobQueues.media); this.log.info('registering job processor', { queue: this.queue.name, name: 'sticker-ingest' }); this.queue.process('sticker-ingest', 1, this.processStickerIngest.bind(this)); diff --git a/app/workers/newsletter/job/email-send.js b/app/workers/newsletter/job/email-send.js index e88343a..d3f4072 100644 --- a/app/workers/newsletter/job/email-send.js +++ b/app/workers/newsletter/job/email-send.js @@ -24,7 +24,7 @@ class NewsletterEmailSendJob extends SiteWorkerProcess { async start ( ) { await super.start(); - this.queue = await this.getJobQueue('newsletter'); + this.queue = await this.getJobQueue('newsletter', this.dtp.config.jobQueues.newsletter); this.log.info('registering job processor', { queue: this.queue.name, name: 'email-send' }); this.queue.process('email-send', this.processEmailSend.bind(this)); diff --git a/app/workers/newsletter/job/transmit.js b/app/workers/newsletter/job/transmit.js index 3b171d7..4990127 100644 --- a/app/workers/newsletter/job/transmit.js +++ b/app/workers/newsletter/job/transmit.js @@ -29,7 +29,7 @@ class NewsletterTransmitJob extends SiteWorkerProcess { async start ( ) { await super.start(); - this.queue = await this.getJobQueue('newsletter'); + this.queue = await this.getJobQueue('newsletter', this.dtp.config.jobQueues.newsletter); this.log.info('registering job processor', { queue: this.queue.name, name: 'transmit' }); this.queue.process('transmit', this.processTransmit.bind(this)); diff --git a/docs/samples/service.js b/docs/samples/service.js index 0126acd..e7dc312 100644 --- a/docs/samples/service.js +++ b/docs/samples/service.js @@ -19,7 +19,7 @@ class SampleService extends SiteService { async start ( ) { await super.start(); - this.queue = this.getJobQueue('sample'); + this.queue = this.getJobQueue('sample', this.dtp.config.jobQueues.sample); } async stop ( ) {