You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
270 lines
8.6 KiB
270 lines
8.6 KiB
// media/job/attachment-ingest.js
|
|
// Copyright (C) 2022 DTP Technologies, LLC
|
|
// License: Apache-2.0
|
|
|
|
'use strict';
|
|
|
|
const path = require('path');
|
|
const fs = require('fs');
|
|
const sharp = require('sharp');
|
|
|
|
const ATTACHMENT_IMAGE_HEIGHT = 540;
|
|
|
|
const mongoose = require('mongoose');
|
|
const Attachment = mongoose.model('Attachment');
|
|
|
|
const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib'));
|
|
|
|
class AttachmentIngestJob extends SiteWorkerProcess {
|
|
|
|
static get COMPONENT ( ) {
|
|
return {
|
|
logId: 'wrk:chat:attachment-ingest:job',
|
|
index: 'attachmentIngestJob',
|
|
className: 'AttachmentIngestJob',
|
|
};
|
|
}
|
|
|
|
constructor (worker) {
|
|
super(worker, AttachmentIngestJob.COMPONENT);
|
|
this.processors = {
|
|
processAttachmentSharp: this.processAttachmentSharp.bind(this),
|
|
processAttachmentFFMPEG: this.processAttachmentFFMPEG.bind(this),
|
|
};
|
|
}
|
|
|
|
async start ( ) {
|
|
await super.start();
|
|
|
|
this.queue = await this.getJobQueue('media', this.dtp.config.jobQueues.media);
|
|
|
|
this.log.info('registering job processor', { queue: this.queue.name, name: 'attachment-ingest' });
|
|
this.queue.process('attachment-ingest', 1, this.processAttachmentIngest.bind(this));
|
|
}
|
|
|
|
async stop ( ) {
|
|
await super.stop();
|
|
}
|
|
|
|
async processAttachmentIngest (job) {
|
|
const { attachment: attachmentService } = this.dtp.services;
|
|
|
|
const { attachmentId } = job.data;
|
|
this.log.info('received attachment-ingest job', { id: job.id, attachmentId });
|
|
|
|
try {
|
|
job.data.attachment = await attachmentService.getById(attachmentId, { withOriginal: true });
|
|
|
|
await this.resetAttachment(job);
|
|
await this.fetchAttachmentFile(job);
|
|
await this.processors[job.data.processor](job);
|
|
|
|
//TODO: emit a completion event which should cause a refresh of the
|
|
// creator's view to display the processed attachment
|
|
|
|
} catch (error) {
|
|
this.log.error('failed to process attachment for ingest', { attachmentId: job.data.attachmentId, error });
|
|
throw error;
|
|
} finally {
|
|
if (job.data.workPath) {
|
|
this.log.info('removing attachment work path');
|
|
await fs.promises.rmdir(job.data.workPath, { recursive: true, force: true });
|
|
delete job.data.workPath;
|
|
}
|
|
}
|
|
}
|
|
|
|
async fetchAttachmentFile (job) {
|
|
const { minio: minioService } = this.dtp.services;
|
|
try {
|
|
const { attachment } = job.data;
|
|
|
|
job.data.workPath = path.join(
|
|
process.env.DTP_ATTACHMENT_WORK_PATH,
|
|
AttachmentIngestJob.COMPONENT.logId,
|
|
attachment._id.toString(),
|
|
);
|
|
|
|
this.jobLog(job, 'creating work directory', { worthPath: job.data.workPath });
|
|
await fs.promises.mkdir(job.data.workPath, { recursive: true });
|
|
|
|
switch (attachment.original.mime) {
|
|
case 'image/jpeg':
|
|
job.data.origFilePath = path.join(job.data.workPath, `${attachment._id}.jpg`);
|
|
job.data.procFilePath = path.join(job.data.workPath, `${attachment._id}.proc.jpg`);
|
|
job.data.processor = 'processAttachmentSharp';
|
|
job.data.sharpFormat = 'jpeg';
|
|
job.data.sharpFormatParameters = { quality: 85 };
|
|
break;
|
|
|
|
case 'image/png':
|
|
job.data.origFilePath = path.join(job.data.workPath, `${attachment._id}.png`);
|
|
job.data.procFilePath = path.join(job.data.workPath, `${attachment._id}.proc.png`);
|
|
job.data.processor = 'processAttachmentSharp';
|
|
job.data.sharpFormat = 'png';
|
|
job.data.sharpFormatParameters = { compression: 9 };
|
|
break;
|
|
|
|
case 'image/gif':
|
|
job.data.origFilePath = path.join(job.data.workPath, `${attachment._id}.gif`);
|
|
job.data.procFilePath = path.join(job.data.workPath, `${attachment._id}.proc.mp4`);
|
|
job.data.processor = 'processAttachmentFFMPEG';
|
|
break;
|
|
|
|
case 'image/webp': // process as PNG
|
|
job.data.origFilePath = path.join(job.data.workPath, `${attachment._id}.webp`);
|
|
job.data.procFilePath = path.join(job.data.workPath, `${attachment._id}.proc.png`);
|
|
job.data.processor = 'processAttachmentSharp';
|
|
job.data.sharpFormat = 'png';
|
|
job.data.sharpFormatParameters = { compression: 9 };
|
|
break;
|
|
|
|
default:
|
|
throw new Error(`unsupported attachment type: ${attachment.original.mime}`);
|
|
}
|
|
|
|
this.jobLog(job, 'fetching attachment original file', {
|
|
attachmentId: attachment._id,
|
|
mime: attachment.original.mime,
|
|
size: attachment.original.size,
|
|
worthPath: job.data.origFilePath,
|
|
});
|
|
await minioService.downloadFile({
|
|
bucket: attachment.original.bucket,
|
|
key: attachment.original.key,
|
|
filePath: job.data.origFilePath,
|
|
});
|
|
} catch (error) {
|
|
this.log.error('failed to fetch attachment file', { attachmentId: job.data.attachmentId, error });
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async resetAttachment (job) {
|
|
const { minio: minioService } = this.dtp.services;
|
|
const { attachment } = job.data;
|
|
|
|
const updateOp = { $set: { status: 'processing' } };
|
|
|
|
if (attachment.encoded) {
|
|
this.log.info('removing existing encoded attachment file', { file: attachment.encoded });
|
|
await minioService.removeObject(attachment.encoded.bucket, attachment.encoded.key);
|
|
delete attachment.encoded;
|
|
updateOp.$unset = { encoded: '' };
|
|
}
|
|
|
|
await Attachment.updateOne({ _id: attachment._id }, updateOp);
|
|
}
|
|
|
|
async processAttachmentSharp (job) {
|
|
const { attachment: attachmentService, minio: minioService } = this.dtp.services;
|
|
const { attachment } = job.data;
|
|
const attachmentId = attachment._id;
|
|
|
|
const sharpImage = sharp(job.data.origFilePath);
|
|
const metadata = await sharpImage.metadata();
|
|
this.log.info('attachment metadata from Sharp', { attachmentId, metadata });
|
|
|
|
let chain = sharpImage
|
|
.clone()
|
|
.toColorspace('srgb')
|
|
.resize({ height: ATTACHMENT_IMAGE_HEIGHT });
|
|
chain = chain[job.data.sharpFormat](job.data.sharpFormatParameters);
|
|
await chain.toFile(job.data.procFilePath);
|
|
|
|
job.data.outFileStat = await fs.promises.stat(job.data.procFilePath);
|
|
|
|
const bucket = process.env.MINIO_ATTACHMENT_BUCKET;
|
|
const key = attachmentService.getAttachmentKey(attachment, 'processed');
|
|
|
|
const response = await minioService.uploadFile({
|
|
bucket,
|
|
key,
|
|
filePath: job.data.procFilePath,
|
|
metadata: {
|
|
'Content-Type': `image/${job.data.sharpFormat}`,
|
|
'Content-Length': job.data.outFileStat.size,
|
|
},
|
|
});
|
|
|
|
await Attachment.updateOne(
|
|
{ _id: job.data.attachment._id },
|
|
{
|
|
$set: {
|
|
status: 'live',
|
|
encoded: {
|
|
bucket,
|
|
key,
|
|
mime: `image/${job.data.sharpFormat}`,
|
|
size: job.data.outFileStat.size,
|
|
etag: response.etag,
|
|
},
|
|
},
|
|
},
|
|
);
|
|
}
|
|
|
|
async processAttachmentFFMPEG (job) {
|
|
const {
|
|
attachment: attachmentService,
|
|
media: mediaService,
|
|
minio: minioService,
|
|
} = this.dtp.services;
|
|
|
|
const { attachment } = job.data;
|
|
const codecVideo = (process.env.DTP_ENABLE_GPU === 'enabled') ? 'h264_nvenc' : 'libx264';
|
|
|
|
// generate the encoded attachment
|
|
// Output height is 100 lines by [aspect] width with width and height being
|
|
// padded to be divisible by 2. The video stream is given a bit rate of
|
|
// 128Kbps, and the media is flagged for +faststart. Audio is stripped if
|
|
// present.
|
|
|
|
const ffmpegArgs = [
|
|
'-y', '-i', job.data.origFilePath,
|
|
'-vf', `scale=-1:${ATTACHMENT_IMAGE_HEIGHT},pad=ceil(iw/2)*2:ceil(ih/2)*2`,
|
|
'-pix_fmt', 'yuv420p',
|
|
'-c:v', codecVideo,
|
|
'-b:v', '128k',
|
|
'-movflags', '+faststart',
|
|
'-an',
|
|
job.data.procFilePath,
|
|
];
|
|
|
|
this.log.debug('transcoding attachment', { ffmpegArgs });
|
|
await mediaService.ffmpeg(ffmpegArgs);
|
|
|
|
job.data.outFileStat = await fs.promises.stat(job.data.procFilePath);
|
|
|
|
const bucket = process.env.MINIO_VIDEO_BUCKET;
|
|
const key = attachmentService.getAttachmentKey(attachment, 'processed');
|
|
|
|
this.jobLog(job, 'uploading processed media file');
|
|
const response = await minioService.uploadFile({
|
|
bucket, key,
|
|
filePath: job.data.procFilePath,
|
|
metadata: {
|
|
'Content-Type': 'video/mp4',
|
|
'Content-Length': job.data.outFileStat.size,
|
|
},
|
|
});
|
|
|
|
await Attachment.updateOne(
|
|
{ _id: attachment._id },
|
|
{
|
|
$set: {
|
|
status: 'live',
|
|
encoded: {
|
|
bucket,
|
|
key,
|
|
mime: 'video/mp4',
|
|
size: job.data.outFileStat.size,
|
|
etag: response.etag,
|
|
},
|
|
},
|
|
},
|
|
);
|
|
}
|
|
}
|
|
|
|
module.exports = AttachmentIngestJob;
|