1 changed files with 655 additions and 0 deletions
@ -0,0 +1,655 @@ |
|||
// host-services.js
|
|||
// Copyright (C) 2021 Digital Telepresence, LLC
|
|||
// License: Apache-2.0
|
|||
|
|||
'use strict'; |
|||
|
|||
const os = require('os'); |
|||
const diskusage = require('diskusage-ng'); |
|||
|
|||
const path = require('path'); |
|||
const fs = require('fs'); |
|||
const si = require('systeminformation'); |
|||
|
|||
require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') }); |
|||
|
|||
const dgram = require('dgram'); |
|||
const numeral = require('numeral'); |
|||
|
|||
const { |
|||
SitePlatform, |
|||
SiteAsync, |
|||
SiteLog, |
|||
SiteError, |
|||
} = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); |
|||
|
|||
const { CronJob } = require('cron'); |
|||
|
|||
const CRON_TIMEZONE = 'America/New_York'; |
|||
|
|||
module.pkg = require(path.resolve(__dirname, '..', '..', 'package.json')); |
|||
module.config = { |
|||
componentName: 'host-services', |
|||
root: path.resolve(__dirname, '..', '..'), |
|||
}; |
|||
|
|||
module.log = new SiteLog(module, module.config.componentName); |
|||
|
|||
class CacheStats { |
|||
|
|||
constructor ( ) { |
|||
this.itemCount = 0; |
|||
this.dataSize = 0; |
|||
|
|||
this.expireCount = 0; |
|||
this.expireDataSize = 0; |
|||
|
|||
this.hitCount = 0; |
|||
this.hitDataSize = 0; |
|||
|
|||
this.missCount = 0; |
|||
this.missDataSize = 0; |
|||
} |
|||
|
|||
add (size) { |
|||
this.itemCount += 1; |
|||
this.dataSize += (size / 1024.0); |
|||
} |
|||
|
|||
remove (size) { |
|||
this.itemCount -= 1; |
|||
this.dataSize -= size / 1024.0; |
|||
} |
|||
|
|||
hit (size) { |
|||
this.hitCount += 1; |
|||
this.hitDataSize += (size / 1024.0); |
|||
} |
|||
|
|||
miss (size) { |
|||
this.missCount += 1; |
|||
this.missDataSize += (size / 1024.0); |
|||
} |
|||
|
|||
expire (size) { |
|||
this.expireCount += 1; |
|||
this.expireDataSize += size; |
|||
} |
|||
|
|||
report ( ) { |
|||
const report = { |
|||
itemCount: this.itemCount, |
|||
dataSize: this.dataSize, |
|||
|
|||
expireCount: this.expireCount, |
|||
expireDataSize: this.expireDataSize, |
|||
|
|||
hitCount: this.hitCount, |
|||
hitDataSize: this.hitDataSize, |
|||
|
|||
missCount: this.missCount, |
|||
missDataSize: this.missDataSize, |
|||
}; |
|||
|
|||
this.resetCounters(); |
|||
|
|||
return report; |
|||
} |
|||
|
|||
resetCounters ( ) { |
|||
this.expireCount = 0; |
|||
this.expireDataSize = 0; |
|||
|
|||
this.hitCount = 0; |
|||
this.hitDataSize = 0; |
|||
|
|||
this.missCount = 0; |
|||
this.missDataSize = 0; |
|||
} |
|||
} |
|||
|
|||
class HostCacheTransaction { |
|||
|
|||
get tid ( ) { return this.message.tid; } |
|||
get cmd ( ) { return this.message.cmd; } |
|||
get params ( ) { return this.message.params; } |
|||
|
|||
get address ( ) { return this.rinfo.address; } |
|||
get port ( ) { return this.rinfo.port; } |
|||
get size ( ) { return this.rinfo.size; } |
|||
|
|||
constructor (message, rinfo) { |
|||
this.created = Date.now(); // timestamp, not Date instance
|
|||
|
|||
this.message = message; |
|||
this.rinfo = rinfo; |
|||
|
|||
this.flags = { |
|||
isFetched: false, |
|||
isCached: false, |
|||
isResolved: false, |
|||
isError: false, |
|||
}; |
|||
} |
|||
|
|||
async getFile ( ) { |
|||
const { minio: minioService } = module.services; |
|||
const filePath = path.join( |
|||
process.env.DTP_HOST_CACHE_PATH, |
|||
this.params.bucket, |
|||
this.params.key, |
|||
); |
|||
const res = { |
|||
cmd: this.cmd, |
|||
success: true, |
|||
message: undefined, |
|||
file: { |
|||
stats: undefined, |
|||
path: undefined, |
|||
}, |
|||
}; |
|||
|
|||
try { |
|||
res.file.stats = await fs.promises.stat(filePath); |
|||
if (!res.file.stats.isFile()) { |
|||
throw new SiteError(500, 'invalid object requested'); |
|||
} |
|||
res.file.path = filePath; |
|||
this.flags.isCached = true; |
|||
module.cacheStats.hit(res.file.stats.size); |
|||
return module.manager.resolveTransaction(this, res); |
|||
} catch (error) { |
|||
if (error.code !== 'ENOENT') { |
|||
module.log.error('failed to stat requested object', { transaction: this, error }); |
|||
res.success = false; |
|||
res.statusCode = 500; |
|||
res.message = error.message; |
|||
this.error = error; |
|||
this.flags.isError = true; |
|||
return module.manager.resolveTransaction(this, res); |
|||
} |
|||
// fall through to MinIO fetch since file not found in cache
|
|||
} |
|||
|
|||
try { |
|||
await minioService.downloadFile({ |
|||
bucket: this.params.bucket, |
|||
key: this.params.key, |
|||
filePath, |
|||
}); |
|||
res.file.path = filePath; |
|||
res.file.stats = await fs.promises.stat(filePath); |
|||
if (!res.file.stats.isFile()) { |
|||
throw new SiteError(500, 'invalid object requested'); |
|||
} |
|||
this.flags.isFetched = true; |
|||
module.cacheStats.add(res.file.stats.size); |
|||
module.cacheStats.miss(res.file.stats.size); |
|||
return module.manager.resolveTransaction(this, res); |
|||
} catch (error) { |
|||
if (error.code !== 'NotFound') { |
|||
module.log.error('failed to fetch requested object from MinIO', { transaction: this, error }); |
|||
res.success = false; |
|||
res.statusCode = 500; |
|||
res.message = error.message; |
|||
this.error = error; |
|||
this.flags.isError = true; |
|||
return module.manager.resolveTransaction(this, res); |
|||
} |
|||
} |
|||
|
|||
res.success = false; |
|||
res.statusCode = 404; |
|||
res.message = 'Not Found'; |
|||
this.error = new SiteError(404, 'Not Found'); |
|||
this.flags.isError = true; |
|||
return module.manager.resolveTransaction(this, res); |
|||
} |
|||
|
|||
async cancel (reason) { |
|||
const res = { |
|||
res: this.cmd, |
|||
success: false, |
|||
message: `Operation canceled: ${reason}`, |
|||
}; |
|||
return module.manager.resolveTransaction(this, res); |
|||
} |
|||
|
|||
async sendResponse (res) { |
|||
const NOW = Date.now(); |
|||
const duration = this.duration = (NOW - this.created) / 1000.0; |
|||
|
|||
const { flags } = this; |
|||
flags.isResolved = true; |
|||
|
|||
const payload = { tid: this.tid, res, flags, duration }; |
|||
const reply = Buffer.from(JSON.stringify(payload)); |
|||
module.server.send(reply, this.port, this.address); |
|||
} |
|||
} |
|||
|
|||
class TransactionManager { |
|||
constructor ( ) { |
|||
this.transactions = { }; |
|||
} |
|||
|
|||
async addTransaction (transaction) { |
|||
if (this.hasPendingRequest(transaction)) { |
|||
this.transactions[transaction.tid] = transaction; // queue it and be done
|
|||
return; |
|||
} |
|||
|
|||
this.transactions[transaction.tid] = transaction; // queue it and process the command
|
|||
switch (transaction.cmd) { |
|||
case 'getFile': |
|||
return transaction.getFile(); |
|||
|
|||
default: |
|||
break; // unknown/undefined command
|
|||
} |
|||
|
|||
this.log.error('invalid host-services command', { |
|||
cmd: transaction.cmd, |
|||
params: transaction.params, |
|||
from: { |
|||
address: transaction.address, |
|||
port: transaction.port, |
|||
}, |
|||
}); |
|||
await module.manager.cancelTransaction(transaction, 'Rejected'); |
|||
} |
|||
|
|||
hasPendingRequest (transaction) { |
|||
if (!transaction) { return false; } |
|||
const keys = Object.keys(this.transactions); |
|||
const match = keys.find((key) => { |
|||
const cmp = this.transactions[key]; |
|||
if (!cmp) { return false; } |
|||
if (cmp.cmd !== transaction.cmd) { return false; } |
|||
if (cmp.params.bucket !== transaction.params.bucket) { return false; } |
|||
if (cmp.params.key !== transaction.params.key) { return false; } |
|||
return true; |
|||
}); |
|||
return !!match; |
|||
} |
|||
|
|||
async resolveTransaction (transaction, res) { |
|||
await transaction.sendResponse(res); |
|||
this.removeTransaction(transaction, 'resolved'); |
|||
} |
|||
|
|||
async cancelTransaction (transaction, reason) { |
|||
await transaction.cancel(); |
|||
this.removeTransaction(transaction, reason); |
|||
} |
|||
|
|||
removeTransaction (transaction) { |
|||
if (this.transactions[transaction.tid]) { |
|||
delete this.transactions[transaction.tid]; |
|||
} |
|||
} |
|||
|
|||
async expireTransactions ( ) { |
|||
const NOW = Date.now(); |
|||
const keys = Object.keys(this.transactions); |
|||
let expired = 0; |
|||
await SiteAsync.each(keys, async (key) => { |
|||
const transaction = this.transactions[key]; |
|||
const age = NOW - transaction.created; |
|||
if (age > (1000 * 30)) { |
|||
module.log.alert('expiring transaction', { transaction }); |
|||
await this.cancelTransaction(transaction, 'expired'); |
|||
++expired; |
|||
} |
|||
}, 8); |
|||
module.log.info('transaction watchdog', { expired }); |
|||
} |
|||
} |
|||
|
|||
module.onHostCacheMessage = async (message, rinfo) => { |
|||
try { |
|||
message = message.toString('utf8'); |
|||
message = JSON.parse(message); |
|||
|
|||
const transaction = new HostCacheTransaction(message, rinfo); |
|||
module.manager.addTransaction(transaction); |
|||
} catch (error) { |
|||
module.log.error('failed to receive UDP message', { message, error }); |
|||
} |
|||
}; |
|||
|
|||
module.startHostCache = async (basePath) => { |
|||
basePath = basePath || process.env.DTP_HOST_CACHE_PATH; |
|||
|
|||
const NOW = Date.now(); |
|||
const dir = await fs.promises.opendir(basePath); |
|||
|
|||
for await (const dirent of dir) { |
|||
if (dirent.isDirectory()) { |
|||
module.log.debug('cache start descend into directory', { name: dirent.name }); |
|||
await module.startHostCache(path.join(basePath, dirent.name)); |
|||
} |
|||
if (dirent.isFile()) { |
|||
const filePath = path.join(basePath, dirent.name); |
|||
const stats = await fs.promises.stat(filePath); |
|||
const age = (NOW - stats.atime.valueOf()) / 1000.0; // seconds
|
|||
module.log.debug('examining file', { filePath, age: numeral(age).format('hh:mm:ss') }); |
|||
if ((age / 60.0) > 60.0) { |
|||
module.log.info('expiring file', { filePath, age: numeral(age).format('hh:mm:ss') }); |
|||
await fs.promises.rm(filePath, { force: true }); |
|||
} else { |
|||
module.cacheStats.add(stats.size); |
|||
} |
|||
} |
|||
} |
|||
|
|||
// await dir.close();
|
|||
module.log.info('cache startup cleanup complete', { basePath }); |
|||
}; |
|||
|
|||
/** |
|||
* When a file is accessed for read or otherwise, it's atime is updated. If the |
|||
* atime of a file exceeds the configured max file idle time, the file is |
|||
* removed. |
|||
* @param {String} basePath |
|||
*/ |
|||
module.cleanHostCache = async (basePath) => { |
|||
const NOW = Date.now(); // timestamp, not Date instance
|
|||
basePath = basePath || process.env.DTP_HOST_CACHE_PATH; |
|||
module.log.info('cache directory cleanup', { path: basePath }); |
|||
|
|||
const dir = await fs.promises.opendir(basePath); |
|||
for await (const dirent of dir) { |
|||
if (dirent.isDirectory()) { |
|||
module.log.debug('cache clean descend into directory', { name: dirent.name }); |
|||
await module.cleanHostCache(path.join(basePath, dirent.name)); |
|||
} |
|||
if (dirent.isFile()) { |
|||
const filePath = path.join(basePath, dirent.name); |
|||
const stats = await fs.promises.stat(filePath); |
|||
const age = NOW - stats.atime.valueOf(); |
|||
module.log.debug('examining file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); |
|||
if ((age / 1000.0 / 60.0) > 60.0) { |
|||
module.log.info('expiring file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); |
|||
await fs.promises.rm(filePath, { force: true }); |
|||
module.cacheStats.remove(stats.size); |
|||
module.cacheStats.expire(stats.size); |
|||
} |
|||
} |
|||
} |
|||
|
|||
module.log.info('cache directory cleanup complete', { basePath }); |
|||
}; |
|||
|
|||
module.expireTransactions = async ( ) => { |
|||
await module.manager.expireTransactions(); |
|||
}; |
|||
|
|||
module.registerHost = async ( ) => { |
|||
const NOW = new Date(); |
|||
|
|||
const mongoose = require('mongoose'); |
|||
const NetHost = mongoose.model('NetHost'); |
|||
|
|||
const memory = await si.mem(); |
|||
|
|||
module.host = new NetHost(); |
|||
module.host.created = NOW; |
|||
module.host.status = 'starting'; |
|||
module.host.hostname = os.hostname(); |
|||
|
|||
module.host.arch = os.arch(); |
|||
module.host.cpus = os.cpus().map((cpu) => { |
|||
return { |
|||
model: cpu.model, |
|||
speed: cpu.speed, |
|||
}; |
|||
}); |
|||
|
|||
module.host.totalmem = memory.total; |
|||
module.host.freemem = memory.available; |
|||
|
|||
module.host.platform = os.platform(); |
|||
module.host.release = os.release(); |
|||
module.host.version = os.version(); |
|||
|
|||
module.host.network = (await si.networkInterfaces()).map((iface) => { |
|||
return { |
|||
iface: iface.iface, |
|||
speed: iface.speed, |
|||
mac: iface.mac, |
|||
ip4: iface.ip4, |
|||
ip4subnet: iface.ip4subnet, |
|||
ip6: iface.ip6, |
|||
ip6subnet: iface.ip6subnet, |
|||
flags: { |
|||
internal: iface.internal, |
|||
virtual: iface.virtual, |
|||
}, |
|||
}; |
|||
}); |
|||
module.log.info('host registration: network', { network: module.host.network }); |
|||
|
|||
await module.host.save(); |
|||
module.host = module.host.toObject(); |
|||
module.log.info('registered host with platform', { host: module.host._id }); |
|||
return module.host; |
|||
}; |
|||
|
|||
module.reportHostStats = async ( ) => { |
|||
const NOW = new Date(); |
|||
|
|||
const mongoose = require('mongoose'); |
|||
const NetHost = mongoose.model('NetHost'); |
|||
const NetHostStats = mongoose.model('NetHostStats'); |
|||
|
|||
const memory = await si.mem(); |
|||
|
|||
const network = (await si.networkStats('*')).map((iface) => { |
|||
const record = { iface: iface.iface }; |
|||
|
|||
record.rxDropped = iface.rx_dropped; |
|||
record.rxErrors = iface.rx_errors; |
|||
|
|||
record.txDropped = iface.tx_dropped; |
|||
record.txErrors = iface.tx_errors; |
|||
|
|||
if (iface.ms !== 0) { |
|||
record.rxPerSecond = iface.rx_sec / (iface.ms / 1000.0); |
|||
record.txPerSecond = iface.tx_sec / (iface.ms / 1000.0); |
|||
} else { |
|||
record.rxPerSecond = 0; |
|||
record.txPerSecond = 0; |
|||
} |
|||
|
|||
return record; |
|||
}); |
|||
|
|||
const cpus = os.cpus().map((cpu) => { |
|||
return { |
|||
user: cpu.times.user, |
|||
nice: cpu.times.nice, |
|||
sys: cpu.times.sys, |
|||
idle: cpu.times.idle, |
|||
irq: cpu.times.irq, |
|||
}; |
|||
}); |
|||
const load = os.loadavg(); |
|||
const cache = module.cacheStats.report(); |
|||
const disk = { |
|||
cache: await module.getDiskUsage(process.env.DTP_HOST_CACHE_PATH), |
|||
}; |
|||
|
|||
const cpuDeltas = [ ]; |
|||
if (module.oldCpuStats) { |
|||
for (let idx = 0; idx < cpus.length; ++idx) { |
|||
cpuDeltas.push({ |
|||
user: cpus[idx].user - module.oldCpuStats[idx].user, |
|||
nice: cpus[idx].nice - module.oldCpuStats[idx].nice, |
|||
sys: cpus[idx].sys - module.oldCpuStats[idx].sys, |
|||
idle: cpus[idx].idle - module.oldCpuStats[idx].idle, |
|||
irq: cpus[idx].irq - module.oldCpuStats[idx].irq, |
|||
}); |
|||
} |
|||
} else { |
|||
cpus.forEach(( ) => { |
|||
cpuDeltas.push({ |
|||
user: 0, |
|||
nice: 0, |
|||
sys: 0, |
|||
idle: 0, |
|||
irq: 0, |
|||
}); |
|||
}); |
|||
} |
|||
module.oldCpuStats = cpus; |
|||
|
|||
await NetHostStats.create({ |
|||
created: NOW, |
|||
host: module.host._id, |
|||
load, |
|||
cpus: cpuDeltas, |
|||
memory, cache, disk, network, |
|||
}); |
|||
await NetHost.updateOne( |
|||
{ _id: module.host._id }, |
|||
{ |
|||
updated: NOW, |
|||
freemem: memory.available, |
|||
}, |
|||
); |
|||
module.log.info('platform host report', { host: module.host._id, }); |
|||
}; |
|||
|
|||
module.getDiskUsage = (pathname) => { |
|||
return new Promise((resolve, reject) => { |
|||
diskusage(pathname, (err, usage) => { |
|||
if (err) { |
|||
return reject(err); |
|||
} |
|||
usage.pctUsed = (usage.used / usage.total) * 100.0; |
|||
return resolve(usage); |
|||
}); |
|||
}); |
|||
}; |
|||
|
|||
module.setHostStatus = async (status) => { |
|||
if (!module.host) { |
|||
return; |
|||
} |
|||
|
|||
const NOW = new Date(); |
|||
const mongoose = require('mongoose'); |
|||
const NetHost = mongoose.model('NetHost'); |
|||
|
|||
await NetHost.updateOne( |
|||
{ _id: module.host._id }, |
|||
{ |
|||
$set: { |
|||
updated: NOW, |
|||
status, |
|||
}, |
|||
}, |
|||
); |
|||
}; |
|||
|
|||
module.expireNetHosts = async ( ) => { |
|||
const NOW = new Date(); |
|||
const OLDEST = new Date(Date.now() - 1000 * 60 * 2); |
|||
|
|||
const mongoose = require('mongoose'); |
|||
const NetHost = mongoose.model('NetHost'); |
|||
|
|||
const hosts = await NetHost.find({ |
|||
status: { $nin: ['inactive', 'crashed'] }, |
|||
updated: { $lt: OLDEST } |
|||
}); |
|||
module.log.info('expired host cleanup', { hostCount: hosts.length }); |
|||
|
|||
await SiteAsync.each(hosts, async (host) => { |
|||
try { |
|||
await NetHost.updateOne( |
|||
{ _id: host._id }, |
|||
{ |
|||
$set: { |
|||
updated: NOW, |
|||
status: 'crashed', |
|||
}, |
|||
}, |
|||
); |
|||
} catch (error) { |
|||
module.log.error('failed to clean expired host', { host, error }); |
|||
} |
|||
}, 4); |
|||
}; |
|||
|
|||
(async ( ) => { |
|||
try { |
|||
process.once('SIGINT', async ( ) => { |
|||
module.log.info('SIGINT received'); |
|||
module.log.info('requesting shutdown...'); |
|||
|
|||
await module.setHostStatus('shutdown'); |
|||
const exitCode = await SitePlatform.shutdown(); |
|||
await module.setHostStatus('inactive'); |
|||
|
|||
process.nextTick(( ) => { |
|||
process.exit(exitCode); |
|||
}); |
|||
}); |
|||
|
|||
module.log.info('ensuring host-services path exists', { path: process.env.DTP_HOST_CACHE_PATH }); |
|||
await fs.promises.mkdir(process.env.DTP_HOST_CACHE_PATH, { recursive: true }); |
|||
|
|||
module.networkStats = await si.networkStats('*'); |
|||
|
|||
module.log.info('starting cache service', { path: process.env.DTP_HOST_CACHE_PATH }); |
|||
module.cacheStats = new CacheStats(); |
|||
await module.startHostCache(process.env.DTP_HOST_CACHE_PATH); |
|||
module.log.info('cache stats at start', { stats: module.cacheStats }); |
|||
|
|||
/* |
|||
* Host Cache server socket setup |
|||
*/ |
|||
module.log.info('creating server UDP socket'); |
|||
module.server = dgram.createSocket('udp4', module.onHostCacheMessage); |
|||
module.log.info('binding server UDP socket'); |
|||
module.server.bind(8000); |
|||
|
|||
/* |
|||
* Site Platform startup |
|||
*/ |
|||
await SitePlatform.startPlatform(module); |
|||
module.domain = await module.services.domain.getByName(process.env.DTP_SITE_DOMAIN_KEY); |
|||
if (!module.domain) { |
|||
throw new Error(`Must define domain ${process.env.DTP_SITE_DOMAIN_KEY}`); |
|||
} |
|||
|
|||
module.log.info('starting transaction manager'); |
|||
module.manager = new TransactionManager(); |
|||
module.expireJob = new CronJob('*/5 * * * * *', module.expireTransactions, null, true, CRON_TIMEZONE); |
|||
|
|||
/* |
|||
* Worker startup |
|||
*/ |
|||
const cleanCronJob = process.env.DTP_HOST_CACHE_CLEAN_CRON || '*/30 * * * * *'; |
|||
module.log.info('starting host cache clean cron', { cleanCronJob }); |
|||
module.cleanupJob = new CronJob(cleanCronJob, module.cleanHostCache, null, true, CRON_TIMEZONE); |
|||
|
|||
module.log.info('starting stats report job'); |
|||
module.statsReportJob = new CronJob('*/5 * * * * *', module.reportHostStats, null, true, CRON_TIMEZONE); |
|||
|
|||
module.log.info('starting host expiration job'); |
|||
module.expireHostsJob = new CronJob('*/20 * * * * *', module.expireNetHosts, null, true, CRON_TIMEZONE); |
|||
|
|||
module.log.info('registering host with DTP Sites platform', { }); |
|||
await module.registerHost(); |
|||
await module.setHostStatus('active'); |
|||
|
|||
module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.componentName} started`); |
|||
} catch (error) { |
|||
module.log.error('failed to start Host Cache worker', { error }); |
|||
process.exit(-1); |
|||
} |
|||
|
|||
})(); |
Loading…
Reference in new issue