The Digital Telepresence Platform core implementing user account management, authentication, search, global directory, and other platform-wide services.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

204 lines
6.3 KiB

// site-ioserver.js
// Copyright (C) 2022 DTP Technologies, LLC
// License: Apache-2.0
'use strict';
const DTP_COMPONENT = { name: 'I/O Server', slug: 'ioserver', prefix: 'srv' };
const path = require('path');
const Redis = require('ioredis');
const mongoose = require('mongoose');
const ConnectToken = mongoose.model('ConnectToken');
const marked = require('marked');
const { SiteLog } = require(path.join(__dirname, 'site-log'));
const Events = require('events');
class SiteIoServer extends Events {
constructor (dtp) {
super();
this.dtp = dtp;
this.log = new SiteLog(dtp, DTP_COMPONENT);
}
async start (httpServer) {
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
this.markedRenderer = new marked.Renderer();
this.markedRenderer.link = (href, title, text) => { return text; };
this.markedRenderer.image = (href, title, text) => { return text; };
this.markedRenderer.image = (href, title, text) => { return text; };
const hljs = require('highlight.js');
this.markedConfig = {
renderer: this.markedRenderer,
highlight: function(code, lang) {
const language = hljs.getLanguage(lang) ? lang : 'plaintext';
return hljs.highlight(code, { language }).value;
},
langPrefix: 'hljs language-', // highlight.js css expects a top-level 'hljs' class.
pedantic: false,
gfm: true,
breaks: false,
sanitize: false,
smartLists: true,
smartypants: false,
xhtml: false,
};
const pubClient = new Redis({
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD,
key: process.env.REDIS_PREFIX,
});
pubClient.on('error', this.onRedisError.bind(this));
const subClient = pubClient.duplicate();
subClient.on('error', this.onRedisError.bind(this));
const transports = ['websocket'/*, 'polling'*/];
const adapter = createAdapter(pubClient, subClient);
this.io = new Server(httpServer, { adapter, transports });
this.io.on('connection', this.onSocketConnect.bind(this));
}
async onRedisError (error) {
this.log.error('Redis error', { error });
}
async stop ( ) {
}
async onSocketConnect (socket) {
this.log.debug('socket connection', { sid: socket.id });
const token = await ConnectToken.findOne({ token: socket.handshake.auth.token }).populate('user').lean();
if (!token) {
this.log.alert('rejecting invalid socket token', {
sid: socket.sid,
handshake: socket.handshake,
});
socket.close();
return;
}
if (token.claimed) {
this.log.alert('rejecting use of claimed connect token', {
sid: socket.id,
handshake: socket.handshake,
});
socket.close();
return;
}
await ConnectToken.updateOne(
{ _id: token._id },
{ $set: { claimed: new Date() } },
);
this.log.debug('token claimed', {
sid: socket.id,
token: socket.handshake.auth.token,
user: token.user._id,
});
const session = {
user: {
_id: token.user._id,
type: token.userType,
created: token.user.created,
username: token.user.username,
displayName: token.user.displayName,
picture: token.user.picture,
},
socket,
};
session.onSocketDisconnect = this.onSocketDisconnect.bind(this, session);
session.onJoinChannel = this.onJoinChannel.bind(this, session);
session.onLeaveChannel = this.onLeaveChannel.bind(this, session);
session.onUserChat = this.onUserChat.bind(this, session);
session.onUserReact = this.onUserReact.bind(this, session);
socket.on('disconnect', session.onSocketDisconnect);
socket.on('join', session.onJoinChannel);
socket.on('leave', session.onLeaveChannel);
socket.on('user-chat', session.onUserChat);
socket.on('user-react', session.onUserReact);
socket.emit('authenticated', {
message: 'token verified',
user: session.user,
});
}
async onSocketDisconnect (session, reason) {
this.log.debug('socket disconnect', { sid: session.socket.id, user: session.user._id, reason });
session.socket.off('disconnect', session.onSocketDisconnect);
session.socket.off('join', session.onJoinChannel);
session.socket.off('leave', session.onLeaveChannel);
}
async onJoinChannel (session, message) {
const { channelId } = message;
this.log.debug('socket joins channel', { sid: session.socket.id, user: session.user._id, channelId });
session.socket.join(channelId);
session.socket.emit('join-result', { channelId });
}
async onLeaveChannel (session, message) {
const { channelId } = message;
this.log.debug('socket leaves channel', { sid: session.socket.id, user: session.user._id, channelId });
session.socket.leave(channelId);
}
async onUserChat (session, messageDefinition) {
const { chat: chatService } = this.dtp.services;
if (!messageDefinition.content || (messageDefinition.content.length === 0)) {
return;
}
try {
const { message, payload } = await chatService.createMessage(session.user, messageDefinition);
message.author = session.user;
payload.html = await chatService.renderTemplate('chatMessage', { user: session.user, message });
await chatService.sendMessage(message.channel, 'user-chat', payload);
} catch (error) {
this.log.error('failed to process user chat message', { error });
await chatService.sendSystemMessage(`Failed to send chat: ${error.message}`, {
type: 'error',
userId: session.user._id.toString(),
});
return;
}
}
async onUserReact (session, message) {
const { chat: chatService } = this.dtp.services;
try {
const reaction = await chatService.createEmojiReaction(session.user, message);
reaction.user = session.user;
const payload = { reaction };
const channelId = reaction.subject.toString();
await chatService.sendMessage(channelId, 'user-react', payload);
} catch (error) {
this.log.error('failed to process reaction', { message, error });
session.socket.emit('system-message', {
created: new Date(),
content: `You are not allowed to chat on ${this.dtp.config.site.name}.`,
});
return;
}
}
}
module.exports.SiteIoServer = SiteIoServer;