Browse Source

Core and Service Node connection management (wip)

- Integrated EventEmitter2 for async event processing
- Changed jshint reporter module URL to avoid use of SSH
- Core can disconnect a service node, generates many events
- Service Node can disconnect a Core, generates many events

Nothing yet processes those events. Many things need to be cleaned up
and removed based on a Core Node disconnection. The Core itself needs to
remove all Kaleidoscope events and other data received from the Service
Node, and the Service Node needs to remove all CoreUser records (and
everything they did while there).

That's going to take a minute to implement throughout all the systems.
pull/2/head
Rob Colbert 1 year ago
parent
commit
00ab250d59
  1. 19
      app/controllers/admin/core-node.js
  2. 12
      app/models/lib/user-types.js
  3. 6
      app/models/user-block.js
  4. 98
      app/services/core-node.js
  5. 31
      app/services/user.js
  6. 17
      app/views/admin/core-node/view.pug
  7. 2
      app/views/components/library.pug
  8. 2
      app/workers/media/job/sticker-ingest.js
  9. 21
      client/js/site-admin-app.js
  10. 34
      lib/site-common.js
  11. 6
      lib/site-platform.js
  12. 3
      package.json
  13. 11
      yarn.lock

19
app/controllers/admin/core-node.js

@ -33,6 +33,8 @@ class CoreNodeController extends SiteController {
router.get('/:coreNodeId', this.getCoreNodeView.bind(this));
router.get('/', this.getIndex.bind(this));
router.delete('/:coreNodeId', this.deleteCoreNode.bind(this));
return router;
}
@ -137,6 +139,23 @@ class CoreNodeController extends SiteController {
return next(error);
}
}
async deleteCoreNode (req, res) {
const { coreNode: coreNodeService } = this.dtp.services;
try {
await coreNodeService.disconnect(res.locals.coreNode);
const displayList = this.createDisplayList('core-disconnect');
displayList.navigateTo('/admin/core-node');
res.status(200).json({ success: true, displayList });
} catch (error) {
this.log.error('failed to disconnect from Core', { error });
return res.status(error.statusCode || 500).json({
success: false,
message: error.message,
});
}
}
}
module.exports = {

12
app/models/lib/user-types.js

@ -9,20 +9,26 @@ const Schema = mongoose.Schema;
module.exports.DTP_THEME_LIST = ['dtp-light', 'dtp-dark'];
module.exports.DTP_USER_TYPE_LIST = ['CoreUser', 'User'];
module.exports.DtpUserSchema = new Schema({
userType: { type: String, enum: module.exports.DTP_USER_TYPE_LIST, required: true },
user: { type: Schema.ObjectId, required: true, index: true, refPath: 'userType' },
}, { _id: false });
module.exports.UserFlagsSchema = new Schema({
isAdmin: { type: Boolean, default: false, required: true },
isModerator: { type: Boolean, default: false, required: true },
isEmailVerified: { type: Boolean, default: false, required: true },
});
}, { _id: false });
module.exports.UserPermissionsSchema = new Schema({
canLogin: { type: Boolean, default: true, required: true },
canChat: { type: Boolean, default: true, required: true },
canComment: { type: Boolean, default: true, required: true },
canReport: { type: Boolean, default: true, required: true },
});
}, { _id: false });
module.exports.UserOptInSchema = new Schema({
system: { type: Boolean, default: true, required: true },
marketing: { type: Boolean, default: true, required: true },
});
}, { _id: false });

6
app/models/user-block.js

@ -7,9 +7,11 @@
const mongoose = require('mongoose');
const Schema = mongoose.Schema;
const { DtpUserSchema } = require('./lib/user-types.js');
const UserBlockSchema = new Schema({
user: { type: Schema.ObjectId, required: true, index: true, ref: 'User' },
blockedUsers: { type: [Schema.ObjectId], ref: 'User' },
member: { type: DtpUserSchema, required: true },
blockedMembers: { type: [DtpUserSchema] },
});
module.exports = mongoose.model('UserBlock', UserBlockSchema);

