From d4588b1397f0d088c9c64e93398779aa2df36862 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 6 Dec 2021 12:30:52 -0500 Subject: [PATCH] added host-services worker from Soapbox --- app/workers/host-services.js | 655 +++++++++++++++++++++++++++++++++++ 1 file changed, 655 insertions(+) create mode 100644 app/workers/host-services.js diff --git a/app/workers/host-services.js b/app/workers/host-services.js new file mode 100644 index 0000000..6b21bc9 --- /dev/null +++ b/app/workers/host-services.js @@ -0,0 +1,655 @@ +// host-services.js +// Copyright (C) 2021 Digital Telepresence, LLC +// License: Apache-2.0 + +'use strict'; + +const os = require('os'); +const diskusage = require('diskusage-ng'); + +const path = require('path'); +const fs = require('fs'); +const si = require('systeminformation'); + +require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') }); + +const dgram = require('dgram'); +const numeral = require('numeral'); + +const { + SitePlatform, + SiteAsync, + SiteLog, + SiteError, +} = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); + +const { CronJob } = require('cron'); + +const CRON_TIMEZONE = 'America/New_York'; + +module.pkg = require(path.resolve(__dirname, '..', '..', 'package.json')); +module.config = { + componentName: 'host-services', + root: path.resolve(__dirname, '..', '..'), +}; + +module.log = new SiteLog(module, module.config.componentName); + +class CacheStats { + + constructor ( ) { + this.itemCount = 0; + this.dataSize = 0; + + this.expireCount = 0; + this.expireDataSize = 0; + + this.hitCount = 0; + this.hitDataSize = 0; + + this.missCount = 0; + this.missDataSize = 0; + } + + add (size) { + this.itemCount += 1; + this.dataSize += (size / 1024.0); + } + + remove (size) { + this.itemCount -= 1; + this.dataSize -= size / 1024.0; + } + + hit (size) { + this.hitCount += 1; + this.hitDataSize += (size / 1024.0); + } + + miss (size) { + this.missCount += 1; + this.missDataSize += (size / 1024.0); + } + + expire (size) { + this.expireCount += 1; + this.expireDataSize += size; + } + + report ( ) { + const report = { + itemCount: this.itemCount, + dataSize: this.dataSize, + + expireCount: this.expireCount, + expireDataSize: this.expireDataSize, + + hitCount: this.hitCount, + hitDataSize: this.hitDataSize, + + missCount: this.missCount, + missDataSize: this.missDataSize, + }; + + this.resetCounters(); + + return report; + } + + resetCounters ( ) { + this.expireCount = 0; + this.expireDataSize = 0; + + this.hitCount = 0; + this.hitDataSize = 0; + + this.missCount = 0; + this.missDataSize = 0; + } +} + +class HostCacheTransaction { + + get tid ( ) { return this.message.tid; } + get cmd ( ) { return this.message.cmd; } + get params ( ) { return this.message.params; } + + get address ( ) { return this.rinfo.address; } + get port ( ) { return this.rinfo.port; } + get size ( ) { return this.rinfo.size; } + + constructor (message, rinfo) { + this.created = Date.now(); // timestamp, not Date instance + + this.message = message; + this.rinfo = rinfo; + + this.flags = { + isFetched: false, + isCached: false, + isResolved: false, + isError: false, + }; + } + + async getFile ( ) { + const { minio: minioService } = module.services; + const filePath = path.join( + process.env.DTP_HOST_CACHE_PATH, + this.params.bucket, + this.params.key, + ); + const res = { + cmd: this.cmd, + success: true, + message: undefined, + file: { + stats: undefined, + path: undefined, + }, + }; + + try { + res.file.stats = await fs.promises.stat(filePath); + if (!res.file.stats.isFile()) { + throw new SiteError(500, 'invalid object requested'); + } + res.file.path = filePath; + this.flags.isCached = true; + module.cacheStats.hit(res.file.stats.size); + return module.manager.resolveTransaction(this, res); + } catch (error) { + if (error.code !== 'ENOENT') { + module.log.error('failed to stat requested object', { transaction: this, error }); + res.success = false; + res.statusCode = 500; + res.message = error.message; + this.error = error; + this.flags.isError = true; + return module.manager.resolveTransaction(this, res); + } + // fall through to MinIO fetch since file not found in cache + } + + try { + await minioService.downloadFile({ + bucket: this.params.bucket, + key: this.params.key, + filePath, + }); + res.file.path = filePath; + res.file.stats = await fs.promises.stat(filePath); + if (!res.file.stats.isFile()) { + throw new SiteError(500, 'invalid object requested'); + } + this.flags.isFetched = true; + module.cacheStats.add(res.file.stats.size); + module.cacheStats.miss(res.file.stats.size); + return module.manager.resolveTransaction(this, res); + } catch (error) { + if (error.code !== 'NotFound') { + module.log.error('failed to fetch requested object from MinIO', { transaction: this, error }); + res.success = false; + res.statusCode = 500; + res.message = error.message; + this.error = error; + this.flags.isError = true; + return module.manager.resolveTransaction(this, res); + } + } + + res.success = false; + res.statusCode = 404; + res.message = 'Not Found'; + this.error = new SiteError(404, 'Not Found'); + this.flags.isError = true; + return module.manager.resolveTransaction(this, res); + } + + async cancel (reason) { + const res = { + res: this.cmd, + success: false, + message: `Operation canceled: ${reason}`, + }; + return module.manager.resolveTransaction(this, res); + } + + async sendResponse (res) { + const NOW = Date.now(); + const duration = this.duration = (NOW - this.created) / 1000.0; + + const { flags } = this; + flags.isResolved = true; + + const payload = { tid: this.tid, res, flags, duration }; + const reply = Buffer.from(JSON.stringify(payload)); + module.server.send(reply, this.port, this.address); + } +} + +class TransactionManager { + constructor ( ) { + this.transactions = { }; + } + + async addTransaction (transaction) { + if (this.hasPendingRequest(transaction)) { + this.transactions[transaction.tid] = transaction; // queue it and be done + return; + } + + this.transactions[transaction.tid] = transaction; // queue it and process the command + switch (transaction.cmd) { + case 'getFile': + return transaction.getFile(); + + default: + break; // unknown/undefined command + } + + this.log.error('invalid host-services command', { + cmd: transaction.cmd, + params: transaction.params, + from: { + address: transaction.address, + port: transaction.port, + }, + }); + await module.manager.cancelTransaction(transaction, 'Rejected'); + } + + hasPendingRequest (transaction) { + if (!transaction) { return false; } + const keys = Object.keys(this.transactions); + const match = keys.find((key) => { + const cmp = this.transactions[key]; + if (!cmp) { return false; } + if (cmp.cmd !== transaction.cmd) { return false; } + if (cmp.params.bucket !== transaction.params.bucket) { return false; } + if (cmp.params.key !== transaction.params.key) { return false; } + return true; + }); + return !!match; + } + + async resolveTransaction (transaction, res) { + await transaction.sendResponse(res); + this.removeTransaction(transaction, 'resolved'); + } + + async cancelTransaction (transaction, reason) { + await transaction.cancel(); + this.removeTransaction(transaction, reason); + } + + removeTransaction (transaction) { + if (this.transactions[transaction.tid]) { + delete this.transactions[transaction.tid]; + } + } + + async expireTransactions ( ) { + const NOW = Date.now(); + const keys = Object.keys(this.transactions); + let expired = 0; + await SiteAsync.each(keys, async (key) => { + const transaction = this.transactions[key]; + const age = NOW - transaction.created; + if (age > (1000 * 30)) { + module.log.alert('expiring transaction', { transaction }); + await this.cancelTransaction(transaction, 'expired'); + ++expired; + } + }, 8); + module.log.info('transaction watchdog', { expired }); + } +} + +module.onHostCacheMessage = async (message, rinfo) => { + try { + message = message.toString('utf8'); + message = JSON.parse(message); + + const transaction = new HostCacheTransaction(message, rinfo); + module.manager.addTransaction(transaction); + } catch (error) { + module.log.error('failed to receive UDP message', { message, error }); + } +}; + +module.startHostCache = async (basePath) => { + basePath = basePath || process.env.DTP_HOST_CACHE_PATH; + + const NOW = Date.now(); + const dir = await fs.promises.opendir(basePath); + + for await (const dirent of dir) { + if (dirent.isDirectory()) { + module.log.debug('cache start descend into directory', { name: dirent.name }); + await module.startHostCache(path.join(basePath, dirent.name)); + } + if (dirent.isFile()) { + const filePath = path.join(basePath, dirent.name); + const stats = await fs.promises.stat(filePath); + const age = (NOW - stats.atime.valueOf()) / 1000.0; // seconds + module.log.debug('examining file', { filePath, age: numeral(age).format('hh:mm:ss') }); + if ((age / 60.0) > 60.0) { + module.log.info('expiring file', { filePath, age: numeral(age).format('hh:mm:ss') }); + await fs.promises.rm(filePath, { force: true }); + } else { + module.cacheStats.add(stats.size); + } + } + } + + // await dir.close(); + module.log.info('cache startup cleanup complete', { basePath }); +}; + +/** + * When a file is accessed for read or otherwise, it's atime is updated. If the + * atime of a file exceeds the configured max file idle time, the file is + * removed. + * @param {String} basePath + */ +module.cleanHostCache = async (basePath) => { + const NOW = Date.now(); // timestamp, not Date instance + basePath = basePath || process.env.DTP_HOST_CACHE_PATH; + module.log.info('cache directory cleanup', { path: basePath }); + + const dir = await fs.promises.opendir(basePath); + for await (const dirent of dir) { + if (dirent.isDirectory()) { + module.log.debug('cache clean descend into directory', { name: dirent.name }); + await module.cleanHostCache(path.join(basePath, dirent.name)); + } + if (dirent.isFile()) { + const filePath = path.join(basePath, dirent.name); + const stats = await fs.promises.stat(filePath); + const age = NOW - stats.atime.valueOf(); + module.log.debug('examining file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); + if ((age / 1000.0 / 60.0) > 60.0) { + module.log.info('expiring file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); + await fs.promises.rm(filePath, { force: true }); + module.cacheStats.remove(stats.size); + module.cacheStats.expire(stats.size); + } + } + } + + module.log.info('cache directory cleanup complete', { basePath }); +}; + +module.expireTransactions = async ( ) => { + await module.manager.expireTransactions(); +}; + +module.registerHost = async ( ) => { + const NOW = new Date(); + + const mongoose = require('mongoose'); + const NetHost = mongoose.model('NetHost'); + + const memory = await si.mem(); + + module.host = new NetHost(); + module.host.created = NOW; + module.host.status = 'starting'; + module.host.hostname = os.hostname(); + + module.host.arch = os.arch(); + module.host.cpus = os.cpus().map((cpu) => { + return { + model: cpu.model, + speed: cpu.speed, + }; + }); + + module.host.totalmem = memory.total; + module.host.freemem = memory.available; + + module.host.platform = os.platform(); + module.host.release = os.release(); + module.host.version = os.version(); + + module.host.network = (await si.networkInterfaces()).map((iface) => { + return { + iface: iface.iface, + speed: iface.speed, + mac: iface.mac, + ip4: iface.ip4, + ip4subnet: iface.ip4subnet, + ip6: iface.ip6, + ip6subnet: iface.ip6subnet, + flags: { + internal: iface.internal, + virtual: iface.virtual, + }, + }; + }); + module.log.info('host registration: network', { network: module.host.network }); + + await module.host.save(); + module.host = module.host.toObject(); + module.log.info('registered host with platform', { host: module.host._id }); + return module.host; +}; + +module.reportHostStats = async ( ) => { + const NOW = new Date(); + + const mongoose = require('mongoose'); + const NetHost = mongoose.model('NetHost'); + const NetHostStats = mongoose.model('NetHostStats'); + + const memory = await si.mem(); + + const network = (await si.networkStats('*')).map((iface) => { + const record = { iface: iface.iface }; + + record.rxDropped = iface.rx_dropped; + record.rxErrors = iface.rx_errors; + + record.txDropped = iface.tx_dropped; + record.txErrors = iface.tx_errors; + + if (iface.ms !== 0) { + record.rxPerSecond = iface.rx_sec / (iface.ms / 1000.0); + record.txPerSecond = iface.tx_sec / (iface.ms / 1000.0); + } else { + record.rxPerSecond = 0; + record.txPerSecond = 0; + } + + return record; + }); + + const cpus = os.cpus().map((cpu) => { + return { + user: cpu.times.user, + nice: cpu.times.nice, + sys: cpu.times.sys, + idle: cpu.times.idle, + irq: cpu.times.irq, + }; + }); + const load = os.loadavg(); + const cache = module.cacheStats.report(); + const disk = { + cache: await module.getDiskUsage(process.env.DTP_HOST_CACHE_PATH), + }; + + const cpuDeltas = [ ]; + if (module.oldCpuStats) { + for (let idx = 0; idx < cpus.length; ++idx) { + cpuDeltas.push({ + user: cpus[idx].user - module.oldCpuStats[idx].user, + nice: cpus[idx].nice - module.oldCpuStats[idx].nice, + sys: cpus[idx].sys - module.oldCpuStats[idx].sys, + idle: cpus[idx].idle - module.oldCpuStats[idx].idle, + irq: cpus[idx].irq - module.oldCpuStats[idx].irq, + }); + } + } else { + cpus.forEach(( ) => { + cpuDeltas.push({ + user: 0, + nice: 0, + sys: 0, + idle: 0, + irq: 0, + }); + }); + } + module.oldCpuStats = cpus; + + await NetHostStats.create({ + created: NOW, + host: module.host._id, + load, + cpus: cpuDeltas, + memory, cache, disk, network, + }); + await NetHost.updateOne( + { _id: module.host._id }, + { + updated: NOW, + freemem: memory.available, + }, + ); + module.log.info('platform host report', { host: module.host._id, }); +}; + +module.getDiskUsage = (pathname) => { + return new Promise((resolve, reject) => { + diskusage(pathname, (err, usage) => { + if (err) { + return reject(err); + } + usage.pctUsed = (usage.used / usage.total) * 100.0; + return resolve(usage); + }); + }); +}; + +module.setHostStatus = async (status) => { + if (!module.host) { + return; + } + + const NOW = new Date(); + const mongoose = require('mongoose'); + const NetHost = mongoose.model('NetHost'); + + await NetHost.updateOne( + { _id: module.host._id }, + { + $set: { + updated: NOW, + status, + }, + }, + ); +}; + +module.expireNetHosts = async ( ) => { + const NOW = new Date(); + const OLDEST = new Date(Date.now() - 1000 * 60 * 2); + + const mongoose = require('mongoose'); + const NetHost = mongoose.model('NetHost'); + + const hosts = await NetHost.find({ + status: { $nin: ['inactive', 'crashed'] }, + updated: { $lt: OLDEST } + }); + module.log.info('expired host cleanup', { hostCount: hosts.length }); + + await SiteAsync.each(hosts, async (host) => { + try { + await NetHost.updateOne( + { _id: host._id }, + { + $set: { + updated: NOW, + status: 'crashed', + }, + }, + ); + } catch (error) { + module.log.error('failed to clean expired host', { host, error }); + } + }, 4); +}; + +(async ( ) => { + try { + process.once('SIGINT', async ( ) => { + module.log.info('SIGINT received'); + module.log.info('requesting shutdown...'); + + await module.setHostStatus('shutdown'); + const exitCode = await SitePlatform.shutdown(); + await module.setHostStatus('inactive'); + + process.nextTick(( ) => { + process.exit(exitCode); + }); + }); + + module.log.info('ensuring host-services path exists', { path: process.env.DTP_HOST_CACHE_PATH }); + await fs.promises.mkdir(process.env.DTP_HOST_CACHE_PATH, { recursive: true }); + + module.networkStats = await si.networkStats('*'); + + module.log.info('starting cache service', { path: process.env.DTP_HOST_CACHE_PATH }); + module.cacheStats = new CacheStats(); + await module.startHostCache(process.env.DTP_HOST_CACHE_PATH); + module.log.info('cache stats at start', { stats: module.cacheStats }); + + /* + * Host Cache server socket setup + */ + module.log.info('creating server UDP socket'); + module.server = dgram.createSocket('udp4', module.onHostCacheMessage); + module.log.info('binding server UDP socket'); + module.server.bind(8000); + + /* + * Site Platform startup + */ + await SitePlatform.startPlatform(module); + module.domain = await module.services.domain.getByName(process.env.DTP_SITE_DOMAIN_KEY); + if (!module.domain) { + throw new Error(`Must define domain ${process.env.DTP_SITE_DOMAIN_KEY}`); + } + + module.log.info('starting transaction manager'); + module.manager = new TransactionManager(); + module.expireJob = new CronJob('*/5 * * * * *', module.expireTransactions, null, true, CRON_TIMEZONE); + + /* + * Worker startup + */ + const cleanCronJob = process.env.DTP_HOST_CACHE_CLEAN_CRON || '*/30 * * * * *'; + module.log.info('starting host cache clean cron', { cleanCronJob }); + module.cleanupJob = new CronJob(cleanCronJob, module.cleanHostCache, null, true, CRON_TIMEZONE); + + module.log.info('starting stats report job'); + module.statsReportJob = new CronJob('*/5 * * * * *', module.reportHostStats, null, true, CRON_TIMEZONE); + + module.log.info('starting host expiration job'); + module.expireHostsJob = new CronJob('*/20 * * * * *', module.expireNetHosts, null, true, CRON_TIMEZONE); + + module.log.info('registering host with DTP Sites platform', { }); + await module.registerHost(); + await module.setHostStatus('active'); + + module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.componentName} started`); + } catch (error) { + module.log.error('failed to start Host Cache worker', { error }); + process.exit(-1); + } + +})(); \ No newline at end of file