DTP Base provides a scalable and secure Node.js application development harness ready for production service.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

809 lines
26 KiB

// site-ioserver.js
// Copyright (C) 2024 DTP Technologies, LLC
// All Rights Reserved
'use strict';
import { createAdapter } from '@socket.io/redis-adapter';
import mongoose from 'mongoose';
const { ObjectId } = mongoose.Types;
const ConnectToken = mongoose.model('ConnectToken');
const Image = mongoose.model('Image'); // jshint ignore:line
// const ChatRoom = mongoose.model('ChatRoom');
const ChatMessage = mongoose.model('ChatMessage');
import dayjs from 'dayjs';
import striptags from 'striptags';
import { Renderer, parse } from 'marked';
import numeral from 'numeral';
// import pug from 'pug';
import { SiteLog } from './site-log.js';
import { SiteCommon } from './site-common.js';
export class SiteIoServer extends SiteCommon {
static get name ( ) { return 'SiteIoServer'; }
static get slug ( ) { return 'ioServer'; }
constructor (dtp) {
super(dtp);
this.dtp = dtp;
this.log = new SiteLog(dtp, SiteIoServer);
this.sessions = { };
}
async start (httpServer) {
const { Server } = await import('socket.io');
/*
* Markdown processing
*/
function confirmedLinkRenderer (href, title, text) {
return `<a href="${href}" uk-tooltip="${title || 'Visit link...'}" onclick="return dtp.app.confirmNavigation(event);">${text}</a>`;
}
this.markedRenderer = new Renderer();
this.markedRenderer.link = confirmedLinkRenderer;
this.markedRenderer.image = (href, title, text) => { return text; };
const hljs = await import('highlight.js');
this.markedConfig = {
renderer: this.markedRenderer,
highlight: function(code, lang) {
const language = hljs.getLanguage(lang) ? lang : 'plaintext';
return hljs.highlight(code, { language }).value;
},
langPrefix: 'hljs language-', // highlight.js css expects a top-level 'hljs' class.
pedantic: false,
gfm: true,
breaks: false,
sanitize: false,
smartLists: true,
smartypants: false,
xhtml: false,
};
/*
* The chat message rate limiter uses Redis to provide accurate atomic
* accounting regardless of which host is currently hosting a user's chat
* connection and session.
*/
const { RateLimiterRedis } = await import('rate-limiter-flexible');
this.channelJoinLimiter = new RateLimiterRedis({
storeClient: this.dtp.redis,
points: 30,
duration: 60,
blockDuration: 60 * 60,
execEvenly: false,
keyPrefix: 'iorl:chatjoin:',
});
this.chatMessageLimiter = new RateLimiterRedis({
storeClient: this.dtp.redis,
points: 2,
duration: 5,
blockDuration: 60 * 60,
execEvenly: false,
keyPrefix: 'iorl:chatmsg:',
});
this.slowChatLimiter = new RateLimiterRedis({
storeClient: this.dtp.redis,
points: 2,
duration: 30,
blockDuration: 60 * 60,
execEvenly: false,
keyPrefix: 'iorl:slowchat',
});
const transports = [
'websocket',
// 'polling',
];
this.log.info('creating Socket.io <-> Redis adapter');
this.adapterPubClient = this.dtp.redis.duplicate();
this.adapterSubClient = this.dtp.redis.duplicate();
this.adapter = createAdapter(this.adapterPubClient, this.adapterSubClient);
this.io = new Server(httpServer, { adapter: this.adapter, transports });
this.io.adapter(this.adapter);
this.io.on('connection', this.onSocketConnect.bind(this));
}
async onRedisError (error) {
this.log.error('Redis error', { error });
}
async shutdown ( ) {
const { chat: chatService } = this.dtp.services;
this.log.alert('Socket.io server shutting down');
for (const socketId in this.sessions) {
const session = this.sessions[socketId];
this.log.debug('leaving joined rooms', { size: session.joinedRooms.size });
for (const roomId of session.joinedRooms) {
const room = await chatService.getRoomById(roomId);
await chatService.chatRoomCheckOut(room, session.user);
}
session.joinedRooms.clear();
this.log.debug('closing socket connection', { socketId });
await session.socket.disconnect(true);
}
}
async onSocketConnect (socket) {
const { channel: channelService } = this.dtp.services;
this.log.debug('socket connection', { sid: socket.id });
try {
const token = await ConnectToken
.findOne({ token: socket.handshake.auth.token })
.populate([
{
path: 'consumer',
select: '+membership +flags +flags.isCloaked',
populate: [
{
path: 'membership',
model: 'Membership',
select: 'status granted expires started plan',
strictPopulate: false,
populate: [
{
path: 'plan',
model: 'MembershipPlan',
select: 'product.name tier'
},
],
},
],
},
])
.lean();
if (!token) {
this.log.alert('rejecting invalid socket token', {
sid: socket.sid,
handshake: socket.handshake,
});
socket.disconnect(true);
return;
}
if (token.consumer) {
if (token.consumer.password) {
delete token.consumer.password;
}
if (token.consumer.passwordSalt) {
delete token.consumer.passwordSalt;
}
}
if (token.claimed) {
this.log.alert('rejecting use of claimed connect token', {
sid: socket.id,
handshake: socket.handshake,
});
socket.disconnect(true);
return;
}
await ConnectToken.updateOne(
{ _id: token._id },
{ $set: { claimed: new Date() } },
);
this.log.debug('token claimed', {
sid: socket.id,
token: socket.handshake.auth.token,
consumerType: token.consumerType,
consumerId: token.consumer._id,
});
const session = {
socket,
joinedChannels: new Set(),
joinedRooms: new Map(),
};
this.sessions[socket.id] = session;
session.onSocketDisconnect = this.onSocketDisconnect.bind(this, session);
socket.on('disconnect', session.onSocketDisconnect);
session.onJoinChannel = this.onJoinChannel.bind(this, session);
socket.on('join', session.onJoinChannel);
session.onLeaveChannel = this.onLeaveChannel.bind(this, session);
socket.on('leave', session.onLeaveChannel);
switch (token.consumerType) {
case 'User':
session.user = {
_id: token.consumer._id,
created: token.consumer.created,
username: token.consumer.username,
username_lc: token.consumer.username_lc,
displayName: token.consumer.displayName,
flags: token.consumer.flags,
membership: token.consumer.membership,
balances: token.consumer.balances,
};
if (token.consumer.picture && token.consumer.picture.small) {
session.user.picture = {
small: await Image.findOne({ _id: token.consumer.picture.small }),
};
}
session.onUserChat = this.onUserChat.bind(this, session);
socket.on('user-chat', session.onUserChat);
socket.emit('user-authenticated', {
message: 'token verified',
user: session.user,
});
break;
case 'Channel':
session.channel = await channelService.getChannelById(token.consumer._id);
if (session.channel.widgetKey) {
delete session.channel.widgetKey;
session.isWidget = true;
}
if (session.channel.streamKey) {
delete session.channel.streamKey;
}
this.log.info('starting Channel session', { channel: session.channel.name });
socket.emit('widget-authenticated', {
message: 'token verified',
channel: session.channel,
});
break;
}
} catch (error) {
this.log.error('failed to process a socket connection', { error });
}
}
async onSocketDisconnect (session, reason) {
const { chat: chatService } = this.dtp.services;
this.log.debug('socket disconnect', {
sid: session.socket.id,
consumerId: (session.user || session.channel)._id,
joinedRooms: session.joinedRooms.size,
reason,
});
if (session.user && session.joinedRooms.size > 0) {
for await (const room of session.joinedRooms) {
await chatService.chatRoomCheckOut(room[1], session.user);
}
}
if (session.onSocketDisconnect) {
session.socket.off('disconnect', session.onSocketDisconnect);
delete session.onSocketDisconnect;
}
if (session.onJoinChannel) {
session.socket.off('join', session.onJoinChannel);
delete session.onJoinChannel;
}
if (session.onLeaveChannel) {
session.socket.off('leave', session.onLeaveChannel);
delete session.onLeaveChannel;
}
if (session.onUserChat) {
session.socket.off('user-chat', session.onUserChat);
delete session.onUserChat;
}
delete this.sessions[session.socket.id];
}
async onJoinChannel (session, message) {
const { channelType } = message;
try {
await this.channelJoinLimiter.consume(session.socket.id, 1);
} catch (error) {
session.socket.disconnect(true);
this.log.alert('banned client IP due to channel join limit', { sip: session.socket.handshake.address });
return;
}
this.log.info('onJoinChannel', {
sid: session.socket.id,
channelType: message.channelType,
channelId: message.channelId,
});
try {
switch (channelType) {
case 'User':
return this.onJoinUserChannel(session, message);
case 'Channel':
return this.onJoinStreamChannel(session, message);
case 'ChatRoom':
return this.onJoinChatRoom(session, message);
case 'Thread':
return this.onJoinThread(session, message);
default:
break;
}
} catch (error) {
this.log.error('failed to join channel', { error });
}
}
async onJoinUserChannel (session, message) {
if (!session.user || !session.user._id) {
return session.socket.emit('join-result', { authResult: 'access-denied' });
}
const channelId = message.channelId;
const channelComponents = channelId.split(':');
const parsedChannelId = ObjectId.createFromHexString((channelComponents[0] === 'broadcast') ? channelComponents[1] : channelComponents[0]);
if (parsedChannelId.equals(session.user._id)) {
this.log.debug('user joins their own channel', {
sid: session.socket.id,
consumerId: parsedChannelId,
userId: session.user._id,
channelId: channelId,
});
session.joinedChannels.add(channelId);
session.socket.join(channelId.toString());
return session.socket.emit('join-result', {
socketId: session.socket.id,
channelId: channelId,
isPersonal: true,
});
}
return this.onJoinStreamChannel(session, message);
}
async onJoinStreamChannel (session, message) {
const { channel: channelService } = this.dtp.services;
const { channelId } = message;
const channelComponents = channelId.split(':');
const parsedChannelId = ObjectId.createFromHexString((channelComponents[0] === 'broadcast') ? channelComponents[1] : channelComponents[0]);
session.channel = await channelService.getChannelById(parsedChannelId, { withPasscode: true });
if (!session.channel) { return; }
if (session.channel.flags.isLoginRequired.forChat) {
if (!session.user) {
this.log.alert('rejecting channel join from anonymous viewer');
session.socket.emit('join-result', { authResult: 'access-denied' });
return;
}
}
let authResult = 'access-granted';
if (session.channel && session.channel.show.passcode && (session.channel.show.passcode.length)) {
const { passcode } = session.channel.show;
delete session.channel.show.passcode; // safety 1st
session.channel.show.isPrivate = true;
if (message.passcode !== passcode) {
session.socket.emit('join-result', { authResult: 'access-denied' });
return;
}
}
const classifier = channelComponents[1];
if (classifier === 'moderator') {
if (!session.user) {
this.log.alert('rejecting moderator channel join from session with no user');
return;
}
const isChannelOwner = session.channel.owner._id.equals(session.user._id);
const isChannelModerator = !!(session.channel.moderators || [ ]).find((moderator) => moderator._id.equals(session.user._id));
const isPlatformAdmin = (session.user.flags.isAdmin || session.user.flags.isModerator);
/*
* If the session's user isn't the channel owner, one of the channel's
* moderators, or a platform-level administrator or moderator? They're not
* joining this channel to receive moderation events.
*/
if (!(isChannelOwner && session.channel.flags.isChatAnnounce) &&
!isChannelModerator &&
!isPlatformAdmin) {
this.log.alert('rejecting unauthorized request to join moderator channel', {
channel: session.channel.name,
user: {
_id: session.user._id,
username: session.user.username_lc,
},
});
return;
}
}
this.log.debug('socket joins stream channel', {
sid: session.socket.id,
sip: session.socket.handshake.address,
consumerId: (session.user || session.channel)._id,
channelId,
});
session.joinedChannels.add(channelId);
session.socket.join(channelId);
session.socket.emit('join-result', { channelId, authResult });
}
async onJoinChatRoom (session, message) {
const { chat: chatService } = this.dtp.services;
if (!session.user) {
session.socket.emit('join-result', { authResult: 'access-denied' });
return;
}
try {
const roomId = ObjectId.createFromHexString(message.channelId);
const room = await chatService.getRoomById(roomId);
if (!room) {
session.socket.emit('join-result', { authResult: 'room-invalid', message: 'The chat room does not exist' });
return;
}
await chatService.chatRoomCheckIn(room, session.user);
this.log.debug('user joins a chat room', {
sid: session.socket.id,
consumerId: (session.user || session.channel)._id,
userId: session.user._id,
roomId: room._id,
});
session.joinedChannels.add(room._id);
session.joinedRooms.set(room._id, room);
session.socket.join(room._id.toString());
session.socket.emit('join-result', {
socketId: session.socket.id,
roomId: room._id.toString(),
});
} catch (error) {
this.log.error('failed to join chat room', { error });
}
}
async onJoinThread (session, message) {
const { chat: chatService } = this.dtp.services;
if (!session.user) {
session.socket.emit('join-result', { authResult: 'access-denied' });
return;
}
try {
const threadId = ObjectId.createFromHexString(message.channelId);
const thread = await chatService.getPrivateMessageById(threadId);
if (!thread) {
session.socket.emit('join-result', { authResult: 'room-invalid', message: 'The chat room does not exist' });
return;
}
this.log.debug('user joins a private message thread', {
sid: session.socket.id,
consumerId: (session.user || session.channel)._id,
userId: session.user._id,
threadId: thread._id,
});
session.joinedChannels.add(thread._id);
session.socket.join(thread._id.toString());
session.socket.emit('join-result', {
socketId: session.socket.id,
threadId: thread._id.toString(),
});
} catch (error) {
this.log.error('failed to join private message thread', { error });
}
}
async onLeaveChannel (session, message) {
const { chat: chatService } = this.dtp.services;
const { channelId } = message;
if (!session.joinedChannels.has(channelId)) {
return;
}
this.log.debug('socket leaves channel', { sid: session.socket.id, user: (session.user || session.channel)._id, channelId });
session.socket.leave(channelId);
session.joinedChannels.delete(channelId);
if (session.joinedRooms.has(channelId)) {
await chatService.chatRoomCheckOut(session.joinedRooms.get(channelId), session.user);
session.joinedRooms.delete(channelId);
}
}
async onUserChat (session, message) {
const {
action: actionService,
channel: channelService,
chat: chatService,
sticker: stickerService,
user: userService,
} = this.dtp.services;
this.log.debug('onUserChat', { message });
const { channelId } = message;
if (!session.joinedChannels.has(channelId)) {
this.log.debug('message rejected: user is not joined to chat channel', { channel: channelId, user: session.user._id });
return;
}
if (!message.content || (message.content.length === 0)) {
this.log.debug('message rejected: chat message is empty', { channel: channelId, user: session.user._id });
return;
}
const channel = await channelService.getChannelById(channelId);
if (!channel) {
this.log.debug('message rejected: channel unknown/unedefined', { channel: channelId, user: session.user._id });
return;
}
const bannedUser = channel.bannedUsers.find((memberId) => memberId.equals(session.user._id));
if (bannedUser) {
this.log.debug('message rejected: user is banned from channel', { channel: channelId, user: session.user._id });
return; // sender is blocked by the channel
}
const stickerSlugs = this.findStickers(message.content);
stickerSlugs.forEach((sticker) => {
const re = new RegExp(`:${sticker}:`, 'gi');
message.content = message.content.replace(re, '').trim();
});
try {
message.content = chatService.filterText(message.content);
if (message.content.length > 500) {
this.log.warn('message rejected: too long', { channel: channelId, user: session.user._id });
return; // message is too long
}
await chatService.analyzeContent(session.user, message.content);
} catch (error) {
this.log.error('chat message rejected', { error });
return; // message fails the sniff tests
}
try {
const userKey = `${channelId.toString()}:${session.user._id.toString()}`;
if (!session.user.membership ||
!session.user.membership.plan ||
(session.user.membership.plan.tier === 'free')) {
await this.slowChatLimiter.consume(userKey, 1);
} else {
await this.chatMessageLimiter.consume(userKey, 1);
}
} catch (rateLimiter) {
const NOW = new Date();
if (!session.notifySpamMuzzle) {
this.log.alert('preventing chat spam', { userId: session.user._id, rateLimiter });
session.socket.to(channelId).emit('system-message', {
created: NOW,
content: `${session.user.displayName || session.user.username} has been muted for for a while.`,
});
session.notifySpamMuzzle = true;
}
session.socket.emit('system-message', {
created: NOW,
content: `You are rate limited for ${numeral(rateLimiter.msBeforeNext / 1000.0).format('0,0.0')} seconds.`,
rateLimiter,
});
return; // rate limited, send nothing
}
const userCheck = await userService.getUserAccount(session.user._id);
if (!userCheck || !userCheck.permissions || !userCheck.permissions.canChat) {
session.socket.emit('system-message', {
created: new Date(),
content: 'You are banned from Chat.',
});
this.log.warn('message rejected: user does not have permission to chat', { channel: channelId, user: session.user._id });
return; // permission denied
}
if (!userCheck.flags.isEmailVerified) {
session.socket.emit('system-message', {
created: new Date(),
content: 'Please verify your email address in Account settings before chatting.',
});
return; // permission denied
}
if (userCheck.flags.isCloaked) {
const displayList = this.createDisplayList('cloaked-notification');
displayList.showNotification('You are currently cloaked and must not send chat messages.', 'danger', 'bottom-center', 5000);
session.socket.emit('system-message', { displayList });
this.log.warn('message rejected: user is cloaked/ghost', { channel: channelId, user: session.user._id });
return; // permission denied (you can NOT chat while cloaked)
}
const stickers = await stickerService.resolveStickerSlugs(stickerSlugs);
try {
await this.processSlashCommand(message);
} catch (error) {
const displayList = this.createDisplayList('slash-error');
displayList.showNotification(`Command error: ${error.message}.`, 'danger', 'bottom-center', 5000);
session.socket.emit('system-message', { displayList });
return; // permission denied (you can NOT chat while cloaked)
}
/*
* If a message specifies an Action ID, pull the action and verify that we
* can find the action and that this is the author of that action.
*/
if (message.action) {
try {
message.action = await actionService.create(userCheck, message);
} catch (error) {
this.log.error('failed to process chat user action', { error });
const displayList = this.createDisplayList('invalid-action');
displayList.showNotification(
`Failed to process action: ${error.message}`,
'danger',
'bottom-center',
5000,
);
session.socket.emit('system-message', { displayList });
return;
}
}
/*
* Channel chat messages receive a fixed 10-day expiration. StreamRay has
* always been this way. Channel operators can manually clear the channel.
* They can also set an option to auto-clear when their stream ends.
*
* It would be easy enough to add a policy.retentionDays value to Channel.
* If people want it, it's trivial to add.
*/
const NOW = new Date();
const chatMessage = await ChatMessage.create({
created: NOW,
expires: dayjs(NOW).add(10, 'days').toDate(),
channel: channelId,
author: session.user._id,
content: message.content,
});
this.log.debug('chat message persisted', { _id: chatMessage._id });
const renderedContent = parse(
this.dtpparse(message.content),
this.markedConfig,
);
const payload = {
_id: chatMessage._id,
user: userService.filterUserObject(session.user),
content: renderedContent,
stickers,
};
if (message.action) {
const actionDefinition = actionService.actions.find((action) => action.verb === message.action.verb);
payload.action = message.action;
payload.action.author = userCheck;
payload.action.overlay = actionDefinition.overlay;
payload.action.name = actionDefinition.name;
payload.action.price = actionDefinition.price;
}
this.dtp.emitter.to(channelId).emit('user-chat', payload);
session.notifySpamMuzzle = false;
}
findStickers (content) {
const tokens = content.split(' ');
const stickers = [ ];
tokens.forEach((token) => {
if ((token[0] !== ':') || (token[token.length -1 ] !== ':')) {
return;
}
token = token.slice(1, token.length - 1 ).toLowerCase();
if (token.includes('/') || token.includes(':') || token.includes(' ')) {
return; // trimmed token includes invalid characters
}
this.log.debug('found sticker request', { token });
if (!stickers.includes(token)) {
stickers.push(striptags(token));
}
});
return stickers.slice(0, 4);
}
async processSlashCommand (message) {
const { action: actionService, user: userService } = this.dtp.services;
if (!message || !message.content || (typeof message.content !== 'string') || (message.content.length === 0)) {
return;
}
/*
* Split the message and verify that we got tokens
*/
const tokens = message.content.split(' ');
if (!Array.isArray(tokens) || (tokens.length === 0)) {
return;
}
/*
* If the first token does not begin with a slash, abort.
*/
if (tokens[0][0] !== '/') {
return;
}
/*
* Determine what command is being run
*/
const COMMAND_LIST = [
'action',
];
const COMMAND_MAP = {
'a': 'action',
'ac': 'action',
};
let command = tokens.shift().slice(1);
this.log.debug('running command', { command });
if (!COMMAND_LIST.includes(command)) {
command = COMMAND_MAP[command];
if (!command) {
throw new Error('Unknown command');
}
}
switch (command) {
case 'action':
let verb = tokens.shift().trim().toLowerCase();
if (!actionService.VERB_LIST.includes(verb)) {
verb = actionService.SHORT_VERB_MAP[verb];
if (!verb) {
throw new Error('Invalid verb');
}
}
const targets = await userService.resolveUsernames(tokens.shift().split(','));
if (!Array.isArray(targets) || (targets.length === 0)) {
throw new Error('Invalid username(s)');
}
this.log.info('processing action command', { command, verb, targets: targets.map((target) => target._id) });
message.content = `${targets[0].username}: ${tokens.join(' ')}`;
message.action = { verb, target: targets[0]._id };
break;
default:
this.log.error('invalid slash command', { command });
throw new Error('Unknown command');
}
}
}