From dda92c8eec9a54f79e6e249135448e717ec3d167 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 31 Oct 2022 04:34:21 -0400 Subject: [PATCH] add job to update feed after create or update --- app/controllers/admin/newsroom.js | 2 +- app/services/feed.js | 6 ++- app/workers/newsroom.js | 22 +++++++++ app/workers/newsroom/cron/update-feeds.js | 20 +------- app/workers/newsroom/job/update-feed.js | 56 +++++++++++++++++++++++ config/job-queues.js | 4 ++ 6 files changed, 89 insertions(+), 21 deletions(-) create mode 100644 app/workers/newsroom/job/update-feed.js diff --git a/app/controllers/admin/newsroom.js b/app/controllers/admin/newsroom.js index d8cb8ee..e2811e2 100644 --- a/app/controllers/admin/newsroom.js +++ b/app/controllers/admin/newsroom.js @@ -106,7 +106,7 @@ class NewsroomAdminController extends SiteController { await feedService.update(res.locals.feed, req.body); res.redirect('/admin/newsroom'); } catch (error) { - this.log.error('failed to create feed', { error }); + this.log.error('failed to update feed', { error }); return next(error); } } diff --git a/app/services/feed.js b/app/services/feed.js index 4f9a2e0..baefc5d 100644 --- a/app/services/feed.js +++ b/app/services/feed.js @@ -24,7 +24,7 @@ class FeedService extends SiteService { } async start ( ) { - this.jobQueue = this.getJobQueue('newsroom', this.dtp.config.jobQueues.newsroom); + this.jobQueue = await this.getJobQueue('newsroom', this.dtp.config.jobQueues.newsroom); } async create (feedDefinition) { @@ -44,6 +44,8 @@ class FeedService extends SiteService { feed.published = feedContent.published; await feed.save(); + this.jobQueue.add('update-feed', { feedId: feed._id }); + return feed.toObject(); } @@ -65,6 +67,8 @@ class FeedService extends SiteService { updateOp.$set.generator = feedDefinition.generator || feedContent.generator; await Feed.updateOne({ _id: feed._id }, updateOp); + + this.jobQueue.add('update-feed', { feedId: feed._id }); } async getFeeds (pagination, options) { diff --git a/app/workers/newsroom.js b/app/workers/newsroom.js index ad4dc02..6ca4fdc 100644 --- a/app/workers/newsroom.js +++ b/app/workers/newsroom.js @@ -8,7 +8,11 @@ const path = require('path'); require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') }); +const mongoose = require('mongoose'); +const { read: feedReader } = require('feed-reader'); + const { + SiteAsync, SiteLog, SiteWorker, } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); @@ -35,6 +39,7 @@ class NewsroomWorker extends SiteWorker { await super.start(); await this.loadProcessor(path.join(__dirname, 'newsroom', 'cron', 'update-feeds.js')); + await this.loadProcessor(path.join(__dirname, 'newsroom', 'job', 'update-feed.js')); await this.startProcessors(); } @@ -42,6 +47,23 @@ class NewsroomWorker extends SiteWorker { async stop ( ) { await super.stop(); } + + async updateFeed (feed) { + const Feed = mongoose.model('Feed'); + const NOW = new Date(); + const { feed: feedService } = this.dtp.services; + try { + this.log.info('loading latest feed data', { feedId: feed._id, title: feed.title }); + const response = await feedReader(feed.url); + await SiteAsync.each(response.entries, async (entry) => { + await Feed.updateOne({ _id: feed._id }, { $set: { published: feed.published || NOW }}); + await feedService.createEntry(feed, entry); + }, 4); + this.log.info('feed updated', { entries: response.entries.length }); + } catch (error) { + this.log.error('failed to update feed', { feedId: feed._id, title: feed.title, error }); + } + } } (async ( ) => { diff --git a/app/workers/newsroom/cron/update-feeds.js b/app/workers/newsroom/cron/update-feeds.js index e2278da..b4d6b5e 100644 --- a/app/workers/newsroom/cron/update-feeds.js +++ b/app/workers/newsroom/cron/update-feeds.js @@ -10,8 +10,6 @@ const mongoose = require('mongoose'); const Feed = mongoose.model('Feed'); const { CronJob } = require('cron'); -const { read: feedReader } = require('feed-reader'); -const { SiteAsync } = require('../../../../lib/site-lib'); const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib')); @@ -58,28 +56,12 @@ class UpdateFeedsCron extends SiteWorkerProcess { .lean() .cursor() .eachAsync(async (feed) => { - await this.updateFeed(feed); + await this.worker.updateFeed(feed); }, 4); } catch (error) { this.log.error('failed to update feeds', { error }); } } - - async updateFeed (feed) { - const NOW = new Date(); - const { feed: feedService } = this.dtp.services; - try { - this.log.info('loading latest feed data', { feedId: feed._id, title: feed.title }); - const response = await feedReader(feed.url); - await SiteAsync.each(response.entries, async (entry) => { - await Feed.updateOne({ _id: feed._id }, { $set: { published: feed.published || NOW }}); - await feedService.createEntry(feed, entry); - }, 4); - this.log.info('feed updated', { entries: response.entries.length }); - } catch (error) { - this.log.error('failed to update feed', { feedId: feed._id, title: feed.title, error }); - } - } } module.exports = UpdateFeedsCron; \ No newline at end of file diff --git a/app/workers/newsroom/job/update-feed.js b/app/workers/newsroom/job/update-feed.js new file mode 100644 index 0000000..8200527 --- /dev/null +++ b/app/workers/newsroom/job/update-feed.js @@ -0,0 +1,56 @@ +// newsroom/job/update-feed.js +// Copyright (C) 2022 DTP Technologies, LLC +// License: Apache-2.0 + +'use strict'; + +const path = require('path'); + +const mongoose = require('mongoose'); + +const Feed = mongoose.model('Feed'); + +const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib')); + +class UpdateFeedJob extends SiteWorkerProcess { + + static get COMPONENT ( ) { + return { + name: 'newsroomUpdateFeedJob', + slug: 'newsroom-update-feed-job', + }; + } + + constructor (worker) { + super(worker, UpdateFeedJob.COMPONENT); + } + + async start ( ) { + await super.start(); + + this.queue = await this.getJobQueue('newsroom', this.dtp.config.jobQueues.newsroom); + + this.log.info('registering job processor', { queue: this.queue.name, name: 'update-feed' }); + this.queue.process('update-feed', this.processUpdateFeed.bind(this)); + } + + async stop ( ) { + await super.stop(); + } + + async processUpdateFeed (job) { + const { feed: feedService } = this.dtp.services; + const { feedId } = job.data; + this.log.info('newsroom feed update job received', { id: job.id, feedId }); + + try { + const feed = await feedService.getById(feedId); + await this.worker.updateFeed(feed); + } catch (error) { + this.log.error('failed to update newsroom feed', { feedId, error }); + throw error; + } + } +} + +module.exports = UpdateFeedJob; \ No newline at end of file diff --git a/config/job-queues.js b/config/job-queues.js index 0c55228..c5077ae 100644 --- a/config/job-queues.js +++ b/config/job-queues.js @@ -17,6 +17,10 @@ module.exports = { attempts: 3, removeOnComplete: true, }, + 'newsroom': { + attempts: 3, + removeOnComplete: true, + }, 'reeeper': { attempts: 3, removeOnComplete: true,