// site-ioserver.js // Copyright (C) 2024 DTP Technologies, LLC // All Rights Reserved 'use strict'; import { createAdapter } from '@socket.io/redis-adapter'; import mongoose from 'mongoose'; const { ObjectId } = mongoose.Types; const ConnectToken = mongoose.model('ConnectToken'); const Image = mongoose.model('Image'); // jshint ignore:line import { SiteLog } from './site-log.js'; import { SiteCommon } from './site-common.js'; export class SiteIoServer extends SiteCommon { static get name ( ) { return 'SiteIoServer'; } static get slug ( ) { return 'ioServer'; } constructor (dtp) { super(dtp); this.dtp = dtp; this.log = new SiteLog(dtp, SiteIoServer); this.sessions = { }; } async start (httpServer) { const { Server } = await import('socket.io'); const { RateLimiterRedis } = await import('rate-limiter-flexible'); this.channelJoinLimiter = new RateLimiterRedis({ storeClient: this.dtp.redis, points: 30, duration: 60, blockDuration: 60 * 60, execEvenly: false, keyPrefix: 'iorl:joinchannel:', }); const transports = [ 'websocket', // 'polling', ]; this.log.info('creating Socket.io <-> Redis adapter'); this.adapterPubClient = this.dtp.redis.duplicate(); this.adapterSubClient = this.dtp.redis.duplicate(); this.adapter = createAdapter(this.adapterPubClient, this.adapterSubClient); this.io = new Server(httpServer, { adapter: this.adapter, transports }); this.io.adapter(this.adapter); this.io.on('connection', this.onSocketConnect.bind(this)); } async onRedisError (error) { this.log.error('Redis error', { error }); } async shutdown ( ) { this.log.alert('Socket.io server shutting down'); for (const socketId in this.sessions) { const session = this.sessions[socketId]; this.log.debug('closing socket connection', { socketId }); await session.socket.disconnect(true); } } async onSocketConnect (socket) { const { channel: channelService } = this.dtp.services; this.log.debug('socket connection', { sid: socket.id }); try { const token = await ConnectToken .findOne({ token: socket.handshake.auth.token }) .populate([ { path: 'consumer', select: '+membership +flags +flags.isCloaked', populate: [ { path: 'membership', model: 'Membership', select: 'status granted expires started plan', strictPopulate: false, populate: [ { path: 'plan', model: 'MembershipPlan', select: 'product.name tier' }, ], }, ], }, ]) .lean(); if (!token) { this.log.alert('rejecting invalid socket token', { sid: socket.sid, handshake: socket.handshake, }); socket.disconnect(true); return; } if (token.consumer) { if (token.consumer.password) { delete token.consumer.password; } if (token.consumer.passwordSalt) { delete token.consumer.passwordSalt; } } if (token.claimed) { this.log.alert('rejecting use of claimed connect token', { sid: socket.id, handshake: socket.handshake, }); socket.disconnect(true); return; } await ConnectToken.updateOne( { _id: token._id }, { $set: { claimed: new Date() } }, ); this.log.debug('token claimed', { sid: socket.id, token: socket.handshake.auth.token, consumerType: token.consumerType, consumerId: token.consumer._id, }); const session = { socket, joinedChannels: new Set(), joinedRooms: new Map(), }; this.sessions[socket.id] = session; session.onSocketDisconnect = this.onSocketDisconnect.bind(this, session); socket.on('disconnect', session.onSocketDisconnect); session.onJoinChannel = this.onJoinChannel.bind(this, session); socket.on('join', session.onJoinChannel); session.onLeaveChannel = this.onLeaveChannel.bind(this, session); socket.on('leave', session.onLeaveChannel); switch (token.consumerType) { case 'User': session.user = { _id: token.consumer._id, created: token.consumer.created, username: token.consumer.username, username_lc: token.consumer.username_lc, displayName: token.consumer.displayName, flags: token.consumer.flags, membership: token.consumer.membership, balances: token.consumer.balances, }; if (token.consumer.picture && token.consumer.picture.small) { session.user.picture = { small: await Image.findOne({ _id: token.consumer.picture.small }), }; } socket.emit('user-authenticated', { message: 'token verified', user: session.user, }); break; case 'Channel': session.channel = await channelService.getChannelById(token.consumer._id); if (session.channel.widgetKey) { delete session.channel.widgetKey; session.isWidget = true; } if (session.channel.streamKey) { delete session.channel.streamKey; } this.log.info('starting Channel session', { channel: session.channel.name }); socket.emit('widget-authenticated', { message: 'token verified', channel: session.channel, }); break; } } catch (error) { this.log.error('failed to process a socket connection', { error }); } } async onSocketDisconnect (session, reason) { const { task: taskService } = this.dtp.services; this.log.debug('socket disconnect', { sid: session.socket.id, consumerId: (session.user || session.channel)._id, joinedRooms: session.joinedRooms.size, reason, }); try { this.log.info('closing open task sessions for user', { user: { _id: session.user._id, username: session.user.username, }, }); await taskService.closeTaskSessionForUser(session.user); } catch (error) { this.log.error('failed to close active task sessions for user', { user: { _id: session.user._id, username: session.user.username, }, error, }); // fall through } if (session.onSocketDisconnect) { session.socket.off('disconnect', session.onSocketDisconnect); delete session.onSocketDisconnect; } if (session.onJoinChannel) { session.socket.off('join', session.onJoinChannel); delete session.onJoinChannel; } if (session.onLeaveChannel) { session.socket.off('leave', session.onLeaveChannel); delete session.onLeaveChannel; } delete this.sessions[session.socket.id]; } async onJoinChannel (session, message) { const { channelType } = message; try { await this.channelJoinLimiter.consume(session.socket.id, 1); } catch (error) { session.socket.disconnect(true); this.log.alert('banned client IP due to channel join limit', { sip: session.socket.handshake.address }); return; } this.log.info('onJoinChannel', { sid: session.socket.id, channelType: message.channelType, channelId: message.channelId, }); try { switch (channelType) { case 'User': return this.onJoinUserChannel(session, message); default: break; } } catch (error) { this.log.error('failed to join channel', { error }); } } async onJoinUserChannel (session, message) { if (!session.user || !session.user._id) { return session.socket.emit('join-result', { authResult: 'access-denied' }); } const channelId = message.channelId; const channelComponents = channelId.split(':'); const parsedChannelId = ObjectId.createFromHexString((channelComponents[0] === 'broadcast') ? channelComponents[1] : channelComponents[0]); if (parsedChannelId.equals(session.user._id)) { this.log.debug('user joins their own channel', { sid: session.socket.id, consumerId: parsedChannelId, userId: session.user._id, channelId: channelId, }); session.joinedChannels.add(channelId); session.socket.join(channelId.toString()); return session.socket.emit('join-result', { socketId: session.socket.id, channelId: channelId, isPersonal: true, }); } return this.onJoinStreamChannel(session, message); } async onLeaveChannel (session, message) { const { channelId } = message; if (!session.joinedChannels.has(channelId)) { return; } this.log.debug('socket leaves channel', { sid: session.socket.id, user: (session.user || session.channel)._id, channelId }); session.socket.leave(channelId); session.joinedChannels.delete(channelId); } }