Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions packages/api/src/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async function formatJobNode(node: JobNode): Promise<Job> {
progress = job.progress;
}

const state = mapJobState(await job.getState());
const state = mapJobState(await job.getState(), job.returnvalue);

const failedReason = state === "failed" ? job.failedReason : undefined;

Expand Down Expand Up @@ -150,7 +150,13 @@ async function formatJobNode(node: JobNode): Promise<Job> {
};
}

function mapJobState(jobState: JobState | "unknown"): Job["state"] {
function mapJobState(
jobState: JobState | "unknown",
returnValue: unknown,
): Job["state"] {
if (typeof returnValue === "string" && returnValue === "skipped") {
return "skipped";
}
if (jobState === "active" || jobState === "waiting-children") {
return "running";
}
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const JobSchema = t.Recursive(
t.Literal("running"),
t.Literal("failed"),
t.Literal("completed"),
t.Literal("skipped"),
]),
progress: t.Number(),
createdOn: t.Number(),
Expand Down
1 change: 1 addition & 0 deletions packages/artisan/scripts/install-bin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ install_ffprobe
install_packager

chmod +x $bin_dir/ffmpeg
chmod +x $bin_dir/ffprobe
chmod +x $bin_dir/packager

info "Added exec permissions"
Expand Down
26 changes: 24 additions & 2 deletions packages/artisan/src/consumer/workers/ffmpeg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ import { FFmpeggy } from "ffmpeggy";
import { downloadFile, uploadFile } from "../s3";
import { TmpDir } from "../tmp-dir";
import { getBinaryPath } from "../helpers";
import { SKIP_JOB } from "./helpers";
import type { Job } from "bullmq";
import type { Stream, Input } from "../../types";
import type { SkippableJobResult } from "./helpers";

const ffmpegBin = await getBinaryPath("ffmpeg");
const ffprobeBin = await getBinaryPath("ffprobe");

FFmpeggy.DefaultConfig = {
...FFmpeggy.DefaultConfig,
ffmpegBin,
ffprobeBin,
};

// The guys at shaka-streamer did a great job implementing an ffmpeg pipeline, we can always learn from it:
Expand All @@ -28,10 +32,10 @@ export type FfmpegData = {
};
};

export type FfmpegResult = {
export type FfmpegResult = SkippableJobResult<{
name: string;
stream: Stream;
};
}>;

