Browse Source

add job to update feed after create or update

pull/2/head
Rob Colbert 1 year ago
parent
commit
dda92c8eec
  1. 2
      app/controllers/admin/newsroom.js
  2. 6
      app/services/feed.js
  3. 22
      app/workers/newsroom.js
  4. 20
      app/workers/newsroom/cron/update-feeds.js
  5. 56
      app/workers/newsroom/job/update-feed.js
  6. 4
      config/job-queues.js

2
app/controllers/admin/newsroom.js

@ -106,7 +106,7 @@ class NewsroomAdminController extends SiteController {
await feedService.update(res.locals.feed, req.body);
res.redirect('/admin/newsroom');
} catch (error) {
this.log.error('failed to create feed', { error });
this.log.error('failed to update feed', { error });
return next(error);
}
}

6
app/services/feed.js

@ -24,7 +24,7 @@ class FeedService extends SiteService {
}
async start ( ) {
this.jobQueue = this.getJobQueue('newsroom', this.dtp.config.jobQueues.newsroom);
this.jobQueue = await this.getJobQueue('newsroom', this.dtp.config.jobQueues.newsroom);
}
async create (feedDefinition) {
@ -44,6 +44,8 @@ class FeedService extends SiteService {
feed.published = feedContent.published;
await feed.save();
this.jobQueue.add('update-feed', { feedId: feed._id });
return feed.toObject();
}
@ -65,6 +67,8 @@ class FeedService extends SiteService {
updateOp.$set.generator = feedDefinition.generator || feedContent.generator;
await Feed.updateOne({ _id: feed._id }, updateOp);
this.jobQueue.add('update-feed', { feedId: feed._id });
}
async getFeeds (pagination, options) {

22
app/workers/newsroom.js

@ -8,7 +8,11 @@ const path = require('path');
require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') });
const mongoose = require('mongoose');
const { read: feedReader } = require('feed-reader');
const {
SiteAsync,
SiteLog,
SiteWorker,
} = require(path.join(__dirname, '..', '..', 'lib', 'site-lib'));
@ -35,6 +39,7 @@ class NewsroomWorker extends SiteWorker {
await super.start();
await this.loadProcessor(path.join(__dirname, 'newsroom', 'cron', 'update-feeds.js'));
await this.loadProcessor(path.join(__dirname, 'newsroom', 'job', 'update-feed.js'));
await this.startProcessors();
}
@ -42,6 +47,23 @@ class NewsroomWorker extends SiteWorker {
async stop ( ) {
await super.stop();
}
async updateFeed (feed) {
const Feed = mongoose.model('Feed');
const NOW = new Date();
const { feed: feedService } = this.dtp.services;
try {
this.log.info('loading latest feed data', { feedId: feed._id, title: feed.title });
const response = await feedReader(feed.url);
await SiteAsync.each(response.entries, async (entry) => {
await Feed.updateOne({ _id: feed._id }, { $set: { published: feed.published || NOW }});
await feedService.createEntry(feed, entry);
}, 4);
this.log.info('feed updated', { entries: response.entries.length });
} catch (error) {
this.log.error('failed to update feed', { feedId: feed._id, title: feed.title, error });
}
}
}
(async ( ) => {

20
app/workers/newsroom/cron/update-feeds.js

@ -10,8 +10,6 @@ const mongoose = require('mongoose');
const Feed = mongoose.model('Feed');
const { CronJob } = require('cron');
const { read: feedReader } = require('feed-reader');
const { SiteAsync } = require('../../../../lib/site-lib');
const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib'));
@ -58,28 +56,12 @@ class UpdateFeedsCron extends SiteWorkerProcess {
.lean()
.cursor()
.eachAsync(async (feed) => {
await this.updateFeed(feed);
await this.worker.updateFeed(feed);
}, 4);
} catch (error) {
this.log.error('failed to update feeds', { error });
}
}
async updateFeed (feed) {
const NOW = new Date();
const { feed: feedService } = this.dtp.services;
try {
this.log.info('loading latest feed data', { feedId: feed._id, title: feed.title });
const response = await feedReader(feed.url);
await SiteAsync.each(response.entries, async (entry) => {
await Feed.updateOne({ _id: feed._id }, { $set: { published: feed.published || NOW }});
await feedService.createEntry(feed, entry);
}, 4);
this.log.info('feed updated', { entries: response.entries.length });
} catch (error) {
this.log.error('failed to update feed', { feedId: feed._id, title: feed.title, error });
}
}
}
module.exports = UpdateFeedsCron;

56
app/workers/newsroom/job/update-feed.js

@ -0,0 +1,56 @@
// newsroom/job/update-feed.js
// Copyright (C) 2022 DTP Technologies, LLC
// License: Apache-2.0
'use strict';
const path = require('path');
const mongoose = require('mongoose');
const Feed = mongoose.model('Feed');
const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib'));
class UpdateFeedJob extends SiteWorkerProcess {
static get COMPONENT ( ) {
return {
name: 'newsroomUpdateFeedJob',
slug: 'newsroom-update-feed-job',
};
}
constructor (worker) {
super(worker, UpdateFeedJob.COMPONENT);
}
async start ( ) {
await super.start();
this.queue = await this.getJobQueue('newsroom', this.dtp.config.jobQueues.newsroom);
this.log.info('registering job processor', { queue: this.queue.name, name: 'update-feed' });
this.queue.process('update-feed', this.processUpdateFeed.bind(this));
}
async stop ( ) {
await super.stop();
}
async processUpdateFeed (job) {
const { feed: feedService } = this.dtp.services;
const { feedId } = job.data;
this.log.info('newsroom feed update job received', { id: job.id, feedId });
try {
const feed = await feedService.getById(feedId);
await this.worker.updateFeed(feed);
} catch (error) {
this.log.error('failed to update newsroom feed', { feedId, error });
throw error;
}
}
}
module.exports = UpdateFeedJob;

4
config/job-queues.js

@ -17,6 +17,10 @@ module.exports = {
attempts: 3,
removeOnComplete: true,
},
'newsroom': {
attempts: 3,
removeOnComplete: true,
},
'reeeper': {
attempts: 3,
removeOnComplete: true,

Loading…
Cancel
Save