From 00ab250d599620f4dc22547be92d551384dde98a Mon Sep 17 00:00:00 2001 From: rob Date: Fri, 4 Nov 2022 20:31:52 -0400 Subject: [PATCH] Core and Service Node connection management (wip) - Integrated EventEmitter2 for async event processing - Changed jshint reporter module URL to avoid use of SSH - Core can disconnect a service node, generates many events - Service Node can disconnect a Core, generates many events Nothing yet processes those events. Many things need to be cleaned up and removed based on a Core Node disconnection. The Core itself needs to remove all Kaleidoscope events and other data received from the Service Node, and the Service Node needs to remove all CoreUser records (and everything they did while there). That's going to take a minute to implement throughout all the systems. --- app/controllers/admin/core-node.js | 19 +++++ app/models/lib/user-types.js | 12 ++- app/models/user-block.js | 6 +- app/services/core-node.js | 98 +++++++++++++++++++++++-- app/services/user.js | 31 ++++---- app/views/admin/core-node/view.pug | 17 ++++- app/views/components/library.pug | 2 +- app/workers/media/job/sticker-ingest.js | 2 +- client/js/site-admin-app.js | 21 ++++++ lib/site-common.js | 34 ++++++++- lib/site-platform.js | 6 ++ package.json | 3 +- yarn.lock | 11 ++- 13 files changed, 226 insertions(+), 36 deletions(-) diff --git a/app/controllers/admin/core-node.js b/app/controllers/admin/core-node.js index 2ae9816..fd7cdc9 100644 --- a/app/controllers/admin/core-node.js +++ b/app/controllers/admin/core-node.js @@ -33,6 +33,8 @@ class CoreNodeController extends SiteController { router.get('/:coreNodeId', this.getCoreNodeView.bind(this)); router.get('/', this.getIndex.bind(this)); + router.delete('/:coreNodeId', this.deleteCoreNode.bind(this)); + return router; } @@ -137,6 +139,23 @@ class CoreNodeController extends SiteController { return next(error); } } + + async deleteCoreNode (req, res) { + const { coreNode: coreNodeService } = this.dtp.services; + try { + await coreNodeService.disconnect(res.locals.coreNode); + + const displayList = this.createDisplayList('core-disconnect'); + displayList.navigateTo('/admin/core-node'); + res.status(200).json({ success: true, displayList }); + } catch (error) { + this.log.error('failed to disconnect from Core', { error }); + return res.status(error.statusCode || 500).json({ + success: false, + message: error.message, + }); + } + } } module.exports = { diff --git a/app/models/lib/user-types.js b/app/models/lib/user-types.js index ffbbad8..a5fce6f 100644 --- a/app/models/lib/user-types.js +++ b/app/models/lib/user-types.js @@ -9,20 +9,26 @@ const Schema = mongoose.Schema; module.exports.DTP_THEME_LIST = ['dtp-light', 'dtp-dark']; +module.exports.DTP_USER_TYPE_LIST = ['CoreUser', 'User']; +module.exports.DtpUserSchema = new Schema({ + userType: { type: String, enum: module.exports.DTP_USER_TYPE_LIST, required: true }, + user: { type: Schema.ObjectId, required: true, index: true, refPath: 'userType' }, +}, { _id: false }); + module.exports.UserFlagsSchema = new Schema({ isAdmin: { type: Boolean, default: false, required: true }, isModerator: { type: Boolean, default: false, required: true }, isEmailVerified: { type: Boolean, default: false, required: true }, -}); +}, { _id: false }); module.exports.UserPermissionsSchema = new Schema({ canLogin: { type: Boolean, default: true, required: true }, canChat: { type: Boolean, default: true, required: true }, canComment: { type: Boolean, default: true, required: true }, canReport: { type: Boolean, default: true, required: true }, -}); +}, { _id: false }); module.exports.UserOptInSchema = new Schema({ system: { type: Boolean, default: true, required: true }, marketing: { type: Boolean, default: true, required: true }, -}); \ No newline at end of file +}, { _id: false }); \ No newline at end of file diff --git a/app/models/user-block.js b/app/models/user-block.js index 2046267..f6396e1 100644 --- a/app/models/user-block.js +++ b/app/models/user-block.js @@ -7,9 +7,11 @@ const mongoose = require('mongoose'); const Schema = mongoose.Schema; +const { DtpUserSchema } = require('./lib/user-types.js'); + const UserBlockSchema = new Schema({ - user: { type: Schema.ObjectId, required: true, index: true, ref: 'User' }, - blockedUsers: { type: [Schema.ObjectId], ref: 'User' }, + member: { type: DtpUserSchema, required: true }, + blockedMembers: { type: [DtpUserSchema] }, }); module.exports = mongoose.model('UserBlock', UserBlockSchema); \ No newline at end of file diff --git a/app/services/core-node.js b/app/services/core-node.js index b8cc039..8f545d5 100644 --- a/app/services/core-node.js +++ b/app/services/core-node.js @@ -61,6 +61,12 @@ class CoreNodeService extends SiteService { async start ( ) { await super.start(); + const https = require('https'); + this.httpsAgent = new https.Agent({ + // read it out-loud: Reject unauthorized when not the 'local' environment. + rejectUnauthorized: (process.env.NODE_ENV !== 'local'), + }); + const cores = await this.getConnectedCores(null, true); this.log.info('Core Node service starting', { connectedCoreCount: cores.length }); @@ -187,7 +193,7 @@ class CoreNodeService extends SiteService { url: '/core/info/package', }); - await CoreNode.updateOne( + core = await CoreNode.findOneAndUpdate( { _id: core._id }, { $set: { @@ -201,10 +207,11 @@ class CoreNodeService extends SiteService { 'meta.supportEmail': txSite.response.site.supportEmail, }, }, + { new: true }, ); - core = await CoreNode.findOne({ _id: core._id }).lean(); this.log.info('resolved Core node', { core }); + this.emitDtpEvent('resolve', { core, host }); return { core, networkPolicy: txSite.response.site.networkPolicy }; } @@ -257,6 +264,8 @@ class CoreNodeService extends SiteService { body: { event }, }; + await this.emitDtpEvent('kaleidoscope.event', { event, recipients, request }); + if (!recipients) { return this.broadcast(request); } @@ -303,6 +312,7 @@ class CoreNodeService extends SiteService { async broadcast (request) { const results = [ ]; + await this.emitDtpEvent('kaleidoscope.broadcast', { request }); await CoreNode .find({ 'flags.isConnected': true, @@ -327,6 +337,7 @@ class CoreNodeService extends SiteService { try { const req = new CoreNodeRequest(); const options = { + agent: this.httpsAgent, headers: { 'Content-Type': 'application/json', }, @@ -353,8 +364,10 @@ class CoreNodeService extends SiteService { options.body = JSON.stringify(request.body); } - this.log.info('sending Core node request', { request: req }); const requestUrl = this.getCoreRequestUrl(core, request.url); + await this.emitDtpEvent('kaleidoscope.request', { core, request, requestUrl }); + + this.log.info('sending Core node request', { request: req }); const response = await fetch(requestUrl, options); if (!response.ok) { let json; @@ -384,6 +397,10 @@ class CoreNodeService extends SiteService { async setRequestResponse (request, response, json) { const DONE = new Date(); const ELAPSED = DONE.valueOf() - request.created.valueOf(); + + /* + * Build the default update operation + */ const updateOp = { $set: { 'response.received': DONE, @@ -391,9 +408,20 @@ class CoreNodeService extends SiteService { 'response.statusCode': response.status, }, }; + if (json) { updateOp.$set['response.success'] = json.success; } + + /* + * Provide an opportunity for anything to alter the operation or cancel it. + */ + await this.emitDtpEvent('kaleidoscope.response', { + core: request.core, + request, response, json, + updateOp, + }); + await CoreNodeRequest.updateOne({ _id: request._id }, updateOp); } @@ -433,6 +461,52 @@ class CoreNodeService extends SiteService { }, }, ); + + await this.emitDtpEvent('connect', { core: request.core, request }); + } + + async disconnect (core) { + this.log.alert('disconnecting from Core', { + name: core.meta.name, + domain: core.meta.domain, + }); + + // provides an abort point if any listener throws + await this.emitDtpEvent('disconnect-pre', { core }); + + const disconnect = await this.sendRequest(core, { + method: 'DELETE', + url: `/core/connect/node/${core.oauth.clientId}`, + }); + this.log.alert('Core disconnect request complete', { + name: core.meta.name, + domain: core.meta.domain, + disconnect, + }); + + try { + await this.emitDtpEvent('disconnect-post', { core, disconnect }); + } catch (error) { + this.log.error('failed to emit dtp.core.disconnect-post', { error }); + // keep going + } + + await CoreUser + .find({ core: core._id }) + .cursor() + .eachAsync(this.removeUser.bind(this, core), 1); + + // await CoreNodeConnect.deleteMany({ 'site.domainKey': core.meta.domainKey }); + // await CoreNodeRequest.deleteMany({ core: core._id }); + + try { + await this.emitDtpEvent('disconnect', { core, disconnect }); + } catch (error) { + this.log.error('failed to emit dtp.core.disconnect', { error }); + // keep going + } + + return disconnect; } async queueServiceNodeConnect (requestToken, appNode) { @@ -478,7 +552,10 @@ class CoreNodeService extends SiteService { await request.save(); - return request.toObject(); + request = request.toObject(); + await this.emitDtpEvent('service-node.connect', { request }); + + return request; } async getServiceNodeQueue (pagination) { @@ -498,7 +575,6 @@ class CoreNodeService extends SiteService { return request; } - async acceptServiceNode (requestToken, appNode) { const { oauth2: oauth2Service } = this.dtp.services; const response = { token: requestToken }; @@ -506,13 +582,15 @@ class CoreNodeService extends SiteService { this.log.info('accepting app node', { requestToken, appNode }); response.client = await oauth2Service.createClient(appNode.site); + await this.emitDtpEvent('service-node.accept', { client: response.client }); + return response; } async setCoreOAuth2Credentials (core, credentials) { const { client } = credentials; this.log.info('updating Core Connect credentials', { core, client }); - await CoreNode.updateOne( + core = await CoreNode.findOneAndUpdate( { _id: core._id }, { $set: { @@ -524,7 +602,9 @@ class CoreNodeService extends SiteService { 'kaleidoscope.token': client.kaleidoscope.token, }, }, + { new: true }, ); + await this.emitDtpEvent('set-oauth2-credentials', { core }); } registerPassportCoreOAuth2 (core) { @@ -591,6 +671,7 @@ class CoreNodeService extends SiteService { ); user = user.toObject(); user.type = 'CoreUser'; + this.emitDtpEvent('user.login', { user }); return cb(null, user); } catch (error) { return cb(error); @@ -704,6 +785,11 @@ class CoreNodeService extends SiteService { }, ); } + + async removeUser (core, user) { + this.log.alert('remove Core user', { core: core.meta.name, user: user.username }); + await this.emitDtpEvent('user.remove', { core, user }); + } } module.exports = { diff --git a/app/services/user.js b/app/services/user.js index c41ba30..76d28a7 100644 --- a/app/services/user.js +++ b/app/services/user.js @@ -649,33 +649,38 @@ class UserService extends SiteService { await User.updateOne({ _id: user._id }, { $unset: { 'picture': '' } }); } - async blockUser (userId, blockedUserId) { - userId = mongoose.Types.ObjectId(userId); - blockedUserId = mongoose.Types.ObjectId(blockedUserId); - if (userId.equals(blockedUserId)) { + async blockUser (user, blockedUser) { + if (user._id.equals(blockedUser._id)) { throw new SiteError(406, "You can't block yourself"); } await UserBlock.updateOne( - { user: userId }, + { 'member.user': user._id }, { - $addToSet: { blockedUsers: blockedUserId }, + $addToSet: { + blockedMembers: { + userType: blockedUser.type, + user: blockedUser._id, + }, + }, }, { upsert: true }, ); } - async unblockUser (userId, blockedUserId) { - userId = mongoose.Types.ObjectId(userId); - blockedUserId = mongoose.Types.ObjectId(blockedUserId); - if (userId.equals(blockedUserId)) { + async unblockUser (user, blockedUser) { + if (user._id.equals(blockedUser._id)) { throw new SiteError(406, "You can't un-block yourself"); } await UserBlock.updateOne( - { user: userId }, + { 'member.user': user._id }, { - $removeFromSet: { blockedUsers: blockedUserId }, + $removeFromSet: { + blockedUsers: { + userType: blockedUser.type, + user: blockedUser._id, + }, + }, }, - { upsert: true }, ); } diff --git a/app/views/admin/core-node/view.pug b/app/views/admin/core-node/view.pug index a728eaa..414e279 100644 --- a/app/views/admin/core-node/view.pug +++ b/app/views/admin/core-node/view.pug @@ -8,20 +8,33 @@ block content div(uk-grid).uk-grid-small.uk-flex-middle div(class="uk-width-1-1 uk-width-expand@m") h1(style="line-height: 1em;") Core Node - div(class="uk-width-1-1 uk-width-auto@m") + + .uk-width-auto a(href=`mailto:${coreNode.meta.supportEmail}?subject=${encodeURIComponent(`Support request from ${site.name}`)}`) span i.fas.fa-envelope span.uk-margin-small-left Email Support - div(class="uk-width-1-1 uk-width-auto@m") + + .uk-width-auto span.uk-label(style="line-height: 1.75em;", class={ 'uk-label-success': coreNode.flags.isConnected, 'uk-label-warning': !coreNode.flags.isConnected && !coreNode.flags.isBlocked, 'uk-label-danger': coreNode.flags.isBlocked, }).no-select= coreNode.flags.isConnected ? 'Connected' : 'Pending' + +renderCoreNodeListItem(coreNode) + .uk-margin + button( + type="button", + data-core={ _id: coreNode._id, name: coreNode.meta.name}, + onclick="return dtp.adminApp.disconnectCore(event);", + ).uk-button.dtp-button-danger.uk-border-rounded + span + i.fas.fa-window-close + span.uk-margin-small-left Disconnect + .uk-margin table.uk-table.uk-table-small thead diff --git a/app/views/components/library.pug b/app/views/components/library.pug index e53b6f9..ac1c65c 100644 --- a/app/views/components/library.pug +++ b/app/views/components/library.pug @@ -46,7 +46,7 @@ include section-title } mixin renderCell (label, value, className) - div(title=`${label}: ${numeral(value).format('0,0')}`).uk-tile.uk-tile-default.uk-padding-remove.no-select + div(title=`${label}: ${numeral(value).format('0,0')}`).no-select div(class=className)!= value .uk-text-muted.uk-text-small= label diff --git a/app/workers/media/job/sticker-ingest.js b/app/workers/media/job/sticker-ingest.js index c4f6fcb..599f8c1 100644 --- a/app/workers/media/job/sticker-ingest.js +++ b/app/workers/media/job/sticker-ingest.js @@ -128,7 +128,7 @@ class StickerIngestJob extends SiteWorkerProcess { throw new Error(`unsupported sticker type: ${job.data.sticker.original.type}`); } - this.jobLog(job, 'fetching original media', { // blah 62096adcb69874552a0f87bc + this.jobLog(job, 'fetching original media', { stickerId: job.data.sticker._id, slug: job.data.sticker.slug, type: job.data.sticker.original.type, diff --git a/client/js/site-admin-app.js b/client/js/site-admin-app.js index a18aba2..39d62f5 100644 --- a/client/js/site-admin-app.js +++ b/client/js/site-admin-app.js @@ -393,7 +393,28 @@ export default class DtpSiteAdminHostStatsApp extends DtpApp { return false; } + async disconnectCore (event) { + event.preventDefault(); + event.stopPropagation(); + const target = event.currentTarget || event.target; + const core = JSON.parse(target.getAttribute('data-core')); + + try { + await UIkit.modal.confirm(`Are you sure you want to disconnect from Core "${core.name}"?`); + } catch (error) { + // canceled + return false; + } + + try { + const response = await fetch(`/admin/core-node/${core._id}`, { method: 'DELETE' }); + await this.processResponse(response); + } catch (error) { + UIkit.modal.alert(`Failed to disconnect from Core: ${error.message}`); + } + return false; + } } dtp.DtpSiteAdminHostStatsApp = DtpSiteAdminHostStatsApp; \ No newline at end of file diff --git a/lib/site-common.js b/lib/site-common.js index 8c00f31..78a1788 100644 --- a/lib/site-common.js +++ b/lib/site-common.js @@ -12,11 +12,29 @@ const striptags = require('striptags'); const { SiteLog } = require(path.join(__dirname, 'site-log')); const { SiteAsync } = require(path.join(__dirname, 'site-async')); -const Events = require('events'); -class SiteCommon extends Events { +const EventEmitter2 = require('eventemitter2'); +class SiteCommon extends EventEmitter2 { + + constructor (dtp, component, options) { + // ensure valid options + options = Object.assign({ }, options); + + // provide DTP's default EventEmitter2 configuration + options.emitter = Object.assign({ + wildcard: false, + delimiter: '.', + newListener: false, + removeListener: false, + maxListeners: 64, + verboseMemoryLeak: process.env.NODE_ENV === 'local', + ignoreErrors: false, + }, options); + + // construct the EventEmitter2 instance via super() + super(options.emitter); - constructor (dtp, component) { - super(); + // store options *after* super() because you can't alter `this` prior + this.options = options; this.dtp = dtp; this.component = component; @@ -43,6 +61,14 @@ class SiteCommon extends Events { }, 1); } + getEventName (name) { + return `dtp.${this.component.slug}.${name}`; + } + + async emitDtpEvent (name, params) { + await this.emitAsync(this.getEventName(name), params); + } + async getJobQueue (name) { if (this.jobQueues[name]) { return this.jobQueues[name]; diff --git a/lib/site-platform.js b/lib/site-platform.js index 2b108f7..7847d65 100644 --- a/lib/site-platform.js +++ b/lib/site-platform.js @@ -184,6 +184,12 @@ module.exports.startPlatform = async (dtp) => { try { module.log = new SiteLog(module, dtp.config.component); + + if (process.env.NODE_ENV === 'local') { + module.log.alert('allowing self-signed certificates for host-to-host communications', { env: process.env.NODE_ENV }); + process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; + } + dtp.config.jobQueues = require(path.join(dtp.config.root, 'config', 'job-queues')); await module.connectDatabase(dtp); diff --git a/package.json b/package.json index 24a66b2..8e0d5d1 100644 --- a/package.json +++ b/package.json @@ -28,10 +28,11 @@ "diskusage-ng": "^1.0.2", "disposable-email-provider-domains": "^1.0.9", "dotenv": "^16.0.0", - "dtp-jshint-reporter": "ssh://git@git.digitaltelepresence.com:digital-telepresence/dtp-jshint-reporter.git#master", + "dtp-jshint-reporter": "git+https://git.digitaltelepresence.com/digital-telepresence/dtp-jshint-reporter.git#master", "ein-validator": "^1.0.1", "email-domain-check": "^1.1.4", "email-validator": "^2.0.4", + "eventemitter2": "^6.4.9", "execa": "^6.1.0", "express": "^4.17.3", "express-limiter": "^1.6.1", diff --git a/yarn.lock b/yarn.lock index d3318d0..dd5d577 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3100,9 +3100,9 @@ drange@^1.0.2: resolved "https://registry.yarnpkg.com/drange/-/drange-1.1.1.tgz#b2aecec2aab82fcef11dbbd7b9e32b83f8f6c0b8" integrity sha512-pYxfDYpued//QpnLIm4Avk7rsNtAtQkUES2cwAYSvD/wd2pKD71gN2Ebj3e7klzXwjocvE8c5vx/1fxwpqmSxA== -"dtp-jshint-reporter@ssh://git@git.digitaltelepresence.com:digital-telepresence/dtp-jshint-reporter.git#master": - version "1.0.2" - resolved "ssh://git@git.digitaltelepresence.com:digital-telepresence/dtp-jshint-reporter.git#68b078b75cd6d048a9bf9bdc9b30ccc2a2145c4f" +"dtp-jshint-reporter@git+https://git.digitaltelepresence.com/digital-telepresence/dtp-jshint-reporter.git#master": + version "1.0.4" + resolved "git+https://git.digitaltelepresence.com/digital-telepresence/dtp-jshint-reporter.git#517c2f8055140b89cd3bbfff1cdf33669b416322" dependencies: chalk "^4.1.1" @@ -3471,6 +3471,11 @@ etag@1.8.1, etag@^1.8.1, etag@~1.8.1: resolved "https://registry.yarnpkg.com/etag/-/etag-1.8.1.tgz#41ae2eeb65efa62268aebfea83ac7d79299b0887" integrity sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc= +eventemitter2@^6.4.9: + version "6.4.9" + resolved "https://registry.yarnpkg.com/eventemitter2/-/eventemitter2-6.4.9.tgz#41f2750781b4230ed58827bc119d293471ecb125" + integrity sha512-JEPTiaOt9f04oa6NOkc4aH+nVp5I3wEjpHbIPqfgCdD5v5bUzy7xQqwcVO2aDQgOWhI28da57HksMrzK9HlRxg== + eventemitter3@^4.0.0: version "4.0.7" resolved "https://registry.yarnpkg.com/eventemitter3/-/eventemitter3-4.0.7.tgz#2de9b68f6528d5644ef5c59526a1b4a07306169f"