Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,45 @@ AZURE_UPLOAD_BATCH_SIZE=20
AZURE_UPLOAD_RETRIES=3

# Queue Stability
# CONCURRENCY = number of parallel jobs. Each job spawns FFmpeg which uses FFMPEG_THREADS cores.
# Set to 1 for max single-job speed, or 2 to process 2 videos simultaneously (cores split between them).
WORKER_CONCURRENCY=1
JOB_LOCK_DURATION_MS=120000
JOB_LOCK_RENEW_MS=30000

# Lock must survive the entire encode pipeline (can take 30+ minutes for full-length content).
# Renewal interval should be aggressive (15s) to survive CPU-starved Node.js event loops.
JOB_LOCK_DURATION_MS=1800000
JOB_LOCK_RENEW_MS=15000

# Video Pipeline Settings
# "SINGLE_FILE" (Byte-range fMP4) or "SEGMENTED" (Standard chunks)
HLS_OUTPUT_MODE="SEGMENTED"

# CDN base URL prepended to HLS segment and init-segment URIs in variant manifests.
# Leave unset for relative paths (local dev). Set to full CDN URL for production.
DOMAIN_SUBDOMAIN_NAME=https://vod-cdn.{SUBDOMAIN}.{DOMAIN}.com

# ==============================================================================
# PERFORMANCE TUNING
# ==============================================================================

# Global FFmpeg thread count. 0 = auto-detect (recommended).
# FFmpeg uses this for demuxing, filtering, and muxing threads.
FFMPEG_THREADS=0

# x265 (HEVC/Dolby Vision) thread pool size. Set to your vCPU count for max utilization.
# This is the BIGGEST performance lever — pools=none previously disabled ALL threading.
# Example: 32-core machine → X265_POOL_SIZE=32
X265_POOL_SIZE=32

# x265 frame-level parallelism. How many frames encode simultaneously.
# 4 is optimal for most machines. Higher values use more RAM but increase throughput.
# Rule of thumb: 2-6 depending on available RAM (each frame buffer ~50-200MB for 4K).
X265_FRAME_THREADS=4

# Developer Override: Force the system to use ONLY one group of profiles.
# Values: 'avc_sdr', 'hvc_sdr', 'hvc_pq', 'dvh_pq', 'ALL'
TEST_VIDEO_PROFILE=ALL


# ==============================================================================
# PRODUCTION ONLY
Expand Down
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,7 @@ vite.config.js.timestamp-*
vite.config.ts.timestamp-*

.DS_Store
video
output
output

