// site-ioserver.js // Copyright (C) 2024 DTP Technologies, LLC // All Rights Reserved 'use strict'; import { createAdapter } from '@socket.io/redis-adapter'; import mongoose from 'mongoose'; const ConnectToken = mongoose.model('ConnectToken'); const Image = mongoose.model('Image'); // jshint ignore:line // const ChatRoom = mongoose.model('ChatRoom'); const ChatMessage = mongoose.model('ChatMessage'); import dayjs from 'dayjs'; import striptags from 'striptags'; import { Renderer, parse } from 'marked'; import numeral from 'numeral'; // import pug from 'pug'; import { SiteLog } from './site-log.js'; import { SiteCommon } from './site-common.js'; export class SiteIoServer extends SiteCommon { static get name ( ) { return 'SiteIoServer'; } static get slug ( ) { return 'ioServer'; } constructor (dtp) { super(dtp); this.dtp = dtp; this.log = new SiteLog(dtp, SiteIoServer); } async start (httpServer) { const { Server } = await import('socket.io'); /* * Markdown processing */ function confirmedLinkRenderer (href, title, text) { return `${text}`; } this.markedRenderer = new Renderer(); this.markedRenderer.link = confirmedLinkRenderer; this.markedRenderer.image = (href, title, text) => { return text; }; const hljs = await import('highlight.js'); this.markedConfig = { renderer: this.markedRenderer, highlight: function(code, lang) { const language = hljs.getLanguage(lang) ? lang : 'plaintext'; return hljs.highlight(code, { language }).value; }, langPrefix: 'hljs language-', // highlight.js css expects a top-level 'hljs' class. pedantic: false, gfm: true, breaks: false, sanitize: false, smartLists: true, smartypants: false, xhtml: false, }; /* * The chat message rate limiter uses Redis to provide accurate atomic * accounting regardless of which host is currently hosting a user's chat * connection and session. */ const { RateLimiterRedis } = await import('rate-limiter-flexible'); this.channelJoinLimiter = new RateLimiterRedis({ storeClient: this.dtp.redis, points: 30, duration: 60, blockDuration: 60 * 60, execEvenly: false, keyPrefix: 'iorl:chatjoin:', }); this.chatMessageLimiter = new RateLimiterRedis({ storeClient: this.dtp.redis, points: 2, duration: 5, blockDuration: 60 * 60, execEvenly: false, keyPrefix: 'iorl:chatmsg:', }); this.slowChatLimiter = new RateLimiterRedis({ storeClient: this.dtp.redis, points: 2, duration: 30, blockDuration: 60 * 60, execEvenly: false, keyPrefix: 'iorl:slowchat', }); const transports = [ 'websocket', // 'polling', ]; this.log.info('creating Socket.io <-> Redis adapter'); this.adapterPubClient = this.dtp.redis.duplicate(); this.adapterSubClient = this.dtp.redis.duplicate(); this.adapter = createAdapter(this.adapterPubClient, this.adapterSubClient); this.io = new Server(httpServer, { adapter: this.adapter, transports }); this.io.adapter(this.adapter); this.io.on('connection', this.onSocketConnect.bind(this)); } async onRedisError (error) { this.log.error('Redis error', { error }); } async stop ( ) { this.log.info('stopping Socket.io server'); } async onSocketConnect (socket) { const { channel: channelService } = this.dtp.services; this.log.debug('socket connection', { sid: socket.id }); try { const token = await ConnectToken .findOne({ token: socket.handshake.auth.token }) .populate([ { path: 'consumer', select: '+membership +flags +flags.isCloaked', populate: [ { path: 'membership', model: 'Membership', select: 'status granted expires started plan', strictPopulate: false, populate: [ { path: 'plan', model: 'MembershipPlan', select: 'product.name tier' }, ], }, ], }, ]) .lean(); if (!token) { this.log.alert('rejecting invalid socket token', { sid: socket.sid, handshake: socket.handshake, }); socket.disconnect(true); return; } if (token.consumer) { if (token.consumer.password) { delete token.consumer.password; } if (token.consumer.passwordSalt) { delete token.consumer.passwordSalt; } } if (token.claimed) { this.log.alert('rejecting use of claimed connect token', { sid: socket.id, handshake: socket.handshake, }); socket.disconnect(true); return; } await ConnectToken.updateOne( { _id: token._id }, { $set: { claimed: new Date() } }, ); this.log.debug('token claimed', { sid: socket.id, token: socket.handshake.auth.token, consumerType: token.consumerType, consumerId: token.consumer._id, }); const session = { socket, joinedChannels: new Set(), joinedRooms: new Map(), }; session.onSocketDisconnect = this.onSocketDisconnect.bind(this, session); socket.on('disconnect', session.onSocketDisconnect); session.onJoinChannel = this.onJoinChannel.bind(this, session); socket.on('join', session.onJoinChannel); session.onLeaveChannel = this.onLeaveChannel.bind(this, session); socket.on('leave', session.onLeaveChannel); switch (token.consumerType) { case 'User': session.user = { _id: token.consumer._id, created: token.consumer.created, username: token.consumer.username, username_lc: token.consumer.username_lc, displayName: token.consumer.displayName, flags: token.consumer.flags, membership: token.consumer.membership, balances: token.consumer.balances, }; if (token.consumer.picture && token.consumer.picture.small) { session.user.picture = { small: await Image.findOne({ _id: token.consumer.picture.small }), }; } session.onUserChat = this.onUserChat.bind(this, session); socket.on('user-chat', session.onUserChat); socket.emit('user-authenticated', { message: 'token verified', user: session.user, }); break; case 'Channel': session.channel = await channelService.getChannelById(token.consumer._id); if (session.channel.widgetKey) { delete session.channel.widgetKey; session.isWidget = true; } if (session.channel.streamKey) { delete session.channel.streamKey; } this.log.info('starting Channel session', { channel: session.channel.name }); socket.emit('widget-authenticated', { message: 'token verified', channel: session.channel, }); break; } } catch (error) { this.log.error('failed to process a socket connection', { error }); } } async onSocketDisconnect (session, reason) { const { chat: chatService } = this.dtp.services; this.log.debug('socket disconnect', { sid: session.socket.id, consumerId: (session.user || session.channel)._id, reason }); if (session.user && session.joinedRooms.size > 0) { for await (const room of session.joinedRooms) { await chatService.chatRoomCheckOut(room[1], session.user); } } if (session.onSocketDisconnect) { session.socket.off('disconnect', session.onSocketDisconnect); delete session.onSocketDisconnect; } if (session.onJoinChannel) { session.socket.off('join', session.onJoinChannel); delete session.onJoinChannel; } if (session.onLeaveChannel) { session.socket.off('leave', session.onLeaveChannel); delete session.onLeaveChannel; } if (session.onUserChat) { session.socket.off('user-chat', session.onUserChat); delete session.onUserChat; } } async onJoinChannel (session, message) { const { channelType } = message; try { await this.channelJoinLimiter.consume(session.socket.id, 1); } catch (error) { session.socket.disconnect(true); this.log.alert('banned client IP due to channel join limit', { sip: session.socket.handshake.address }); return; } this.log.info('onJoinChannel', { sid: session.socket.id, channelType: message.channelType, channelId: message.channelId, }); try { switch (channelType) { case 'User': return this.onJoinUserChannel(session, message); case 'Channel': return this.onJoinStreamChannel(session, message); case 'ChatRoom': return this.onJoinChatRoom(session, message); case 'Thread': return this.onJoinThread(session, message); default: break; } } catch (error) { this.log.error('failed to join channel', { error }); } } async onJoinUserChannel (session, message) { if (!session.user || !session.user._id) { return session.socket.emit('join-result', { authResult: 'access-denied' }); } const channelId = message.channelId; const channelComponents = channelId.split(':'); const parsedChannelId = mongoose.Types.ObjectId((channelComponents[0] === 'broadcast') ? channelComponents[1] : channelComponents[0]); if (parsedChannelId.equals(session.user._id)) { this.log.debug('user joins their own channel', { sid: session.socket.id, consumerId: parsedChannelId, userId: session.user._id, channelId: channelId, }); session.joinedChannels.add(channelId); session.socket.join(channelId.toString()); return session.socket.emit('join-result', { socketId: session.socket.id, channelId: channelId, isPersonal: true, }); } return this.onJoinStreamChannel(session, message); } async onJoinStreamChannel (session, message) { const { channel: channelService } = this.dtp.services; const { channelId } = message; const channelComponents = channelId.split(':'); const parsedChannelId = mongoose.Types.ObjectId((channelComponents[0] === 'broadcast') ? channelComponents[1] : channelComponents[0]); session.channel = await channelService.getChannelById(parsedChannelId, { withPasscode: true }); if (!session.channel) { return; } if (session.channel.flags.isLoginRequired.forChat) { if (!session.user) { this.log.alert('rejecting channel join from anonymous viewer'); session.socket.emit('join-result', { authResult: 'access-denied' }); return; } } let authResult = 'access-granted'; if (session.channel && session.channel.show.passcode && (session.channel.show.passcode.length)) { const { passcode } = session.channel.show; delete session.channel.show.passcode; // safety 1st session.channel.show.isPrivate = true; if (message.passcode !== passcode) { session.socket.emit('join-result', { authResult: 'access-denied' }); return; } } const classifier = channelComponents[1]; if (classifier === 'moderator') { if (!session.user) { this.log.alert('rejecting moderator channel join from session with no user'); return; } const isChannelOwner = session.channel.owner._id.equals(session.user._id); const isChannelModerator = !!(session.channel.moderators || [ ]).find((moderator) => moderator._id.equals(session.user._id)); const isPlatformAdmin = (session.user.flags.isAdmin || session.user.flags.isModerator); /* * If the session's user isn't the channel owner, one of the channel's * moderators, or a platform-level administrator or moderator? They're not * joining this channel to receive moderation events. */ if (!(isChannelOwner && session.channel.flags.isChatAnnounce) && !isChannelModerator && !isPlatformAdmin) { this.log.alert('rejecting unauthorized request to join moderator channel', { channel: session.channel.name, user: { _id: session.user._id, username: session.user.username_lc, }, }); return; } } this.log.debug('socket joins stream channel', { sid: session.socket.id, sip: session.socket.handshake.address, consumerId: (session.user || session.channel)._id, channelId, }); session.joinedChannels.add(channelId); session.socket.join(channelId); session.socket.emit('join-result', { channelId, authResult }); } async onJoinChatRoom (session, message) { const { chat: chatService } = this.dtp.services; if (!session.user) { session.socket.emit('join-result', { authResult: 'access-denied' }); return; } try { const roomId = mongoose.Types.ObjectId(message.channelId); const room = await chatService.getRoomById(roomId); if (!room) { session.socket.emit('join-result', { authResult: 'room-invalid', message: 'The chat room does not exist' }); return; } await chatService.chatRoomCheckIn(room, session.user); this.log.debug('user joins a chat room', { sid: session.socket.id, consumerId: (session.user || session.channel)._id, userId: session.user._id, roomId: room._id, }); session.joinedChannels.add(room._id); session.joinedRooms.set(room._id, room); session.socket.join(room._id.toString()); session.socket.emit('join-result', { socketId: session.socket.id, roomId: room._id.toString(), }); } catch (error) { this.log.error('failed to join chat room', { error }); } } async onJoinThread (session, message) { const { chat: chatService } = this.dtp.services; if (!session.user) { session.socket.emit('join-result', { authResult: 'access-denied' }); return; } try { const threadId = mongoose.Types.ObjectId(message.channelId); const thread = await chatService.getPrivateMessageById(threadId); if (!thread) { session.socket.emit('join-result', { authResult: 'room-invalid', message: 'The chat room does not exist' }); return; } this.log.debug('user joins a private message thread', { sid: session.socket.id, consumerId: (session.user || session.channel)._id, userId: session.user._id, threadId: thread._id, }); session.joinedChannels.add(thread._id); session.socket.join(thread._id.toString()); session.socket.emit('join-result', { socketId: session.socket.id, threadId: thread._id.toString(), }); } catch (error) { this.log.error('failed to join private message thread', { error }); } } async onLeaveChannel (session, message) { const { chat: chatService } = this.dtp.services; const { channelId } = message; if (!session.joinedChannels.has(channelId)) { return; } this.log.debug('socket leaves channel', { sid: session.socket.id, user: (session.user || session.channel)._id, channelId }); session.socket.leave(channelId); session.joinedChannels.delete(channelId); if (session.joinedRooms.has(channelId)) { await chatService.chatRoomCheckOut(session.joinedRooms.get(channelId), session.user); session.joinedRooms.delete(channelId); } } async onUserChat (session, message) { const { action: actionService, channel: channelService, chat: chatService, sticker: stickerService, user: userService, } = this.dtp.services; this.log.debug('onUserChat', { message }); const { channelId } = message; if (!session.joinedChannels.has(channelId)) { this.log.debug('message rejected: user is not joined to chat channel', { channel: channelId, user: session.user._id }); return; } if (!message.content || (message.content.length === 0)) { this.log.debug('message rejected: chat message is empty', { channel: channelId, user: session.user._id }); return; } const channel = await channelService.getChannelById(channelId); if (!channel) { this.log.debug('message rejected: channel unknown/unedefined', { channel: channelId, user: session.user._id }); return; } const bannedUser = channel.bannedUsers.find((memberId) => memberId.equals(session.user._id)); if (bannedUser) { this.log.debug('message rejected: user is banned from channel', { channel: channelId, user: session.user._id }); return; // sender is blocked by the channel } const stickerSlugs = this.findStickers(message.content); stickerSlugs.forEach((sticker) => { const re = new RegExp(`:${sticker}:`, 'gi'); message.content = message.content.replace(re, '').trim(); }); try { message.content = chatService.filterText(message.content); if (message.content.length > 500) { this.log.warn('message rejected: too long', { channel: channelId, user: session.user._id }); return; // message is too long } await chatService.analyzeContent(session.user, message.content); } catch (error) { this.log.error('chat message rejected', { error }); return; // message fails the sniff tests } try { const userKey = `${channelId.toString()}:${session.user._id.toString()}`; if (!session.user.membership || !session.user.membership.plan || (session.user.membership.plan.tier === 'free')) { await this.slowChatLimiter.consume(userKey, 1); } else { await this.chatMessageLimiter.consume(userKey, 1); } } catch (rateLimiter) { const NOW = new Date(); if (!session.notifySpamMuzzle) { this.log.alert('preventing chat spam', { userId: session.user._id, rateLimiter }); session.socket.to(channelId).emit('system-message', { created: NOW, content: `${session.user.displayName || session.user.username} has been muted for for a while.`, }); session.notifySpamMuzzle = true; } session.socket.emit('system-message', { created: NOW, content: `You are rate limited for ${numeral(rateLimiter.msBeforeNext / 1000.0).format('0,0.0')} seconds.`, rateLimiter, }); return; // rate limited, send nothing } const userCheck = await userService.getUserAccount(session.user._id); if (!userCheck || !userCheck.permissions || !userCheck.permissions.canChat) { session.socket.emit('system-message', { created: new Date(), content: 'You are banned from Chat.', }); this.log.warn('message rejected: user does not have permission to chat', { channel: channelId, user: session.user._id }); return; // permission denied } if (!userCheck.flags.isEmailVerified) { session.socket.emit('system-message', { created: new Date(), content: 'Please verify your email address in Account settings before chatting.', }); return; // permission denied } if (userCheck.flags.isCloaked) { const displayList = this.createDisplayList('cloaked-notification'); displayList.showNotification('You are currently cloaked and must not send chat messages.', 'danger', 'bottom-center', 5000); session.socket.emit('system-message', { displayList }); this.log.warn('message rejected: user is cloaked/ghost', { channel: channelId, user: session.user._id }); return; // permission denied (you can NOT chat while cloaked) } const stickers = await stickerService.resolveStickerSlugs(stickerSlugs); try { await this.processSlashCommand(message); } catch (error) { const displayList = this.createDisplayList('slash-error'); displayList.showNotification(`Command error: ${error.message}.`, 'danger', 'bottom-center', 5000); session.socket.emit('system-message', { displayList }); return; // permission denied (you can NOT chat while cloaked) } /* * If a message specifies an Action ID, pull the action and verify that we * can find the action and that this is the author of that action. */ if (message.action) { try { message.action = await actionService.create(userCheck, message); } catch (error) { this.log.error('failed to process chat user action', { error }); const displayList = this.createDisplayList('invalid-action'); displayList.showNotification( `Failed to process action: ${error.message}`, 'danger', 'bottom-center', 5000, ); session.socket.emit('system-message', { displayList }); return; } } /* * Channel chat messages receive a fixed 10-day expiration. StreamRay has * always been this way. Channel operators can manually clear the channel. * They can also set an option to auto-clear when their stream ends. * * It would be easy enough to add a policy.retentionDays value to Channel. * If people want it, it's trivial to add. */ const NOW = new Date(); const chatMessage = await ChatMessage.create({ created: NOW, expires: dayjs(NOW).add(10, 'days').toDate(), channel: channelId, author: session.user._id, content: message.content, }); this.log.debug('chat message persisted', { _id: chatMessage._id }); const renderedContent = parse( this.dtpparse(message.content), this.markedConfig, ); const payload = { _id: chatMessage._id, user: userService.filterUserObject(session.user), content: renderedContent, stickers, }; if (message.action) { const actionDefinition = actionService.actions.find((action) => action.verb === message.action.verb); payload.action = message.action; payload.action.author = userCheck; payload.action.overlay = actionDefinition.overlay; payload.action.name = actionDefinition.name; payload.action.price = actionDefinition.price; } this.dtp.emitter.to(channelId).emit('user-chat', payload); session.notifySpamMuzzle = false; } findStickers (content) { const tokens = content.split(' '); const stickers = [ ]; tokens.forEach((token) => { if ((token[0] !== ':') || (token[token.length -1 ] !== ':')) { return; } token = token.slice(1, token.length - 1 ).toLowerCase(); if (token.includes('/') || token.includes(':') || token.includes(' ')) { return; // trimmed token includes invalid characters } this.log.debug('found sticker request', { token }); if (!stickers.includes(token)) { stickers.push(striptags(token)); } }); return stickers.slice(0, 4); } async processSlashCommand (message) { const { action: actionService, user: userService } = this.dtp.services; if (!message || !message.content || (typeof message.content !== 'string') || (message.content.length === 0)) { return; } /* * Split the message and verify that we got tokens */ const tokens = message.content.split(' '); if (!Array.isArray(tokens) || (tokens.length === 0)) { return; } /* * If the first token does not begin with a slash, abort. */ if (tokens[0][0] !== '/') { return; } /* * Determine what command is being run */ const COMMAND_LIST = [ 'action', ]; const COMMAND_MAP = { 'a': 'action', 'ac': 'action', }; let command = tokens.shift().slice(1); this.log.debug('running command', { command }); if (!COMMAND_LIST.includes(command)) { command = COMMAND_MAP[command]; if (!command) { throw new Error('Unknown command'); } } switch (command) { case 'action': let verb = tokens.shift().trim().toLowerCase(); if (!actionService.VERB_LIST.includes(verb)) { verb = actionService.SHORT_VERB_MAP[verb]; if (!verb) { throw new Error('Invalid verb'); } } const targets = await userService.resolveUsernames(tokens.shift().split(',')); if (!Array.isArray(targets) || (targets.length === 0)) { throw new Error('Invalid username(s)'); } this.log.info('processing action command', { command, verb, targets: targets.map((target) => target._id) }); message.content = `${targets[0].username}: ${tokens.join(' ')}`; message.action = { verb, target: targets[0]._id }; break; default: this.log.error('invalid slash command', { command }); throw new Error('Unknown command'); } } }