Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ const app = new Elysia()
description:
"The source path, starting with http(s):// or s3://",
}),
height: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("audio"),
path: t.String({
description:
"The source path, starting with http(s):// or s3://",
}),
language: LangCodeSchema,
language: t.Optional(LangCodeSchema),
channels: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("text"),
Expand All @@ -97,15 +99,17 @@ const app = new Elysia()
type: t.Literal("video"),
codec: VideoCodecSchema,
height: t.Number(),
bitrate: t.Number({ description: "Bitrate in bps" }),
framerate: t.Number({ description: "Frames per second" }),
bitrate: t.Optional(t.Number({ description: "Bitrate in bps" })),
framerate: t.Optional(
t.Number({ description: "Frames per second" }),
),
}),
t.Object({
type: t.Literal("audio"),
codec: AudioCodecSchema,
bitrate: t.Number({ description: "Bitrate in bps" }),
language: LangCodeSchema,
channels: t.Number(),
bitrate: t.Optional(t.Number({ description: "Bitrate in bps" })),
language: t.Optional(LangCodeSchema),
channels: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("text"),
Expand Down
26 changes: 8 additions & 18 deletions packages/api/src/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ function findQueueByName(name: string): Queue {

function formatIdPair(id: string): [Queue, string] {
const queueName = id.split("_", 1)[0];
if (!queueName) {
throw new Error("Missing queueName as prefix when formatting id pair");
}
return [findQueueByName(queueName), id];
}

Expand Down Expand Up @@ -95,16 +98,16 @@ async function formatJobNode(node: JobNode): Promise<Job> {
progress = job.progress;
}

const state = mapJobState(await job.getState(), job.returnvalue);
const state = mapJobState(await job.getState());

const failedReason = state === "failed" ? job.failedReason : undefined;

const findParentSortKey = (job: BullMQJob): number => {
const value = job.data?.metadata?.parentSortKey;
const findParentSortIndex = (job: BullMQJob): number => {
const value = job.data?.parentSortIndex;
return typeof value === "number" ? value : 0;
};
(children ?? []).sort(
(a, b) => findParentSortKey(a.job) - findParentSortKey(b.job),
(a, b) => findParentSortIndex(a.job) - findParentSortIndex(b.job),
);

const jobChildren = await Promise.all((children ?? []).map(formatJobNode));
Expand Down Expand Up @@ -150,20 +153,7 @@ async function formatJobNode(node: JobNode): Promise<Job> {
};
}

// Keep these in sync with ocnsumer/workers/helpers.ts in artisan,
// we can treat the result as a string literal to indicate non standard
// job states such as "skipped".
type JobReturnValueStatus = "__JOB_SKIPPED__";

function mapJobState(
jobState: JobState | "unknown",
maybeReturnValue?: JobReturnValueStatus,
): Job["state"] {
// We pass maybeReturnValue as "any" from the input, it's not typed. But we
// can check whether it is a defined return value for non standard job states.
if (maybeReturnValue === "__JOB_SKIPPED__") {
return "skipped";
}
function mapJobState(jobState: JobState | "unknown"): Job["state"] {
if (jobState === "active" || jobState === "waiting-children") {
return "running";
}
Expand Down
1 change: 0 additions & 1 deletion packages/api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ export const JobSchema = t.Recursive(
t.Literal("running"),
t.Literal("failed"),
t.Literal("completed"),
t.Literal("skipped"),
]),
progress: t.Number(),
createdOn: t.Number(),
Expand Down
4 changes: 0 additions & 4 deletions packages/app/src/components/JobState.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import Loader from "lucide-react/icons/loader";
import CircleDotDashed from "lucide-react/icons/circle-dot-dashed";
import Check from "lucide-react/icons/check";
import X from "lucide-react/icons/x";
import CircleOff from "lucide-react/icons/circle-off";
import { cn } from "@/lib/utils";
import type { Job } from "@/api";

Expand All @@ -16,9 +15,6 @@ export function JobState({ state }: { state: Job["state"] }) {
if (state === "running") {
return createCircle("bg-blue-200 text-blue-800", Loader, "animate-spin");
}
if (state === "skipped") {
return createCircle("bg-gray-200 text-gray-800", CircleOff);
}
return createCircle("bg-violet-200 text-violet-800", CircleDotDashed);
}

Expand Down
11 changes: 0 additions & 11 deletions packages/app/src/components/JobsStats.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) {
let completed = 0;
let failed = 0;
let running = 0;
let skipped = 0;

for (const job of jobs) {
if (job.state === "completed") {
Expand All @@ -25,9 +24,6 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) {
if (job.state === "failed") {
failed += 1;
}
if (job.state === "skipped") {
skipped += 1;
}
}

const filterJobState = (state?: Job["state"]) => {
Expand Down Expand Up @@ -61,13 +57,6 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) {
active={filter.state === "running"}
tooltip="Running"
/>
<JobStatsTile
value={skipped}
className="bg-gray-400"
onClick={() => filterJobState("skipped")}
active={filter.state === "skipped"}
tooltip="Skipped"
/>
</div>
</TooltipProvider>
);
Expand Down
4 changes: 3 additions & 1 deletion packages/app/src/pages/JobsPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ export function JobsPage() {
{filteredJobs.length ? (
<JobsList jobs={filteredJobs} />
) : (
<p className="text-center">No jobs found...</p>
<p className="text-center py-16 text-muted-foreground">
Nothing here but tumbleweeds... and they're not clickable.
</p>
)}
</div>
</div>
Expand Down
8 changes: 8 additions & 0 deletions packages/artisan/src/assert.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export function assert<T>(
value: T,
message: string = "value is null",
): asserts value is NonNullable<T> {
if (value === null || value === undefined) {
throw Error(message);
}
}
47 changes: 24 additions & 23 deletions packages/artisan/src/consumer/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,7 @@
import { Job, Queue } from "bullmq";
import { connection } from "./env";

/**
* Gets the full job. This is particularly handy when you need
* children values from child jobs.
* @param job Full job
* @returns
*/
export async function getFakeJob<T>(job: Job) {
if (!job.id) {
throw new Error("Missing job id");
}

const queue = new Queue(job.queueName, { connection });
const fakeJob = await Job.fromId<T>(queue, job.id);

if (!fakeJob) {
throw new Error("Failed to fetch fake job");
}

return fakeJob;
}
import parseFilepath from "parse-filepath";
import { downloadFile } from "./s3";
import { Dir } from "./lib/dir";
import type { PartialInput } from "../types";

export async function getBinaryPath(name: string) {
const direct = `${process.cwd()}/bin/${name}`;
Expand All @@ -40,3 +21,23 @@ export async function getBinaryPath(name: string) {
`Failed to get bin dep "${name}", run scripts/bin-deps.sh to install binary dependencies.`,
);
}

export async function getInputPath(input: PartialInput, dir: Dir | string) {
const filePath = parseFilepath(input.path);

// If the input is on S3, download the file locally.
if (filePath.dir.startsWith("s3://")) {
const inDir = dir instanceof Dir ? await dir.createTempDir() : dir;
await downloadFile(inDir, filePath.path.replace("s3://", ""));
return parseFilepath(`${inDir}/${filePath.basename}`);
}

if (
filePath.dir.startsWith("http://") ||
filePath.dir.startsWith("https://")
) {
return filePath;
}

throw new Error("Failed to resolve input path");
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import * as os from "node:os";
/**
* Manager for temporary directories on file system.
*/
export class TmpDir {
export class Dir {
private dirs_ = new Set<string>();

/**
* Create a new temporary directory.
* @returns
*/
async create() {
async createTempDir() {
const dir = await fs.mkdtemp(
path.join(os.tmpdir(), `superstreamer-${crypto.randomUUID()}`),
);
Expand Down
20 changes: 20 additions & 0 deletions packages/artisan/src/consumer/lib/worker-processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Dir } from "./dir";
import type { Job } from "bullmq";

export type WorkerCallback<T, R> = (params: {
job: Job<T, R>;
token?: string | undefined;
dir: Dir;
}) => Promise<R>;

export function createWorkerProcessor<T, R>(callback: WorkerCallback<T, R>) {
const dir = new Dir();

return async (job: Job<T, R>, token?: string) => {
try {
return await callback({ job, token, dir });
} finally {
await dir.deleteAll();
}
};
}
9 changes: 8 additions & 1 deletion packages/artisan/src/consumer/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,21 @@ export async function downloadFolder(key: string, path: string) {
* @param key S3 key
*/
export async function downloadFile(path: string, key: string) {
const name = `${path}/${basename(key)}`;

if (await Bun.file(name).exists()) {
// If the file already exists, we have nothing to do.
return;
}

const response = await client.send(
new GetObjectCommand({
Bucket: env.S3_BUCKET,
Key: key,
}),
);

await writeFile(`${path}/${basename(key)}`, response.Body as Readable);
await writeFile(name, response.Body as Readable);
}

/**
Expand Down
Loading