A virtual newsroom powered by RSS and AI.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

113 lines
2.8 KiB

// workers/newsroom.ts
// Copyright (C) 2025 DTP Technologies, LLC
// All Rights Reserved
import assert from "node:assert";
import env from "../config/env.js";
import os from "node:os";
import { DtpWorker } from "../lib/core/worker.js";
import { CronJob } from "cron";
import { FetchNewsJob } from "./newsroom/fetch-news.js";
import { IDtpProcessEndpoint } from "../app/models/lib/process-endpoint.js";
import { DtpProcessStatus } from "../app/models/lib/process-status.js";
import ProcessService from "../app/services/process.js";
class NewsroomWorker extends DtpWorker {
fetchNews?: FetchNewsJob;
fetchNewsJob?: CronJob;
static get name() {
return "NewsroomWorker";
}
static get slug() {
return "newsroom-worker";
}
constructor() {
super(NewsroomWorker);
}
async start(): Promise<void> {
await super.startWorker("newsroom", env.jobQueues.newsroom);
assert(this.jobQueue, "Job queue required");
/*
* Register with DTP platform
*/
const processService = this.getService<ProcessService>("process");
const endpoint: IDtpProcessEndpoint = {
hostname: os.hostname(),
ip: env.https.address,
port: env.https.port,
};
this.log.info("registering Web process with the platform", { endpoint });
this.workerProcess = await processService.createWorkerProcess(endpoint);
/*
* Attach to Bull job queue
*/
this.fetchNews = new FetchNewsJob(this, this.jobQueue);
this.log.info("registering ingest-entry job processor");
this.jobQueue.process(
"ingest-entry",
1,
this.fetchNews.ingestEntry.bind(this.fetchNews)
);
/*
* Create cron job schedules
*/
this.log.info("registering cron", { job: this.fetchNews.slug });
await this.fetchNews.run();
this.fetchNewsJob = new CronJob(
"0 0 * * * *",
this.fetchNews.run.bind(this.fetchNews),
null,
true,
env.timezone
);
/*
* Mark process as online (initialization complete)
*/
await processService.setStatus(this.workerProcess, DtpProcessStatus.Online);
this.log.info(`${this.component.name}:${this.component.slug} online`);
}
async stop(): Promise<number> {
this.log.info("stopping worker jobs");
if (this.fetchNewsJob) {
this.fetchNewsJob.stop();
delete this.fetchNewsJob;
}
this.log.debug("worker jobs stopped");
return super.stop();
}
}
(async () => {
process.on("unhandledRejection", async (error: Error, p) => {
console.error("Unhandled rejection", {
error: error,
promise: p,
stack: error.stack,
});
process.exit(-2);
});
try {
const worker = new NewsroomWorker();
await worker.start();
} catch (error) {
console.log("worker error", (error as Error).message);
process.exit(-1);
}
})();