async function prepareInput(job: Job, tmpDir: TmpDir, input: Input) {
const filePath = parseFilePath(input.path);
Expand Down Expand Up @@ -62,6 +66,10 @@ async function runJob(

job.log(`Input is ${inputFile.path}`);

const inputInfo = await FFmpeggy.probe(inputFile.path);

job.log(`Probed info (${JSON.stringify(inputInfo)})`);

const ffmpeg = new FFmpeggy({
input: inputFile.path,
globalOptions: ["-loglevel error"],
Expand All @@ -72,6 +80,20 @@ async function runJob(
const outputOptions: string[] = [];

if (params.stream.type === "video") {
const maxHeight = inputInfo.streams.reduce<number>((acc, stream) => {
if (!stream.height) {
return acc;
}
return acc > stream.height ? acc : stream.height;
}, 0);

if (maxHeight && params.stream.height > maxHeight) {
job.log(
`Skip upscale, requested ${params.stream.height} is larger than input ${maxHeight}`,
);
return SKIP_JOB;
}

name = `video_${params.stream.height}_${params.stream.bitrate}_${params.stream.codec}.m4v`;
outputOptions.push(
...getVideoOutputOptions(params.stream, params.segmentSize),
Expand Down
3 changes: 3 additions & 0 deletions packages/artisan/src/consumer/workers/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export const SKIP_JOB = "skipped";

export type SkippableJobResult<T> = typeof SKIP_JOB | T;
37 changes: 25 additions & 12 deletions packages/artisan/src/consumer/workers/transcode.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { addPackageJob } from "../../producer";
import { getFakeJob } from "../helpers";
import { uploadJson } from "../s3";
import { SKIP_JOB } from "./helpers";
import type { FfmpegResult } from "./ffmpeg";
import type { Stream } from "../../types";
import type { MetaFile } from "../meta-file";
import type { Job } from "bullmq";
import type { SkippableJobResult } from "./helpers";

export type TranscodeData = {
params: {
Expand All @@ -17,9 +19,9 @@ export type TranscodeData = {
};
};

export type TranscodeResult = {
export type TranscodeResult = SkippableJobResult<{
assetId: string;
};
}>;

/**
* The transcode job relies on the underlying ffmpeg jobs. It waits until these
Expand All @@ -34,18 +36,29 @@ export default async function (job: Job<TranscodeData, TranscodeResult>) {

const childrenValues = await fakeJob.getChildrenValues();

const streams = Object.entries(childrenValues).reduce<Record<string, Stream>>(
(acc, [key, value]) => {
if (key.startsWith("bull:ffmpeg")) {
const result: FfmpegResult = value;
if (result === SKIP_JOB) {
// We skipped this job, bail out early.
return acc;
}
acc[result.name] = result.stream;
}
return acc;
},
{},
);

if (!Object.keys(streams).length) {
job.log("Skip transcode, no streams found");
return SKIP_JOB;
}

const meta: MetaFile = {
version: 1,
streams: Object.entries(childrenValues).reduce<Record<string, Stream>>(
(acc, [key, value]) => {
if (key.startsWith("bull:ffmpeg")) {
const ffmpegResult: FfmpegResult = value;
acc[ffmpegResult.name] = ffmpegResult.stream;
}
return acc;
},
{},
),
streams,
segmentSize: params.segmentSize,
};

Expand Down
53 changes: 53 additions & 0 deletions packages/dashboard/src/components/JobLog.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { useEffect, useLayoutEffect, useRef, useState } from "react";
import { cn } from "@/lib/utils";
import ArrowDownFromLine from "lucide-react/icons/arrow-down-from-line";
import ArrowUpFromLine from "lucide-react/icons/arrow-up-from-line";

type JobLogProps = {
value: string;
index: number;
};

export function JobLog({ value, index }: JobLogProps) {
const ref = useRef<HTMLDivElement>(null);
const [showMore, setShowMore] = useState(false);
const [expanded, setExpanded] = useState(false);

useLayoutEffect(() => {
const onResize = () => {
if (!ref.current) {
return;
}
setShowMore(ref.current.clientHeight < ref.current.scrollHeight);
};
window.addEventListener("resize", onResize);
onResize();
return () => {
window.removeEventListener("resize", onResize);
};
}, []);

return (
<div className="border border-border rounded-md p-2 break-all flex">
<div className="mr-2 font-medium">{index + 1}</div>
<div>
<div
ref={ref}
className={cn("overflow-hidden", !expanded && "max-h-12")}
>
{value}
</div>
{showMore ? (
<button
className="text-xs font-medium"
onClick={() => {
setExpanded((v) => !v);
}}
>
{expanded ? "collapse" : "expand"}
</button>
) : null}
</div>
</div>
);
}
9 changes: 3 additions & 6 deletions packages/dashboard/src/components/JobLogs.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { api } from "@/api";
import { useQuery } from "@tanstack/react-query";
import { JobLog } from "./JobLog";

type JobLogsProps = {
id: string;
Expand All @@ -23,12 +24,8 @@ export function JobLogs({ id }: JobLogsProps) {
return (
<ul className="flex flex-col gap-2 text-xs">
{logs.map((it, index) => (
<li
key={index}
className="border border-border rounded-md p-2 break-all flex"
>
<div className="mr-2 font-medium">{index + 1}</div>
<div>{it}</div>
<li key={index}>
<JobLog value={it} index={index} />
</li>
))}
</ul>
Expand Down
4 changes: 4 additions & 0 deletions packages/dashboard/src/components/JobState.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Loader from "lucide-react/icons/loader";
import CircleDotDashed from "lucide-react/icons/circle-dot-dashed";
import Check from "lucide-react/icons/check";
import X from "lucide-react/icons/x";
import CircleOff from "lucide-react/icons/circle-off";
import { cn } from "@/lib/utils";
import type { Job } from "@/api";

Expand All @@ -15,6 +16,9 @@ export function JobState({ state }: { state: Job["state"] }) {
if (state === "running") {
return createCircle("bg-blue-200 text-blue-800", Loader, "animate-spin");
}
if (state === "skipped") {
return createCircle("bg-gray-200 text-gray-800", CircleOff);
}
return createCircle("bg-violet-200 text-violet-800", CircleDotDashed);
}

Expand Down
16 changes: 8 additions & 8 deletions packages/dashboard/src/components/JobsStats.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) {
let completed = 0;
let failed = 0;
let running = 0;
let waiting = 0;
let skipped = 0;

for (const job of jobs) {
if (job.state === "completed") {
Expand All @@ -25,8 +25,8 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) {
if (job.state === "failed") {
failed += 1;
}
if (job.state === "waiting") {
waiting += 1;
if (job.state === "skipped") {
skipped += 1;
}
}

Expand Down Expand Up @@ -62,11 +62,11 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) {
tooltip="Running"
/>
<JobStatsTile
value={waiting}
className="bg-violet-400"
onClick={() => filterJobState("waiting")}
active={filter.state === "waiting"}
tooltip="Waiting"
value={skipped}
className="bg-gray-400"
onClick={() => filterJobState("skipped")}
active={filter.state === "skipped"}
tooltip="Skipped"
/>
</div>
</TooltipProvider>
Expand Down