You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
356 lines
12 KiB
356 lines
12 KiB
// stickers.js
|
|
// Copyright (C) 2022 DTP Technologies, LLC
|
|
// License: Apache-2.0
|
|
|
|
'use strict';
|
|
|
|
const DTP_STICKER_HEIGHT = 100;
|
|
|
|
const path = require('path');
|
|
const fs = require('fs');
|
|
require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') });
|
|
|
|
const mongoose = require('mongoose');
|
|
|
|
const { SitePlatform, SiteLog, SiteWorker } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib'));
|
|
|
|
const sharp = require('sharp');
|
|
|
|
module.pkg = require(path.resolve(__dirname, '..', '..', 'package.json'));
|
|
module.config = {
|
|
environment: process.env.NODE_ENV,
|
|
root: path.resolve(__dirname, '..', '..'),
|
|
component: { name: 'stickersWorker', slug: 'stickers-worker' },
|
|
};
|
|
|
|
class StickerWorker extends SiteWorker {
|
|
|
|
constructor (dtp) {
|
|
super(dtp, dtp.config.component);
|
|
|
|
this.processors = {
|
|
processStickerSharp: this.processStickerSharp.bind(this),
|
|
processStickerFFMPEG: this.processStickerFFMPEG.bind(this),
|
|
};
|
|
}
|
|
|
|
async start ( ) {
|
|
await super.start();
|
|
const { jobQueue: jobQueueService } = this.dtp.services;
|
|
|
|
this.log.info('registering sticker-ingest job processor', {
|
|
config: this.dtp.config.jobQueues['sticker-ingest'],
|
|
});
|
|
this.stickerProcessingQueue = jobQueueService.getJobQueue(
|
|
'sticker-ingest',
|
|
this.dtp.config.jobQueues['sticker-ingest'],
|
|
);
|
|
|
|
this.stickerProcessingQueue.process('sticker-ingest', 1, this.processStickerIngest.bind(this));
|
|
this.stickerProcessingQueue.process('sticker-delete', 1, this.processStickerDelete.bind(this));
|
|
}
|
|
|
|
async stop ( ) {
|
|
if (this.stickerProcessingQueue) {
|
|
try {
|
|
this.log.info('closing sticker-ingest job queue');
|
|
await this.stickerProcessingQueue.close();
|
|
delete this.stickerProcessingQueue;
|
|
} catch (error) {
|
|
this.log.error('failed to close sticker ingest job queue', { error });
|
|
// fall through
|
|
}
|
|
}
|
|
await super.stop();
|
|
}
|
|
|
|
async processStickerIngest (job) {
|
|
try {
|
|
this.log.info('received sticker ingest job', { id: job.id, data: job.data });
|
|
await this.fetchSticker(job); // defines jobs.data.processor
|
|
await this.resetSticker(job);
|
|
|
|
// call the chosen file processor to render the sticker for distribution
|
|
await this.processors[job.data.processor](job);
|
|
|
|
//TODO: emit a completion event which should cause a refresh of the
|
|
// creator's view to display the processed sticker
|
|
} catch (error) {
|
|
this.log.error('failed to process sticker', { stickerId: job.data.stickerId, error });
|
|
throw error;
|
|
} finally {
|
|
if (job.data.workPath) {
|
|
this.log.info('cleaning up sticker work path', { workPath: job.data.workPath });
|
|
await fs.promises.rm(job.data.workPath, { recursive: true });
|
|
}
|
|
}
|
|
}
|
|
|
|
async fetchSticker (job) {
|
|
const { minio: minioService, sticker: stickerService } = this.dtp.services;
|
|
job.data.sticker = await stickerService.getById(job.data.stickerId, true);
|
|
|
|
job.data.workPath = path.join(
|
|
process.env.DTP_STICKER_WORK_PATH,
|
|
this.dtp.config.component.slug,
|
|
job.data.sticker._id.toString(),
|
|
);
|
|
|
|
this.jobLog(job, 'creating work directory', { worthPath: job.data.workPath });
|
|
await fs.promises.mkdir(job.data.workPath, { recursive: true });
|
|
|
|
switch (job.data.sticker.original.type) {
|
|
case 'image/jpeg':
|
|
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.jpg`);
|
|
job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.jpg`);
|
|
job.data.processor = 'processStickerSharp';
|
|
job.data.sharpFormat = 'jpeg';
|
|
job.data.sharpFormatParameters = { quality: 85 };
|
|
break;
|
|
|
|
case 'image/png':
|
|
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.png`);
|
|
job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.png`);
|
|
job.data.processor = 'processStickerSharp';
|
|
job.data.sharpFormat = 'png';
|
|
job.data.sharpFormatParameters = { compression: 9 };
|
|
break;
|
|
|
|
case 'image/gif':
|
|
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.gif`);
|
|
job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.mp4`);
|
|
job.data.processor = 'processStickerFFMPEG';
|
|
break;
|
|
|
|
case 'image/webp': // process as PNG
|
|
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.webp`);
|
|
job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.png`);
|
|
job.data.processor = 'processStickerSharp';
|
|
job.data.sharpFormat = 'png';
|
|
job.data.sharpFormatParameters = { compression: 9 };
|
|
break;
|
|
|
|
case 'image/webm': // process as MP4
|
|
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.webm`);
|
|
job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.mp4`);
|
|
job.data.processor = 'processStickerFFMPEG';
|
|
break;
|
|
|
|
case 'video/mp4':
|
|
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.mp4`);
|
|
job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.mp4`);
|
|
job.data.processor = 'processStickerFFMPEG';
|
|
break;
|
|
|
|
default:
|
|
throw new Error(`unsupported sticker type: ${job.data.sticker.original.type}`);
|
|
}
|
|
|
|
this.jobLog(job, 'fetching original media', { // blah 62096adcb69874552a0f87bc
|
|
stickerId: job.data.sticker._id,
|
|
slug: job.data.sticker.slug,
|
|
type: job.data.sticker.original.type,
|
|
worthPath: job.data.origFilePath,
|
|
});
|
|
await minioService.downloadFile({
|
|
bucket: job.data.sticker.original.bucket,
|
|
key: job.data.sticker.original.key,
|
|
filePath: job.data.origFilePath,
|
|
});
|
|
}
|
|
|
|
async resetSticker (job) {
|
|
const { minio: minioService } = this.dtp.services;
|
|
const { sticker } = job.data;
|
|
|
|
if (!sticker.encoded) {
|
|
return;
|
|
}
|
|
|
|
this.log.info('removing existing encoded sticker media', { media: sticker.encoded });
|
|
await minioService.removeObject(sticker.encoded.bucket, sticker.encoded.key);
|
|
|
|
// switch sticker back to 'processing' status to prevent use in the app
|
|
const Sticker = mongoose.model('Sticker');
|
|
await Sticker.updateOne(
|
|
{ _id: job.data.sticker._id },
|
|
{
|
|
$set: {
|
|
status: 'processing',
|
|
},
|
|
$unset: {
|
|
encoded: '',
|
|
},
|
|
},
|
|
);
|
|
|
|
delete sticker.encoded;
|
|
}
|
|
|
|
async processStickerSharp (job) {
|
|
const { minio: minioService } = this.dtp.services;
|
|
|
|
const sharpImage = sharp(job.data.origFilePath);
|
|
const metadata = await sharpImage.metadata();
|
|
this.log.info('sticker metadata from Sharp', { stickerId: job.data.sticker._id, metadata });
|
|
|
|
let chain = sharpImage
|
|
.clone()
|
|
.toColorspace('srgb')
|
|
.resize({ height: DTP_STICKER_HEIGHT });
|
|
|
|
chain = chain[job.data.sharpFormat](job.data.sharpFormatParameters);
|
|
|
|
await chain.toFile(job.data.outFilePath);
|
|
|
|
job.data.outFileStat = await fs.promises.stat(job.data.outFilePath);
|
|
|
|
const bucket = process.env.MINIO_VIDEO_BUCKET;
|
|
const key = `/stickers/${job.data.sticker._id.toString().slice(0, 3)}/${job.data.sticker._id}.encoded.${job.data.sharpFormat}`;
|
|
|
|
await minioService.uploadFile({
|
|
bucket,
|
|
key,
|
|
filePath: job.data.outFilePath,
|
|
metadata: {
|
|
'Content-Type': `image/${job.data.sharpFormat}`,
|
|
'Content-Length': job.data.outFileStat.size,
|
|
},
|
|
});
|
|
|
|
const Sticker = mongoose.model('Sticker');
|
|
await Sticker.updateOne(
|
|
{ _id: job.data.sticker._id },
|
|
{
|
|
$set: {
|
|
status: 'live',
|
|
encoded: {
|
|
bucket,
|
|
key,
|
|
type: `image/${job.data.sharpFormat}`,
|
|
size: job.data.outFileStat.size,
|
|
}
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
async processStickerFFMPEG (job) {
|
|
const { media: mediaService, minio: minioService } = this.dtp.services;
|
|
|
|
const codecVideo = (process.env.DTP_GPU_ACCELERATION === 'enabled') ? 'h264_nvenc' : 'libx264';
|
|
|
|
// generate the encoded sticker
|
|
// Output height is 100 lines by [aspect] width with width and height being
|
|
// padded to be divisible by 2. The video stream is given a bit rate of
|
|
// 128Kbps, and the media is flagged for +faststart. Audio is stripped if
|
|
// present.
|
|
|
|
const ffmpegStickerArgs = [
|
|
'-y', '-i', job.data.origFilePath,
|
|
'-vf', `scale=-1:${DTP_STICKER_HEIGHT},pad=ceil(iw/2)*2:ceil(ih/2)*2`,
|
|
'-pix_fmt', 'yuv420p',
|
|
'-c:v', codecVideo,
|
|
'-b:v', '128k',
|
|
'-movflags', '+faststart',
|
|
'-an',
|
|
job.data.outFilePath,
|
|
];
|
|
|
|
this.jobLog(job, `transcoding motion sticker: ${job.data.sticker.slug}`);
|
|
this.log.debug('transcoding motion sticker', { ffmpegStickerArgs });
|
|
await mediaService.ffmpeg(ffmpegStickerArgs);
|
|
|
|
job.data.outFileStat = await fs.promises.stat(job.data.outFilePath);
|
|
|
|
const bucket = process.env.MINIO_VIDEO_BUCKET;
|
|
const key = `/stickers/${job.data.sticker._id.toString().slice(0, 3)}/${job.data.sticker._id}.encoded.mp4`;
|
|
|
|
this.jobLog(job, 'uploading encoded media file');
|
|
await minioService.uploadFile({
|
|
bucket, key,
|
|
filePath: job.data.outFilePath,
|
|
metadata: {
|
|
'Content-Type': 'video/mp4',
|
|
'Content-Length': job.data.outFileStat.size,
|
|
},
|
|
});
|
|
|
|
this.jobLog(job, 'updating Sticker to live status');
|
|
|
|
const Sticker = mongoose.model('Sticker');
|
|
await Sticker.updateOne(
|
|
{ _id: job.data.sticker._id },
|
|
{
|
|
$set: {
|
|
status: 'live',
|
|
encoded: {
|
|
bucket,
|
|
key,
|
|
type: 'video/mp4',
|
|
size: job.data.outFileStat.size,
|
|
},
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
async processStickerDelete (job) {
|
|
const { minio: minioService, sticker: stickerService } = this.dtp.services;
|
|
const Sticker = mongoose.model('Sticker');
|
|
try {
|
|
const sticker = await stickerService.getById(job.data.stickerId, true);
|
|
|
|
this.log.info('removing original media', { stickerId: sticker._id, slug: sticker.slug });
|
|
await minioService.removeObject(sticker.original.bucket, sticker.original.key);
|
|
|
|
if (sticker.encoded) {
|
|
this.log.info('removing encoded media', { stickerId: sticker._id, slug: sticker.slug });
|
|
await minioService.removeObject(sticker.encoded.bucket, sticker.encoded.key);
|
|
}
|
|
|
|
this.log.info('removing sticker', { stickerId: sticker._id, slug: sticker.slug });
|
|
await Sticker.deleteOne({ _id: sticker._id });
|
|
} catch (error) {
|
|
this.log.error('failed to delete sticker', { stickerId: job.data.stickerId, error });
|
|
throw error; // for job report
|
|
}
|
|
}
|
|
|
|
async jobLog (job, message, data = { }) {
|
|
job.log(message);
|
|
this.log.info(message, { jobId: job.id, ...data });
|
|
}
|
|
}
|
|
|
|
(async ( ) => {
|
|
try {
|
|
module.log = new SiteLog(module, module.config.component);
|
|
|
|
/*
|
|
* Platform startup
|
|
*/
|
|
await SitePlatform.startPlatform(module, module.config.component);
|
|
|
|
module.worker = new StickerWorker(module);
|
|
await module.worker.start();
|
|
|
|
/*
|
|
* Worker startup
|
|
*/
|
|
|
|
if (process.argv[2]) {
|
|
const stickerId = mongoose.Types.ObjectId(process.argv[2]);
|
|
this.log.info('creating sticker processing job', { stickerId });
|
|
await module.worker.stickerProcessingQueue.add('sticker-ingest', { stickerId });
|
|
}
|
|
|
|
module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`);
|
|
} catch (error) {
|
|
module.log.error('failed to start worker', {
|
|
component: module.config.component,
|
|
error,
|
|
});
|
|
process.exit(-1);
|
|
}
|
|
})();
|