Open source web app engine for the Digital Telepresence Platform.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

267 lines
8.7 KiB

// media/job/sticker-ingest.js
// Copyright (C) 2022 DTP Technologies, LLC
// License: Apache-2.0
'use strict';
const DTP_STICKER_HEIGHT = 100;
const path = require('path');
const fs = require('fs');
const mongoose = require('mongoose');
const Sticker = mongoose.model('Sticker');
const sharp = require('sharp');
const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib'));
class StickerIngestJob extends SiteWorkerProcess {
static get COMPONENT ( ) {
return {
name: 'stickerIngestJob',
slug: 'sticker-ingest-job',
};
}
constructor (worker) {
super(worker, StickerIngestJob.COMPONENT);
this.processors = {
processStickerSharp: this.processStickerSharp.bind(this),
processStickerFFMPEG: this.processStickerFFMPEG.bind(this),
};
}
async start ( ) {
await super.start();
this.queue = await this.getJobQueue('media', this.dtp.config.jobQueues.media);
this.log.info('registering job processor', { queue: this.queue.name, name: 'sticker-ingest' });
this.queue.process('sticker-ingest', 1, this.processStickerIngest.bind(this));
}
async stop ( ) {
await super.stop();
}
async processStickerIngest (job) {
try {
this.log.info('received sticker ingest job', { id: job.id, data: job.data });
await this.fetchSticker(job);
await this.resetSticker(job);
// call the chosen file processor to render the sticker for distribution
await this.processors[job.data.processor](job);
//TODO: emit a completion event which should cause a refresh of the
// creator's view to display the processed sticker
} catch (error) {
this.log.error('failed to process sticker', { stickerId: job.data.stickerId, error });
throw error;
} finally {
if (job.data.workPath) {
this.log.info('cleaning up sticker work path', { workPath: job.data.workPath });
await fs.promises.rm(job.data.workPath, { recursive: true });
}
}
}
async fetchSticker (job) {
const { minio: minioService, sticker: stickerService } = this.dtp.services;
job.data.sticker = await stickerService.getById(job.data.stickerId, true);
job.data.workPath = path.join(
process.env.DTP_STICKER_WORK_PATH,
this.dtp.config.component.slug,
job.data.sticker._id.toString(),
);
this.jobLog(job, 'creating work directory', { worthPath: job.data.workPath });
await fs.promises.mkdir(job.data.workPath, { recursive: true });
switch (job.data.sticker.original.type) {
case 'image/jpeg':
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.jpg`);
job.data.procFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.jpg`);
job.data.processor = 'processStickerSharp';
job.data.sharpFormat = 'jpeg';
job.data.sharpFormatParameters = { quality: 85 };
break;
case 'image/png':
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.png`);
job.data.procFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.png`);
job.data.processor = 'processStickerSharp';
job.data.sharpFormat = 'png';
job.data.sharpFormatParameters = { compression: 9 };
break;
case 'image/gif':
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.gif`);
job.data.procFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.mp4`);
job.data.processor = 'processStickerFFMPEG';
break;
case 'image/webp': // process as PNG
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.webp`);
job.data.procFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.png`);
job.data.processor = 'processStickerSharp';
job.data.sharpFormat = 'png';
job.data.sharpFormatParameters = { compression: 9 };
break;
case 'image/webm': // process as MP4
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.webm`);
job.data.procFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.mp4`);
job.data.processor = 'processStickerFFMPEG';
break;
case 'video/mp4':
job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.mp4`);
job.data.procFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.mp4`);
job.data.processor = 'processStickerFFMPEG';
break;
default:
throw new Error(`unsupported sticker type: ${job.data.sticker.original.type}`);
}
this.jobLog(job, 'fetching original media', { // blah 62096adcb69874552a0f87bc
stickerId: job.data.sticker._id,
slug: job.data.sticker.slug,
type: job.data.sticker.original.type,
worthPath: job.data.origFilePath,
});
await minioService.downloadFile({
bucket: job.data.sticker.original.bucket,
key: job.data.sticker.original.key,
filePath: job.data.origFilePath,
});
}
async resetSticker (job) {
const { minio: minioService } = this.dtp.services;
const { sticker } = job.data;
const updateOp = { $set: { status: 'processing' } };
if (sticker.encoded) {
this.log.info('removing existing encoded sticker media', { media: sticker.encoded });
await minioService.removeObject(sticker.encoded.bucket, sticker.encoded.key);
delete sticker.encoded;
updateOp.$unset = { encoded: '' };
}
await Sticker.updateOne({ _id: sticker._id }, updateOp);
}
async processStickerSharp (job) {
const { minio: minioService } = this.dtp.services;
const sharpImage = sharp(job.data.origFilePath);
const metadata = await sharpImage.metadata();
this.log.info('sticker metadata from Sharp', { stickerId: job.data.sticker._id, metadata });
let chain = sharpImage
.clone()
.toColorspace('srgb')
.resize({ height: DTP_STICKER_HEIGHT });
chain = chain[job.data.sharpFormat](job.data.sharpFormatParameters);
await chain.toFile(job.data.procFilePath);
job.data.outFileStat = await fs.promises.stat(job.data.procFilePath);
const bucket = process.env.MINIO_VIDEO_BUCKET;
const key = `/stickers/${job.data.sticker._id.toString().slice(0, 3)}/${job.data.sticker._id}.encoded.${job.data.sharpFormat}`;
await minioService.uploadFile({
bucket,
key,
filePath: job.data.procFilePath,
metadata: {
'Content-Type': `image/${job.data.sharpFormat}`,
'Content-Length': job.data.outFileStat.size,
},
});
await Sticker.updateOne(
{ _id: job.data.sticker._id },
{
$set: {
status: 'live',
encoded: {
bucket,
key,
type: `image/${job.data.sharpFormat}`,
size: job.data.outFileStat.size,
}
},
},
);
}
async processStickerFFMPEG (job) {
const { media: mediaService, minio: minioService } = this.dtp.services;
const codecVideo = (process.env.DTP_ENABLE_GPU === 'enabled') ? 'h264_nvenc' : 'libx264';
// generate the encoded sticker
// Output height is 100 lines by [aspect] width with width and height being
// padded to be divisible by 2. The video stream is given a bit rate of
// 128Kbps, and the media is flagged for +faststart. Audio is stripped if
// present.
const ffmpegStickerArgs = [
'-y', '-i', job.data.origFilePath,
'-vf', `scale=-1:${DTP_STICKER_HEIGHT},pad=ceil(iw/2)*2:ceil(ih/2)*2`,
'-pix_fmt', 'yuv420p',
'-c:v', codecVideo,
'-b:v', '128k',
'-movflags', '+faststart',
'-an',
job.data.procFilePath,
];
this.jobLog(job, `transcoding motion sticker: ${job.data.sticker.slug}`);
this.log.debug('transcoding motion sticker', { ffmpegStickerArgs });
await mediaService.ffmpeg(ffmpegStickerArgs);
job.data.outFileStat = await fs.promises.stat(job.data.procFilePath);
const bucket = process.env.MINIO_VIDEO_BUCKET;
const key = `/stickers/${job.data.sticker._id.toString().slice(0, 3)}/${job.data.sticker._id}.encoded.mp4`;
this.jobLog(job, 'uploading encoded media file');
await minioService.uploadFile({
bucket, key,
filePath: job.data.procFilePath,
metadata: {
'Content-Type': 'video/mp4',
'Content-Length': job.data.outFileStat.size,
},
});
this.jobLog(job, 'updating Sticker to live status');
await Sticker.updateOne(
{ _id: job.data.sticker._id },
{
$set: {
status: 'live',
encoded: {
bucket,
key,
type: 'video/mp4',
size: job.data.outFileStat.size,
},
},
},
);
}
}
module.exports = StickerIngestJob;