Skip to content

Commit

Permalink
feat: add image processor input metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed Jul 14, 2024
1 parent 7db05a6 commit d2076ea
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 108 deletions.
43 changes: 28 additions & 15 deletions image-processor/proto/scuffle/image_processor/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ message DrivePath {
// Possible template argument values are:
// - {id} - The id of the task.
string path = 2;
// The acl to use for the drive. (used when uploading)
optional string acl = 3;
}

// The resize method determines how the image processor should resize the image.
Expand Down Expand Up @@ -111,6 +113,8 @@ message InputMetadata {
uint32 width = 3;
// If this is different from the actual height the image processor will generate a fatal error.
uint32 height = 4;
// The loop count of the input image.
int32 loop_count = 5;
}

// InputUpload is used to upload an image to a drive configured in the image processor config.
Expand Down Expand Up @@ -229,33 +233,43 @@ message OutputFormatOptions {

message OutputFile {
// The path to the output file.
string path = 1;
DrivePath path = 1;
// The content type of the output file.
string content_type = 2;
// The acl of the output file.
optional string acl = 3;
// Width of the output image.
uint32 width = 4;
uint32 width = 3;
// Height of the output image.
uint32 height = 5;
uint32 height = 4;
// The frame count of the output image.
uint32 frame_count = 6;
uint32 frame_count = 5;
// The duration of the output image.
uint32 duration_ms = 7;
uint32 duration_ms = 6;
// The size of the output image in bytes.
uint32 size = 8;
uint32 size = 7;
// The format of the output image.
OutputFormat format = 9;
OutputFormat format = 8;
// Loop count of the output image.
int32 loop_count = 9;
}

// Returned after the image is processed.
message InputFileMetadata {
// The final path of the input image.
DrivePath path = 1;
// The content type of the input image.
string content_type = 2;
// The width of the input image.
uint32 width = 1;
uint32 width = 3;
// The height of the input image.
uint32 height = 2;
uint32 height = 4;
// The frame count of the input image.
uint32 frame_count = 3;
uint32 frame_count = 5;
// The duration of the input image.
uint32 duration_ms = 6;
// The size of the input image in bytes.
uint32 size = 7;
// The loop count of the input image.
int32 loop_count = 8;
}

message Output {
Expand All @@ -274,9 +288,8 @@ message Output {
// - {ext} - The extension of the output image. (e.g. 'webp', 'avif', etc.)
DrivePath drive_path = 1;

// Override the acl of the output images.
// By default this will use the ACL specified by the output drive config.
optional string acl_override = 2;
// The path to the input image to re-upload.
optional DrivePath input_reupload_path = 2;

// The desired format to encode the output image.
repeated OutputFormatOptions formats = 3;
Expand Down
3 changes: 2 additions & 1 deletion image-processor/src/management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl ManagementServer {

let drive_path = DrivePath {
drive: drive_path.drive,
acl: drive_path.acl,
path: path.clone(),
};

Expand All @@ -86,7 +87,7 @@ impl ManagementServer {
&path,
Bytes::from(input_upload.binary),
Some(DriveWriteOptions {
acl: input_upload.acl,
acl: drive_path.acl.clone(),
cache_control: input_upload.cache_control,
content_disposition: input_upload.content_disposition,
content_type: input_upload.content_type,
Expand Down
9 changes: 9 additions & 0 deletions image-processor/src/management/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ pub fn validate_output(global: &Arc<Global>, mut fragment: Fragment, output: Opt
],
)?;

if let Some(input_reupload_path) = output.input_reupload_path.as_ref() {
validate_drive_path(
global,
fragment.push("path"),
Some(input_reupload_path),
&["id", "width", "height", "ext"],
)?;
}

if output.formats.is_empty() {
return Err(Error {
code: ErrorCode::InvalidInput as i32,
Expand Down
51 changes: 33 additions & 18 deletions image-processor/src/worker/process/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use bytes::Bytes;
use file_format::FileFormat;
use scuffle_image_processor_proto::{animation_config, InputFileMetadata, Output, OutputFormat, OutputFormatOptions, Task};
use scuffle_image_processor_proto::{animation_config, Output, OutputFormat, OutputFormatOptions, Task};
use tokio::sync::OwnedSemaphorePermit;

use super::decoder::{AnyDecoder, Decoder, DecoderFrontend, DecoderInfo, LoopCount};
Expand All @@ -13,7 +13,7 @@ use super::resize::{ImageResizer, ResizeOutputTarget};
use super::JobError;

pub struct JobOutput {
pub input: InputFileMetadata,
pub input: InputImage,
pub output: Vec<OutputImage>,
}

Expand All @@ -28,6 +28,16 @@ pub struct OutputImage {
pub data: Vec<u8>,
pub frame_count: usize,
pub duration_ms: u64,
pub loop_count: LoopCount,
}

pub struct InputImage {
pub format: FileFormat,
pub width: usize,
pub height: usize,
pub loop_count: LoopCount,
pub duration_ms: u64,
pub frame_count: usize,
}

#[derive(Clone, Copy)]
Expand Down Expand Up @@ -59,11 +69,7 @@ impl Drop for CancelToken {
}
}

pub async fn spawn(
task: Task,
input: Bytes,
permit: Arc<OwnedSemaphorePermit>,
) -> Result<(DecoderInfo, JobOutput), JobError> {
pub async fn spawn(task: Task, input: Bytes, permit: Arc<OwnedSemaphorePermit>) -> Result<JobOutput, JobError> {
let cancel_token = CancelToken::new();
let _cancel_guard = cancel_token.clone();

Expand All @@ -86,16 +92,15 @@ pub async fn spawn(
}
}

let info = task.decoder_info.clone();

task.finish().map(|out| (info, out))
task.finish()
})
.await?
}

struct BlockingTask<'a> {
decoder: AnyDecoder<'a>,
decoder_info: DecoderInfo,
input: InputImage,
frame_configs: Vec<Option<FrameConfig>>,
resizer: ImageResizer,
static_encoders: Vec<(usize, Vec<(ResizeOutputTarget, AnyEncoder)>)>,
Expand All @@ -104,6 +109,7 @@ struct BlockingTask<'a> {
frame_idx: usize,
duration_carried_ms: f64,
frame_rate_factor: Option<f64>,
loop_count: LoopCount,
}

fn split_formats(output: &Output) -> (Vec<(usize, &OutputFormatOptions)>, Vec<(usize, &OutputFormatOptions)>) {
Expand Down Expand Up @@ -166,8 +172,8 @@ impl<'a> BlockingTask<'a> {
return Err(JobError::InvalidJob);
}

let file_format = DecoderFrontend::from_format(FileFormat::from_bytes(input))?;
let decoder = file_format.build(task, Cow::Borrowed(input))?;
let file_format = FileFormat::from_bytes(input);
let decoder = DecoderFrontend::from_format(file_format.clone())?.build(task, Cow::Borrowed(input))?;

let decoder_info = decoder.info();

Expand Down Expand Up @@ -291,6 +297,15 @@ impl<'a> BlockingTask<'a> {
static_frame_idx,
frame_idx: 0,
duration_carried_ms: 0.0,
input: InputImage {
format: file_format,
width: decoder_info.width,
height: decoder_info.height,
loop_count: decoder_info.loop_count,
duration_ms: 0,
frame_count: decoder_info.frame_count,
},
loop_count,
frame_rate_factor: anim_config.and_then(|config| match config.frame_rate.as_ref()? {
animation_config::FrameRate::FrameRateFactor(factor) => Some(*factor),
_ => None,
Expand Down Expand Up @@ -362,7 +377,7 @@ impl<'a> BlockingTask<'a> {
Ok(true)
}

pub fn finish(self) -> Result<JobOutput, JobError> {
pub fn finish(mut self) -> Result<JobOutput, JobError> {
let output = self
.static_encoders
.into_iter()
Expand All @@ -381,17 +396,17 @@ impl<'a> BlockingTask<'a> {
frame_count: info.frame_count,
duration_ms: info.duration,
data: encoder.finish()?,
loop_count: self.loop_count,
})
})
})
.collect::<Result<_, EncoderError>>()?;

// Update the duration of the input.
self.input.duration_ms = u64::try_from(self.decoder.duration_ms()).unwrap_or(0);

Ok(JobOutput {
input: InputFileMetadata {
width: self.decoder_info.width as u32,
height: self.decoder_info.height as u32,
frame_count: self.decoder_info.frame_count as u32,
},
input: self.input,
output,
})
}
Expand Down
20 changes: 15 additions & 5 deletions image-processor/src/worker/process/decoder/ffmpeg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct FfmpegDecoder<'data> {
info: DecoderInfo,
input_stream_index: i32,
average_frame_duration: u64,
duration_ms: i64,
previous_timestamp: Option<u64>,
send_packet: bool,
eof: bool,
Expand Down Expand Up @@ -76,15 +77,20 @@ impl<'data> FfmpegDecoder<'data> {
}
}

let duration_ms =
(input_stream_duration * input_stream_time_base.num as i64 * 1000) / input_stream_time_base.den as i64;

if duration_ms < 0 {
return Err(DecoderError::InvalidTimeBase);
}

if let Some(max_input_duration_ms) = task.limits.as_ref().and_then(|l| l.max_input_duration_ms) {
// actual duration
// = duration * (time_base.num / time_base.den) * 1000
// = (duration * time_base.num * 1000) / time_base.den
let duration =
(input_stream_duration * input_stream_time_base.num as i64 * 1000) / input_stream_time_base.den as i64;

if duration > max_input_duration_ms as i64 {
return Err(DecoderError::TooLong(duration));
if duration_ms > max_input_duration_ms as i64 {
return Err(DecoderError::TooLong(duration_ms));
}
}

Expand All @@ -98,7 +104,6 @@ impl<'data> FfmpegDecoder<'data> {
)?;

let info = DecoderInfo {
decoder: DecoderFrontend::Ffmpeg,
width: decoder.width() as usize,
height: decoder.height() as usize,
frame_count: input_stream_frames as usize,
Expand All @@ -125,6 +130,7 @@ impl<'data> FfmpegDecoder<'data> {
send_packet: true,
frame,
average_frame_duration,
duration_ms,
previous_timestamp: Some(0),
})
}
Expand Down Expand Up @@ -221,6 +227,10 @@ impl Decoder for FfmpegDecoder<'_> {
}
}

fn duration_ms(&self) -> i64 {
self.duration_ms
}

fn info(&self) -> DecoderInfo {
self.info
}
Expand Down
5 changes: 4 additions & 1 deletion image-processor/src/worker/process/decoder/libavif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl<'data> AvifDecoder<'data> {
let image = AvifRgbImage::new(decoder.as_ref());

let info = DecoderInfo {
decoder: DecoderFrontend::LibAvif,
width: image.width as usize,
height: image.height as usize,
loop_count: if decoder.as_ref().repetitionCount <= 0 {
Expand Down Expand Up @@ -133,4 +132,8 @@ impl Decoder for AvifDecoder<'_> {
duration_ts,
)))
}

fn duration_ms(&self) -> i64 {
self.total_duration as i64 * 1000 / self.info.timescale as i64
}
}
13 changes: 8 additions & 5 deletions image-processor/src/worker/process/decoder/libwebp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ impl<'data> WebpDecoder<'data> {

Ok(Self {
info: DecoderInfo {
decoder: DecoderFrontend::LibWebp,
width: info.canvas_width as _,
height: info.canvas_height as _,
width: info.canvas_width as usize,
height: info.canvas_height as usize,
loop_count: match info.loop_count {
0 => LoopCount::Infinite,
_ => LoopCount::Finite(info.loop_count as _),
_ => LoopCount::Finite(info.loop_count as usize),
},
frame_count: info.frame_count as _,
frame_count: info.frame_count as usize,
timescale: 1000,
},
max_input_duration: task
Expand Down Expand Up @@ -130,4 +129,8 @@ impl Decoder for WebpDecoder<'_> {

Ok(Some(FrameRef::new(buf, self.info.width, self.info.height, duration_ts)))
}

fn duration_ms(&self) -> i64 {
self.total_duration as i64
}
}
Loading

0 comments on commit d2076ea

Please sign in to comment.