From 46fe794cc1563c478d4d2acdf5b9ffb7f9d38cfb Mon Sep 17 00:00:00 2001 From: rob Date: Wed, 6 Jul 2022 11:06:18 -0400 Subject: [PATCH] refactored to new SiteWorker base class --- app/workers/host-services.js | 605 +++++++++++++++++------------------ app/workers/reeeper.js | 93 +++--- app/workers/sample-worker.js | 4 +- lib/site-service.js | 4 +- 4 files changed, 345 insertions(+), 361 deletions(-) diff --git a/app/workers/host-services.js b/app/workers/host-services.js index fb90a8f..ffedc5a 100644 --- a/app/workers/host-services.js +++ b/app/workers/host-services.js @@ -17,10 +17,10 @@ const dgram = require('dgram'); const numeral = require('numeral'); const { - SitePlatform, SiteAsync, SiteLog, SiteError, + SiteWorker, } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); const { CronJob } = require('cron'); @@ -122,8 +122,11 @@ class HostCacheTransaction { get port ( ) { return this.rinfo.port; } get size ( ) { return this.rinfo.size; } - constructor (message, rinfo) { + constructor (dtp, message, rinfo) { + this.dtp = dtp; this.created = Date.now(); // timestamp, not Date instance + this.component = { name: 'Host Cache Transaction', slug: 'host-cache-transaction' }; + this.log = new SiteLog(dtp, this.component); this.message = message; this.rinfo = rinfo; @@ -137,7 +140,7 @@ class HostCacheTransaction { } async getFile ( ) { - const { minio: minioService } = module.services; + const { minio: minioService } = this.dtp.services; const filePath = path.join( process.env.DTP_HOST_CACHE_PATH, this.params.bucket, @@ -160,17 +163,17 @@ class HostCacheTransaction { } res.file.path = filePath; this.flags.isCached = true; - module.cacheStats.hit(res.file.stats.size); - return module.manager.resolveTransaction(this, res); + this.dtp.cacheStats.hit(res.file.stats.size); + return this.dtp.manager.resolveTransaction(this, res); } catch (error) { if (error.code !== 'ENOENT') { - module.log.error('failed to stat requested object', { transaction: this, error }); + this.log.error('failed to stat requested object', { transaction: this, error }); res.success = false; res.statusCode = 500; res.message = error.message; this.error = error; this.flags.isError = true; - return module.manager.resolveTransaction(this, res); + return this.dtp.manager.resolveTransaction(this, res); } // fall through to MinIO fetch since file not found in cache } @@ -187,18 +190,18 @@ class HostCacheTransaction { throw new SiteError(500, 'invalid object requested'); } this.flags.isFetched = true; - module.cacheStats.add(res.file.stats.size); - module.cacheStats.miss(res.file.stats.size); - return module.manager.resolveTransaction(this, res); + this.dtp.cacheStats.add(res.file.stats.size); + this.dtp.cacheStats.miss(res.file.stats.size); + return this.dtp.manager.resolveTransaction(this, res); } catch (error) { if (error.code !== 'NotFound') { - module.log.error('failed to fetch requested object from MinIO', { transaction: this, error }); + this.log.error('failed to fetch requested object from MinIO', { transaction: this, error }); res.success = false; res.statusCode = 500; res.message = error.message; this.error = error; this.flags.isError = true; - return module.manager.resolveTransaction(this, res); + return this.dtp.manager.resolveTransaction(this, res); } } @@ -207,7 +210,7 @@ class HostCacheTransaction { res.message = 'Not Found'; this.error = new SiteError(404, 'Not Found'); this.flags.isError = true; - return module.manager.resolveTransaction(this, res); + return this.dtp.manager.resolveTransaction(this, res); } async cancel (reason) { @@ -216,7 +219,7 @@ class HostCacheTransaction { success: false, message: `Operation canceled: ${reason}`, }; - return module.manager.resolveTransaction(this, res); + return this.dtp.manager.resolveTransaction(this, res); } async sendResponse (res) { @@ -228,12 +231,15 @@ class HostCacheTransaction { const payload = { tid: this.tid, res, flags, duration }; const reply = Buffer.from(JSON.stringify(payload)); - module.server.send(reply, this.port, this.address); + this.dtp.server.send(reply, this.port, this.address); } } class TransactionManager { - constructor ( ) { + constructor (dtp) { + this.dtp = dtp; + this.component = { name: 'Transaction Manager', slug: 'transaction-manager' }; + this.log = new SiteLog(dtp, this.component); this.transactions = { }; } @@ -260,7 +266,7 @@ class TransactionManager { port: transaction.port, }, }); - await module.manager.cancelTransaction(transaction, 'Rejected'); + await this.dtp.manager.cancelTransaction(transaction, 'Rejected'); } hasPendingRequest (transaction) { @@ -301,363 +307,348 @@ class TransactionManager { const transaction = this.transactions[key]; const age = NOW - transaction.created; if (age > (1000 * 30)) { - module.log.alert('expiring transaction', { transaction }); + this.log.alert('expiring transaction', { transaction }); await this.cancelTransaction(transaction, 'expired'); ++expired; } }, 8); - module.log.info('transaction watchdog', { expired }); + this.log.info('transaction watchdog', { expired }); } } -module.onHostCacheMessage = async (message, rinfo) => { - try { - message = message.toString('utf8'); - message = JSON.parse(message); - - const transaction = new HostCacheTransaction(message, rinfo); - module.manager.addTransaction(transaction); - } catch (error) { - module.log.error('failed to receive UDP message', { message, error }); - } -}; +class HostServicesWorker extends SiteWorker { -module.startHostCache = async (basePath) => { - basePath = basePath || process.env.DTP_HOST_CACHE_PATH; + constructor (dtp) { + super(dtp, dtp.config.component); + } - const NOW = Date.now(); - const dir = await fs.promises.opendir(basePath); + async onHostCacheMessage (message, rinfo) { + try { + message = message.toString('utf8'); + message = JSON.parse(message); - for await (const dirent of dir) { - if (dirent.isDirectory()) { - module.log.debug('cache start descend into directory', { name: dirent.name }); - await module.startHostCache(path.join(basePath, dirent.name)); - } - if (dirent.isFile()) { - const filePath = path.join(basePath, dirent.name); - const stats = await fs.promises.stat(filePath); - const age = (NOW - stats.atime.valueOf()) / 1000.0; // seconds - module.log.debug('examining file', { filePath, age: numeral(age).format('hh:mm:ss') }); - if ((age / 60.0) > 60.0) { - module.log.info('expiring file', { filePath, age: numeral(age).format('hh:mm:ss') }); - await fs.promises.rm(filePath, { force: true }); - } else { - module.cacheStats.add(stats.size); - } + const transaction = new HostCacheTransaction(this.dtp, message, rinfo); + this.dtp.manager.addTransaction(transaction); + } catch (error) { + this.log.error('failed to receive UDP message', { message, error }); } } - // await dir.close(); - module.log.info('cache startup cleanup complete', { basePath }); -}; + async startHostCache (basePath) { + basePath = basePath || process.env.DTP_HOST_CACHE_PATH; -/** - * When a file is accessed for read or otherwise, it's atime is updated. If the - * atime of a file exceeds the configured max file idle time, the file is - * removed. - * @param {String} basePath - */ -module.cleanHostCache = async (basePath) => { - const NOW = Date.now(); // timestamp, not Date instance - basePath = basePath || process.env.DTP_HOST_CACHE_PATH; - module.log.info('cache directory cleanup', { path: basePath }); - - const dir = await fs.promises.opendir(basePath); - for await (const dirent of dir) { - if (dirent.isDirectory()) { - module.log.debug('cache clean descend into directory', { name: dirent.name }); - await module.cleanHostCache(path.join(basePath, dirent.name)); - } - if (dirent.isFile()) { - const filePath = path.join(basePath, dirent.name); - const stats = await fs.promises.stat(filePath); - const age = NOW - stats.atime.valueOf(); - module.log.debug('examining file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); - if ((age / 1000.0 / 60.0) > 60.0) { - module.log.info('expiring file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); - await fs.promises.rm(filePath, { force: true }); - module.cacheStats.remove(stats.size); - module.cacheStats.expire(stats.size); + const NOW = Date.now(); + const dir = await fs.promises.opendir(basePath); + + for await (const dirent of dir) { + if (dirent.isDirectory()) { + this.log.debug('cache start descend into directory', { name: dirent.name }); + await this.startHostCache(path.join(basePath, dirent.name)); + } + if (dirent.isFile()) { + const filePath = path.join(basePath, dirent.name); + const stats = await fs.promises.stat(filePath); + const age = (NOW - stats.atime.valueOf()) / 1000.0; // seconds + this.log.debug('examining file', { filePath, age: numeral(age).format('hh:mm:ss') }); + if ((age / 60.0) > 60.0) { + this.log.info('expiring file', { filePath, age: numeral(age).format('hh:mm:ss') }); + await fs.promises.rm(filePath, { force: true }); + } else { + this.dtp.cacheStats.add(stats.size); + } } } + + // await dir.close(); + this.log.info('cache startup cleanup complete', { basePath }); } - module.log.info('cache directory cleanup complete', { basePath }); -}; + /** + * When a file is accessed for read or otherwise, it's atime is updated. If the + * atime of a file exceeds the configured max file idle time, the file is + * removed. + * @param {String} basePath The root of the cache store on disk. + */ + async cleanHostCache (basePath) { + const NOW = Date.now(); // timestamp, not Date instance + basePath = basePath || process.env.DTP_HOST_CACHE_PATH; + this.log.info('cache directory cleanup', { path: basePath }); + + const dir = await fs.promises.opendir(basePath); + for await (const dirent of dir) { + if (dirent.isDirectory()) { + this.log.debug('cache clean descend into directory', { name: dirent.name }); + await this.cleanHostCache(path.join(basePath, dirent.name)); + } + if (dirent.isFile()) { + const filePath = path.join(basePath, dirent.name); + const stats = await fs.promises.stat(filePath); + const age = NOW - stats.atime.valueOf(); + this.log.debug('examining file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); + if ((age / 1000.0 / 60.0) > 60.0) { + this.log.info('expiring file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); + await fs.promises.rm(filePath, { force: true }); + this.dtp.cacheStats.remove(stats.size); + this.dtp.cacheStats.expire(stats.size); + } + } + } -module.expireTransactions = async ( ) => { - await module.manager.expireTransactions(); -}; + this.log.info('cache directory cleanup complete', { basePath }); + } -module.registerHost = async ( ) => { - const NOW = new Date(); + async expireTransactions ( ) { + await this.dtp.manager.expireTransactions(); + } - const mongoose = require('mongoose'); - const NetHost = mongoose.model('NetHost'); + async registerHost ( ) { + const NOW = new Date(); - const memory = await si.mem(); + const mongoose = require('mongoose'); + const NetHost = mongoose.model('NetHost'); - module.host = new NetHost(); - module.host.created = NOW; - module.host.status = 'starting'; - module.host.hostname = os.hostname(); + const memory = await si.mem(); - module.host.arch = os.arch(); - module.host.cpus = os.cpus().map((cpu) => { - return { - model: cpu.model, - speed: cpu.speed, - }; - }); - - module.host.totalmem = memory.total; - module.host.freemem = memory.available; - - module.host.platform = os.platform(); - module.host.release = os.release(); - module.host.version = os.version(); - - module.host.network = (await si.networkInterfaces()).map((iface) => { - return { - iface: iface.iface, - speed: iface.speed, - mac: iface.mac, - ip4: iface.ip4, - ip4subnet: iface.ip4subnet, - ip6: iface.ip6, - ip6subnet: iface.ip6subnet, - flags: { - internal: iface.internal, - virtual: iface.virtual, - }, - }; - }); - module.log.info('host registration: network', { network: module.host.network }); + this.host = new NetHost(); + this.host.created = NOW; + this.host.status = 'starting'; + this.host.hostname = os.hostname(); + + this.host.arch = os.arch(); + this.host.cpus = os.cpus().map((cpu) => { + return { + model: cpu.model, + speed: cpu.speed, + }; + }); - await module.host.save(); - module.host = module.host.toObject(); - module.log.info('registered host with platform', { host: module.host._id }); - return module.host; -}; + this.host.totalmem = memory.total; + this.host.freemem = memory.available; + + this.host.platform = os.platform(); + this.host.release = os.release(); + this.host.version = os.version(); + + this.host.network = (await si.networkInterfaces()).map((iface) => { + return { + iface: iface.iface, + speed: iface.speed, + mac: iface.mac, + ip4: iface.ip4, + ip4subnet: iface.ip4subnet, + ip6: iface.ip6, + ip6subnet: iface.ip6subnet, + flags: { + internal: iface.internal, + virtual: iface.virtual, + }, + }; + }); + this.log.info('host registration: network', { network: this.host.network }); -module.reportHostStats = async ( ) => { - const NOW = new Date(); + await this.host.save(); + this.host = this.host.toObject(); + this.log.info('registered host with platform', { host: this.host._id }); + return this.host; + } - const mongoose = require('mongoose'); - const NetHost = mongoose.model('NetHost'); - const NetHostStats = mongoose.model('NetHostStats'); + async reportHostStats ( ) { + const NOW = new Date(); - const memory = await si.mem(); + const mongoose = require('mongoose'); + const NetHost = mongoose.model('NetHost'); + const NetHostStats = mongoose.model('NetHostStats'); - const network = (await si.networkStats('*')).map((iface) => { - const record = { iface: iface.iface }; + const memory = await si.mem(); - record.rxDropped = iface.rx_dropped; - record.rxErrors = iface.rx_errors; + const network = (await si.networkStats('*')).map((iface) => { + const record = { iface: iface.iface }; - record.txDropped = iface.tx_dropped; - record.txErrors = iface.tx_errors; + record.rxDropped = iface.rx_dropped; + record.rxErrors = iface.rx_errors; - if (iface.ms !== 0) { - record.rxPerSecond = iface.rx_sec / (iface.ms / 1000.0); - record.txPerSecond = iface.tx_sec / (iface.ms / 1000.0); - } else { - record.rxPerSecond = 0; - record.txPerSecond = 0; - } + record.txDropped = iface.tx_dropped; + record.txErrors = iface.tx_errors; + + if (iface.ms !== 0) { + record.rxPerSecond = iface.rx_sec / (iface.ms / 1000.0); + record.txPerSecond = iface.tx_sec / (iface.ms / 1000.0); + } else { + record.rxPerSecond = 0; + record.txPerSecond = 0; + } - return record; - }); + return record; + }); - const cpus = os.cpus().map((cpu) => { - return { - user: cpu.times.user, - nice: cpu.times.nice, - sys: cpu.times.sys, - idle: cpu.times.idle, - irq: cpu.times.irq, + const cpus = os.cpus().map((cpu) => { + return { + user: cpu.times.user, + nice: cpu.times.nice, + sys: cpu.times.sys, + idle: cpu.times.idle, + irq: cpu.times.irq, + }; + }); + const load = os.loadavg(); + const cache = this.dtp.cacheStats.report(); + const disk = { + cache: await this.getDiskUsage(process.env.DTP_HOST_CACHE_PATH), }; - }); - const load = os.loadavg(); - const cache = module.cacheStats.report(); - const disk = { - cache: await module.getDiskUsage(process.env.DTP_HOST_CACHE_PATH), - }; - - const cpuDeltas = [ ]; - if (module.oldCpuStats) { - for (let idx = 0; idx < cpus.length; ++idx) { - cpuDeltas.push({ - user: cpus[idx].user - module.oldCpuStats[idx].user, - nice: cpus[idx].nice - module.oldCpuStats[idx].nice, - sys: cpus[idx].sys - module.oldCpuStats[idx].sys, - idle: cpus[idx].idle - module.oldCpuStats[idx].idle, - irq: cpus[idx].irq - module.oldCpuStats[idx].irq, + + const cpuDeltas = [ ]; + if (this.oldCpuStats) { + for (let idx = 0; idx < cpus.length; ++idx) { + cpuDeltas.push({ + user: cpus[idx].user - this.oldCpuStats[idx].user, + nice: cpus[idx].nice - this.oldCpuStats[idx].nice, + sys: cpus[idx].sys - this.oldCpuStats[idx].sys, + idle: cpus[idx].idle - this.oldCpuStats[idx].idle, + irq: cpus[idx].irq - this.oldCpuStats[idx].irq, + }); + } + } else { + cpus.forEach(( ) => { + cpuDeltas.push({ + user: 0, + nice: 0, + sys: 0, + idle: 0, + irq: 0, + }); }); } - } else { - cpus.forEach(( ) => { - cpuDeltas.push({ - user: 0, - nice: 0, - sys: 0, - idle: 0, - irq: 0, - }); + this.oldCpuStats = cpus; + + await NetHostStats.create({ + created: NOW, + host: this.host._id, + load, + cpus: cpuDeltas, + memory, cache, disk, network, }); + await NetHost.updateOne( + { _id: this.host._id }, + { + updated: NOW, + freemem: memory.available, + }, + ); + this.log.info('platform host report', { host: this.host._id, }); } - module.oldCpuStats = cpus; - - await NetHostStats.create({ - created: NOW, - host: module.host._id, - load, - cpus: cpuDeltas, - memory, cache, disk, network, - }); - await NetHost.updateOne( - { _id: module.host._id }, - { - updated: NOW, - freemem: memory.available, - }, - ); - module.log.info('platform host report', { host: module.host._id, }); -}; -module.getDiskUsage = (pathname) => { - return new Promise((resolve, reject) => { - diskusage(pathname, (err, usage) => { - if (err) { - return reject(err); - } - usage.pctUsed = (usage.used / usage.total) * 100.0; - return resolve(usage); + getDiskUsage (pathname) { + return new Promise((resolve, reject) => { + diskusage(pathname, (err, usage) => { + if (err) { + return reject(err); + } + usage.pctUsed = (usage.used / usage.total) * 100.0; + return resolve(usage); + }); }); - }); -}; - -module.setHostStatus = async (status) => { - if (!module.host) { - return; } - const NOW = new Date(); - const mongoose = require('mongoose'); - const NetHost = mongoose.model('NetHost'); - - await NetHost.updateOne( - { _id: module.host._id }, - { - $set: { - updated: NOW, - status, - }, - }, - ); -}; - -module.expireNetHosts = async ( ) => { - const NOW = new Date(); - const OLDEST = new Date(Date.now() - 1000 * 60 * 2); - - const mongoose = require('mongoose'); - const NetHost = mongoose.model('NetHost'); + async setHostStatus (status) { + if (!this.host) { + return; + } - const hosts = await NetHost.find({ - status: { $nin: ['inactive', 'crashed'] }, - updated: { $lt: OLDEST } - }); - module.log.info('expired host cleanup', { hostCount: hosts.length }); + const NOW = new Date(); + const mongoose = require('mongoose'); + const NetHost = mongoose.model('NetHost'); - await SiteAsync.each(hosts, async (host) => { - try { - await NetHost.updateOne( - { _id: host._id }, - { - $set: { - updated: NOW, - status: 'crashed', - }, + await NetHost.updateOne( + { _id: this.host._id }, + { + $set: { + updated: NOW, + status, }, - ); - } catch (error) { - module.log.error('failed to clean expired host', { host, error }); - } - }, 4); -}; - -(async ( ) => { - try { - process.on('unhandledRejection', (error, p) => { - module.log.error('Unhandled rejection', { - error: error, - promise: p, - stack: error.stack - }); - }); - - process.on('warning', (error) => { - module.log.alert('warning', { error }); - }); + }, + ); + } - process.once('SIGINT', async ( ) => { - module.log.info('SIGINT received'); - module.log.info('requesting shutdown...'); + async expireNetHosts ( ) { + const NOW = new Date(); + const OLDEST = new Date(Date.now() - 1000 * 60 * 2); - await module.setHostStatus('shutdown'); - const exitCode = await SitePlatform.shutdown(); - await module.setHostStatus('inactive'); + const mongoose = require('mongoose'); + const NetHost = mongoose.model('NetHost'); - process.nextTick(( ) => { - process.exit(exitCode); - }); + const hosts = await NetHost.find({ + status: { $nin: ['inactive', 'crashed'] }, + updated: { $lt: OLDEST } }); + this.log.info('expired host cleanup', { hostCount: hosts.length }); + + await SiteAsync.each(hosts, async (host) => { + try { + await NetHost.updateOne( + { _id: host._id }, + { + $set: { + updated: NOW, + status: 'crashed', + }, + }, + ); + } catch (error) { + this.log.error('failed to clean expired host', { host, error }); + } + }, 4); + } - module.log.info('ensuring host-services path exists', { path: process.env.DTP_HOST_CACHE_PATH }); - await fs.promises.mkdir(process.env.DTP_HOST_CACHE_PATH, { recursive: true }); + async start ( ) { + await super.start(); - module.networkStats = await si.networkStats('*'); + this.networkStats = await si.networkStats('*'); - module.log.info('starting cache service', { path: process.env.DTP_HOST_CACHE_PATH }); - module.cacheStats = new CacheStats(); - await module.startHostCache(process.env.DTP_HOST_CACHE_PATH); - module.log.info('cache stats at start', { stats: module.cacheStats }); + this.log.info('starting cache service', { path: process.env.DTP_HOST_CACHE_PATH }); + this.dtp.cacheStats = new CacheStats(); + await this.startHostCache(process.env.DTP_HOST_CACHE_PATH); + this.log.info('cache stats at start', { stats: this.dtp.cacheStats }); /* * Host Cache server socket setup */ const HOST_PORT = parseInt(process.env.DTP_HOST_CACHE_PORT || '8000', 10); - module.log.info('creating server UDP socket'); - module.server = dgram.createSocket('udp4', module.onHostCacheMessage); - module.log.info('binding server UDP socket', { port: HOST_PORT }); - module.server.bind(HOST_PORT); + this.log.info('creating server UDP socket'); + this.dtp.server = dgram.createSocket('udp4', this.onHostCacheMessage.bind(this)); + this.log.info('binding server UDP socket', { port: HOST_PORT }); + this.dtp.server.bind(HOST_PORT); - /* - * Site Platform startup - */ - await SitePlatform.startPlatform(module); - - module.log.info('starting transaction manager'); - module.manager = new TransactionManager(); - module.expireJob = new CronJob('*/5 * * * * *', module.expireTransactions, null, true, CRON_TIMEZONE); + this.log.info('starting transaction manager'); + this.dtp.manager = new TransactionManager(this.dtp); + this.expireJob = new CronJob('*/5 * * * * *', this.expireTransactions.bind(this), null, true, CRON_TIMEZONE); /* * Worker startup */ const cleanCronJob = process.env.DTP_HOST_CACHE_CLEAN_CRON || '*/30 * * * * *'; - module.log.info('starting host cache clean cron', { cleanCronJob }); - module.cleanupJob = new CronJob(cleanCronJob, module.cleanHostCache, null, true, CRON_TIMEZONE); + this.log.info('starting host cache clean cron', { cleanCronJob }); + this.cleanupJob = new CronJob(cleanCronJob, this.cleanHostCache.bind(this), null, true, CRON_TIMEZONE); + + this.log.info('starting stats report job'); + this.statsReportJob = new CronJob('*/5 * * * * *', this.reportHostStats.bind(this), null, true, CRON_TIMEZONE); - module.log.info('starting stats report job'); - module.statsReportJob = new CronJob('*/5 * * * * *', module.reportHostStats, null, true, CRON_TIMEZONE); + this.log.info('starting host expiration job'); + this.expireHostsJob = new CronJob('*/20 * * * * *', this.expireNetHosts.bind(this), null, true, CRON_TIMEZONE); + + this.log.info('registering host with DTP Sites platform', { }); + await this.registerHost(); + await this.setHostStatus('active'); + } +} - module.log.info('starting host expiration job'); - module.expireHostsJob = new CronJob('*/20 * * * * *', module.expireNetHosts, null, true, CRON_TIMEZONE); +(async ( ) => { + try { + + module.log.info('ensuring host-services path exists', { path: process.env.DTP_HOST_CACHE_PATH }); + await fs.promises.mkdir(process.env.DTP_HOST_CACHE_PATH, { recursive: true }); - module.log.info('registering host with DTP Sites platform', { }); - await module.registerHost(); - await module.setHostStatus('active'); + module.worker = new HostServicesWorker(module); + await module.worker.start(); module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`); } catch (error) { diff --git a/app/workers/reeeper.js b/app/workers/reeeper.js index 4dc1ca6..91f1890 100644 --- a/app/workers/reeeper.js +++ b/app/workers/reeeper.js @@ -11,8 +11,8 @@ require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') }); const mongoose = require('mongoose'); const { - SitePlatform, SiteLog, + SiteWorker, } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); const { CronJob } = require('cron'); @@ -31,61 +31,52 @@ module.config = { module.config.site = require(path.join(module.rootPath, 'config', 'site')); module.config.http = require(path.join(module.rootPath, 'config', 'http')); -module.log = new SiteLog(module, module.config.component); +class ReeeperWorker extends SiteWorker { -module.expireCrashedHosts = async ( ) => { - const NetHost = mongoose.model('NetHost'); - try { - await NetHost - .find({ status: 'crashed' }) - .select('_id hostname') - .lean() - .cursor() - .eachAsync(async (host) => { - module.log.info('deactivating crashed host', { hostname: host.hostname }); - await NetHost.updateOne({ _id: host._id }, { $set: { status: 'inactive' } }); - }); - } catch (error) { - module.log.error('failed to expire crashed hosts', { error }); + constructor (dtp) { + super(dtp, dtp.config.component); } -}; + + async start ( ) { + await super.start(); + await this.expireCrashedHosts(); // first-run the expirations + this.expireJob = new CronJob('*/5 * * * * *', this.expireCrashedHosts.bind(this), null, true, CRON_TIMEZONE); + } + + async stop ( ) { + if (this.expireJob) { + this.log.info('stopping host expire job'); + this.expireJob.stop(); + delete this.expireJob; + } + + await super.stop(); + } + + async expireCrashedHosts ( ) { + const NetHost = mongoose.model('NetHost'); + try { + await NetHost + .find({ status: 'crashed' }) + .select('_id hostname') + .lean() + .cursor() + .eachAsync(async (host) => { + this.log.info('deactivating crashed host', { hostname: host.hostname }); + await NetHost.updateOne({ _id: host._id }, { $set: { status: 'inactive' } }); + }); + } catch (error) { + this.log.error('failed to expire crashed hosts', { error }); + } + } +} (async ( ) => { try { - process.on('unhandledRejection', (error, p) => { - module.log.error('Unhandled rejection', { - error: error, - promise: p, - stack: error.stack - }); - }); - - process.on('warning', (error) => { - module.log.alert('warning', { error }); - }); - - process.once('SIGINT', async ( ) => { - module.log.info('SIGINT received'); - module.log.info('requesting shutdown...'); - const exitCode = await SitePlatform.shutdown(); - - process.nextTick(( ) => { - process.exit(exitCode); - }); - }); - - /* - * Site Platform startup - */ - await SitePlatform.startPlatform(module); - - await module.expireCrashedHosts(); // first-run the expirations - - module.expireJob = new CronJob( - '*/5 * * * * *', - module.expireCrashedHosts, - null, true, CRON_TIMEZONE, - ); + module.log = new SiteLog(module, module.config.component); + + module.worker = new ReeeperWorker(module); + await module.worker.start(); module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`); } catch (error) { diff --git a/app/workers/sample-worker.js b/app/workers/sample-worker.js index 7960069..b53b894 100644 --- a/app/workers/sample-worker.js +++ b/app/workers/sample-worker.js @@ -52,9 +52,11 @@ class SampleWorker extends SiteWorker { } async stop ( ) { - this.log.info('stopping worker job'); + this.log.info('stopping sample worker job'); this.job.stop(); delete this.job; + + await super.stop(); } async runJob ( ) { diff --git a/lib/site-service.js b/lib/site-service.js index 161cba6..c194df7 100644 --- a/lib/site-service.js +++ b/lib/site-service.js @@ -15,11 +15,11 @@ class SiteService extends SiteCommon { } async start ( ) { - this.log.debug(`starting ${this.name} service`); + this.log.debug(`starting ${this.component.name} service`); } async stop ( ) { - this.log.debug(`stopping ${this.name} service`); + this.log.debug(`stopping ${this.component.name} service`); } }