98
app/services/core-node.js

@ -61,6 +61,12 @@ class CoreNodeService extends SiteService {
async start ( ) {
await super.start();
const https = require('https');
this.httpsAgent = new https.Agent({
// read it out-loud: Reject unauthorized when not the 'local' environment.
rejectUnauthorized: (process.env.NODE_ENV !== 'local'),
});
const cores = await this.getConnectedCores(null, true);
this.log.info('Core Node service starting', { connectedCoreCount: cores.length });
@ -187,7 +193,7 @@ class CoreNodeService extends SiteService {
url: '/core/info/package',
});
await CoreNode.updateOne(
core = await CoreNode.findOneAndUpdate(
{ _id: core._id },
{
$set: {
@ -201,10 +207,11 @@ class CoreNodeService extends SiteService {
'meta.supportEmail': txSite.response.site.supportEmail,
},
},
{ new: true },
);
core = await CoreNode.findOne({ _id: core._id }).lean();
this.log.info('resolved Core node', { core });
this.emitDtpEvent('resolve', { core, host });
return { core, networkPolicy: txSite.response.site.networkPolicy };
}
@ -257,6 +264,8 @@ class CoreNodeService extends SiteService {
body: { event },
};
await this.emitDtpEvent('kaleidoscope.event', { event, recipients, request });
if (!recipients) {
return this.broadcast(request);
}
@ -303,6 +312,7 @@ class CoreNodeService extends SiteService {
async broadcast (request) {
const results = [ ];
await this.emitDtpEvent('kaleidoscope.broadcast', { request });
await CoreNode
.find({
'flags.isConnected': true,
@ -327,6 +337,7 @@ class CoreNodeService extends SiteService {
try {
const req = new CoreNodeRequest();
const options = {
agent: this.httpsAgent,
headers: {
'Content-Type': 'application/json',
},
@ -353,8 +364,10 @@ class CoreNodeService extends SiteService {
options.body = JSON.stringify(request.body);
}
this.log.info('sending Core node request', { request: req });
const requestUrl = this.getCoreRequestUrl(core, request.url);
await this.emitDtpEvent('kaleidoscope.request', { core, request, requestUrl });
this.log.info('sending Core node request', { request: req });
const response = await fetch(requestUrl, options);
if (!response.ok) {
let json;
@ -384,6 +397,10 @@ class CoreNodeService extends SiteService {
async setRequestResponse (request, response, json) {
const DONE = new Date();
const ELAPSED = DONE.valueOf() - request.created.valueOf();
/*
* Build the default update operation
*/
const updateOp = {
$set: {
'response.received': DONE,
@ -391,9 +408,20 @@ class CoreNodeService extends SiteService {
'response.statusCode': response.status,
},
};
if (json) {
updateOp.$set['response.success'] = json.success;
}
/*
* Provide an opportunity for anything to alter the operation or cancel it.
*/
await this.emitDtpEvent('kaleidoscope.response', {
core: request.core,
request, response, json,
updateOp,
});
await CoreNodeRequest.updateOne({ _id: request._id }, updateOp);
}
@ -433,6 +461,52 @@ class CoreNodeService extends SiteService {
},
},
);
await this.emitDtpEvent('connect', { core: request.core, request });
}
async disconnect (core) {
this.log.alert('disconnecting from Core', {
name: core.meta.name,
domain: core.meta.domain,
});
// provides an abort point if any listener throws
await this.emitDtpEvent('disconnect-pre', { core });
const disconnect = await this.sendRequest(core, {
method: 'DELETE',
url: `/core/connect/node/${core.oauth.clientId}`,
});
this.log.alert('Core disconnect request complete', {
name: core.meta.name,
domain: core.meta.domain,
disconnect,
});
try {
await this.emitDtpEvent('disconnect-post', { core, disconnect });
} catch (error) {
this.log.error('failed to emit dtp.core.disconnect-post', { error });
// keep going
}
await CoreUser
.find({ core: core._id })
.cursor()
.eachAsync(this.removeUser.bind(this, core), 1);
// await CoreNodeConnect.deleteMany({ 'site.domainKey': core.meta.domainKey });
// await CoreNodeRequest.deleteMany({ core: core._id });
try {
await this.emitDtpEvent('disconnect', { core, disconnect });
} catch (error) {
this.log.error('failed to emit dtp.core.disconnect', { error });
// keep going
}
return disconnect;
}
async queueServiceNodeConnect (requestToken, appNode) {
@ -478,7 +552,10 @@ class CoreNodeService extends SiteService {
await request.save();
return request.toObject();
request = request.toObject();
await this.emitDtpEvent('service-node.connect', { request });
return request;
}
async getServiceNodeQueue (pagination) {
@ -498,7 +575,6 @@ class CoreNodeService extends SiteService {
return request;
}
async acceptServiceNode (requestToken, appNode) {
const { oauth2: oauth2Service } = this.dtp.services;
const response = { token: requestToken };
@ -506,13 +582,15 @@ class CoreNodeService extends SiteService {
this.log.info('accepting app node', { requestToken, appNode });
response.client = await oauth2Service.createClient(appNode.site);
await this.emitDtpEvent('service-node.accept', { client: response.client });
return response;
}
async setCoreOAuth2Credentials (core, credentials) {
const { client } = credentials;
this.log.info('updating Core Connect credentials', { core, client });
await CoreNode.updateOne(
core = await CoreNode.findOneAndUpdate(
{ _id: core._id },
{
$set: {
@ -524,7 +602,9 @@ class CoreNodeService extends SiteService {
'kaleidoscope.token': client.kaleidoscope.token,
},
},
{ new: true },
);
await this.emitDtpEvent('set-oauth2-credentials', { core });
}
registerPassportCoreOAuth2 (core) {
@ -591,6 +671,7 @@ class CoreNodeService extends SiteService {
);
user = user.toObject();
user.type = 'CoreUser';
this.emitDtpEvent('user.login', { user });
return cb(null, user);
} catch (error) {
return cb(error);
@ -704,6 +785,11 @@ class CoreNodeService extends SiteService {
},
);
}
async removeUser (core, user) {
this.log.alert('remove Core user', { core: core.meta.name, user: user.username });
await this.emitDtpEvent('user.remove', { core, user });
}
}
module.exports = {

31
app/services/user.js

@ -649,33 +649,38 @@ class UserService extends SiteService {
await User.updateOne({ _id: user._id }, { $unset: { 'picture': '' } });
}
async blockUser (userId, blockedUserId) {
userId = mongoose.Types.ObjectId(userId);
blockedUserId = mongoose.Types.ObjectId(blockedUserId);
if (userId.equals(blockedUserId)) {
async blockUser (user, blockedUser) {
if (user._id.equals(blockedUser._id)) {
throw new SiteError(406, "You can't block yourself");
}
await UserBlock.updateOne(
{ user: userId },
{ 'member.user': user._id },
{
$addToSet: { blockedUsers: blockedUserId },
$addToSet: {
blockedMembers: {
userType: blockedUser.type,
user: blockedUser._id,
},
},
},
{ upsert: true },
);
}
async unblockUser (userId, blockedUserId) {
userId = mongoose.Types.ObjectId(userId);
blockedUserId = mongoose.Types.ObjectId(blockedUserId);
if (userId.equals(blockedUserId)) {
async unblockUser (user, blockedUser) {
if (user._id.equals(blockedUser._id)) {
throw new SiteError(406, "You can't un-block yourself");
}
await UserBlock.updateOne(
{ user: userId },
{ 'member.user': user._id },
{
$removeFromSet: { blockedUsers: blockedUserId },
$removeFromSet: {
blockedUsers: {
userType: blockedUser.type,
user: blockedUser._id,
},
},
},
{ upsert: true },
);
}

17
app/views/admin/core-node/view.pug

@ -8,20 +8,33 @@ block content
div(uk-grid).uk-grid-small.uk-flex-middle
div(class="uk-width-1-1 uk-width-expand@m")
h1(style="line-height: 1em;") Core Node
div(class="uk-width-1-1 uk-width-auto@m")
.uk-width-auto
a(href=`mailto:${coreNode.meta.supportEmail}?subject=${encodeURIComponent(`Support request from ${site.name}`)}`)
span
i.fas.fa-envelope
span.uk-margin-small-left Email Support
div(class="uk-width-1-1 uk-width-auto@m")
.uk-width-auto
span.uk-label(style="line-height: 1.75em;", class={
'uk-label-success': coreNode.flags.isConnected,
'uk-label-warning': !coreNode.flags.isConnected && !coreNode.flags.isBlocked,
'uk-label-danger': coreNode.flags.isBlocked,
}).no-select= coreNode.flags.isConnected ? 'Connected' : 'Pending'
+renderCoreNodeListItem(coreNode)
.uk-margin
button(
type="button",
data-core={ _id: coreNode._id, name: coreNode.meta.name},
onclick="return dtp.adminApp.disconnectCore(event);",
).uk-button.dtp-button-danger.uk-border-rounded
span
i.fas.fa-window-close
span.uk-margin-small-left Disconnect
.uk-margin
table.uk-table.uk-table-small
thead

2
app/views/components/library.pug

@ -46,7 +46,7 @@ include section-title
}
mixin renderCell (label, value, className)
div(title=`${label}: ${numeral(value).format('0,0')}`).uk-tile.uk-tile-default.uk-padding-remove.no-select
div(title=`${label}: ${numeral(value).format('0,0')}`).no-select
div(class=className)!= value
.uk-text-muted.uk-text-small= label

2
app/workers/media/job/sticker-ingest.js

@ -128,7 +128,7 @@ class StickerIngestJob extends SiteWorkerProcess {
throw new Error(`unsupported sticker type: ${job.data.sticker.original.type}`);
}
this.jobLog(job, 'fetching original media', { // blah 62096adcb69874552a0f87bc
this.jobLog(job, 'fetching original media', {
stickerId: job.data.sticker._id,
slug: job.data.sticker.slug,
type: job.data.sticker.original.type,

21
client/js/site-admin-app.js

@ -393,7 +393,28 @@ export default class DtpSiteAdminHostStatsApp extends DtpApp {
return false;
}
async disconnectCore (event) {
event.preventDefault();
event.stopPropagation();
const target = event.currentTarget || event.target;
const core = JSON.parse(target.getAttribute('data-core'));
try {
await UIkit.modal.confirm(`Are you sure you want to disconnect from Core "${core.name}"?`);
} catch (error) {
// canceled
return false;
}
try {
const response = await fetch(`/admin/core-node/${core._id}`, { method: 'DELETE' });
await this.processResponse(response);
} catch (error) {
UIkit.modal.alert(`Failed to disconnect from Core: ${error.message}`);
}
return false;
}
}
dtp.DtpSiteAdminHostStatsApp = DtpSiteAdminHostStatsApp;

34
lib/site-common.js

@ -12,11 +12,29 @@ const striptags = require('striptags');
const { SiteLog } = require(path.join(__dirname, 'site-log'));
const { SiteAsync } = require(path.join(__dirname, 'site-async'));
const Events = require('events');
class SiteCommon extends Events {
const EventEmitter2 = require('eventemitter2');
class SiteCommon extends EventEmitter2 {
constructor (dtp, component, options) {
// ensure valid options
options = Object.assign({ }, options);
// provide DTP's default EventEmitter2 configuration
options.emitter = Object.assign({
wildcard: false,
delimiter: '.',
newListener: false,
removeListener: false,
maxListeners: 64,
verboseMemoryLeak: process.env.NODE_ENV === 'local',
ignoreErrors: false,
}, options);
// construct the EventEmitter2 instance via super()
super(options.emitter);
constructor (dtp, component) {
super();
// store options *after* super() because you can't alter `this` prior
this.options = options;
this.dtp = dtp;
this.component = component;
@ -43,6 +61,14 @@ class SiteCommon extends Events {
}, 1);
}
getEventName (name) {
return `dtp.${this.component.slug}.${name}`;
}
async emitDtpEvent (name, params) {
await this.emitAsync(this.getEventName(name), params);
}
async getJobQueue (name) {
if (this.jobQueues[name]) {
return this.jobQueues[name];

6
lib/site-platform.js

@ -184,6 +184,12 @@ module.exports.startPlatform = async (dtp) => {
try {
module.log = new SiteLog(module, dtp.config.component);
if (process.env.NODE_ENV === 'local') {
module.log.alert('allowing self-signed certificates for host-to-host communications', { env: process.env.NODE_ENV });
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
}
dtp.config.jobQueues = require(path.join(dtp.config.root, 'config', 'job-queues'));
await module.connectDatabase(dtp);

3
package.json

@ -28,10 +28,11 @@
"diskusage-ng": "^1.0.2",
"disposable-email-provider-domains": "^1.0.9",
"dotenv": "^16.0.0",
"dtp-jshint-reporter": "ssh://[email protected]:digital-telepresence/dtp-jshint-reporter.git#master",
"dtp-jshint-reporter": "git+https://git.digitaltelepresence.com/digital-telepresence/dtp-jshint-reporter.git#master",
"ein-validator": "^1.0.1",
"email-domain-check": "^1.1.4",
"email-validator": "^2.0.4",
"eventemitter2": "^6.4.9",
"execa": "^6.1.0",
"express": "^4.17.3",
"express-limiter": "^1.6.1",

11
yarn.lock

@ -3100,9 +3100,9 @@ drange@^1.0.2:
resolved "https://registry.yarnpkg.com/drange/-/drange-1.1.1.tgz#b2aecec2aab82fcef11dbbd7b9e32b83f8f6c0b8"
integrity sha512-pYxfDYpued//QpnLIm4Avk7rsNtAtQkUES2cwAYSvD/wd2pKD71gN2Ebj3e7klzXwjocvE8c5vx/1fxwpqmSxA==
"dtp-jshint-reporter@ssh://[email protected]:digital-telepresence/dtp-jshint-reporter.git#master":
version "1.0.2"
resolved "ssh://[email protected]:digital-telepresence/dtp-jshint-reporter.git#68b078b75cd6d048a9bf9bdc9b30ccc2a2145c4f"
"dtp-jshint-reporter@git+https://git.digitaltelepresence.com/digital-telepresence/dtp-jshint-reporter.git#master":
version "1.0.4"
resolved "git+https://git.digitaltelepresence.com/digital-telepresence/dtp-jshint-reporter.git#517c2f8055140b89cd3bbfff1cdf33669b416322"
dependencies:
chalk "^4.1.1"
@ -3471,6 +3471,11 @@ [email protected], etag@^1.8.1, etag@~1.8.1:
resolved "https://registry.yarnpkg.com/etag/-/etag-1.8.1.tgz#41ae2eeb65efa62268aebfea83ac7d79299b0887"
integrity sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc=
eventemitter2@^6.4.9:
version "6.4.9"
resolved "https://registry.yarnpkg.com/eventemitter2/-/eventemitter2-6.4.9.tgz#41f2750781b4230ed58827bc119d293471ecb125"
integrity sha512-JEPTiaOt9f04oa6NOkc4aH+nVp5I3wEjpHbIPqfgCdD5v5bUzy7xQqwcVO2aDQgOWhI28da57HksMrzK9HlRxg==
eventemitter3@^4.0.0:
version "4.0.7"
resolved "https://registry.yarnpkg.com/eventemitter3/-/eventemitter3-4.0.7.tgz#2de9b68f6528d5644ef5c59526a1b4a07306169f"

Loading…
Cancel
Save