Browse Source

refactored to new SiteWorker base class

pull/1/head
Rob Colbert 3 years ago
parent
commit
46fe794cc1
  1. 605
      app/workers/host-services.js
  2. 93
      app/workers/reeeper.js
  3. 4
      app/workers/sample-worker.js
  4. 4
      lib/site-service.js

605
app/workers/host-services.js

@ -17,10 +17,10 @@ const dgram = require('dgram');
const numeral = require('numeral'); const numeral = require('numeral');
const { const {
SitePlatform,
SiteAsync, SiteAsync,
SiteLog, SiteLog,
SiteError, SiteError,
SiteWorker,
} = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib'));
const { CronJob } = require('cron'); const { CronJob } = require('cron');
@ -122,8 +122,11 @@ class HostCacheTransaction {
get port ( ) { return this.rinfo.port; } get port ( ) { return this.rinfo.port; }
get size ( ) { return this.rinfo.size; } get size ( ) { return this.rinfo.size; }
constructor (message, rinfo) { constructor (dtp, message, rinfo) {
this.dtp = dtp;
this.created = Date.now(); // timestamp, not Date instance this.created = Date.now(); // timestamp, not Date instance
this.component = { name: 'Host Cache Transaction', slug: 'host-cache-transaction' };
this.log = new SiteLog(dtp, this.component);
this.message = message; this.message = message;
this.rinfo = rinfo; this.rinfo = rinfo;
@ -137,7 +140,7 @@ class HostCacheTransaction {
} }
async getFile ( ) { async getFile ( ) {
const { minio: minioService } = module.services; const { minio: minioService } = this.dtp.services;
const filePath = path.join( const filePath = path.join(
process.env.DTP_HOST_CACHE_PATH, process.env.DTP_HOST_CACHE_PATH,
this.params.bucket, this.params.bucket,
@ -160,17 +163,17 @@ class HostCacheTransaction {
} }
res.file.path = filePath; res.file.path = filePath;
this.flags.isCached = true; this.flags.isCached = true;
module.cacheStats.hit(res.file.stats.size); this.dtp.cacheStats.hit(res.file.stats.size);
return module.manager.resolveTransaction(this, res); return this.dtp.manager.resolveTransaction(this, res);
} catch (error) { } catch (error) {
if (error.code !== 'ENOENT') { if (error.code !== 'ENOENT') {
module.log.error('failed to stat requested object', { transaction: this, error }); this.log.error('failed to stat requested object', { transaction: this, error });
res.success = false; res.success = false;
res.statusCode = 500; res.statusCode = 500;
res.message = error.message; res.message = error.message;
this.error = error; this.error = error;
this.flags.isError = true; this.flags.isError = true;
return module.manager.resolveTransaction(this, res); return this.dtp.manager.resolveTransaction(this, res);
} }
// fall through to MinIO fetch since file not found in cache // fall through to MinIO fetch since file not found in cache
} }
@ -187,18 +190,18 @@ class HostCacheTransaction {
throw new SiteError(500, 'invalid object requested'); throw new SiteError(500, 'invalid object requested');
} }
this.flags.isFetched = true; this.flags.isFetched = true;
module.cacheStats.add(res.file.stats.size); this.dtp.cacheStats.add(res.file.stats.size);
module.cacheStats.miss(res.file.stats.size); this.dtp.cacheStats.miss(res.file.stats.size);
return module.manager.resolveTransaction(this, res); return this.dtp.manager.resolveTransaction(this, res);
} catch (error) { } catch (error) {
if (error.code !== 'NotFound') { if (error.code !== 'NotFound') {
module.log.error('failed to fetch requested object from MinIO', { transaction: this, error }); this.log.error('failed to fetch requested object from MinIO', { transaction: this, error });
res.success = false; res.success = false;
res.statusCode = 500; res.statusCode = 500;
res.message = error.message; res.message = error.message;
this.error = error; this.error = error;
this.flags.isError = true; this.flags.isError = true;
return module.manager.resolveTransaction(this, res); return this.dtp.manager.resolveTransaction(this, res);
} }
} }
@ -207,7 +210,7 @@ class HostCacheTransaction {
res.message = 'Not Found'; res.message = 'Not Found';
this.error = new SiteError(404, 'Not Found'); this.error = new SiteError(404, 'Not Found');
this.flags.isError = true; this.flags.isError = true;
return module.manager.resolveTransaction(this, res); return this.dtp.manager.resolveTransaction(this, res);
} }
async cancel (reason) { async cancel (reason) {
@ -216,7 +219,7 @@ class HostCacheTransaction {
success: false, success: false,
message: `Operation canceled: ${reason}`, message: `Operation canceled: ${reason}`,
}; };
return module.manager.resolveTransaction(this, res); return this.dtp.manager.resolveTransaction(this, res);
} }
async sendResponse (res) { async sendResponse (res) {
@ -228,12 +231,15 @@ class HostCacheTransaction {
const payload = { tid: this.tid, res, flags, duration }; const payload = { tid: this.tid, res, flags, duration };
const reply = Buffer.from(JSON.stringify(payload)); const reply = Buffer.from(JSON.stringify(payload));
module.server.send(reply, this.port, this.address); this.dtp.server.send(reply, this.port, this.address);
} }
} }
class TransactionManager { class TransactionManager {
constructor ( ) { constructor (dtp) {
this.dtp = dtp;
this.component = { name: 'Transaction Manager', slug: 'transaction-manager' };
this.log = new SiteLog(dtp, this.component);
this.transactions = { }; this.transactions = { };
} }
@ -260,7 +266,7 @@ class TransactionManager {
port: transaction.port, port: transaction.port,
}, },
}); });
await module.manager.cancelTransaction(transaction, 'Rejected'); await this.dtp.manager.cancelTransaction(transaction, 'Rejected');
} }
hasPendingRequest (transaction) { hasPendingRequest (transaction) {
@ -301,363 +307,348 @@ class TransactionManager {
const transaction = this.transactions[key]; const transaction = this.transactions[key];
const age = NOW - transaction.created; const age = NOW - transaction.created;
if (age > (1000 * 30)) { if (age > (1000 * 30)) {
module.log.alert('expiring transaction', { transaction }); this.log.alert('expiring transaction', { transaction });
await this.cancelTransaction(transaction, 'expired'); await this.cancelTransaction(transaction, 'expired');
++expired; ++expired;
} }
}, 8); }, 8);
module.log.info('transaction watchdog', { expired }); this.log.info('transaction watchdog', { expired });
} }
} }
module.onHostCacheMessage = async (message, rinfo) => { class HostServicesWorker extends SiteWorker {
try {
message = message.toString('utf8');
message = JSON.parse(message);
const transaction = new HostCacheTransaction(message, rinfo);
module.manager.addTransaction(transaction);
} catch (error) {
module.log.error('failed to receive UDP message', { message, error });
}
};
module.startHostCache = async (basePath) => { constructor (dtp) {
basePath = basePath || process.env.DTP_HOST_CACHE_PATH; super(dtp, dtp.config.component);
}
const NOW = Date.now(); async onHostCacheMessage (message, rinfo) {
const dir = await fs.promises.opendir(basePath); try {
message = message.toString('utf8');
message = JSON.parse(message);
for await (const dirent of dir) { const transaction = new HostCacheTransaction(this.dtp, message, rinfo);
if (dirent.isDirectory()) { this.dtp.manager.addTransaction(transaction);
module.log.debug('cache start descend into directory', { name: dirent.name }); } catch (error) {
await module.startHostCache(path.join(basePath, dirent.name)); this.log.error('failed to receive UDP message', { message, error });
}
if (dirent.isFile()) {
const filePath = path.join(basePath, dirent.name);
const stats = await fs.promises.stat(filePath);
const age = (NOW - stats.atime.valueOf()) / 1000.0; // seconds
module.log.debug('examining file', { filePath, age: numeral(age).format('hh:mm:ss') });
if ((age / 60.0) > 60.0) {
module.log.info('expiring file', { filePath, age: numeral(age).format('hh:mm:ss') });
await fs.promises.rm(filePath, { force: true });
} else {
module.cacheStats.add(stats.size);
}
} }
} }
// await dir.close(); async startHostCache (basePath) {
module.log.info('cache startup cleanup complete', { basePath }); basePath = basePath || process.env.DTP_HOST_CACHE_PATH;
};
/** const NOW = Date.now();
* When a file is accessed for read or otherwise, it's atime is updated. If the const dir = await fs.promises.opendir(basePath);
* atime of a file exceeds the configured max file idle time, the file is
* removed. for await (const dirent of dir) {
* @param {String} basePath if (dirent.isDirectory()) {
*/ this.log.debug('cache start descend into directory', { name: dirent.name });
module.cleanHostCache = async (basePath) => { await this.startHostCache(path.join(basePath, dirent.name));
const NOW = Date.now(); // timestamp, not Date instance }
basePath = basePath || process.env.DTP_HOST_CACHE_PATH; if (dirent.isFile()) {
module.log.info('cache directory cleanup', { path: basePath }); const filePath = path.join(basePath, dirent.name);
const stats = await fs.promises.stat(filePath);
const dir = await fs.promises.opendir(basePath); const age = (NOW - stats.atime.valueOf()) / 1000.0; // seconds
for await (const dirent of dir) { this.log.debug('examining file', { filePath, age: numeral(age).format('hh:mm:ss') });
if (dirent.isDirectory()) { if ((age / 60.0) > 60.0) {
module.log.debug('cache clean descend into directory', { name: dirent.name }); this.log.info('expiring file', { filePath, age: numeral(age).format('hh:mm:ss') });
await module.cleanHostCache(path.join(basePath, dirent.name)); await fs.promises.rm(filePath, { force: true });
} } else {
if (dirent.isFile()) { this.dtp.cacheStats.add(stats.size);
const filePath = path.join(basePath, dirent.name); }
const stats = await fs.promises.stat(filePath);
const age = NOW - stats.atime.valueOf();
module.log.debug('examining file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') });
if ((age / 1000.0 / 60.0) > 60.0) {
module.log.info('expiring file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') });
await fs.promises.rm(filePath, { force: true });
module.cacheStats.remove(stats.size);
module.cacheStats.expire(stats.size);
} }
} }
// await dir.close();
this.log.info('cache startup cleanup complete', { basePath });
} }
module.log.info('cache directory cleanup complete', { basePath }); /**
}; * When a file is accessed for read or otherwise, it's atime is updated. If the
* atime of a file exceeds the configured max file idle time, the file is
* removed.
* @param {String} basePath The root of the cache store on disk.
*/
async cleanHostCache (basePath) {
const NOW = Date.now(); // timestamp, not Date instance
basePath = basePath || process.env.DTP_HOST_CACHE_PATH;
this.log.info('cache directory cleanup', { path: basePath });
const dir = await fs.promises.opendir(basePath);
for await (const dirent of dir) {
if (dirent.isDirectory()) {
this.log.debug('cache clean descend into directory', { name: dirent.name });
await this.cleanHostCache(path.join(basePath, dirent.name));
}
if (dirent.isFile()) {
const filePath = path.join(basePath, dirent.name);
const stats = await fs.promises.stat(filePath);
const age = NOW - stats.atime.valueOf();
this.log.debug('examining file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') });
if ((age / 1000.0 / 60.0) > 60.0) {
this.log.info('expiring file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') });
await fs.promises.rm(filePath, { force: true });
this.dtp.cacheStats.remove(stats.size);
this.dtp.cacheStats.expire(stats.size);
}
}
}
module.expireTransactions = async ( ) => { this.log.info('cache directory cleanup complete', { basePath });
await module.manager.expireTransactions(); }
};
module.registerHost = async ( ) => { async expireTransactions ( ) {
const NOW = new Date(); await this.dtp.manager.expireTransactions();
}
const mongoose = require('mongoose'); async registerHost ( ) {
const NetHost = mongoose.model('NetHost'); const NOW = new Date();
const memory = await si.mem(); const mongoose = require('mongoose');
const NetHost = mongoose.model('NetHost');
module.host = new NetHost(); const memory = await si.mem();
module.host.created = NOW;
module.host.status = 'starting';
module.host.hostname = os.hostname();
module.host.arch = os.arch(); this.host = new NetHost();
module.host.cpus = os.cpus().map((cpu) => { this.host.created = NOW;
return { this.host.status = 'starting';
model: cpu.model, this.host.hostname = os.hostname();
speed: cpu.speed,
}; this.host.arch = os.arch();
}); this.host.cpus = os.cpus().map((cpu) => {
return {
module.host.totalmem = memory.total; model: cpu.model,
module.host.freemem = memory.available; speed: cpu.speed,
};
module.host.platform = os.platform(); });
module.host.release = os.release();
module.host.version = os.version();
module.host.network = (await si.networkInterfaces()).map((iface) => {
return {
iface: iface.iface,
speed: iface.speed,
mac: iface.mac,
ip4: iface.ip4,
ip4subnet: iface.ip4subnet,
ip6: iface.ip6,
ip6subnet: iface.ip6subnet,
flags: {
internal: iface.internal,
virtual: iface.virtual,
},
};
});
module.log.info('host registration: network', { network: module.host.network });
await module.host.save(); this.host.totalmem = memory.total;
module.host = module.host.toObject(); this.host.freemem = memory.available;
module.log.info('registered host with platform', { host: module.host._id });
return module.host; this.host.platform = os.platform();
}; this.host.release = os.release();
this.host.version = os.version();
this.host.network = (await si.networkInterfaces()).map((iface) => {
return {
iface: iface.iface,
speed: iface.speed,
mac: iface.mac,
ip4: iface.ip4,
ip4subnet: iface.ip4subnet,
ip6: iface.ip6,
ip6subnet: iface.ip6subnet,
flags: {
internal: iface.internal,
virtual: iface.virtual,
},
};
});
this.log.info('host registration: network', { network: this.host.network });
module.reportHostStats = async ( ) => { await this.host.save();
const NOW = new Date(); this.host = this.host.toObject();
this.log.info('registered host with platform', { host: this.host._id });
return this.host;
}
const mongoose = require('mongoose'); async reportHostStats ( ) {
const NetHost = mongoose.model('NetHost'); const NOW = new Date();
const NetHostStats = mongoose.model('NetHostStats');
const memory = await si.mem(); const mongoose = require('mongoose');
const NetHost = mongoose.model('NetHost');
const NetHostStats = mongoose.model('NetHostStats');
const network = (await si.networkStats('*')).map((iface) => { const memory = await si.mem();
const record = { iface: iface.iface };
record.rxDropped = iface.rx_dropped; const network = (await si.networkStats('*')).map((iface) => {
record.rxErrors = iface.rx_errors; const record = { iface: iface.iface };
record.txDropped = iface.tx_dropped; record.rxDropped = iface.rx_dropped;
record.txErrors = iface.tx_errors; record.rxErrors = iface.rx_errors;
if (iface.ms !== 0) { record.txDropped = iface.tx_dropped;
record.rxPerSecond = iface.rx_sec / (iface.ms / 1000.0); record.txErrors = iface.tx_errors;
record.txPerSecond = iface.tx_sec / (iface.ms / 1000.0);
} else { if (iface.ms !== 0) {
record.rxPerSecond = 0; record.rxPerSecond = iface.rx_sec / (iface.ms / 1000.0);
record.txPerSecond = 0; record.txPerSecond = iface.tx_sec / (iface.ms / 1000.0);
} } else {
record.rxPerSecond = 0;
record.txPerSecond = 0;
}
return record; return record;
}); });
const cpus = os.cpus().map((cpu) => { const cpus = os.cpus().map((cpu) => {
return { return {
user: cpu.times.user, user: cpu.times.user,
nice: cpu.times.nice, nice: cpu.times.nice,
sys: cpu.times.sys, sys: cpu.times.sys,
idle: cpu.times.idle, idle: cpu.times.idle,
irq: cpu.times.irq, irq: cpu.times.irq,
};
});
const load = os.loadavg();
const cache = this.dtp.cacheStats.report();
const disk = {
cache: await this.getDiskUsage(process.env.DTP_HOST_CACHE_PATH),
}; };
});
const load = os.loadavg(); const cpuDeltas = [ ];
const cache = module.cacheStats.report(); if (this.oldCpuStats) {
const disk = { for (let idx = 0; idx < cpus.length; ++idx) {
cache: await module.getDiskUsage(process.env.DTP_HOST_CACHE_PATH), cpuDeltas.push({
}; user: cpus[idx].user - this.oldCpuStats[idx].user,
nice: cpus[idx].nice - this.oldCpuStats[idx].nice,
const cpuDeltas = [ ]; sys: cpus[idx].sys - this.oldCpuStats[idx].sys,
if (module.oldCpuStats) { idle: cpus[idx].idle - this.oldCpuStats[idx].idle,
for (let idx = 0; idx < cpus.length; ++idx) { irq: cpus[idx].irq - this.oldCpuStats[idx].irq,
cpuDeltas.push({ });
user: cpus[idx].user - module.oldCpuStats[idx].user, }
nice: cpus[idx].nice - module.oldCpuStats[idx].nice, } else {
sys: cpus[idx].sys - module.oldCpuStats[idx].sys, cpus.forEach(( ) => {
idle: cpus[idx].idle - module.oldCpuStats[idx].idle, cpuDeltas.push({
irq: cpus[idx].irq - module.oldCpuStats[idx].irq, user: 0,
nice: 0,
sys: 0,
idle: 0,
irq: 0,
});
}); });
} }
} else { this.oldCpuStats = cpus;
cpus.forEach(( ) => {
cpuDeltas.push({ await NetHostStats.create({
user: 0, created: NOW,
nice: 0, host: this.host._id,
sys: 0, load,
idle: 0, cpus: cpuDeltas,
irq: 0, memory, cache, disk, network,
});
}); });
await NetHost.updateOne(
{ _id: this.host._id },
{
updated: NOW,
freemem: memory.available,
},
);
this.log.info('platform host report', { host: this.host._id, });
} }
module.oldCpuStats = cpus;
await NetHostStats.create({
created: NOW,
host: module.host._id,
load,
cpus: cpuDeltas,
memory, cache, disk, network,
});
await NetHost.updateOne(
{ _id: module.host._id },
{
updated: NOW,
freemem: memory.available,
},
);
module.log.info('platform host report', { host: module.host._id, });
};
module.getDiskUsage = (pathname) => { getDiskUsage (pathname) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
diskusage(pathname, (err, usage) => { diskusage(pathname, (err, usage) => {
if (err) { if (err) {
return reject(err); return reject(err);
} }
usage.pctUsed = (usage.used / usage.total) * 100.0; usage.pctUsed = (usage.used / usage.total) * 100.0;
return resolve(usage); return resolve(usage);
});
}); });
});
};
module.setHostStatus = async (status) => {
if (!module.host) {
return;
} }
const NOW = new Date(); async setHostStatus (status) {
const mongoose = require('mongoose'); if (!this.host) {
const NetHost = mongoose.model('NetHost'); return;
}
await NetHost.updateOne(
{ _id: module.host._id },
{
$set: {
updated: NOW,
status,
},
},
);
};
module.expireNetHosts = async ( ) => {
const NOW = new Date();
const OLDEST = new Date(Date.now() - 1000 * 60 * 2);
const mongoose = require('mongoose');
const NetHost = mongoose.model('NetHost');
const hosts = await NetHost.find({ const NOW = new Date();
status: { $nin: ['inactive', 'crashed'] }, const mongoose = require('mongoose');
updated: { $lt: OLDEST } const NetHost = mongoose.model('NetHost');
});
module.log.info('expired host cleanup', { hostCount: hosts.length });
await SiteAsync.each(hosts, async (host) => { await NetHost.updateOne(
try { { _id: this.host._id },
await NetHost.updateOne( {
{ _id: host._id }, $set: {
{ updated: NOW,
$set: { status,
updated: NOW,
status: 'crashed',
},
}, },
); },
} catch (error) { );
module.log.error('failed to clean expired host', { host, error }); }
}
}, 4);
};
(async ( ) => {
try {
process.on('unhandledRejection', (error, p) => {
module.log.error('Unhandled rejection', {
error: error,
promise: p,
stack: error.stack
});
});
process.on('warning', (error) => {
module.log.alert('warning', { error });
});
process.once('SIGINT', async ( ) => { async expireNetHosts ( ) {
module.log.info('SIGINT received'); const NOW = new Date();
module.log.info('requesting shutdown...'); const OLDEST = new Date(Date.now() - 1000 * 60 * 2);
await module.setHostStatus('shutdown'); const mongoose = require('mongoose');
const exitCode = await SitePlatform.shutdown(); const NetHost = mongoose.model('NetHost');
await module.setHostStatus('inactive');
process.nextTick(( ) => { const hosts = await NetHost.find({
process.exit(exitCode); status: { $nin: ['inactive', 'crashed'] },
}); updated: { $lt: OLDEST }
}); });
this.log.info('expired host cleanup', { hostCount: hosts.length });
await SiteAsync.each(hosts, async (host) => {
try {
await NetHost.updateOne(
{ _id: host._id },
{
$set: {
updated: NOW,
status: 'crashed',
},
},
);
} catch (error) {
this.log.error('failed to clean expired host', { host, error });
}
}, 4);
}
module.log.info('ensuring host-services path exists', { path: process.env.DTP_HOST_CACHE_PATH }); async start ( ) {
await fs.promises.mkdir(process.env.DTP_HOST_CACHE_PATH, { recursive: true }); await super.start();
module.networkStats = await si.networkStats('*'); this.networkStats = await si.networkStats('*');
module.log.info('starting cache service', { path: process.env.DTP_HOST_CACHE_PATH }); this.log.info('starting cache service', { path: process.env.DTP_HOST_CACHE_PATH });
module.cacheStats = new CacheStats(); this.dtp.cacheStats = new CacheStats();
await module.startHostCache(process.env.DTP_HOST_CACHE_PATH); await this.startHostCache(process.env.DTP_HOST_CACHE_PATH);
module.log.info('cache stats at start', { stats: module.cacheStats }); this.log.info('cache stats at start', { stats: this.dtp.cacheStats });
/* /*
* Host Cache server socket setup * Host Cache server socket setup
*/ */
const HOST_PORT = parseInt(process.env.DTP_HOST_CACHE_PORT || '8000', 10); const HOST_PORT = parseInt(process.env.DTP_HOST_CACHE_PORT || '8000', 10);
module.log.info('creating server UDP socket'); this.log.info('creating server UDP socket');
module.server = dgram.createSocket('udp4', module.onHostCacheMessage); this.dtp.server = dgram.createSocket('udp4', this.onHostCacheMessage.bind(this));
module.log.info('binding server UDP socket', { port: HOST_PORT }); this.log.info('binding server UDP socket', { port: HOST_PORT });
module.server.bind(HOST_PORT); this.dtp.server.bind(HOST_PORT);
/* this.log.info('starting transaction manager');
* Site Platform startup this.dtp.manager = new TransactionManager(this.dtp);
*/ this.expireJob = new CronJob('*/5 * * * * *', this.expireTransactions.bind(this), null, true, CRON_TIMEZONE);
await SitePlatform.startPlatform(module);
module.log.info('starting transaction manager');
module.manager = new TransactionManager();
module.expireJob = new CronJob('*/5 * * * * *', module.expireTransactions, null, true, CRON_TIMEZONE);
/* /*
* Worker startup * Worker startup
*/ */
const cleanCronJob = process.env.DTP_HOST_CACHE_CLEAN_CRON || '*/30 * * * * *'; const cleanCronJob = process.env.DTP_HOST_CACHE_CLEAN_CRON || '*/30 * * * * *';
module.log.info('starting host cache clean cron', { cleanCronJob }); this.log.info('starting host cache clean cron', { cleanCronJob });
module.cleanupJob = new CronJob(cleanCronJob, module.cleanHostCache, null, true, CRON_TIMEZONE); this.cleanupJob = new CronJob(cleanCronJob, this.cleanHostCache.bind(this), null, true, CRON_TIMEZONE);
this.log.info('starting stats report job');
this.statsReportJob = new CronJob('*/5 * * * * *', this.reportHostStats.bind(this), null, true, CRON_TIMEZONE);
module.log.info('starting stats report job'); this.log.info('starting host expiration job');
module.statsReportJob = new CronJob('*/5 * * * * *', module.reportHostStats, null, true, CRON_TIMEZONE); this.expireHostsJob = new CronJob('*/20 * * * * *', this.expireNetHosts.bind(this), null, true, CRON_TIMEZONE);
this.log.info('registering host with DTP Sites platform', { });
await this.registerHost();
await this.setHostStatus('active');
}
}
module.log.info('starting host expiration job'); (async ( ) => {
module.expireHostsJob = new CronJob('*/20 * * * * *', module.expireNetHosts, null, true, CRON_TIMEZONE); try {
module.log.info('ensuring host-services path exists', { path: process.env.DTP_HOST_CACHE_PATH });
await fs.promises.mkdir(process.env.DTP_HOST_CACHE_PATH, { recursive: true });
module.log.info('registering host with DTP Sites platform', { }); module.worker = new HostServicesWorker(module);
await module.registerHost(); await module.worker.start();
await module.setHostStatus('active');
module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`); module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`);
} catch (error) { } catch (error) {

93
app/workers/reeeper.js

@ -11,8 +11,8 @@ require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') });
const mongoose = require('mongoose'); const mongoose = require('mongoose');
const { const {
SitePlatform,
SiteLog, SiteLog,
SiteWorker,
} = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib'));
const { CronJob } = require('cron'); const { CronJob } = require('cron');
@ -31,61 +31,52 @@ module.config = {
module.config.site = require(path.join(module.rootPath, 'config', 'site')); module.config.site = require(path.join(module.rootPath, 'config', 'site'));
module.config.http = require(path.join(module.rootPath, 'config', 'http')); module.config.http = require(path.join(module.rootPath, 'config', 'http'));
module.log = new SiteLog(module, module.config.component); class ReeeperWorker extends SiteWorker {
module.expireCrashedHosts = async ( ) => { constructor (dtp) {
const NetHost = mongoose.model('NetHost'); super(dtp, dtp.config.component);
try {
await NetHost
.find({ status: 'crashed' })
.select('_id hostname')
.lean()
.cursor()
.eachAsync(async (host) => {
module.log.info('deactivating crashed host', { hostname: host.hostname });
await NetHost.updateOne({ _id: host._id }, { $set: { status: 'inactive' } });
});
} catch (error) {
module.log.error('failed to expire crashed hosts', { error });
} }
};
async start ( ) {
await super.start();
await this.expireCrashedHosts(); // first-run the expirations
this.expireJob = new CronJob('*/5 * * * * *', this.expireCrashedHosts.bind(this), null, true, CRON_TIMEZONE);
}
async stop ( ) {
if (this.expireJob) {
this.log.info('stopping host expire job');
this.expireJob.stop();
delete this.expireJob;
}
await super.stop();
}
async expireCrashedHosts ( ) {
const NetHost = mongoose.model('NetHost');
try {
await NetHost
.find({ status: 'crashed' })
.select('_id hostname')
.lean()
.cursor()
.eachAsync(async (host) => {
this.log.info('deactivating crashed host', { hostname: host.hostname });
await NetHost.updateOne({ _id: host._id }, { $set: { status: 'inactive' } });
});
} catch (error) {
this.log.error('failed to expire crashed hosts', { error });
}
}
}
(async ( ) => { (async ( ) => {
try { try {
process.on('unhandledRejection', (error, p) => { module.log = new SiteLog(module, module.config.component);
module.log.error('Unhandled rejection', {
error: error, module.worker = new ReeeperWorker(module);
promise: p, await module.worker.start();
stack: error.stack
});
});
process.on('warning', (error) => {
module.log.alert('warning', { error });
});
process.once('SIGINT', async ( ) => {
module.log.info('SIGINT received');
module.log.info('requesting shutdown...');
const exitCode = await SitePlatform.shutdown();
process.nextTick(( ) => {
process.exit(exitCode);
});
});
/*
* Site Platform startup
*/
await SitePlatform.startPlatform(module);
await module.expireCrashedHosts(); // first-run the expirations
module.expireJob = new CronJob(
'*/5 * * * * *',
module.expireCrashedHosts,
null, true, CRON_TIMEZONE,
);
module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`); module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`);
} catch (error) { } catch (error) {

4
app/workers/sample-worker.js

@ -52,9 +52,11 @@ class SampleWorker extends SiteWorker {
} }
async stop ( ) { async stop ( ) {
this.log.info('stopping worker job'); this.log.info('stopping sample worker job');
this.job.stop(); this.job.stop();
delete this.job; delete this.job;
await super.stop();
} }
async runJob ( ) { async runJob ( ) {

4
lib/site-service.js

@ -15,11 +15,11 @@ class SiteService extends SiteCommon {
} }
async start ( ) { async start ( ) {
this.log.debug(`starting ${this.name} service`); this.log.debug(`starting ${this.component.name} service`);
} }
async stop ( ) { async stop ( ) {
this.log.debug(`stopping ${this.name} service`); this.log.debug(`stopping ${this.component.name} service`);
} }
} }

Loading…
Cancel
Save