|
|
@ -61,6 +61,12 @@ class CoreNodeService extends SiteService { |
|
|
|
async start ( ) { |
|
|
|
await super.start(); |
|
|
|
|
|
|
|
const https = require('https'); |
|
|
|
this.httpsAgent = new https.Agent({ |
|
|
|
// read it out-loud: Reject unauthorized when not the 'local' environment.
|
|
|
|
rejectUnauthorized: (process.env.NODE_ENV !== 'local'), |
|
|
|
}); |
|
|
|
|
|
|
|
const cores = await this.getConnectedCores(null, true); |
|
|
|
|
|
|
|
this.log.info('Core Node service starting', { connectedCoreCount: cores.length }); |
|
|
@ -187,7 +193,7 @@ class CoreNodeService extends SiteService { |
|
|
|
url: '/core/info/package', |
|
|
|
}); |
|
|
|
|
|
|
|
await CoreNode.updateOne( |
|
|
|
core = await CoreNode.findOneAndUpdate( |
|
|
|
{ _id: core._id }, |
|
|
|
{ |
|
|
|
$set: { |
|
|
@ -201,10 +207,11 @@ class CoreNodeService extends SiteService { |
|
|
|
'meta.supportEmail': txSite.response.site.supportEmail, |
|
|
|
}, |
|
|
|
}, |
|
|
|
{ new: true }, |
|
|
|
); |
|
|
|
|
|
|
|
core = await CoreNode.findOne({ _id: core._id }).lean(); |
|
|
|
this.log.info('resolved Core node', { core }); |
|
|
|
this.emitDtpEvent('resolve', { core, host }); |
|
|
|
return { core, networkPolicy: txSite.response.site.networkPolicy }; |
|
|
|
} |
|
|
|
|
|
|
@ -257,6 +264,8 @@ class CoreNodeService extends SiteService { |
|
|
|
body: { event }, |
|
|
|
}; |
|
|
|
|
|
|
|
await this.emitDtpEvent('kaleidoscope.event', { event, recipients, request }); |
|
|
|
|
|
|
|
if (!recipients) { |
|
|
|
return this.broadcast(request); |
|
|
|
} |
|
|
@ -303,6 +312,7 @@ class CoreNodeService extends SiteService { |
|
|
|
|
|
|
|
async broadcast (request) { |
|
|
|
const results = [ ]; |
|
|
|
await this.emitDtpEvent('kaleidoscope.broadcast', { request }); |
|
|
|
await CoreNode |
|
|
|
.find({ |
|
|
|
'flags.isConnected': true, |
|
|
@ -327,6 +337,7 @@ class CoreNodeService extends SiteService { |
|
|
|
try { |
|
|
|
const req = new CoreNodeRequest(); |
|
|
|
const options = { |
|
|
|
agent: this.httpsAgent, |
|
|
|
headers: { |
|
|
|
'Content-Type': 'application/json', |
|
|
|
}, |
|
|
@ -353,8 +364,10 @@ class CoreNodeService extends SiteService { |
|
|
|
options.body = JSON.stringify(request.body); |
|
|
|
} |
|
|
|
|
|
|
|
this.log.info('sending Core node request', { request: req }); |
|
|
|
const requestUrl = this.getCoreRequestUrl(core, request.url); |
|
|
|
await this.emitDtpEvent('kaleidoscope.request', { core, request, requestUrl }); |
|
|
|
|
|
|
|
this.log.info('sending Core node request', { request: req }); |
|
|
|
const response = await fetch(requestUrl, options); |
|
|
|
if (!response.ok) { |
|
|
|
let json; |
|
|
@ -384,6 +397,10 @@ class CoreNodeService extends SiteService { |
|
|
|
async setRequestResponse (request, response, json) { |
|
|
|
const DONE = new Date(); |
|
|
|
const ELAPSED = DONE.valueOf() - request.created.valueOf(); |
|
|
|
|
|
|
|
/* |
|
|
|
* Build the default update operation |
|
|
|
*/ |
|
|
|
const updateOp = { |
|
|
|
$set: { |
|
|
|
'response.received': DONE, |
|
|
@ -391,9 +408,20 @@ class CoreNodeService extends SiteService { |
|
|
|
'response.statusCode': response.status, |
|
|
|
}, |
|
|
|
}; |
|
|
|
|
|
|
|
if (json) { |
|
|
|
updateOp.$set['response.success'] = json.success; |
|
|
|
} |
|
|
|
|
|
|
|
/* |
|
|
|
* Provide an opportunity for anything to alter the operation or cancel it. |
|
|
|
*/ |
|
|
|
await this.emitDtpEvent('kaleidoscope.response', { |
|
|
|
core: request.core, |
|
|
|
request, response, json, |
|
|
|
updateOp, |
|
|
|
}); |
|
|
|
|
|
|
|
await CoreNodeRequest.updateOne({ _id: request._id }, updateOp); |
|
|
|
} |
|
|
|
|
|
|
@ -433,6 +461,52 @@ class CoreNodeService extends SiteService { |
|
|
|
}, |
|
|
|
}, |
|
|
|
); |
|
|
|
|
|
|
|
await this.emitDtpEvent('connect', { core: request.core, request }); |
|
|
|
} |
|
|
|
|
|
|
|
async disconnect (core) { |
|
|
|
this.log.alert('disconnecting from Core', { |
|
|
|
name: core.meta.name, |
|
|
|
domain: core.meta.domain, |
|
|
|
}); |
|
|
|
|
|
|
|
// provides an abort point if any listener throws
|
|
|
|
await this.emitDtpEvent('disconnect-pre', { core }); |
|
|
|
|
|
|
|
const disconnect = await this.sendRequest(core, { |
|
|
|
method: 'DELETE', |
|
|
|
url: `/core/connect/node/${core.oauth.clientId}`, |
|
|
|
}); |
|
|
|
this.log.alert('Core disconnect request complete', { |
|
|
|
name: core.meta.name, |
|
|
|
domain: core.meta.domain, |
|
|
|
disconnect, |
|
|
|
}); |
|
|
|
|
|
|
|
try { |
|
|
|
await this.emitDtpEvent('disconnect-post', { core, disconnect }); |
|
|
|
} catch (error) { |
|
|
|
this.log.error('failed to emit dtp.core.disconnect-post', { error }); |
|
|
|
// keep going
|
|
|
|
} |
|
|
|
|
|
|
|
await CoreUser |
|
|
|
.find({ core: core._id }) |
|
|
|
.cursor() |
|
|
|
.eachAsync(this.removeUser.bind(this, core), 1); |
|
|
|
|
|
|
|
// await CoreNodeConnect.deleteMany({ 'site.domainKey': core.meta.domainKey });
|
|
|
|
// await CoreNodeRequest.deleteMany({ core: core._id });
|
|
|
|
|
|
|
|
try { |
|
|
|
await this.emitDtpEvent('disconnect', { core, disconnect }); |
|
|
|
} catch (error) { |
|
|
|
this.log.error('failed to emit dtp.core.disconnect', { error }); |
|
|
|
// keep going
|
|
|
|
} |
|
|
|
|
|
|
|
return disconnect; |
|
|
|
} |
|
|
|
|
|
|
|
async queueServiceNodeConnect (requestToken, appNode) { |
|
|
@ -478,7 +552,10 @@ class CoreNodeService extends SiteService { |
|
|
|
|
|
|
|
await request.save(); |
|
|
|
|
|
|
|
return request.toObject(); |
|
|
|
request = request.toObject(); |
|
|
|
await this.emitDtpEvent('service-node.connect', { request }); |
|
|
|
|
|
|
|
return request; |
|
|
|
} |
|
|
|
|
|
|
|
async getServiceNodeQueue (pagination) { |
|
|
@ -498,7 +575,6 @@ class CoreNodeService extends SiteService { |
|
|
|
return request; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async acceptServiceNode (requestToken, appNode) { |
|
|
|
const { oauth2: oauth2Service } = this.dtp.services; |
|
|
|
const response = { token: requestToken }; |
|
|
@ -506,13 +582,15 @@ class CoreNodeService extends SiteService { |
|
|
|
this.log.info('accepting app node', { requestToken, appNode }); |
|
|
|
response.client = await oauth2Service.createClient(appNode.site); |
|
|
|
|
|
|
|
await this.emitDtpEvent('service-node.accept', { client: response.client }); |
|
|
|
|
|
|
|
return response; |
|
|
|
} |
|
|
|
|
|
|
|
async setCoreOAuth2Credentials (core, credentials) { |
|
|
|
const { client } = credentials; |
|
|
|
this.log.info('updating Core Connect credentials', { core, client }); |
|
|
|
await CoreNode.updateOne( |
|
|
|
core = await CoreNode.findOneAndUpdate( |
|
|
|
{ _id: core._id }, |
|
|
|
{ |
|
|
|
$set: { |
|
|
@ -524,7 +602,9 @@ class CoreNodeService extends SiteService { |
|
|
|
'kaleidoscope.token': client.kaleidoscope.token, |
|
|
|
}, |
|
|
|
}, |
|
|
|
{ new: true }, |
|
|
|
); |
|
|
|
await this.emitDtpEvent('set-oauth2-credentials', { core }); |
|
|
|
} |
|
|
|
|
|
|
|
registerPassportCoreOAuth2 (core) { |
|
|
@ -591,6 +671,7 @@ class CoreNodeService extends SiteService { |
|
|
|
); |
|
|
|
user = user.toObject(); |
|
|
|
user.type = 'CoreUser'; |
|
|
|
this.emitDtpEvent('user.login', { user }); |
|
|
|
return cb(null, user); |
|
|
|
} catch (error) { |
|
|
|
return cb(error); |
|
|
@ -704,6 +785,11 @@ class CoreNodeService extends SiteService { |
|
|
|
}, |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
async removeUser (core, user) { |
|
|
|
this.log.alert('remove Core user', { core: core.meta.name, user: user.username }); |
|
|
|
await this.emitDtpEvent('user.remove', { core, user }); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
module.exports = { |
|
|
|