DTP Social Engine
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

659 lines
18 KiB

// host-services.js
// Copyright (C) 2022 DTP Technologies, LLC
// License: Apache-2.0
'use strict';
const os = require('os');
const diskusage = require('diskusage-ng');
const path = require('path');
const fs = require('fs');
const si = require('systeminformation');
require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') });
const dgram = require('dgram');
const numeral = require('numeral');
const {
SiteAsync,
SiteLog,
SiteError,
SiteWorker,
} = require(path.join(__dirname, '..', '..', 'lib', 'site-lib'));
const { CronJob } = require('cron');
const CRON_TIMEZONE = 'America/New_York';
module.rootPath = path.resolve(__dirname, '..', '..');
module.pkg = require(path.resolve(__dirname, '..', '..', 'package.json'));
module.config = {
environment: process.env.NODE_ENV,
root: module.rootPath,
component: { name: 'hostServicesWorker', slug: 'host-services-worker' },
site: require(path.join(module.rootPath, 'config', 'site')),
http: require(path.join(module.rootPath, 'config', 'http')),
};
module.log = new SiteLog(module, module.config.component);
class CacheStats {
constructor ( ) {
this.itemCount = 0;
this.dataSize = 0;
this.expireCount = 0;
this.expireDataSize = 0;
this.hitCount = 0;
this.hitDataSize = 0;
this.missCount = 0;
this.missDataSize = 0;
}
add (size) {
this.itemCount += 1;
this.dataSize += (size / 1024.0);
}
remove (size) {
this.itemCount -= 1;
this.dataSize -= size / 1024.0;
}
hit (size) {
this.hitCount += 1;
this.hitDataSize += (size / 1024.0);
}
miss (size) {
this.missCount += 1;
this.missDataSize += (size / 1024.0);
}
expire (size) {
this.expireCount += 1;
this.expireDataSize += size;
}
report ( ) {
const report = {
itemCount: this.itemCount,
dataSize: this.dataSize,
expireCount: this.expireCount,
expireDataSize: this.expireDataSize,
hitCount: this.hitCount,
hitDataSize: this.hitDataSize,
missCount: this.missCount,
missDataSize: this.missDataSize,
};
this.resetCounters();
return report;
}
resetCounters ( ) {
this.expireCount = 0;
this.expireDataSize = 0;
this.hitCount = 0;
this.hitDataSize = 0;
this.missCount = 0;
this.missDataSize = 0;
}
}
class HostCacheTransaction {
get tid ( ) { return this.message.tid; }
get cmd ( ) { return this.message.cmd; }
get params ( ) { return this.message.params; }
get address ( ) { return this.rinfo.address; }
get port ( ) { return this.rinfo.port; }
get size ( ) { return this.rinfo.size; }
constructor (dtp, message, rinfo) {
this.dtp = dtp;
this.created = Date.now(); // timestamp, not Date instance
this.component = { name: 'Host Cache Transaction', slug: 'host-cache-transaction' };
this.log = new SiteLog(dtp, this.component);
this.message = message;
this.rinfo = rinfo;
this.flags = {
isFetched: false,
isCached: false,
isResolved: false,
isError: false,
};
}
async getFile ( ) {
const { minio: minioService } = this.dtp.services;
const filePath = path.join(
process.env.DTP_HOST_CACHE_PATH,
this.params.bucket,
this.params.key,
);
const res = {
cmd: this.cmd,
success: true,
message: undefined,
file: {
stats: undefined,
path: undefined,
},
};
try {
res.file.stats = await fs.promises.stat(filePath);
if (!res.file.stats.isFile()) {
throw new SiteError(500, 'invalid object requested');
}
res.file.path = filePath;
this.flags.isCached = true;
this.dtp.cacheStats.hit(res.file.stats.size);
return this.dtp.manager.resolveTransaction(this, res);
} catch (error) {
if (error.code !== 'ENOENT') {
this.log.error('failed to stat requested object', { transaction: this, error });
res.success = false;
res.statusCode = 500;
res.message = error.message;
this.error = error;
this.flags.isError = true;
return this.dtp.manager.resolveTransaction(this, res);
}
// fall through to MinIO fetch since file not found in cache
}
try {
await minioService.downloadFile({
bucket: this.params.bucket,
key: this.params.key,
filePath,
});
res.file.path = filePath;
res.file.stats = await fs.promises.stat(filePath);
if (!res.file.stats.isFile()) {
throw new SiteError(500, 'invalid object requested');
}
this.flags.isFetched = true;
this.dtp.cacheStats.add(res.file.stats.size);
this.dtp.cacheStats.miss(res.file.stats.size);
return this.dtp.manager.resolveTransaction(this, res);
} catch (error) {
if (error.code !== 'NotFound') {
this.log.error('failed to fetch requested object from MinIO', { transaction: this, error });
res.success = false;
res.statusCode = 500;
res.message = error.message;
this.error = error;
this.flags.isError = true;
return this.dtp.manager.resolveTransaction(this, res);
}
}
res.success = false;
res.statusCode = 404;
res.message = 'Not Found';
this.error = new SiteError(404, 'Not Found');
this.flags.isError = true;
return this.dtp.manager.resolveTransaction(this, res);
}
async cancel (reason) {
const res = {
res: this.cmd,
success: false,
message: `Operation canceled: ${reason}`,
};
return this.dtp.manager.resolveTransaction(this, res);
}
async sendResponse (res) {
const NOW = Date.now();
const duration = this.duration = (NOW - this.created) / 1000.0;
const { flags } = this;
flags.isResolved = true;
const payload = { tid: this.tid, res, flags, duration };
const reply = Buffer.from(JSON.stringify(payload));
this.dtp.server.send(reply, this.port, this.address);
}
}
class TransactionManager {
constructor (dtp) {
this.dtp = dtp;
this.component = { name: 'Transaction Manager', slug: 'transaction-manager' };
this.log = new SiteLog(dtp, this.component);
this.transactions = { };
}
async addTransaction (transaction) {
if (this.hasPendingRequest(transaction)) {
this.transactions[transaction.tid] = transaction; // queue it and be done
return;
}
this.transactions[transaction.tid] = transaction; // queue it and process the command
switch (transaction.cmd) {
case 'getFile':
return transaction.getFile();
default:
break; // unknown/undefined command
}
this.log.error('invalid host-services command', {
cmd: transaction.cmd,
params: transaction.params,
from: {
address: transaction.address,
port: transaction.port,
},
});
await this.dtp.manager.cancelTransaction(transaction, 'Rejected');
}
hasPendingRequest (transaction) {
if (!transaction) { return false; }
const keys = Object.keys(this.transactions);
const match = keys.find((key) => {
const cmp = this.transactions[key];
if (!cmp) { return false; }
if (cmp.cmd !== transaction.cmd) { return false; }
if (cmp.params.bucket !== transaction.params.bucket) { return false; }
if (cmp.params.key !== transaction.params.key) { return false; }
return true;
});
return !!match;
}
async resolveTransaction (transaction, res) {
await transaction.sendResponse(res);
this.removeTransaction(transaction, 'resolved');
}
async cancelTransaction (transaction, reason) {
await transaction.cancel();
this.removeTransaction(transaction, reason);
}
removeTransaction (transaction) {
if (this.transactions[transaction.tid]) {
delete this.transactions[transaction.tid];
}
}
async expireTransactions ( ) {
const NOW = Date.now();
const keys = Object.keys(this.transactions);
let expired = 0;
await SiteAsync.each(keys, async (key) => {
const transaction = this.transactions[key];
const age = NOW - transaction.created;
if (age > (1000 * 30)) {
this.log.alert('expiring transaction', { transaction });
await this.cancelTransaction(transaction, 'expired');
++expired;
}
}, 8);
this.log.info('transaction watchdog', { expired });
}
}
class HostServicesWorker extends SiteWorker {
constructor (dtp) {
super(dtp, dtp.config.component);
}
async onHostCacheMessage (message, rinfo) {
try {
message = message.toString('utf8');
message = JSON.parse(message);
const transaction = new HostCacheTransaction(this.dtp, message, rinfo);
this.dtp.manager.addTransaction(transaction);
} catch (error) {
this.log.error('failed to receive UDP message', { message, error });
}
}
async startHostCache (basePath) {
basePath = basePath || process.env.DTP_HOST_CACHE_PATH;
const NOW = Date.now();
const dir = await fs.promises.opendir(basePath);
for await (const dirent of dir) {
if (dirent.isDirectory()) {
this.log.debug('cache start descend into directory', { name: dirent.name });
await this.startHostCache(path.join(basePath, dirent.name));
}
if (dirent.isFile()) {
const filePath = path.join(basePath, dirent.name);
const stats = await fs.promises.stat(filePath);
const age = (NOW - stats.atime.valueOf()) / 1000.0; // seconds
this.log.debug('examining file', { filePath, age: numeral(age).format('hh:mm:ss') });
if ((age / 60.0) > 60.0) {
this.log.info('expiring file', { filePath, age: numeral(age).format('hh:mm:ss') });
await fs.promises.rm(filePath, { force: true });
} else {
this.dtp.cacheStats.add(stats.size);
}
}
}
// await dir.close();
this.log.info('cache startup cleanup complete', { basePath });
}
/**
* When a file is accessed for read or otherwise, it's atime is updated. If the
* atime of a file exceeds the configured max file idle time, the file is
* removed.
* @param {String} basePath The root of the cache store on disk.
*/
async cleanHostCache (basePath) {
const NOW = Date.now(); // timestamp, not Date instance
basePath = basePath || process.env.DTP_HOST_CACHE_PATH;
this.log.info('cache directory cleanup', { path: basePath });
const dir = await fs.promises.opendir(basePath);
for await (const dirent of dir) {
if (dirent.isDirectory()) {
this.log.debug('cache clean descend into directory', { name: dirent.name });
await this.cleanHostCache(path.join(basePath, dirent.name));
}
if (dirent.isFile()) {
const filePath = path.join(basePath, dirent.name);
const stats = await fs.promises.stat(filePath);
const age = NOW - stats.atime.valueOf();
this.log.debug('examining file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') });
if ((age / 1000.0 / 60.0) > 60.0) {
this.log.info('expiring file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') });
await fs.promises.rm(filePath, { force: true });
this.dtp.cacheStats.remove(stats.size);
this.dtp.cacheStats.expire(stats.size);
}
}
}
this.log.info('cache directory cleanup complete', { basePath });
}
async expireTransactions ( ) {
await this.dtp.manager.expireTransactions();
}
async registerHost ( ) {
const NOW = new Date();
const mongoose = require('mongoose');
const NetHost = mongoose.model('NetHost');
const memory = await si.mem();
this.host = new NetHost();
this.host.created = NOW;
this.host.status = 'starting';
this.host.hostname = os.hostname();
this.host.arch = os.arch();
this.host.cpus = os.cpus().map((cpu) => {
return {
model: cpu.model,
speed: cpu.speed,
};
});
this.host.totalmem = memory.total;
this.host.freemem = memory.available;
this.host.platform = os.platform();
this.host.release = os.release();
this.host.version = os.version();
this.host.network = (await si.networkInterfaces()).map((iface) => {
return {
iface: iface.iface,
speed: iface.speed,
mac: iface.mac,
ip4: iface.ip4,
ip4subnet: iface.ip4subnet,
ip6: iface.ip6,
ip6subnet: iface.ip6subnet,
flags: {
internal: iface.internal,
virtual: iface.virtual,
},
};
});
this.log.info('host registration: network', { network: this.host.network });
await this.host.save();
this.host = this.host.toObject();
this.log.info('registered host with platform', { host: this.host._id });
return this.host;
}
async reportHostStats ( ) {
const NOW = new Date();
const mongoose = require('mongoose');
const NetHost = mongoose.model('NetHost');
const NetHostStats = mongoose.model('NetHostStats');
const memory = await si.mem();
const network = (await si.networkStats('*')).map((iface) => {
const record = { iface: iface.iface };
record.rxDropped = iface.rx_dropped;
record.rxErrors = iface.rx_errors;
record.txDropped = iface.tx_dropped;
record.txErrors = iface.tx_errors;
if (iface.ms !== 0) {
record.rxPerSecond = iface.rx_sec / (iface.ms / 1000.0);
record.txPerSecond = iface.tx_sec / (iface.ms / 1000.0);
} else {
record.rxPerSecond = 0;
record.txPerSecond = 0;
}
return record;
});
const cpus = os.cpus().map((cpu) => {
return {
user: cpu.times.user,
nice: cpu.times.nice,
sys: cpu.times.sys,
idle: cpu.times.idle,
irq: cpu.times.irq,
};
});
const load = os.loadavg();
const cache = this.dtp.cacheStats.report();
const disk = {
cache: await this.getDiskUsage(process.env.DTP_HOST_CACHE_PATH),
};
const cpuDeltas = [ ];
if (this.oldCpuStats) {
for (let idx = 0; idx < cpus.length; ++idx) {
cpuDeltas.push({
user: cpus[idx].user - this.oldCpuStats[idx].user,
nice: cpus[idx].nice - this.oldCpuStats[idx].nice,
sys: cpus[idx].sys - this.oldCpuStats[idx].sys,
idle: cpus[idx].idle - this.oldCpuStats[idx].idle,
irq: cpus[idx].irq - this.oldCpuStats[idx].irq,
});
}
} else {
cpus.forEach(( ) => {
cpuDeltas.push({
user: 0,
nice: 0,
sys: 0,
idle: 0,
irq: 0,
});
});
}
this.oldCpuStats = cpus;
await NetHostStats.create({
created: NOW,
host: this.host._id,
load,
cpus: cpuDeltas,
memory, cache, disk, network,
});
await NetHost.updateOne(
{ _id: this.host._id },
{
updated: NOW,
freemem: memory.available,
},
);
this.log.info('platform host report', { host: this.host._id, });
}
getDiskUsage (pathname) {
return new Promise((resolve, reject) => {
diskusage(pathname, (err, usage) => {
if (err) {
return reject(err);
}
usage.pctUsed = (usage.used / usage.total) * 100.0;
return resolve(usage);
});
});
}
async setHostStatus (status) {
if (!this.host) {
return;
}
const NOW = new Date();
const mongoose = require('mongoose');
const NetHost = mongoose.model('NetHost');
await NetHost.updateOne(
{ _id: this.host._id },
{
$set: {
updated: NOW,
status,
},
},
);
}
async expireNetHosts ( ) {
const NOW = new Date();
const OLDEST = new Date(Date.now() - 1000 * 60 * 2);
const mongoose = require('mongoose');
const NetHost = mongoose.model('NetHost');
const hosts = await NetHost.find({
status: { $nin: ['inactive', 'crashed'] },
updated: { $lt: OLDEST }
});
this.log.info('expired host cleanup', { hostCount: hosts.length });
await SiteAsync.each(hosts, async (host) => {
try {
await NetHost.updateOne(
{ _id: host._id },
{
$set: {
updated: NOW,
status: 'crashed',
},
},
);
} catch (error) {
this.log.error('failed to clean expired host', { host, error });
}
}, 4);
}
async start ( ) {
await super.start();
this.networkStats = await si.networkStats('*');
this.log.info('starting cache service', { path: process.env.DTP_HOST_CACHE_PATH });
this.dtp.cacheStats = new CacheStats();
await this.startHostCache(process.env.DTP_HOST_CACHE_PATH);
this.log.info('cache stats at start', { stats: this.dtp.cacheStats });
/*
* Host Cache server socket setup
*/
const HOST_PORT = parseInt(process.env.DTP_HOST_CACHE_PORT || '8000', 10);
this.log.info('creating server UDP socket');
this.dtp.server = dgram.createSocket('udp4', this.onHostCacheMessage.bind(this));
this.log.info('binding server UDP socket', { port: HOST_PORT });
this.dtp.server.bind(HOST_PORT);
this.log.info('starting transaction manager');
this.dtp.manager = new TransactionManager(this.dtp);
this.expireJob = new CronJob('*/5 * * * * *', this.expireTransactions.bind(this), null, true, CRON_TIMEZONE);
/*
* Worker startup
*/
const cleanCronJob = process.env.DTP_HOST_CACHE_CLEAN_CRON || '*/30 * * * * *';
this.log.info('starting host cache clean cron', { cleanCronJob });
this.cleanupJob = new CronJob(cleanCronJob, this.cleanHostCache.bind(this), null, true, CRON_TIMEZONE);
this.log.info('starting stats report job');
this.statsReportJob = new CronJob('*/5 * * * * *', this.reportHostStats.bind(this), null, true, CRON_TIMEZONE);
this.log.info('starting host expiration job');
this.expireHostsJob = new CronJob('*/20 * * * * *', this.expireNetHosts.bind(this), null, true, CRON_TIMEZONE);
this.log.info('registering host with DTP Sites platform', { });
await this.registerHost();
await this.setHostStatus('active');
}
}
(async ( ) => {
try {
module.log.info('ensuring host-services path exists', { path: process.env.DTP_HOST_CACHE_PATH });
await fs.promises.mkdir(process.env.DTP_HOST_CACHE_PATH, { recursive: true });
module.worker = new HostServicesWorker(module);
await module.worker.start();
module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`);
} catch (error) {
module.log.error('failed to start worker', { component: module.config.component, error });
process.exit(-1);
}
})();