You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
253 lines
7.6 KiB
253 lines
7.6 KiB
// canvas-io-server.js
|
|
// Copyright (C) 2022 Rob Colbert @[email protected]
|
|
// License: Apache-2.0
|
|
|
|
'use strict';
|
|
|
|
import fs from 'fs';
|
|
import { dirname } from 'path';
|
|
import { fileURLToPath } from 'url';
|
|
const __dirname = dirname(fileURLToPath(import.meta.url)); // jshint ignore:line
|
|
|
|
import { Server } from 'socket.io';
|
|
import { createAdapter } from '@socket.io/redis-adapter';
|
|
|
|
import { RateLimiterRedis } from 'rate-limiter-flexible';
|
|
import { CronJob } from 'cron';
|
|
|
|
import CanvasImage from './lib/canvas-image.js';
|
|
|
|
export default class CanvasIoServer {
|
|
|
|
constructor (app) {
|
|
this.app = app;
|
|
|
|
this.limiters = {
|
|
getImageData: new RateLimiterRedis({ storeClient: this.app.redis, points: 10, duration: 60 }),
|
|
setPixel: new RateLimiterRedis({ storeClient: this.app.redis, points: 10, duration: 60 }),
|
|
};
|
|
|
|
this.images = { };
|
|
}
|
|
|
|
async start ( ) {
|
|
const images = await CanvasImage.getImages(this.app.redis);
|
|
for (const image of images) {
|
|
await this.loadImage(image);
|
|
}
|
|
|
|
const pubClient = this.app.redis.duplicate();
|
|
pubClient.on('error', this.onRedisError.bind(this));
|
|
|
|
const subClient = this.app.redis.duplicate();
|
|
subClient.on('error', this.onRedisError.bind(this));
|
|
|
|
const transports = ['websocket'/*, 'polling'*/];
|
|
const adapter = createAdapter(pubClient, subClient);
|
|
|
|
await subClient.subscribe('mpcvs:admin');
|
|
subClient.on('message', this.onPubSubMessage.bind(this));
|
|
|
|
this.io = new Server(this.app.httpServer, { adapter, transports });
|
|
this.io.on('connection', this.onSocketConnect.bind(this));
|
|
|
|
console.log('starting image backup cron');
|
|
this.backupCron = new CronJob(
|
|
'*/15 * * * * *',
|
|
this.onImageBackup.bind(this),
|
|
null,
|
|
true,
|
|
'America/New_York',
|
|
);
|
|
|
|
console.log('CanvasIoServer ready');
|
|
}
|
|
|
|
async onRedisError (error) {
|
|
console.log('Redis error', error);
|
|
}
|
|
|
|
async onSocketConnect (socket) {
|
|
const CONNECT_TOKEN = socket.handshake.auth.token;
|
|
console.log('socket connection', { sid: socket.id, token: CONNECT_TOKEN });
|
|
|
|
const TOKEN_KEY = `connect-token:${CONNECT_TOKEN}`;
|
|
|
|
const tokenData = await this.app.redis.get(TOKEN_KEY);
|
|
if (!tokenData) {
|
|
console.log('rejecting invalid socket token', {
|
|
sid: socket.sid,
|
|
handshake: socket.handshake,
|
|
});
|
|
socket.disconnect(true);
|
|
return;
|
|
}
|
|
|
|
await this.app.redis.del(TOKEN_KEY); // immediately remove
|
|
|
|
const token = JSON.parse(tokenData);
|
|
const IMAGE_ID = token.imageId;
|
|
|
|
const session = { CONNECT_TOKEN, IMAGE_ID, socket };
|
|
|
|
session.onSocketDisconnect = this.onSocketDisconnect.bind(this, session);
|
|
session.onJoinChannel = this.onJoinChannel.bind(this, session);
|
|
session.onLeaveChannel = this.onLeaveChannel.bind(this, session);
|
|
|
|
session.onGetImageData = this.onGetImageData.bind(this, session);
|
|
session.onSetPixel = this.onSetPixel.bind(this, session);
|
|
|
|
socket.on('disconnect', session.onSocketDisconnect);
|
|
socket.on('join', session.onJoinChannel);
|
|
socket.on('leave', session.onLeaveChannel);
|
|
|
|
socket.on('getimagedata', session.onGetImageData);
|
|
socket.on('setpixel', session.onSetPixel);
|
|
|
|
socket.emit('authenticated', { message: 'token verified' });
|
|
}
|
|
|
|
async onSocketDisconnect (session, reason) {
|
|
const { CONNECT_TOKEN, IMAGE_ID, socket } = session;
|
|
console.log('socket disconnect', { sid: socket.id, CONNECT_TOKEN, IMAGE_ID, reason });
|
|
session.socket.off('disconnect', session.onSocketDisconnect);
|
|
session.socket.off('join', session.onJoinChannel);
|
|
session.socket.off('leave', session.onLeaveChannel);
|
|
}
|
|
|
|
async onJoinChannel (session, message) {
|
|
const { channelId } = message;
|
|
console.log('socket joins channel', { sid: session.socket.id, token: session.CONNECT_TOKEN, channelId });
|
|
session.socket.join(channelId);
|
|
session.socket.emit('join-result', { channelId });
|
|
}
|
|
|
|
async onLeaveChannel (session, message) {
|
|
const { channelId } = message;
|
|
console.log('socket leaves channel', { sid: session.socket.id, token: session.CONNECT_TOKEN, channelId });
|
|
session.socket.leave(channelId);
|
|
}
|
|
|
|
async onGetImageData (session) {
|
|
const { CONNECT_TOKEN, IMAGE_ID, socket } = session;
|
|
console.log('onGetImageData', { sid: session.socket.id, CONNECT_TOKEN, IMAGE_ID });
|
|
|
|
try {
|
|
const limiterKey = `${IMAGE_ID}:${CONNECT_TOKEN}`;
|
|
await this.limiters.getImageData.consume(limiterKey, 1);
|
|
} catch (error) {
|
|
console.log('onGetImageData rate limit exceeded', { CONNECT_TOKEN, IMAGE_ID });
|
|
socket.emit('canvas-error', { command: 'getimagedata', message: error.message });
|
|
return;
|
|
}
|
|
|
|
const image = this.images[IMAGE_ID];
|
|
if (!image) {
|
|
socket.emit('canvas-error', { message: 'image not found' });
|
|
return;
|
|
}
|
|
|
|
session.socket.emit('canvas-image-data', {
|
|
width: image.image.bitmap.width,
|
|
height: image.image.bitmap.height,
|
|
data: Buffer.from(image.image.bitmap.data),
|
|
});
|
|
}
|
|
|
|
async onSetPixel (session, message) {
|
|
const { CONNECT_TOKEN, IMAGE_ID, socket } = session;
|
|
try {
|
|
const limiterKey = `${IMAGE_ID}:${CONNECT_TOKEN}`;
|
|
const limiter = await this.limiters.setPixel.consume(limiterKey, 1);
|
|
socket.emit('canvas-limiter-update', limiter);
|
|
} catch (error) {
|
|
console.log('failed to setpixel', error);
|
|
socket.emit('canvas-error', { command: 'setpixel', error });
|
|
return;
|
|
}
|
|
|
|
const image = this.images[IMAGE_ID];
|
|
if (!image) {
|
|
socket.emit('canvas-error', { command: 'setpixel', message: 'Image does not exist' });
|
|
return;
|
|
}
|
|
|
|
console.log('onSetPixel', IMAGE_ID, message);
|
|
image.setPixel(message);
|
|
|
|
this.io.in(IMAGE_ID).emit('canvas-setpixel', {
|
|
x: message.x,
|
|
y: message.y,
|
|
r: message.r,
|
|
g: message.g,
|
|
b: message.b,
|
|
});
|
|
}
|
|
|
|
async onImageBackup ( ) {
|
|
for (const imageId in this.images) {
|
|
const image = this.images[imageId];
|
|
if (image.dirty) {
|
|
await image.save();
|
|
}
|
|
}
|
|
}
|
|
|
|
async loadImage (imageMetadata) {
|
|
const { id, name, width, height } = imageMetadata;
|
|
console.log('loading image', imageMetadata);
|
|
|
|
const image = new CanvasImage(id, name || 'Test Image');
|
|
try {
|
|
await image.load();
|
|
} catch (error) {
|
|
console.log('image load failed', error);
|
|
console.log('creating new image', { id, name, width, height });
|
|
image.create(width, height);
|
|
await image.save();
|
|
}
|
|
|
|
if (image.width !== width || image.height !== height) {
|
|
throw new Error('Image file geometry mismatch');
|
|
}
|
|
|
|
this.images[image.id] = image;
|
|
}
|
|
|
|
/**
|
|
* Receives Redis pub/sub messages for this process. Determines if the message
|
|
* is for a channel we care about and, if so, calls the processor for that
|
|
* message type.
|
|
* @param {Object} message The message received
|
|
*/
|
|
async onPubSubMessage (channel, message) {
|
|
if (channel !== 'mpcvs:admin') {
|
|
return; // not one of our messages
|
|
}
|
|
|
|
message = JSON.parse(message);
|
|
switch (message.command) {
|
|
case 'image-create':
|
|
return this.onImageCreate(message);
|
|
default:
|
|
break;
|
|
}
|
|
|
|
throw new Error(`Invalid mpcsv:admin command: ${message.command}`);
|
|
}
|
|
|
|
async onImageCreate (message) {
|
|
const { id, name, width, height } = message.image;
|
|
console.log('creating image', { id, width, height });
|
|
|
|
const image = new CanvasImage(id, name);
|
|
image.create(width, height);
|
|
|
|
const IMAGE_KEY = image.key;
|
|
console.log('registering image in Redis', { id, key: IMAGE_KEY });
|
|
await this.app.redis.hset(IMAGE_KEY, 'name', image.name, 'width', image.width, 'height', image.height);
|
|
|
|
await image.save();
|
|
this.images[image.id] = image;
|
|
}
|
|
}
|