You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
657 lines
19 KiB
657 lines
19 KiB
// core-node.js
|
|
// Copyright (C) 2022 DTP Technologies, LLC
|
|
// License: Apache-2.0
|
|
|
|
'use strict';
|
|
|
|
const uuidv4 = require('uuid').v4;
|
|
const fetch = require('node-fetch'); // jshint ignore:line
|
|
|
|
const mongoose = require('mongoose');
|
|
|
|
const CoreNode = mongoose.model('CoreNode');
|
|
const CoreUser = mongoose.model('CoreUser');
|
|
|
|
const CoreNodeConnect = mongoose.model('CoreNodeConnect');
|
|
const CoreNodeRequest = mongoose.model('CoreNodeRequest');
|
|
|
|
const passport = require('passport');
|
|
const OAuth2Strategy = require('passport-oauth2');
|
|
|
|
const striptags = require('striptags');
|
|
|
|
const { SiteService, SiteError } = require('../../lib/site-lib');
|
|
|
|
class CoreAddress {
|
|
|
|
constructor (host, port) {
|
|
this.host = host;
|
|
this.port = port;
|
|
}
|
|
|
|
parse (host) {
|
|
const tokens = host.split(':');
|
|
this.host = tokens[0];
|
|
if (tokens[1]) {
|
|
this.port = parseInt(tokens[1], 10);
|
|
} else {
|
|
this.port = 443;
|
|
}
|
|
return this;
|
|
}
|
|
}
|
|
|
|
class CoreNodeService extends SiteService {
|
|
|
|
constructor (dtp) {
|
|
super(dtp, module.exports);
|
|
this.populateCoreUser = [
|
|
{
|
|
path: 'core'
|
|
},
|
|
];
|
|
this.populateCoreNodeRequest = [
|
|
{
|
|
path: 'core',
|
|
},
|
|
];
|
|
}
|
|
|
|
async start ( ) {
|
|
const cores = await this.getConnectedCores(null, true);
|
|
this.log.info('Core Node service starting', { connectedCoreCount: cores.length });
|
|
cores.forEach((core) => this.registerPassportCoreOAuth2(core));
|
|
}
|
|
|
|
async attachExpressRoutes (router) {
|
|
const cores = await this.getConnectedCores(null, true);
|
|
cores.forEach((core) => {
|
|
const coreAuthStrategyName = this.getCoreAuthStrategyName(core);
|
|
const coreAuthUri = `/core/${core._id}`;
|
|
const coreAuthCallbackUri = `/core/${core._id}/callback`;
|
|
|
|
this.log.info('attach Core Auth route', {
|
|
coreId: core._id,
|
|
name: core.meta.name,
|
|
strategy: coreAuthStrategyName,
|
|
auth: coreAuthUri,
|
|
callback: coreAuthCallbackUri,
|
|
});
|
|
router.get(
|
|
coreAuthUri,
|
|
(req, res, next) => {
|
|
this.log.debug('Core auth request', { coreAuthStrategyName, clientId: core.oauth.clientId });
|
|
return next();
|
|
},
|
|
passport.authenticate(coreAuthStrategyName),
|
|
);
|
|
|
|
router.get(
|
|
coreAuthCallbackUri,
|
|
(req, res, next) => {
|
|
this.log.debug('Core auth callback', { strategy: coreAuthStrategyName });
|
|
return next();
|
|
},
|
|
passport.authenticate(coreAuthStrategyName, { failureRedirect: '/' }),
|
|
async (req, res, next) => {
|
|
req.session.userType = 'Core';
|
|
req.session.coreId = core._id;
|
|
req.login(req.user, (error) => {
|
|
if (error) {
|
|
return next(error);
|
|
}
|
|
req.session.userType = 'Core';
|
|
req.session.coreId = core._id;
|
|
return res.redirect('/');
|
|
});
|
|
},
|
|
);
|
|
});
|
|
}
|
|
|
|
registerPassportCoreOAuth2 (core) {
|
|
const { coreNode: coreNodeService } = this.dtp.services;
|
|
const AUTH_SCHEME = coreNodeService.getCoreRequestScheme();
|
|
|
|
const coreAuthStrategyName = this.getCoreAuthStrategyName(core);
|
|
const authorizationHost = `${core.address.host}:${core.address.port}`;
|
|
const authorizationURL = `${AUTH_SCHEME}://${authorizationHost}/oauth2/authorize`;
|
|
const tokenURL = `${AUTH_SCHEME}://${authorizationHost}/oauth2/token`;
|
|
const callbackURL = `${AUTH_SCHEME}://${process.env.DTP_SITE_DOMAIN}/auth/core/${core._id}/callback`;
|
|
|
|
const coreAuthStrategy = new OAuth2Strategy(
|
|
{
|
|
authorizationURL,
|
|
tokenURL,
|
|
clientID: core.oauth.clientId.toString(),
|
|
clientSecret: core.oauth.clientSecret,
|
|
callbackURL,
|
|
},
|
|
async (accessToken, refreshToken, params, profile, cb) => {
|
|
const NOW = new Date();
|
|
try {
|
|
const coreUserId = mongoose.Types.ObjectId(params.coreUserId);
|
|
let user = await CoreUser.findOneAndUpdate(
|
|
{
|
|
core: core._id,
|
|
coreUserId,
|
|
},
|
|
{
|
|
$setOnInsert: {
|
|
created: NOW,
|
|
core: core._id,
|
|
coreUserId,
|
|
flags: {
|
|
isAdmin: false,
|
|
isModerator: false,
|
|
},
|
|
permissions: {
|
|
canLogin: true,
|
|
canChat: true,
|
|
canComment: true,
|
|
canReport: true,
|
|
},
|
|
optIn: {
|
|
system: true,
|
|
marketing: false,
|
|
},
|
|
theme: 'dtp-light',
|
|
stats: {
|
|
uniqueVisitCount: 0,
|
|
totalVisitCount: 0,
|
|
},
|
|
},
|
|
$set: {
|
|
updated: NOW,
|
|
username: params.username,
|
|
username_lc: params.username_lc,
|
|
displayName: params.displayName,
|
|
bio: params.bio,
|
|
},
|
|
},
|
|
{
|
|
upsert: true,
|
|
new: true,
|
|
},
|
|
);
|
|
user = user.toObject();
|
|
user.type = 'CoreUser';
|
|
return cb(null, user);
|
|
} catch (error) {
|
|
return cb(error);
|
|
}
|
|
},
|
|
);
|
|
|
|
this.log.info('registering Core auth strategy', {
|
|
name: coreAuthStrategyName,
|
|
host: core.address.host,
|
|
port: core.address.port,
|
|
clientID: core.oauth.clientId.toString(),
|
|
callbackURL,
|
|
});
|
|
passport.use(coreAuthStrategyName, coreAuthStrategy);
|
|
}
|
|
|
|
parseCoreAddress (host) {
|
|
const address = new CoreAddress();
|
|
return address.parse(host);
|
|
}
|
|
|
|
async getCoreById (coreNodeId) {
|
|
const core = await CoreNode
|
|
.findOne({ _id: coreNodeId })
|
|
.lean();
|
|
return core;
|
|
}
|
|
|
|
async getCoreByAddress (address) {
|
|
const core = await CoreNode
|
|
.findOne({
|
|
'address.host': address.host,
|
|
'address.port': address.port,
|
|
})
|
|
.lean();
|
|
return core;
|
|
}
|
|
|
|
/**
|
|
* First ensures that a record exists in the local database for the Core node.
|
|
* Then, calls the node's info services to resolve more metadata about the
|
|
* node, it's operation, policies, and available services.
|
|
*
|
|
* @param {String} host hostname and optional port number of Core node to be
|
|
* resolved.
|
|
* @returns CoreNode document describing the Core node.
|
|
*/
|
|
async resolveCore (host) {
|
|
const NOW = new Date();
|
|
this.log.info('resolving Core node', { host });
|
|
|
|
const address = this.parseCoreAddress(host);
|
|
let core = await this.getCoreByAddress(address);
|
|
if (!core) {
|
|
core = new CoreNode();
|
|
core.created = NOW;
|
|
core.address.host = address.host;
|
|
core.address.port = address.port;
|
|
await core.save();
|
|
core = core.toObject();
|
|
}
|
|
|
|
const txSite = await this.sendRequest(core, {
|
|
method: 'GET',
|
|
url: '/core/info/site',
|
|
});
|
|
const txPackage = await this.sendRequest(core, {
|
|
method: 'GET',
|
|
url: '/core/info/package',
|
|
});
|
|
|
|
await CoreNode.updateOne(
|
|
{ _id: core._id },
|
|
{
|
|
$set: {
|
|
updated: NOW,
|
|
'meta.name': txSite.response.site.name,
|
|
'meta.description': txSite.response.site.description,
|
|
'meta.domain': txSite.response.site.domain,
|
|
'meta.domainKey': txSite.response.site.domainKey,
|
|
'meta.version': txPackage.response.package.version,
|
|
'meta.admin': txSite.response.site.admin,
|
|
'meta.supportEmail': txSite.response.site.supportEmail,
|
|
},
|
|
},
|
|
);
|
|
|
|
core = await CoreNode.findOne({ _id: core._id }).lean();
|
|
this.log.info('resolved Core node', { core });
|
|
return { core, networkPolicy: txSite.response.site.networkPolicy };
|
|
}
|
|
|
|
async getConstellationStats ( ) {
|
|
const connectedCount = await CoreNode.countDocuments({ 'flags.isConnected': true });
|
|
const pendingCount = await CoreNode.countDocuments({ 'flags.isConnected': false });
|
|
const potentialReach = Math.round(Math.random() * 6000000);
|
|
return { connectedCount, pendingCount, potentialReach };
|
|
}
|
|
|
|
async sendKaleidoscopeEvent (event) {
|
|
const CORE_SCHEME = this.getCoreRequestScheme();
|
|
const { pkg } = this.dtp;
|
|
const { site } = this.dtp.config;
|
|
|
|
event.source = Object.assign({
|
|
pkg: { name: pkg.name, version: pkg.version },
|
|
site,
|
|
}, event.source);
|
|
|
|
if (event.emitter) {
|
|
let emitterUrl = event.emitter.coreUserId ? `/user/core/${event.emitter._id}` : `/user/${event.emitter._id}`;
|
|
event.source.emitter = {
|
|
emitterType: event.emitter.type,
|
|
emitterId: event.emitter._id.toString(),
|
|
displayName: event.emitter.displayName,
|
|
username: event.emitter.username,
|
|
href: `${CORE_SCHEME}://${site.domain}${emitterUrl}`,
|
|
};
|
|
}
|
|
|
|
const request = {
|
|
tokenized: false,
|
|
method: 'POST',
|
|
url: '/hive/kaleidoscope/event',
|
|
body: { event },
|
|
};
|
|
|
|
return this.broadcast(request);
|
|
}
|
|
|
|
async broadcast (request) {
|
|
const results = [ ];
|
|
await CoreNode
|
|
.find({
|
|
'flags.isConnected': true,
|
|
'flags.isBlocked': false,
|
|
})
|
|
.cursor()
|
|
.eachAsync(async (core) => {
|
|
try {
|
|
if (!core.kaleidoscope || !core.kaleidoscope.token) {
|
|
throw new Error('Core has not provided a Kaleidoscope token');
|
|
}
|
|
const response = await this.sendRequest(core, request);
|
|
results.push({ coreId: core._id, request, response });
|
|
} catch (error) {
|
|
this.log.error('failed to send Core Node request', { core: core._id, request: request.url, error });
|
|
}
|
|
}, 2);
|
|
return results;
|
|
}
|
|
|
|
getCoreAuthStrategyName (core) {
|
|
return `dtp:${core.meta.domainKey}`;
|
|
}
|
|
|
|
getCoreRequestScheme ( ) {
|
|
return process.env.DTP_CORE_AUTH_SCHEME || 'https';
|
|
}
|
|
|
|
getCoreRequestUrl (core, requestUrl) {
|
|
const coreScheme = this.getCoreRequestScheme();
|
|
return `${coreScheme}://${core.address.host}:${core.address.port}${requestUrl}`;
|
|
}
|
|
|
|
getLocalUrl (url) {
|
|
const CORE_SCHEME = this.getCoreRequestScheme();
|
|
const { site } = this.dtp.config;
|
|
return `${CORE_SCHEME}://${site.domain}${url}`;
|
|
}
|
|
|
|
async sendRequest (core, request) {
|
|
try {
|
|
const req = new CoreNodeRequest();
|
|
const options = {
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
},
|
|
};
|
|
|
|
if (core.kaleidoscope && core.kaleidoscope.token) {
|
|
options.headers.Authorization = `Bearer ${core.kaleidoscope.token}`;
|
|
}
|
|
|
|
req.created = new Date();
|
|
req.core = core._id;
|
|
if (request.tokenized) {
|
|
req.token = {
|
|
value: uuidv4(),
|
|
claimed: false,
|
|
};
|
|
options.headers['X-DTP-Core-Token'] = req.token.value;
|
|
}
|
|
options.method = req.method = request.method || 'GET';
|
|
req.url = request.url;
|
|
await req.save();
|
|
|
|
if (request.body) {
|
|
options.body = JSON.stringify(request.body);
|
|
}
|
|
|
|
this.log.info('sending Core node request', { request: req });
|
|
const requestUrl = this.getCoreRequestUrl(core, request.url);
|
|
const response = await fetch(requestUrl, options);
|
|
if (!response.ok) {
|
|
let json;
|
|
try {
|
|
json = await response.json();
|
|
} catch (error) {
|
|
await this.setRequestResponse(req, response);
|
|
throw new SiteError(response.status, response.statusText);
|
|
}
|
|
this.log.debug('received failure response', { json });
|
|
await this.setRequestResponse(req, response, json);
|
|
throw new SiteError(response.status, json.message || response.statusText);
|
|
}
|
|
|
|
const json = await response.json();
|
|
|
|
await this.setRequestResponse(req, response, json);
|
|
|
|
this.log.info('Core node request complete', { request: req });
|
|
return { request: req.toObject(), response: json };
|
|
} catch (error) {
|
|
this.log.error('failed to send Core Node request', { core: core._id, request: request.url, error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async setRequestResponse (request, response, json) {
|
|
const DONE = new Date();
|
|
const ELAPSED = DONE.valueOf() - request.created.valueOf();
|
|
const updateOp = {
|
|
$set: {
|
|
'response.received': DONE,
|
|
'response.elapsed': ELAPSED,
|
|
'response.statusCode': response.status,
|
|
},
|
|
};
|
|
if (json) {
|
|
updateOp.$set['response.success'] = json.success;
|
|
}
|
|
await CoreNodeRequest.updateOne({ _id: request._id }, updateOp);
|
|
}
|
|
|
|
async getCoreRequestHistory (core, pagination) {
|
|
const requests = await CoreNodeRequest
|
|
.find({ core: core._id })
|
|
.sort({ created: -1 })
|
|
.skip(pagination.skip)
|
|
.limit(pagination.cpp)
|
|
.lean();
|
|
for (const req of requests) {
|
|
req.core = core;
|
|
}
|
|
const totalRequestCount = await CoreNodeRequest.countDocuments({ core: core._id });
|
|
return { requests, totalRequestCount };
|
|
}
|
|
|
|
async connect (response) {
|
|
const request = await CoreNodeRequest
|
|
.findOne({ 'token.value': response.token })
|
|
.populate(this.populateCoreNodeRequest)
|
|
.lean();
|
|
if (!request || request.token.claimed) {
|
|
throw new SiteError(403, 'Unauthorized request');
|
|
}
|
|
|
|
this.log.info('enabling Core community', {
|
|
coreId: request.core._id,
|
|
domain: request.core.meta.domain,
|
|
});
|
|
await this.setCoreOAuth2Credentials(request.core, response.credentials);
|
|
await CoreNodeRequest.updateOne(
|
|
{ _id: request._id },
|
|
{
|
|
$set: {
|
|
'token.claimed': true,
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
async queueServiceNodeConnect (requestToken, appNode) {
|
|
const NOW = new Date();
|
|
|
|
appNode.site.domain = striptags(appNode.site.domain.trim().toLowerCase());
|
|
appNode.site.domainKey = striptags(appNode.site.domainKey.trim().toLowerCase());
|
|
appNode.site.name = striptags(appNode.site.name.trim());
|
|
appNode.site.description = striptags(appNode.site.description.trim());
|
|
appNode.site.company = striptags(appNode.site.company.trim());
|
|
appNode.site.coreAuth.scopes = appNode.site.coreAuth.scopes.map((scope) => scope.trim().toLowerCase());
|
|
appNode.site.coreAuth.callbackUrl = striptags(appNode.site.coreAuth.callbackUrl.trim());
|
|
|
|
let request = await CoreNodeConnect.findOne({
|
|
$or: [
|
|
{ domain: appNode.site.domain },
|
|
{ domainKey: appNode.site.domainKey },
|
|
],
|
|
});
|
|
if (request) {
|
|
throw new SiteError(406, 'You already have a pending Core Connect request');
|
|
}
|
|
|
|
request = new CoreNodeConnect();
|
|
request.created = NOW;
|
|
request.token = requestToken;
|
|
request.status = 'pending';
|
|
request.pkg = {
|
|
name: appNode.pkg.name,
|
|
version: appNode.pkg.version,
|
|
};
|
|
request.site = {
|
|
domain: appNode.site.domain,
|
|
domainKey: appNode.site.domainKey,
|
|
name: appNode.site.name,
|
|
description: appNode.site.description,
|
|
company: appNode.site.company,
|
|
coreAuth: {
|
|
scopes: appNode.site.coreAuth.scopes,
|
|
callbackUrl: appNode.site.coreAuth.callbackUrl,
|
|
},
|
|
};
|
|
|
|
await request.save();
|
|
|
|
return request.toObject();
|
|
}
|
|
|
|
async getServiceNodeQueue (pagination) {
|
|
const queue = await CoreNodeConnect
|
|
.find({ status: 'pending' })
|
|
.sort({ created: -1 })
|
|
.skip(pagination.skip)
|
|
.limit(pagination.cpp)
|
|
.lean();
|
|
return queue;
|
|
}
|
|
|
|
async getCoreConnectRequest (requestId) {
|
|
const request = await CoreNodeConnect
|
|
.findOne({ _id: requestId })
|
|
.lean();
|
|
return request;
|
|
}
|
|
|
|
|
|
async acceptServiceNode (requestToken, appNode) {
|
|
const { oauth2: oauth2Service } = this.dtp.services;
|
|
const response = { token: requestToken };
|
|
|
|
this.log.info('accepting app node', { requestToken, appNode });
|
|
response.client = await oauth2Service.createClient(appNode.site);
|
|
|
|
return response;
|
|
}
|
|
|
|
async setCoreOAuth2Credentials (core, credentials) {
|
|
const { client } = credentials;
|
|
this.log.info('updating Core Connect credentials', { core, client });
|
|
await CoreNode.updateOne(
|
|
{ _id: core._id },
|
|
{
|
|
$set: {
|
|
'flags.isConnected': true,
|
|
'oauth.clientId': client._id,
|
|
'oauth.clientSecret': client.secret,
|
|
'oauth.scopes': client.scopes,
|
|
'oauth.redirectUri': client.redirectUri,
|
|
'kaleidoscope.token': client.kaleidoscope.token,
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
async getConnectedCores (pagination, withOAuth = false) {
|
|
let q = CoreNode.find({ 'flags.isConnected': true });
|
|
if (!withOAuth) {
|
|
q = q.select('-oauth');
|
|
}
|
|
|
|
q = q.sort({ 'meta.domain': 1 });
|
|
if (pagination) {
|
|
q = q
|
|
.skip(pagination.skip)
|
|
.limit(pagination.cpp);
|
|
}
|
|
|
|
const cores = await q.lean();
|
|
return cores;
|
|
}
|
|
|
|
async searchUsers (search, pagination) {
|
|
const users = await CoreUser
|
|
.find(search)
|
|
.select('+flags +permissions +optIn')
|
|
.sort({ username_lc: 1 })
|
|
.skip(pagination.skip)
|
|
.limit(pagination.cpp)
|
|
.populate(this.populateCoreUser)
|
|
.lean();
|
|
|
|
return users.map((user) => {
|
|
user.type = 'CoreUser';
|
|
return user;
|
|
});
|
|
}
|
|
|
|
async getUserByLocalId (userId) {
|
|
const user = await CoreUser
|
|
.findOne({ _id: userId })
|
|
.select('+flags +permissions +optIn')
|
|
.populate(this.populateCoreUser)
|
|
.lean();
|
|
user.type = 'CoreUser';
|
|
return user;
|
|
}
|
|
|
|
async updateUserForAdmin (user, settings) {
|
|
const NOW = new Date();
|
|
|
|
if (!settings.username || !settings.username.length) {
|
|
throw new SiteError(406, 'Must include username');
|
|
}
|
|
settings.username = striptags(settings.username.trim());
|
|
settings.username_lc = settings.username.toLowerCase();
|
|
|
|
if (settings.badges) {
|
|
settings.badges = settings.badges.split(',').map((badge) => striptags(badge.trim()));
|
|
} else {
|
|
settings.badges = [ ];
|
|
}
|
|
|
|
await CoreUser.updateOne(
|
|
{ _id: user._id },
|
|
{
|
|
$set: {
|
|
updated: NOW,
|
|
|
|
username: settings.username,
|
|
username_lc: settings.username_lc,
|
|
displayName: striptags(settings.displayName.trim()),
|
|
bio: striptags(settings.bio.trim()),
|
|
badges: settings.badges,
|
|
|
|
'flags.isAdmin': settings.isAdmin === 'on',
|
|
'flags.isModerator': settings.isModerator === 'on',
|
|
'flags.isEmailVerified': settings.isEmailVerified === 'on',
|
|
|
|
'permissions.canLogin': settings.canLogin === 'on',
|
|
'permissions.canChat': settings.canChat === 'on',
|
|
'permissions.canComment': settings.canComment === 'on',
|
|
'permissions.canReport': settings.canReport === 'on',
|
|
|
|
'optIn.system': settings.optInSystem === 'on',
|
|
'optIn.marketing': settings.optInMarketing === 'on',
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
async updateUserSettings (user, settings) {
|
|
await CoreUser.updateOne(
|
|
{ _id: user._id },
|
|
{
|
|
$set: {
|
|
theme: settings.theme,
|
|
},
|
|
},
|
|
);
|
|
}
|
|
}
|
|
|
|
module.exports = {
|
|
slug: 'core-node',
|
|
name: 'coreNode',
|
|
create: (dtp) => { return new CoreNodeService(dtp); },
|
|
};
|