// host-services.js // Copyright (C) 2024 DTP Technologies, LLC // All Rights Reserved 'use strict'; import 'dotenv/config'; import os from 'node:os'; import path, { dirname } from 'path'; import fs from 'node:fs'; import diskusage from 'diskusage-ng'; import sysinfo from 'systeminformation'; import si from 'systeminformation'; import dgram from 'node:dgram'; import { SiteRuntime, SiteAsync, SiteError } from '../../lib/site-lib.js'; import { CronJob } from 'cron'; import { createRequire } from 'module'; const require = createRequire(import.meta.url); // jshint ignore:line const CRON_TIMEZONE = 'America/New_York'; class CacheStats { constructor ( ) { this.itemCount = 0; this.dataSize = 0; this.expireCount = 0; this.expireDataSize = 0; this.hitCount = 0; this.hitDataSize = 0; this.missCount = 0; this.missDataSize = 0; } add (size) { this.itemCount += 1; this.dataSize += (size / 1024.0); } remove (size) { this.itemCount -= 1; this.dataSize -= size / 1024.0; } hit (size) { this.hitCount += 1; this.hitDataSize += (size / 1024.0); } miss (size) { this.missCount += 1; this.missDataSize += (size / 1024.0); } expire (size) { this.expireCount += 1; this.expireDataSize += size; } report ( ) { const report = { itemCount: this.itemCount, dataSize: this.dataSize, expireCount: this.expireCount, expireDataSize: this.expireDataSize, hitCount: this.hitCount, hitDataSize: this.hitDataSize, missCount: this.missCount, missDataSize: this.missDataSize, }; this.resetCounters(); return report; } resetCounters ( ) { this.expireCount = 0; this.expireDataSize = 0; this.hitCount = 0; this.hitDataSize = 0; this.missCount = 0; this.missDataSize = 0; } } class HostCacheTransaction { get tid ( ) { return this.message.tid; } get cmd ( ) { return this.message.cmd; } get params ( ) { return this.message.params; } get address ( ) { return this.rinfo.address; } get port ( ) { return this.rinfo.port; } get size ( ) { return this.rinfo.size; } constructor (dtp, message, rinfo) { this.dtp = dtp; this.created = Date.now(); // timestamp, not Date instance this.message = message; this.rinfo = rinfo; this.flags = { isFetched: false, isCached: false, isResolved: false, isError: false, }; } async getFile ( ) { const { minio: minioService } = this.dtp.services; const filePath = path.join( process.env.HOST_CACHE_PATH, this.params.bucket, this.params.key, ); const res = { cmd: this.cmd, success: true, message: undefined, file: { stats: undefined, path: undefined, }, }; try { res.file.stats = await fs.promises.stat(filePath); if (!res.file.stats.isFile()) { throw new SiteError(500, 'invalid object requested'); } res.file.path = filePath; this.flags.isCached = true; this.dtp.cacheStats.hit(res.file.stats.size); return this.dtp.manager.resolveTransaction(this, res); } catch (error) { if (error.code !== 'ENOENT') { this.dtp.log.error('failed to stat requested object', { transaction: this, error }); res.success = false; res.statusCode = 500; res.message = error.message; this.error = error; this.flags.isError = true; return this.dtp.manager.resolveTransaction(this, res); } // fall through to MinIO fetch since file not found in cache } try { await minioService.downloadFile({ bucket: this.params.bucket, key: this.params.key, filePath, }); res.file.path = filePath; res.file.stats = await fs.promises.stat(filePath); if (!res.file.stats.isFile()) { throw new SiteError(500, 'invalid object requested'); } this.flags.isFetched = true; this.dtp.cacheStats.add(res.file.stats.size); this.dtp.cacheStats.miss(res.file.stats.size); return this.dtp.manager.resolveTransaction(this, res); } catch (error) { if (error.code !== 'NotFound') { this.dtp.log.error('failed to fetch requested object from MinIO', { transaction: this, error }); res.success = false; res.statusCode = 500; res.message = error.message; this.error = error; this.flags.isError = true; return this.dtp.manager.resolveTransaction(this, res); } } res.success = false; res.statusCode = 404; res.message = 'Not Found'; this.error = new SiteError(404, 'Not Found'); this.flags.isError = true; return this.dtp.manager.resolveTransaction(this, res); } /** * Will fetch the contents of a URL to local storage with a separate JSON * metadata file to describe the data file. */ async fetchUrl ( ) { const { crypto: cryptoService } = this.dtp.services; const res = { cmd: this.cmd, success: true, message: undefined, file: { stats: undefined, path: undefined, }, }; const urlHash = cryptoService.createHash(this.params.url, 'sha256'); const basePath = path.join(process.env.HOST_CACHE_PATH, 'web-resource', urlHash.slice(0, 4)); await fs.promises.mkdir(basePath, { recursive: true }); const resourceFilename = path.join(basePath, `${urlHash}.dat`); const resourceMetaFilename = path.join(basePath, `${urlHash}.json`); /* * Try first to read from local storage. If not found, proceed to fetch * and store logic below. */ try { res.file.stats = await fs.promises.stat(resourceFilename); if (!res.file.stats.isFile()) { throw new SiteError(500, 'invalid object requested'); } res.file.path = resourceFilename; res.file.meta = require(resourceMetaFilename); this.flags.isCached = true; this.dtp.cacheStats.hit(res.file.stats.size); return this.dtp.manager.resolveTransaction(this, res); } catch (error) { if (error.code !== 'ENOENT') { this.dtp.log.error('failed to stat requested object', { transaction: this, error }); res.success = false; res.statusCode = 500; res.message = error.message; this.error = error; this.flags.isError = true; return this.dtp.manager.resolveTransaction(this, res); } // fall through to HTTP fetch since file not found in cache } /* * HTTP fetch of URL to retrieve the resource from its origin source. * * It is commonly advised and good practice to operate these fetches through * an HTTP Proxy to prevent exposing your origin server IP. */ try { const response = await fetch(this.params.url); if (!response.ok) { this.error = new Error('Failed to fetch URL'); this.flags.isError = true; this.dtp.log.error(this.error.message, { transaction: this, status: response.status }); res.success = false; res.statusCode = response.status; res.message = this.error.message; return this.dtp.manager.resolveTransaction(this, res); } /* * Set up to receive binary data stream as the resource file contents. */ let contentType = response.headers.get('content-type'); let contentSize = response.headers.get('content-length'); if (contentSize) { contentSize = parseInt(contentSize, 10); } this.dtp.log.debug('writing initial meta file', { resourceMetaFilename }); await fs.promises.writeFile(resourceMetaFilename, JSON.stringify({ contentType, contentSize })); this.dtp.log.info('writing web resource file', resourceFilename); let writeStream = fs.createWriteStream(resourceFilename, { autoClose: true, encoding: 'binary', }); writeStream.on('close', async ( ) => { res.file.path = resourceFilename; res.file.stats = await fs.promises.stat(resourceFilename); if (!res.file.stats.isFile()) { throw new SiteError(500, 'invalid object requested'); } // now that it's our own file, ensure that contentSize reflects our answer contentSize = res.file.stats.size; res.file.meta = { contentType, contentSize }; this.dtp.log.debug('writing meta file', { resourceMetaFilename }); await fs.promises.writeFile(resourceMetaFilename, JSON.stringify(res.file.meta)); this.flags.isFetched = true; this.dtp.cacheStats.add(res.file.stats.size); this.dtp.cacheStats.miss(res.file.stats.size); this.dtp.manager.resolveTransaction(this, res); }); const { Readable } = await import('stream'); Readable.fromWeb(response.body).pipe(writeStream); } catch (error) { this.error = error; this.flags.isError = true; this.dtp.log.error(this.error.message, { transaction: this, error }); res.success = false; res.statusCode = error.statusCode || 500; res.message = this.error.message; return this.dtp.manager.resolveTransaction(this, res); } } async cancel (reason) { const res = { res: this.cmd, success: false, message: `Operation canceled: ${reason}`, }; return this.dtp.manager.resolveTransaction(this, res); } async sendResponse (res) { const NOW = Date.now(); const duration = this.duration = (NOW - this.created) / 1000.0; const { flags } = this; flags.isResolved = true; const payload = { tid: this.tid, res, flags, duration }; const reply = Buffer.from(JSON.stringify(payload)); this.dtp.server.send(reply, this.port, this.address); } } class TransactionManager { constructor (dtp) { this.dtp = dtp; this.transactions = { }; } async addTransaction (transaction) { if (this.hasPendingRequest(transaction)) { this.transactions[transaction.tid] = transaction; // queue it and be done return; } this.transactions[transaction.tid] = transaction; // queue it and process the command switch (transaction.cmd) { case 'getFile': return transaction.getFile(); case 'fetchUrl': return transaction.fetchUrl(); default: break; // unknown/undefined command } this.dtp.log.error('invalid host-services command', { cmd: transaction.cmd, params: transaction.params, from: { address: transaction.address, port: transaction.port, }, }); await this.dtp.manager.cancelTransaction(transaction, 'Rejected'); } hasPendingRequest (transaction) { if (!transaction) { return false; } const keys = Object.keys(this.transactions); const match = keys.find((key) => { const cmp = this.transactions[key]; if (!cmp) { return false; } if (cmp.cmd !== transaction.cmd) { return false; } if (cmp.params.bucket !== transaction.params.bucket) { return false; } if (cmp.params.key !== transaction.params.key) { return false; } return true; }); return !!match; } async resolveTransaction (transaction, res) { await transaction.sendResponse(res); this.removeTransaction(transaction, 'resolved'); const removed = [ ]; for (const key in this.transactions) { const t = this.transactions[key]; if (!t) { delete this.transactions[key]; return; } if ((transaction.params.bucket === t.params.bucket) && (transaction.params.key === t.params.key)) { await t.sendResponse(res); removed.push(t); } } for (const t of removed) { this.removeTransaction(t, 'resolved'); } } async cancelTransaction (transaction, reason) { await transaction.cancel(); this.removeTransaction(transaction, reason); } removeTransaction (transaction) { if (this.transactions[transaction.tid]) { delete this.transactions[transaction.tid]; } } async expireTransactions ( ) { const NOW = Date.now(); let expired = 0; for (const key in this.transactions) { const transaction = this.transactions[key]; const age = NOW - transaction.created; if (age > (1000 * 30)) { this.dtp.log.alert('expiring transaction', { transaction }); await this.cancelTransaction(transaction, 'expired'); ++expired; } } } } class SiteHostServices extends SiteRuntime { static get name ( ) { return 'SiteHostServices'; } static get slug ( ) { return 'hostServices'; } constructor (rootPath) { super(SiteHostServices, rootPath); } async start (basePath) { await super.start(); this.config.hostCache = { host: process.env.HOST_CACHE_HOST || 'localhost', port: parseInt(process.env.HOST_CACHE_PORT || '8000', 10), }; basePath = basePath || process.env.HOST_CACHE_PATH; this.log.info('ensuring host-services path exists', { basePath }); await fs.promises.mkdir(basePath, { recursive: true }); await this.cleanHostCache(basePath); this.networkStats = await si.networkStats('*'); this.log.info('starting cache service', { basePath }); this.cacheStats = new CacheStats(); /* * Host Cache server socket setup */ this.log.info('creating server UDP socket'); this.server = dgram.createSocket('udp4', this.onHostCacheMessage.bind(this)); this.log.info('binding server UDP socket', { port: this.config.hostCache.port, host: this.config.hostCache.host, }); this.server.bind(this.config.hostCache.port, this.config.hostCache.host); this.manager = new TransactionManager(this); this.expireJob = new CronJob( '*/5 * * * * *', this.expireTransactions.bind(this), null, true, CRON_TIMEZONE, ); const cleanCronJob = process.env.HOST_CACHE_CLEAN_CRON || '*/30 * * * * *'; this.log.info('starting host cache clean cron', { cleanCronJob }); this.cleanupJob = new CronJob( cleanCronJob, this.cleanHostCache.bind(this), null, true, CRON_TIMEZONE, ); this.log.info('starting stats report job'); this.statsReportJob = new CronJob( '*/5 * * * * *', this.reportHostStats.bind(this), null, true, CRON_TIMEZONE, ); this.log.info('starting host expiration job'); this.expireHostsJob = new CronJob( '*/20 * * * * *', this.expireNetHosts.bind(this), null, true, CRON_TIMEZONE, ); this.log.info('registering host with Site platform'); await this.registerHost(); await this.setHostStatus('active'); this.log.info(`${this.config.pkg.name} v${this.config.pkg.version} ${SiteHostServices.name} started`); } async shutdown ( ) { await this.setHostStatus('shutdown'); } async onHostCacheMessage (message, rinfo) { try { message = message.toString('utf8'); message = JSON.parse(message); const transaction = new HostCacheTransaction(this, message, rinfo); this.manager.addTransaction(transaction); } catch (error) { this.log.error('failed to receive UDP message', { message, error }); } } /** * When a file is accessed for read or otherwise, it's atime is updated. If the * atime of a file exceeds the configured max file idle time, the file is * removed. * @param {String} basePath */ async cleanHostCache (basePath) { const NOW = Date.now(); // timestamp, not Date instance basePath = basePath || process.env.HOST_CACHE_PATH; const dir = await fs.promises.opendir(basePath); for await (const dirent of dir) { if (dirent.isDirectory()) { await this.cleanHostCache(path.join(basePath, dirent.name)); } if (dirent.isFile()) { const filePath = path.join(basePath, dirent.name); const stats = await fs.promises.stat(filePath); const age = NOW - stats.atime.valueOf(); if ((age / 1000.0 / 60.0) > 60.0) { await fs.promises.rm(filePath, { force: true }); this.cacheStats.remove(stats.size); this.cacheStats.expire(stats.size); } } } } async expireTransactions ( ) { await this.manager.expireTransactions(); } async registerHost ( ) { const NOW = new Date(); const mongoose = await import('mongoose'); const NetHost = mongoose.model('NetHost'); const memory = await si.mem(); this.host = new NetHost(); this.host.created = NOW; this.host.status = 'starting'; this.host.hostname = os.hostname(); this.host.arch = os.arch(); this.host.cpus = os.cpus().map((cpu) => { return { model: cpu.model, speed: cpu.speed, }; }); this.host.totalmem = memory.total; this.host.freemem = memory.available; this.host.node = process.version; this.host.platform = os.platform(); this.host.release = os.release(); this.host.version = os.version(); this.host.network = (await si.networkInterfaces()).map((iface) => { return { iface: iface.iface, speed: iface.speed, mac: iface.mac, ip4: iface.ip4, ip4subnet: iface.ip4subnet, ip6: iface.ip6, ip6subnet: iface.ip6subnet, flags: { internal: iface.internal, virtual: iface.virtual, }, }; }); await this.host.save(); this.host = this.host.toObject(); return this.host; } async reportHostStats ( ) { const NOW = new Date(); const mongoose = await import('mongoose'); const NetHost = mongoose.model('NetHost'); const NetHostStats = mongoose.model('NetHostStats'); const memory = await this.reportMemoryInformation(); const network = await this.reportNetworkInformation(); const load = this.reportLoadAvg(); const cache = this.cacheStats.report(); const disk = { cache: await this.reportDiskUsage(process.env.HOST_CACHE_PATH), }; const newCpuTimes = this.reportCpuTimes(); const cpuDeltas = this.reportCpuTimeDeltas(this.oldCpuTimes, newCpuTimes); this.oldCpuTimes = newCpuTimes; await NetHostStats.create({ created: NOW, host: this.host._id, load, cpus: cpuDeltas, memory, cache, disk, network, }); await NetHost.updateOne( { _id: this.host._id }, { updated: NOW, freemem: memory.available, }, ); } async setHostStatus (status) { if (!this.host) { return; } const NOW = new Date(); const mongoose = await import('mongoose'); const NetHost = mongoose.model('NetHost'); await NetHost.updateOne( { _id: this.host._id }, { $set: { updated: NOW, status, }, }, ); } async expireNetHosts ( ) { const NOW = new Date(); const OLDEST = new Date(Date.now() - 1000 * 60 * 2); const mongoose = await import('mongoose'); const NetHost = mongoose.model('NetHost'); const hosts = await NetHost.find({ status: { $nin: ['inactive', 'crashed'] }, updated: { $lt: OLDEST } }); await SiteAsync.each(hosts, async (host) => { try { await NetHost.updateOne( { _id: host._id }, { $set: { updated: NOW, status: 'crashed', }, }, ); } catch (error) { this.log.error('failed to clean expired host', { host, error }); } }, 4); } async reportMemoryInformation ( ) { return sysinfo.mem(); } async reportNetworkInformation ( ) { return (await sysinfo.networkStats('*')).map((iface) => { const record = { iface: iface.iface }; record.rxDropped = iface.rx_dropped; record.rxErrors = iface.rx_errors; record.txDropped = iface.tx_dropped; record.txErrors = iface.tx_errors; if (iface.ms !== 0) { record.rxPerSecond = iface.rx_sec / (iface.ms / 1000.0); record.txPerSecond = iface.tx_sec / (iface.ms / 1000.0); } else { record.rxPerSecond = 0; record.txPerSecond = 0; } return record; }); } reportDiskUsage (pathname) { return new Promise((resolve, reject) => { diskusage(pathname, (err, usage) => { if (err) { return reject(err); } usage.pctUsed = (usage.used / usage.total) * 100.0; return resolve(usage); }); }); } reportLoadAvg ( ) { return os.loadavg(); } reportCpuTimes ( ) { return os .cpus() .map((cpu) => { return { user: cpu.times.user, nice: cpu.times.nice, sys: cpu.times.sys, idle: cpu.times.idle, irq: cpu.times.irq, }; }); } reportCpuTimeDeltas (oldCpuStats, newCpuStats) { const cpuDeltas = [ ]; if (oldCpuStats) { for (let idx = 0; idx < newCpuStats.length; ++idx) { cpuDeltas.push({ user: newCpuStats[idx].user - oldCpuStats[idx].user, nice: newCpuStats[idx].nice - oldCpuStats[idx].nice, sys: newCpuStats[idx].sys - oldCpuStats[idx].sys, idle: newCpuStats[idx].idle - oldCpuStats[idx].idle, irq: newCpuStats[idx].irq - oldCpuStats[idx].irq, }); } } else { newCpuStats.forEach(( ) => { cpuDeltas.push({ user: 0, nice: 0, sys: 0, idle: 0, irq: 0, }); }); } return cpuDeltas; } } (async ( ) => { try { const { fileURLToPath } = await import('node:url'); const __dirname = dirname(fileURLToPath(import.meta.url)); // jshint ignore:line const worker = new SiteHostServices(path.resolve(__dirname, '..', '..')); await worker.start(); } catch (error) { console.error('failed to start Host Cache worker', { error }); process.exit(-1); } })();