# DEVELOPMENT ONLY local files
tmp
49 changes: 34 additions & 15 deletions src/application/video.process.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import fs from 'node:fs';
import path from 'node:path';
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import type {
JobData,
ProcessVideoUseCase,
Expand All @@ -12,11 +16,7 @@ import { pino } from 'pino';
const logger = pino({ name: 'ProcessVideo' });

/**
* Orchestrates the core video processing pipeline: Probe -> Transcode -> Upload.
*
* @remarks
* - Idempotency: If a job fails midway, retrying it will safely overwrite existing partial state.
* - Cleanup: Guaranteed to remove local intermediate files on both success and failure pathways.
* Orchestrates the core video processing pipeline: Download -> Probe -> Transcode -> Upload.
*/
export class ProcessVideo implements ProcessVideoUseCase {
constructor(
Expand All @@ -25,29 +25,48 @@ export class ProcessVideo implements ProcessVideoUseCase {
private readonly db: VideoRepository,
) {}

/**
* Executes the transcoding pipeline and synchronizes state with the database and webhook.
*
* @param job - Job payload from BullMQ. `videoId` acts as the idempotency key in DB/Storage.
* @throws {WorkerError} If any step fails. Process catches this, cleans up, and rethrows
* so the BullMQ wrapper can handle the retry/failure logic based on `.retryable`.
*/
async execute(job: JobData, onProgress?: ProgressCallback): Promise<void> {
const { videoId, sourceUrl, webhookUrl } = job;
logger.info({ videoId, sourceUrl, webhookUrl }, 'Starting video processing pipeline');

await this.db.updateStatus(videoId, 'processing');

try {
logger.info({ videoId }, 'Step 1/3: Probing source');
const probeResult = await this.ffmpeg.probe(sourceUrl);
const parsedUrl = new URL(sourceUrl);
const extension = path.extname(parsedUrl.pathname);

logger.info({ videoId, extension }, 'Step 0/3: Downloading source video locally');

const workDir = `/tmp/worker/${videoId}`;
await fs.promises.mkdir(workDir, { recursive: true });

const localSourcePath = path.join(workDir, `source${extension}`);

const response = await fetch(sourceUrl);
if (!response.ok) {
throw new Error(
`Failed to download source video: ${response.status} ${response.statusText}`,
);
}
if (!response.body) {
throw new Error('Response body from source video is empty');
}

await pipeline(
Readable.fromWeb(response.body as any),
fs.createWriteStream(localSourcePath),
);
logger.info({ videoId }, 'Source video successfully downloaded to worker disk');

logger.info({ videoId }, 'Step 1/3: Probing source');
const probeResult = await this.ffmpeg.probe(localSourcePath);
await this.db.updateStatus(videoId, 'processing');
logger.info(
{
videoId,
duration: probeResult.duration,
resolution: `${probeResult.width}x${probeResult.height}`,
format: extension,
},
'Probe complete',
);
Expand All @@ -58,7 +77,7 @@ export class ProcessVideo implements ProcessVideoUseCase {
await this.db.updateStatus(videoId, 'transcoding');

const { outputDir, renditions } = await this.ffmpeg.transcodeHLS(
sourceUrl,
localSourcePath,
videoId,
probeResult.width,
probeResult.height,
Expand Down
12 changes: 10 additions & 2 deletions src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ const envSchema = z.object({
WORKER_CONCURRENCY: z.coerce.number().default(1),

TEST_DURATION_SECONDS: z.coerce.number().optional(),
TEST_VIDEO_PROFILE: unquotedString
.pipe(z.enum(['avc_sdr', 'hvc_sdr', 'hvc_pq', 'dvh_pq', 'ALL']))
.default('ALL'),
HLS_OUTPUT_MODE: unquotedString.pipe(z.enum(['SINGLE_FILE', 'SEGMENTED'])).default('SEGMENTED'),

JOB_LOCK_DURATION_MS: z.coerce.number().default(120000),
JOB_LOCK_RENEW_MS: z.coerce.number().default(30000),
JOB_LOCK_DURATION_MS: z.coerce.number().default(1800000),
JOB_LOCK_RENEW_MS: z.coerce.number().default(15000),

AZURE_UPLOAD_BATCH_SIZE: z.coerce.number().default(20),
AZURE_UPLOAD_RETRIES: z.coerce.number().default(3),
Expand All @@ -36,6 +39,11 @@ const envSchema = z.object({
),
CORS_ORIGIN: unquotedString.default('*'),
DATABASE_URL: unquotedString.pipe(z.string().min(1, 'DATABASE_URL is required')),
DOMAIN_SUBDOMAIN_NAME: unquotedString.optional(),

FFMPEG_THREADS: z.coerce.number().default(0),
X265_POOL_SIZE: z.coerce.number().default(32),
X265_FRAME_THREADS: z.coerce.number().default(4),
});

/**
Expand Down
1 change: 1 addition & 0 deletions src/domain/job.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export interface JobData {

export interface AudioStreamInfo {
index: number;
codec: string;
language: string;
channels: number;
title: string;
Expand Down
19 changes: 1 addition & 18 deletions src/infrastructure/ffmpeg/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
computeAudioMetadata,
} from './encoding/profiles.js';
import { probe } from './core/probe.js';
import { probeComplexity } from './core/complexity.js';
import { processMasterPipeline } from './hls/pipeline.js';
import { writeMasterPlaylist } from './hls/playlist.js';
import { HLS_CONSTANTS } from './constants.js';
Expand All @@ -35,14 +34,6 @@ const ISO_639_1_MAP: Record<string, string> = {
und: 'und',
};

/**
* The "Brain" of the transcoding engine.
*
* @remarks
* - Orchestrates the entire lifecycle: Probing -> Complexity Analysis -> Transcoding -> Manifest Mapping.
* - Implements a Dispersed Hash Tree schema (via `blobPathFromUuid`) to prevent directory iteration attacks in public storage.
* - Employs a "Smart Per-Title" intelligence: Probes the file's visual complexity before assigning final bitrates and renditions.
*/
export class FFmpegAdapter implements TranscodeProvider {
constructor(private readonly workDir: string = DEFAULT_WORK_DIR) {}
async probe(sourceUrl: string): Promise<ProbeResult> {
Expand All @@ -63,15 +54,7 @@ export class FFmpegAdapter implements TranscodeProvider {
const outputDir = path.join(this.workDir, videoId, 'hls');
const activeProfiles = filterActiveVideoProfiles(sourceWidth, sourceHeight, videoRange);

logger.info({ videoId }, 'Analyzing video complexity for Smart Per-Title Bitrate adaptation');

const { multiplier: complexityMultiplier } = await probeComplexity(
sourceUrl,
sourceDuration,
videoId,
sourceWidth,
sourceHeight,
);
const complexityMultiplier = 1.0;

const rawVideoVariants = computeVideoMetadata(
activeProfiles,
Expand Down
10 changes: 0 additions & 10 deletions src/infrastructure/ffmpeg/constants.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
export const HLS_CONSTANTS = {
MASTER_PLAYLIST_NAME: 'playlist.m3u8',

VIDEO_SEGMENT_NAME: 'data_%03d.m4s',
SINGLE_VIDEO_NAME: 'data.m4s',

INIT_SEGMENT_NAME: 'init.mp4',

VARIANT_PLAYLIST_NAME: 'manifest.m3u8',

AUDIO_TIERS: {
SURROUND: 'a1',
STEREO: 'a2',
},
} as const;
Loading
Loading