// host-services.js // Copyright (C) 2024 DTP Technologies, LLC // All Rights Reserved 'use strict'; import 'dotenv/config'; import path, { dirname } from 'path'; import fs from 'node:fs'; import readline from 'node:readline'; import { SiteRuntime } from '../../lib/site-lib.js'; import { CronJob } from 'cron'; const CRON_TIMEZONE = 'America/New_York'; import { Readable, pipeline } from 'node:stream'; import { promisify } from 'node:util'; const streamPipeline = promisify(pipeline); class ChatLinksService extends SiteRuntime { static get name ( ) { return 'ChatLinksWorker'; } static get slug ( ) { return 'chatLinks'; } static get BLOCKLIST_URL ( ) { return 'https://raw.githubusercontent.com/StevenBlack/hosts/master/alternates/porn/hosts'; } constructor (rootPath) { super(ChatLinksService, rootPath); } async start ( ) { await super.start(); const mongoose = await import('mongoose'); this.Link = mongoose.model('Link'); this.viewModel = { }; await this.populateViewModel(this.viewModel); this.blacklist = { porn: path.join(this.config.root, 'data', 'blacklist', 'porn'), }; /* * Bull Queue job processors */ this.log.info('registering link-ingest job processor', { config: this.config.jobQueues.links }); this.linksProcessingQueue = this.services.jobQueue.getJobQueue('links', this.config.jobQueues.links); this.linksProcessingQueue.process('link-ingest', 1, this.ingestLink.bind(this)); /* * Cron jobs */ const cronBlacklistUpdate = '0 0 3 * * *'; // Ever day at 3:00 a.m. this.log.info('created URL blacklist update cron', { cronBlacklistUpdate }); this.updateBlacklistJob = new CronJob( cronBlacklistUpdate, this.updateUrlBlacklist.bind(this), null, true, CRON_TIMEZONE, ); } async shutdown ( ) { this.log.alert('ChatLinksWorker shutting down'); await super.shutdown(); } async ingestLink (job) { const { link: linkService, user: userService } = this.services; this.log.info('received link ingest job', { data: job.data }); try { if (!job.data.submitterId) { this.log.error('link ingest submitted without submitterId'); return; } job.data.submitter = await userService.getUserAccount(job.data.submitterId); if (!job.data.submitter) { this.log.error('link submitted with invalid User', { submitterId: job.data.submitterId }); return; } /* * Is the submitter blocked from sharing links? */ if (!job.data.submitter.permissions.canShareLinks) { this.log.alert('Submitter is not permitted to share links', { submitter: { _id: job.data.submitter._id, username: job.data.submitter.username, }, }); return; } this.log.info('fetching link from database'); job.data.link = await linkService.getById(job.data.linkId); if (!job.data.link) { this.log.error('link not found in database', { linkId: job.data.linkId }); return; } /* * Is the domain or URL already known to be blocked? */ if (job.data.link.flags && job.data.link.flags.isBlocked) { this.log.alert('aborting ingest of blocked link', { submitter: { _id: job.data.submitter._id, username: job.data.submitter.username, }, domain: job.data.link.domain, url: job.data.link.url, }); return; } /* * Is the domain currently blocked? */ const isDomainBlocked = await linkService.isDomainBlocked(job.data.link.domain); if (isDomainBlocked) { /* * Make sure the flag is set on the Link */ await this.Link.updateOne( { _id: job.data.link._id }, { $set: { 'flags.isBlocked': true, }, }, ); /* * Log the rejection */ this.log.alert('prohibiting link from blocked domain', { submitter: { _id: job.data.submitter._id, username: job.data.submitter.username, }, domain: job.data.link.domain, url: job.data.link.url, }); return; // bye! } this.log.info('fetching link preview', { domain: job.data.link.domain, url: job.data.link.url, }); job.data.preview = await linkService.generatePagePreview(job.data.link.url); if (!job.data.preview) { throw new Error('failed to load link preview'); } this.log.info('updating link record in Mongo', { link: job.data.link._id, preview: job.data.preview, }); job.data.link = await this.Link.findOneAndUpdate( { _id: job.data.link._id }, { $set: { lastPreviewFetched: job.data.preview.fetched, title: job.data.preview.title, siteName: job.data.preview.siteName, description: job.data.preview.description, tags: job.data.preview.tags, mediaType: job.data.preview.mediaType, contentType: job.data.preview.contentType, images: job.data.preview.images, videos: job.data.preview.videos, audios: job.data.preview.audios, favicons: job.data.preview.favicons, oembed: job.data.preview.oembed, 'flags.havePreview': true, }, }, { new: true }, ); job.data.link = await this.Link.populate(job.data.link, linkService.populateLink); this.log.info('link ingest complete', { submitter: { _id: job.data.submitter._id, username: job.data.submitter.username, }, link: job.data.link, }); if (job.data?.options?.channelId) { const viewModel = Object.assign({ link: job.data.link }, this.viewModel); const displayList = linkService.createDisplayList('replace-preview'); displayList.replaceElement( `.link-container[data-link-id="${job.data.link._id}"]`, await linkService.renderPreview(viewModel), ); this.emitter.to(job.data.options.channelId).emit('chat-control', { displayList }); } } catch (error) { await this.log.error('failed to ingest link', { domain: job.data.link.domain, url: job.data.link.url, error }); throw error; } } async updateUrlBlacklist ( ) { try { /* * Fetch latest to local file */ this.log.info('fetching updated domain blacklist'); const response = await fetch(ChatLinksService.BLOCKLIST_URL); if (!response.ok) { throw new Error(`unexpected response ${response.statusText}`); } await streamPipeline(Readable.fromWeb(response.body), fs.createWriteStream(this.blacklist.porn)); /* * Read local file line-by-line with filtering and comment removal to insert * to Redis set of blocked domains */ const fileStream = fs.createReadStream(this.blacklist.porn); const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity, }); for await (let line of rl) { line = line.trim(); if (line[0] === '#') { continue; } const tokens = line.split(' '); if (tokens[0] !== '0.0.0.0' || tokens[1] === '0.0.0.0') { continue; } const r = await this.redis.sadd(ChatLinksService.DOMAIN_BLACKLIST_KEY, tokens[1]); if (r > 0) { this.log.info('added domain to Redis blocklist', { domain: tokens[1] }); } } } catch (error) { this.log.error('failed to update domain blacklist', { error }); // fall through } finally { this.log.info('domain block list updated'); } } } (async ( ) => { try { const { fileURLToPath } = await import('node:url'); const __dirname = dirname(fileURLToPath(import.meta.url)); // jshint ignore:line const worker = new ChatLinksService(path.resolve(__dirname, '..', '..')); await worker.start(); } catch (error) { console.error('failed to start Host Cache worker', { error }); process.exit(-1); } })();