// canvas-io-server.js // Copyright (C) 2022 Rob Colbert @rob@nicecrew.digital // License: Apache-2.0 'use strict'; import fs from 'fs'; import { dirname } from 'path'; import { fileURLToPath } from 'url'; const __dirname = dirname(fileURLToPath(import.meta.url)); // jshint ignore:line import { Server } from 'socket.io'; import { createAdapter } from '@socket.io/redis-adapter'; import { RateLimiterRedis } from 'rate-limiter-flexible'; import { CronJob } from 'cron'; import CanvasImage from './lib/canvas-image.js'; export default class CanvasIoServer { constructor (app) { this.app = app; this.limiters = { getImageData: new RateLimiterRedis({ storeClient: this.app.redis, points: 10, duration: 60 }), setPixel: new RateLimiterRedis({ storeClient: this.app.redis, points: 10, duration: 60 }), }; this.images = { }; } async start ( ) { const images = await CanvasImage.getImages(this.app.redis); for (const image of images) { await this.loadImage(image); } const pubClient = this.app.redis.duplicate(); pubClient.on('error', this.onRedisError.bind(this)); const subClient = this.app.redis.duplicate(); subClient.on('error', this.onRedisError.bind(this)); const transports = ['websocket'/*, 'polling'*/]; const adapter = createAdapter(pubClient, subClient); await subClient.subscribe('mpcvs:admin'); subClient.on('message', this.onPubSubMessage.bind(this)); this.io = new Server(this.app.httpServer, { adapter, transports }); this.io.on('connection', this.onSocketConnect.bind(this)); console.log('starting image backup cron'); this.backupCron = new CronJob( '*/15 * * * * *', this.onImageBackup.bind(this), null, true, 'America/New_York', ); console.log('CanvasIoServer ready'); } async onRedisError (error) { console.log('Redis error', error); } async onSocketConnect (socket) { const CONNECT_TOKEN = socket.handshake.auth.token; console.log('socket connection', { sid: socket.id, token: CONNECT_TOKEN }); const TOKEN_KEY = `connect-token:${CONNECT_TOKEN}`; const tokenData = await this.app.redis.get(TOKEN_KEY); if (!tokenData) { console.log('rejecting invalid socket token', { sid: socket.sid, handshake: socket.handshake, }); socket.disconnect(true); return; } await this.app.redis.del(TOKEN_KEY); // immediately remove const token = JSON.parse(tokenData); const IMAGE_ID = token.imageId; const session = { CONNECT_TOKEN, IMAGE_ID, socket }; session.onSocketDisconnect = this.onSocketDisconnect.bind(this, session); session.onJoinChannel = this.onJoinChannel.bind(this, session); session.onLeaveChannel = this.onLeaveChannel.bind(this, session); session.onGetImageData = this.onGetImageData.bind(this, session); session.onSetPixel = this.onSetPixel.bind(this, session); socket.on('disconnect', session.onSocketDisconnect); socket.on('join', session.onJoinChannel); socket.on('leave', session.onLeaveChannel); socket.on('getimagedata', session.onGetImageData); socket.on('setpixel', session.onSetPixel); socket.emit('authenticated', { message: 'token verified' }); } async onSocketDisconnect (session, reason) { const { CONNECT_TOKEN, IMAGE_ID, socket } = session; console.log('socket disconnect', { sid: socket.id, CONNECT_TOKEN, IMAGE_ID, reason }); session.socket.off('disconnect', session.onSocketDisconnect); session.socket.off('join', session.onJoinChannel); session.socket.off('leave', session.onLeaveChannel); } async onJoinChannel (session, message) { const { channelId } = message; console.log('socket joins channel', { sid: session.socket.id, token: session.CONNECT_TOKEN, channelId }); session.socket.join(channelId); session.socket.emit('join-result', { channelId }); } async onLeaveChannel (session, message) { const { channelId } = message; console.log('socket leaves channel', { sid: session.socket.id, token: session.CONNECT_TOKEN, channelId }); session.socket.leave(channelId); } async onGetImageData (session) { const { CONNECT_TOKEN, IMAGE_ID, socket } = session; console.log('onGetImageData', { sid: session.socket.id, CONNECT_TOKEN, IMAGE_ID }); try { const limiterKey = `${IMAGE_ID}:${CONNECT_TOKEN}`; await this.limiters.getImageData.consume(limiterKey, 1); } catch (error) { console.log('onGetImageData rate limit exceeded', { CONNECT_TOKEN, IMAGE_ID }); socket.emit('canvas-error', { command: 'getimagedata', message: error.message }); return; } const image = this.images[IMAGE_ID]; if (!image) { socket.emit('canvas-error', { message: 'image not found' }); return; } session.socket.emit('canvas-image-data', { width: image.image.bitmap.width, height: image.image.bitmap.height, data: Buffer.from(image.image.bitmap.data), }); } async onSetPixel (session, message) { const { CONNECT_TOKEN, IMAGE_ID, socket } = session; try { const limiterKey = `${IMAGE_ID}:${CONNECT_TOKEN}`; const limiter = await this.limiters.setPixel.consume(limiterKey, 1); socket.emit('canvas-limiter-update', limiter); } catch (error) { console.log('failed to setpixel', error); socket.emit('canvas-error', { command: 'setpixel', error }); return; } const image = this.images[IMAGE_ID]; if (!image) { socket.emit('canvas-error', { command: 'setpixel', message: 'Image does not exist' }); return; } console.log('onSetPixel', IMAGE_ID, message); image.setPixel(message); this.io.in(IMAGE_ID).emit('canvas-setpixel', { x: message.x, y: message.y, r: message.r, g: message.g, b: message.b, }); } async onImageBackup ( ) { for (const imageId in this.images) { const image = this.images[imageId]; if (image.dirty) { await image.save(); } } } async loadImage (imageMetadata) { const { id, name, width, height } = imageMetadata; console.log('loading image', imageMetadata); const image = new CanvasImage(id, name || 'Test Image'); try { await image.load(); } catch (error) { console.log('image load failed', error); console.log('creating new image', { id, name, width, height }); image.create(width, height); await image.save(); } if (image.width !== width || image.height !== height) { throw new Error('Image file geometry mismatch'); } this.images[image.id] = image; } /** * Receives Redis pub/sub messages for this process. Determines if the message * is for a channel we care about and, if so, calls the processor for that * message type. * @param {Object} message The message received */ async onPubSubMessage (channel, message) { if (channel !== 'mpcvs:admin') { return; // not one of our messages } message = JSON.parse(message); switch (message.command) { case 'image-create': return this.onImageCreate(message); default: break; } throw new Error(`Invalid mpcsv:admin command: ${message.command}`); } async onImageCreate (message) { const { id, name, width, height } = message.image; console.log('creating image', { id, width, height }); const image = new CanvasImage(id, name); image.create(width, height); const IMAGE_KEY = image.key; console.log('registering image in Redis', { id, key: IMAGE_KEY }); await this.app.redis.hset(IMAGE_KEY, 'name', image.name, 'width', image.width, 'height', image.height); await image.save(); this.images[image.id] = image; } }