DTP Base provides a scalable and secure Node.js application development harness ready for production service.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

320 lines
9.2 KiB

// site-ioserver.js
// Copyright (C) 2024 DTP Technologies, LLC
// All Rights Reserved
'use strict';
import { createAdapter } from '@socket.io/redis-adapter';
import mongoose from 'mongoose';
const { ObjectId } = mongoose.Types;
const ConnectToken = mongoose.model('ConnectToken');
const Image = mongoose.model('Image'); // jshint ignore:line
import { SiteLog } from './site-log.js';
import { SiteCommon } from './site-common.js';
export class SiteIoServer extends SiteCommon {
static get name ( ) { return 'SiteIoServer'; }
static get slug ( ) { return 'ioServer'; }
constructor (dtp) {
super(dtp);
this.dtp = dtp;
this.log = new SiteLog(dtp, SiteIoServer);
this.sessions = { };
}
async start (httpServer) {
const { Server } = await import('socket.io');
const { RateLimiterRedis } = await import('rate-limiter-flexible');
this.channelJoinLimiter = new RateLimiterRedis({
storeClient: this.dtp.redis,
points: 30,
duration: 60,
blockDuration: 60 * 60,
execEvenly: false,
keyPrefix: 'iorl:joinchannel:',
});
const transports = [
'websocket',
// 'polling',
];
this.log.info('creating Socket.io <-> Redis adapter');
this.adapterPubClient = this.dtp.redis.duplicate();
this.adapterSubClient = this.dtp.redis.duplicate();
this.adapter = createAdapter(this.adapterPubClient, this.adapterSubClient);
this.io = new Server(httpServer, {
adapter: this.adapter, transports,
pingInterval: 5000,
pingTimeout: 3000,
});
this.io.adapter(this.adapter);
this.io.on('connection', this.onSocketConnect.bind(this));
}
async onRedisError (error) {
this.log.error('Redis error', { error });
}
async shutdown ( ) {
this.log.alert('Socket.io server shutting down');
for (const socketId in this.sessions) {
const session = this.sessions[socketId];
this.log.debug('closing socket connection', { socketId });
await session.socket.disconnect(true);
}
}
async onSocketConnect (socket) {
const { channel: channelService } = this.dtp.services;
this.log.debug('socket connection', { sid: socket.id });
try {
const token = await ConnectToken
.findOne({ token: socket.handshake.auth.token })
.populate([
{
path: 'consumer',
select: '+membership +flags +flags.isCloaked',
populate: [
{
path: 'membership',
model: 'Membership',
select: 'status granted expires started plan',
strictPopulate: false,
populate: [
{
path: 'plan',
model: 'MembershipPlan',
select: 'product.name tier'
},
],
},
],
},
])
.lean();
if (!token) {
this.log.alert('rejecting invalid socket token', {
sid: socket.sid,
handshake: socket.handshake,
});
socket.disconnect(true);
return;
}
if (token.consumer) {
if (token.consumer.password) {
delete token.consumer.password;
}
if (token.consumer.passwordSalt) {
delete token.consumer.passwordSalt;
}
}
if (token.claimed) {
this.log.alert('rejecting use of claimed connect token', {
sid: socket.id,
handshake: socket.handshake,
});
socket.disconnect(true);
return;
}
await ConnectToken.updateOne(
{ _id: token._id },
{ $set: { claimed: new Date() } },
);
this.log.debug('token claimed', {
sid: socket.id,
token: socket.handshake.auth.token,
consumerType: token.consumerType,
consumerId: token.consumer._id,
});
const session = {
socket,
joinedChannels: new Set(),
joinedRooms: new Map(),
};
this.sessions[socket.id] = session;
session.onSocketDisconnect = this.onSocketDisconnect.bind(this, session);
socket.on('disconnect', session.onSocketDisconnect);
session.onJoinChannel = this.onJoinChannel.bind(this, session);
socket.on('join', session.onJoinChannel);
session.onLeaveChannel = this.onLeaveChannel.bind(this, session);
socket.on('leave', session.onLeaveChannel);
switch (token.consumerType) {
case 'User':
session.user = {
_id: token.consumer._id,
created: token.consumer.created,
username: token.consumer.username,
username_lc: token.consumer.username_lc,
displayName: token.consumer.displayName,
flags: token.consumer.flags,
membership: token.consumer.membership,
balances: token.consumer.balances,
};
if (token.consumer.picture && token.consumer.picture.small) {
session.user.picture = {
small: await Image.findOne({ _id: token.consumer.picture.small }),
};
}
socket.emit('user-authenticated', {
message: 'token verified',
user: session.user,
});
break;
case 'Channel':
session.channel = await channelService.getChannelById(token.consumer._id);
if (session.channel.widgetKey) {
delete session.channel.widgetKey;
session.isWidget = true;
}
if (session.channel.streamKey) {
delete session.channel.streamKey;
}
this.log.info('starting Channel session', { channel: session.channel.name });
socket.emit('widget-authenticated', {
message: 'token verified',
channel: session.channel,
});
break;
}
} catch (error) {
this.log.error('failed to process a socket connection', { error });
}
}
async onSocketDisconnect (session, reason) {
const { task: taskService } = this.dtp.services;
this.log.debug('socket disconnect', {
sid: session.socket.id,
consumerId: (session.user || session.channel)._id,
joinedRooms: session.joinedRooms.size,
reason,
});
try {
this.log.info('closing open task sessions for user', {
user: {
_id: session.user._id,
username: session.user.username,
},
});
await taskService.closeTaskSessionForUser(session.user);
} catch (error) {
this.log.error('failed to close active task sessions for user', {
user: {
_id: session.user._id,
username: session.user.username,
},
error,
});
// fall through
}
if (session.onSocketDisconnect) {
session.socket.off('disconnect', session.onSocketDisconnect);
delete session.onSocketDisconnect;
}
if (session.onJoinChannel) {
session.socket.off('join', session.onJoinChannel);
delete session.onJoinChannel;
}
if (session.onLeaveChannel) {
session.socket.off('leave', session.onLeaveChannel);
delete session.onLeaveChannel;
}
delete this.sessions[session.socket.id];
}
async onJoinChannel (session, message) {
const { channelType } = message;
try {
await this.channelJoinLimiter.consume(session.socket.id, 1);
} catch (error) {
session.socket.disconnect(true);
this.log.alert('banned client IP due to channel join limit', { sip: session.socket.handshake.address });
return;
}
this.log.info('onJoinChannel', {
sid: session.socket.id,
channelType: message.channelType,
channelId: message.channelId,
});
try {
switch (channelType) {
case 'User':
return this.onJoinUserChannel(session, message);
default:
break;
}
} catch (error) {
this.log.error('failed to join channel', { error });
}
}
async onJoinUserChannel (session, message) {
if (!session.user || !session.user._id) {
return session.socket.emit('join-result', { authResult: 'access-denied' });
}
const channelId = message.channelId;
const channelComponents = channelId.split(':');
const parsedChannelId = ObjectId.createFromHexString((channelComponents[0] === 'broadcast') ? channelComponents[1] : channelComponents[0]);
if (parsedChannelId.equals(session.user._id)) {
this.log.debug('user joins their own channel', {
sid: session.socket.id,
consumerId: parsedChannelId,
userId: session.user._id,
channelId: channelId,
});
session.joinedChannels.add(channelId);
session.socket.join(channelId.toString());
return session.socket.emit('join-result', {
socketId: session.socket.id,
channelId: channelId,
isPersonal: true,
});
}
return this.onJoinStreamChannel(session, message);
}
async onLeaveChannel (session, message) {
const { channelId } = message;
if (!session.joinedChannels.has(channelId)) {
return;
}
this.log.debug('socket leaves channel', { sid: session.socket.id, user: (session.user || session.channel)._id, channelId });
session.socket.leave(channelId);
session.joinedChannels.delete(channelId);
}
}