// workers/newsroom.ts // Copyright (C) 2025 DTP Technologies, LLC // All Rights Reserved import assert from "node:assert"; import env from "../config/env.js"; import os from "node:os"; import { DtpWorker } from "../lib/core/worker.js"; import { CronJob } from "cron"; import { FetchNewsJob } from "./newsroom/fetch-news.js"; import { IDtpProcessEndpoint } from "../app/models/lib/process-endpoint.js"; import { DtpProcessStatus } from "../app/models/lib/process-status.js"; import ProcessService from "../app/services/process.js"; class NewsroomWorker extends DtpWorker { fetchNews?: FetchNewsJob; fetchNewsJob?: CronJob; static get name() { return "NewsroomWorker"; } static get slug() { return "newsroom-worker"; } constructor() { super(NewsroomWorker); } async start(): Promise { await super.startWorker("newsroom", env.jobQueues.newsroom); assert(this.jobQueue, "Job queue required"); /* * Register with DTP platform */ const processService = this.getService("process"); const endpoint: IDtpProcessEndpoint = { hostname: os.hostname(), ip: env.https.address, port: env.https.port, }; this.log.info("registering Web process with the platform", { endpoint }); this.workerProcess = await processService.createWorkerProcess(endpoint); /* * Attach to Bull job queue */ this.fetchNews = new FetchNewsJob(this, this.jobQueue); this.log.info("registering ingest-entry job processor"); this.jobQueue.process( "ingest-entry", 1, this.fetchNews.ingestEntry.bind(this.fetchNews) ); /* * Create cron job schedules */ this.log.info("registering cron", { job: this.fetchNews.slug }); await this.fetchNews.run(); this.fetchNewsJob = new CronJob( "0 0 * * * *", this.fetchNews.run.bind(this.fetchNews), null, true, env.timezone ); /* * Mark process as online (initialization complete) */ await processService.setStatus(this.workerProcess, DtpProcessStatus.Online); this.log.info(`${this.component.name}:${this.component.slug} online`); } async stop(): Promise { this.log.info("stopping worker jobs"); if (this.fetchNewsJob) { this.fetchNewsJob.stop(); delete this.fetchNewsJob; } this.log.debug("worker jobs stopped"); return super.stop(); } } (async () => { process.on("unhandledRejection", async (error: Error, p) => { console.error("Unhandled rejection", { error: error, promise: p, stack: error.stack, }); process.exit(-2); }); try { const worker = new NewsroomWorker(); await worker.start(); } catch (error) { console.log("worker error", (error as Error).message); process.exit(-1); } })();