Skip to content

Add local video publisher & subscriber examples#830

Merged
ladvoc merged 16 commits intomainfrom
dc/local_video_examples
Jan 21, 2026
Merged

Add local video publisher & subscriber examples#830
ladvoc merged 16 commits intomainfrom
dc/local_video_examples

Conversation

@chenosaurus
Copy link
Contributor

@chenosaurus chenosaurus commented Jan 9, 2026

adding examples/local_video to demonstrate publishing & subscribing to video tracks using the Rust SDK.

Summary by CodeRabbit

  • New Features

    • Added a local-video example with publisher, subscriber, and device-listing tools; subscriber offers GPU-accelerated YUV rendering and simulcast quality controls
    • Publisher supports live capture → publish with codec choice and automatic fallback
  • Documentation

    • Added comprehensive example README and usage instructions
  • Chores

    • Workspace updated to include the new example
    • Build now optionally detects system JPEG support
    • Added new ignore pattern for .cursor

✏️ Tip: You can customize this high-level summary in your review settings.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds two comprehensive examples demonstrating local video capture and streaming with the LiveKit Rust SDK: a publisher that captures frames from a local camera and publishes them to a LiveKit room, and a subscriber that connects to a room and renders received video in a window with GPU acceleration.

Key changes:

  • Publisher example with camera capture, format conversion (YUYV/MJPEG/RGB24 to I420), and H.264/H.265 codec support
  • Subscriber example with GPU-accelerated YUV rendering using WGPU/egui and simulcast layer controls
  • Enhanced yuv-sys build script to detect and enable libjpeg/libjpeg-turbo for MJPEG fast-path decoding

Reviewed changes

Copilot reviewed 8 out of 10 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
yuv-sys/build.rs Adds pkg-config detection for system libjpeg to enable MJPEG fast-path conversion in libyuv
yuv-sys/Cargo.toml Adds pkg-config dependency for build-time library detection
examples/local_video/Cargo.toml New example package configuration with required dependencies for camera capture, video processing, and GPU rendering
examples/local_video/README.md Documentation for both publisher and subscriber examples with usage instructions
examples/local_video/src/publisher.rs Complete publisher implementation with camera capture, format detection/conversion, and LiveKit video track publishing
examples/local_video/src/subscriber.rs Complete subscriber implementation with video stream reception, GPU rendering, and simulcast controls
examples/local_video/src/yuv_shader.wgsl WGSL shader for YUV to RGB conversion and rendering in the subscriber
Cargo.toml Adds local_video example to workspace members
Cargo.lock Lock file updates for new dependencies
.gitignore Adds .cursor IDE directory to ignore list

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 45 to 51
--api-secret YOUR_SECRET

# subscribe to a specific participant's video only
cargo run -p local_video --bin subscriber -- \
--room-name demo \
--identity viewer-1 \
--participant alice
Copy link

Copilot AI Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent indentation: these lines use 3 spaces while lines 40-45 above use 4 spaces (with the exception of line 45). The indentation should be consistent throughout the command blocks.

Suggested change
--api-secret YOUR_SECRET
# subscribe to a specific participant's video only
cargo run -p local_video --bin subscriber -- \
--room-name demo \
--identity viewer-1 \
--participant alice
--api-secret YOUR_SECRET
# subscribe to a specific participant's video only
cargo run -p local_video --bin subscriber -- \
--room-name demo \
--identity viewer-1 \
--participant alice

Copilot uses AI. Check for mistakes.
chenosaurus and others added 4 commits January 9, 2026 21:21
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
env_logger::init();
let args = Args::parse();

let ctrl_c_received = Arc::new(AtomicBool::new(false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: This can be handled more idiomatically by defining a run function that accepts the CLI arguments and using tokio::select!:

tokio::select! {
    _ = run(args) => {},
    _ = signal::ctrl_c() => {}
}

env_logger::init();
let args = Args::parse();

let ctrl_c_received = Arc::new(AtomicBool::new(false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

while let Some(evt) = events.recv().await {
debug!("Room event: {:?}", evt);
match evt {
RoomEvent::TrackSubscribed { track, publication, participant } => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Handling each event type in a separate function would improve readability.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 17, 2026

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

📝 Walkthrough

Walkthrough

Adds a new example workspace examples/local_video with three binaries (camera listing, publisher, subscriber) for camera enumeration, LiveKit publishing, and GUI-based subscribing with YUV rendering; updates workspace manifest, libyuv build to optionally detect system JPEG via pkg-config, and minor .gitignore change.

Changes

Cohort / File(s) Summary
Workspace / VCS
\.gitignore`, `Cargo.toml``
Added .cursor to .gitignore; added examples/local_video to workspace members in Cargo.toml.
Example Manifest & Docs
\examples/local_video/Cargo.toml`, `examples/local_video/README.md``
New crate manifest with three binaries and many dependencies; README documents usage, flags, and examples.
Camera enumeration
\examples/local_video/src/list_devices.rs``
New binary enumerating cameras and printing capabilities (platform-specific paths for macOS and others).
Video publisher
\examples/local_video/src/publisher.rs``
New publisher binary: camera capture, format negotiation (YUYV/MJPEG), multi-path conversion to I420, pacing, LiveKit connection and publish (H.265/H.264 fallback), simulcast and metrics.
Video subscriber & UI
\examples/local_video/src/subscriber.rs``
New subscriber binary with LiveKit connection, track subscribe/unsubscribe handling, SharedYuv buffer, simulcast state, WGPU-based YUV rendering, and egui/eframe GUI.
Shader
\examples/local_video/src/yuv_shader.wgsl``
WGSL shader implementing YUV→RGB conversion and sampling for Y/U/V planes.
libyuv build integration
\yuv-sys/Cargo.toml`, `yuv-sys/build.rs``
Added pkg-config build-dependency; build script now optionally detects system libjpeg/libjpeg-turbo/jpeg and sets HAVE_JPEG/include paths for libyuv build.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Publisher as Publisher Binary
    participant Camera as Camera (Nokhwa)
    participant LiveKit as LiveKit Room
    participant Network as Network

    User->>Publisher: Run with camera index & room info
    Publisher->>Camera: Open camera, negotiate format
    Camera-->>Publisher: Handle + format
    Publisher->>LiveKit: Create token & connect
    LiveKit-->>Publisher: Connected
    Publisher->>Publisher: Create LocalVideoTrack (H.265/H.264)

    loop Capture & Publish
        Publisher->>Camera: Read frame
        Camera-->>Publisher: Frame data
        Publisher->>Publisher: Decode/convert to I420
        Publisher->>Publisher: Maintain RTP timestamp & pace
        Publisher->>LiveKit: Push I420 frame
        LiveKit->>Network: Transmit video
    end

    User->>Publisher: Ctrl-C
    Publisher->>LiveKit: Unpublish & disconnect
Loading
sequenceDiagram
    participant User
    participant Subscriber as Subscriber Binary
    participant LiveKit as LiveKit Room
    participant Network as Network
    participant GPU as GPU (WGPU)
    participant UI as egui/eframe

    User->>Subscriber: Run with room credentials
    Subscriber->>LiveKit: Create token & connect (auto-subscribe)
    LiveKit-->>Subscriber: Connected

    loop Room Events
        LiveKit->>Subscriber: TrackSubscribed
        Subscriber->>Subscriber: Spawn frame sink thread
        Network->>Subscriber: Deliver frames
        Subscriber->>Subscriber: Convert to I420, write SharedYuv

        UI->>Subscriber: Request frame
        Subscriber->>GPU: Upload Y/U/V textures
        GPU->>GPU: Run yuv_shader (YUV→RGB)
        GPU-->>UI: Render frame
        UI->>User: Display video + HUD + controls

        User->>UI: Change simulcast quality
        UI->>Subscriber: Update SimulcastState
        Subscriber->>LiveKit: Request quality change
    end

    User->>Subscriber: Ctrl-C
    Subscriber->>LiveKit: Disconnect & cleanup
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

🐰 I hopped through cameras, codecs, and light,
Publishing frames into the night—
A shader, a GUI, simulcast delight,
Streams stitched together, pixels just right. 🎨📹

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Add local video publisher & subcriber examples' accurately describes the main change—adding new example code for video publishing and subscribing functionality to the Rust SDK.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@examples/local_video/src/publisher.rs`:
- Line 1: Validate pace_fps before computing Duration::from_secs_f64 to avoid a
divide-by-zero panic: in the code paths that call Duration::from_secs_f64(1.0 /
pace_fps) (look for uses in publisher.rs around the publisher setup and the loop
where pace_fps is applied), check that pace_fps > 0 and return an Err or print a
clear error and exit when it is zero or negative; update both occurrences (the
initial interval computation and the repeated use at lines ~184-186) to perform
this guard and handle the invalid value gracefully instead of letting
Duration::from_secs_f64 panic.
♻️ Duplicate comments (3)
examples/local_video/README.md (1)

55-60: Inconsistent indentation in command examples.

Lines 56-59 use 4-space indentation while the earlier block (lines 48-53) uses 3-space indentation. Consider aligning for consistency throughout the document.

examples/local_video/src/publisher.rs (2)

165-175: Verify the nokhwa API call name (set_camera_request).

The method name appears misspelled as set_camera_requset, which would fail to compile if the API is actually set_camera_request. Please confirm against nokhwa’s current API and fix both call sites if needed.

🔧 Possible fix (if the API is `set_camera_request`)
-    if let Err(_) = camera
-        .set_camera_requset(RequestedFormat::new::<RgbFormat>(RequestedFormatType::Exact(wanted)))
+    if let Err(_) = camera
+        .set_camera_request(RequestedFormat::new::<RgbFormat>(RequestedFormatType::Exact(wanted)))
     {
@@
-        let _ = camera
-            .set_camera_requset(RequestedFormat::new::<RgbFormat>(RequestedFormatType::Exact(alt)));
+        let _ = camera
+            .set_camera_request(RequestedFormat::new::<RgbFormat>(RequestedFormatType::Exact(alt)));
     }

283-317: Handle libyuv conversion failures instead of ignoring return codes.

rs_YUY2ToI420 / rs_RGB24ToI420 return non‑zero on failure; ignoring them can publish corrupted frames. Consider logging and skipping the frame, and apply the same handling to the other conversion paths.

🔍 Example handling for the YUYV path
-            unsafe {
-                // returns 0 on success
-                let _ = yuv_sys::rs_YUY2ToI420(
-                    src_bytes.as_ptr(),
-                    src_stride,
-                    data_y.as_mut_ptr(),
-                    stride_y as i32,
-                    data_u.as_mut_ptr(),
-                    stride_u as i32,
-                    data_v.as_mut_ptr(),
-                    stride_v as i32,
-                    width as i32,
-                    height as i32,
-                );
-            }
+            let ret = unsafe {
+                // returns 0 on success
+                yuv_sys::rs_YUY2ToI420(
+                    src_bytes.as_ptr(),
+                    src_stride,
+                    data_y.as_mut_ptr(),
+                    stride_y as i32,
+                    data_u.as_mut_ptr(),
+                    stride_u as i32,
+                    data_v.as_mut_ptr(),
+                    stride_v as i32,
+                    width as i32,
+                    height as i32,
+                )
+            };
+            if ret != 0 {
+                log::warn!("YUYV->I420 conversion failed: {}", ret);
+                continue;
+            }

Also applies to: 362-374

🧹 Nitpick comments (4)
examples/local_video/README.md (1)

15-30: Add language specifier to fenced code blocks.

The code blocks are missing language specifiers, which affects syntax highlighting and linting compliance.

📝 Suggested fix
-```
+```bash
  cargo run -p local_video --bin publisher -- --list-cameras

Apply similar changes to the code blocks starting at lines 33 and 43.

examples/local_video/src/subscriber.rs (2)

437-440: Minor redundancy in repaint requests.

request_repaint() on line 439 triggers an immediate repaint, while request_repaint_after(16ms) on line 520 schedules a delayed one. For video playback at ~60fps, the delayed repaint alone should suffice. The immediate request causes extra repaints but isn't harmful.

Consider removing the immediate request_repaint() call if the 16ms delayed repaint provides sufficient smoothness.


992-998: Per-frame allocations for texture packing.

The packed, packed_u, and packed_v vectors are allocated on every dirty frame. For 30-60fps video, this creates significant allocation pressure. Consider caching these buffers in YuvGpuState and reusing them across frames.

♻️ Optimization suggestion

Add cached buffers to YuvGpuState:

struct YuvGpuState {
    // ... existing fields ...
    packed_y: Vec<u8>,
    packed_u: Vec<u8>,
    packed_v: Vec<u8>,
}

Then resize and reuse instead of allocating new vectors each frame.

examples/local_video/src/list_devices.rs (1)

49-59: Consider graceful handling of per-format failures.

If compatible_list_by_resolution fails for one fourcc (line 52), the entire function returns an error. For better robustness, consider skipping formats that fail to enumerate rather than aborting completely.

♻️ Suggested improvement
         for fourcc in fourccs {
-            let mut res_map = camera.compatible_list_by_resolution(fourcc)?;
+            let Ok(mut res_map) = camera.compatible_list_by_resolution(fourcc) else {
+                continue; // Skip formats that fail to enumerate
+            };
             let mut res_sorted = BTreeMap::new();
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2d672f5 and eb88d86.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (10)
  • .gitignore
  • Cargo.toml
  • examples/local_video/Cargo.toml
  • examples/local_video/README.md
  • examples/local_video/src/list_devices.rs
  • examples/local_video/src/publisher.rs
  • examples/local_video/src/subscriber.rs
  • examples/local_video/src/yuv_shader.wgsl
  • yuv-sys/Cargo.toml
  • yuv-sys/build.rs
🧰 Additional context used
🧬 Code graph analysis (1)
examples/local_video/src/publisher.rs (2)
yuv-sys/build.rs (1)
  • main (108-174)
livekit/src/room/options.rs (1)
  • as_str (30-38)
🪛 markdownlint-cli2 (0.18.1)
examples/local_video/README.md

15-15: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


33-33: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


43-43: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (8)
Cargo.toml (1)

22-22: LGTM!

The new workspace member is correctly added and maintains the alphabetical ordering of the examples list.

.gitignore (1)

6-7: LGTM!

Sensible additions: .env for local environment variables (used by the new example for LiveKit credentials) and .cursor for Cursor IDE artifacts.

examples/local_video/src/yuv_shader.wgsl (1)

1-61: LGTM!

The shader correctly implements:

  • Fullscreen triangle rendering via vertex index
  • BT.601 limited-range YUV to RGB conversion with proper coefficients
  • Padding-aware UV coordinate scaling to avoid sampling padded columns
  • Vertical flip for correct orientation
examples/local_video/src/subscriber.rs (3)

23-27: LGTM!

Simple and effective shutdown polling with appropriate memory ordering.


656-659: LGTM!

Properly signals shutdown to background threads after the UI window closes, ensuring clean termination of the frame sink loop.


284-298: The review comment is incorrect. The I420 buffer slices returned by data() are guaranteed to match the calculated sizes because they use identical formulas.

Looking at the implementation of I420Buffer::data() in libwebrtc/src/native/video_frame.rs (lines 304-314), the slice lengths are constructed using:

  • Y plane: stride_y() * height()
  • U plane: stride_u() * chroma_height
  • V plane: stride_v() * chroma_height

The code in subscriber.rs calculates the exact same values:

  • y_size = sy * h (where sy is from strides() and h is from height())
  • u_size = su * ch (where su is from strides() and ch = (h + 1) / 2)
  • v_size = sv * ch

Since both use the same strides, height, and chroma height calculations from the same underlying I420 buffer object, the sizes are guaranteed to match. The copy_from_slice calls cannot panic.

Likely an incorrect or invalid review comment.

examples/local_video/src/list_devices.rs (2)

68-86: LGTM!

Clean implementation that properly aggregates formats, deduplicates FPS values, and maintains sorted output via BTreeMap.


13-31: LGTM!

The main function provides clear output with proper handling of the no-cameras case and graceful error reporting for capability enumeration failures.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@@ -0,0 +1,447 @@
use anyhow::Result;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard against --fps 0 causing a panic.

Duration::from_secs_f64(1.0 / pace_fps) will panic on zero; validate before computing the interval.

✅ Suggested fix
-use anyhow::Result;
+use anyhow::{bail, Result};
@@
-    // Pace publishing at the requested FPS (not the camera-reported FPS) to hit desired cadence
-    let pace_fps = args.fps as f64;
+    if args.fps == 0 {
+        bail!("--fps must be > 0");
+    }
+    // Pace publishing at the requested FPS (not the camera-reported FPS) to hit desired cadence
+    let pace_fps = args.fps as f64;

Also applies to: 184-186

🤖 Prompt for AI Agents
In `@examples/local_video/src/publisher.rs` at line 1, Validate pace_fps before
computing Duration::from_secs_f64 to avoid a divide-by-zero panic: in the code
paths that call Duration::from_secs_f64(1.0 / pace_fps) (look for uses in
publisher.rs around the publisher setup and the loop where pace_fps is applied),
check that pace_fps > 0 and return an Err or print a clear error and exit when
it is zero or negative; update both occurrences (the initial interval
computation and the repeated use at lines ~184-186) to perform this guard and
handle the invalid value gracefully instead of letting Duration::from_secs_f64
panic.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@examples/local_video/src/subscriber.rs`:
- Around line 122-366: The background sink thread can keep running after the
active video track changes; inside the thread's main loop (the spawned closure
using sink.next()), add a guard that checks the shared active_sid (active_sid2)
against my_sid and breaks if they differ or if active is None so the old sink
stops immediately when a new track becomes active; perform this check each
iteration (e.g., right after waking/receiving next or before swapping buffers)
to avoid interleaving frames with a new subscription and ensure active SID
ownership is validated before writing into shared (SharedYuv) state.
🧹 Nitpick comments (2)
examples/local_video/src/list_devices.rs (1)

42-66: Add a fallback when FourCC probing yields no usable data.
If compatible_fourcc() returns Ok but empty (or a per‑FourCC resolution query fails), the current path can leave capabilities empty even when compatible_camera_formats() would work. Consider falling back when the map is still empty, and skip failing FourCCs. Please verify this behavior with nokhwa v0.10.0.

♻️ Suggested tweak
-    if let Ok(mut fourccs) = camera.compatible_fourcc() {
-        fourccs.sort();
-        for fourcc in fourccs {
-            let mut res_map = camera.compatible_list_by_resolution(fourcc)?;
-            let mut res_sorted = BTreeMap::new();
-            for (res, mut fps_list) in res_map.drain() {
-                fps_list.sort();
-                res_sorted.insert(res, fps_list);
-            }
-            capabilities.insert(fourcc, res_sorted);
-        }
-    } else {
+    if let Ok(mut fourccs) = camera.compatible_fourcc() {
+        fourccs.sort();
+        for fourcc in fourccs {
+            if let Ok(mut res_map) = camera.compatible_list_by_resolution(fourcc) {
+                let mut res_sorted = BTreeMap::new();
+                for (res, mut fps_list) in res_map.drain() {
+                    fps_list.sort();
+                    res_sorted.insert(res, fps_list);
+                }
+                capabilities.insert(fourcc, res_sorted);
+            }
+        }
+        if capabilities.is_empty() {
+            let formats = camera.compatible_camera_formats()?;
+            capabilities = capabilities_from_formats(formats);
+        }
+    } else {
         let formats = camera.compatible_camera_formats()?;
         capabilities = capabilities_from_formats(formats);
     }
examples/local_video/src/subscriber.rs (1)

368-376: Optionally clear frame dimensions on unsubscribe.
Clearing only HUD data leaves the last frame displayed. Zeroing dims/buffers will blank the view after unpublish/unsubscribe.

♻️ Suggested tweak
     {
         let mut s = shared.lock();
         s.codec.clear();
         s.fps = 0.0;
+        s.width = 0;
+        s.height = 0;
+        s.stride_y = 0;
+        s.stride_u = 0;
+        s.stride_v = 0;
+        s.y.clear();
+        s.u.clear();
+        s.v.clear();
     }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0887902 and 1beec5f.

📒 Files selected for processing (2)
  • examples/local_video/src/list_devices.rs
  • examples/local_video/src/subscriber.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Test (x86_64-pc-windows-msvc)
  • GitHub Check: Test (x86_64-apple-darwin)
  • GitHub Check: Test (x86_64-unknown-linux-gnu)
🔇 Additional comments (23)
examples/local_video/src/list_devices.rs (4)

13-31: LGTM. Clear flow and early exit for no cameras.


33-40: LGTM.


68-84: LGTM.


86-108: LGTM.

examples/local_video/src/subscriber.rs (19)

23-27: LGTM.


29-55: LGTM.


57-69: LGTM.


71-90: LGTM.


92-96: LGTM.


98-115: LGTM.


117-120: LGTM.


378-391: LGTM.


393-406: LGTM.


408-413: LGTM.


415-515: LGTM.


517-533: LGTM.


535-643: LGTM.


647-649: LGTM.


651-705: LGTM.


707-709: LGTM.


711-718: LGTM.


720-1067: LGTM.


1069-1097: LGTM.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +122 to +366
async fn handle_track_subscribed(
track: livekit::track::RemoteTrack,
publication: RemoteTrackPublication,
participant: RemoteParticipant,
allowed_identity: &Option<String>,
shared: &Arc<Mutex<SharedYuv>>,
rt: &tokio::runtime::Handle,
active_sid: &Arc<Mutex<Option<TrackSid>>>,
ctrl_c_received: &Arc<AtomicBool>,
simulcast: &Arc<Mutex<SimulcastState>>,
) {
// If a participant filter is set, skip others
if let Some(ref allow) = allowed_identity {
if participant.identity().as_str() != allow {
debug!("Skipping track from '{}' (filter set to '{}')", participant.identity(), allow);
return;
}
}

let livekit::track::RemoteTrack::Video(video_track) = track else {
return;
};

let sid = publication.sid().clone();
let codec = codec_label(&publication.mime_type());
// Only handle if we don't already have an active video track
{
let mut active = active_sid.lock();
if active.as_ref() == Some(&sid) {
debug!("Track {} already active, ignoring duplicate subscribe", sid);
return;
}
if active.is_some() {
debug!(
"A video track is already active ({}), ignoring new subscribe {}",
active.as_ref().unwrap(),
sid
);
return;
}
*active = Some(sid.clone());
}

// Update HUD codec label early (before first frame arrives)
{
let mut s = shared.lock();
s.codec = codec;
}

info!(
"Subscribed to video track: {} (sid {}) from {} - codec: {}, simulcast: {}, dimension: {}x{}",
publication.name(),
publication.sid(),
participant.identity(),
publication.mime_type(),
publication.simulcasted(),
publication.dimension().0,
publication.dimension().1
);

// Try to fetch inbound RTP/codec stats for more details
match video_track.get_stats().await {
Ok(stats) => {
let mut codec_by_id: HashMap<String, (String, String)> = HashMap::new();
let mut inbound: Option<livekit::webrtc::stats::InboundRtpStats> = None;
for s in stats.iter() {
match s {
livekit::webrtc::stats::RtcStats::Codec(c) => {
codec_by_id.insert(
c.rtc.id.clone(),
(c.codec.mime_type.clone(), c.codec.sdp_fmtp_line.clone()),
);
}
livekit::webrtc::stats::RtcStats::InboundRtp(i) => {
if i.stream.kind == "video" {
inbound = Some(i.clone());
}
}
_ => {}
}
}

if let Some(i) = inbound {
if let Some((mime, fmtp)) = codec_by_id.get(&i.stream.codec_id) {
info!("Inbound codec: {} (fmtp: {})", mime, fmtp);
} else {
info!("Inbound codec id: {}", i.stream.codec_id);
}
info!(
"Inbound current layer: {}x{} ~{:.1} fps, decoder: {}, power_efficient: {}",
i.inbound.frame_width,
i.inbound.frame_height,
i.inbound.frames_per_second,
i.inbound.decoder_implementation,
i.inbound.power_efficient_decoder
);
}
}
Err(e) => debug!("Failed to get stats for video track: {:?}", e),
}

// Start background sink thread
let shared2 = shared.clone();
let active_sid2 = active_sid.clone();
let my_sid = sid.clone();
let rt_clone = rt.clone();
let ctrl_c_sink = ctrl_c_received.clone();
// Initialize simulcast state for this publication
{
let mut sc = simulcast.lock();
sc.available = publication.simulcasted();
let dim = publication.dimension();
sc.full_dims = Some((dim.0, dim.1));
sc.requested_quality = None;
sc.active_quality = None;
sc.publication = Some(publication.clone());
}
let simulcast2 = simulcast.clone();
std::thread::spawn(move || {
let mut sink = NativeVideoStream::new(video_track.rtc_track());
let mut frames: u64 = 0;
let mut last_log = Instant::now();
let mut logged_first = false;
let mut last_stats = Instant::now();
let mut fps_window_frames: u64 = 0;
let mut fps_window_start = Instant::now();
let mut fps_smoothed: f32 = 0.0;
// YUV buffers reused to avoid per-frame allocations
let mut y_buf: Vec<u8> = Vec::new();
let mut u_buf: Vec<u8> = Vec::new();
let mut v_buf: Vec<u8> = Vec::new();
loop {
if ctrl_c_sink.load(Ordering::Acquire) {
break;
}
let next = rt_clone.block_on(async {
tokio::select! {
_ = wait_for_shutdown(ctrl_c_sink.clone()) => None,
frame = sink.next() => frame,
}
});
let Some(frame) = next else { break };
let w = frame.buffer.width();
let h = frame.buffer.height();

if !logged_first {
debug!("First frame: {}x{}, type {:?}", w, h, frame.buffer.buffer_type());
logged_first = true;
}

// Convert to I420 on CPU, but keep planes separate for GPU sampling
let i420 = frame.buffer.to_i420();
let (sy, su, sv) = i420.strides();
let (dy, du, dv) = i420.data();

let ch = (h + 1) / 2;

// Ensure capacity and copy full plane slices
let y_size = (sy * h) as usize;
let u_size = (su * ch) as usize;
let v_size = (sv * ch) as usize;
if y_buf.len() != y_size {
y_buf.resize(y_size, 0);
}
if u_buf.len() != u_size {
u_buf.resize(u_size, 0);
}
if v_buf.len() != v_size {
v_buf.resize(v_size, 0);
}
y_buf.copy_from_slice(dy);
u_buf.copy_from_slice(du);
v_buf.copy_from_slice(dv);

// Swap buffers into shared state
let mut s = shared2.lock();
s.width = w as u32;
s.height = h as u32;
s.stride_y = sy as u32;
s.stride_u = su as u32;
s.stride_v = sv as u32;
std::mem::swap(&mut s.y, &mut y_buf);
std::mem::swap(&mut s.u, &mut u_buf);
std::mem::swap(&mut s.v, &mut v_buf);
s.dirty = true;

// Update smoothed FPS (~500ms window)
fps_window_frames += 1;
let win_elapsed = fps_window_start.elapsed();
if win_elapsed >= Duration::from_millis(500) {
let inst_fps = (fps_window_frames as f32) / (win_elapsed.as_secs_f32().max(0.001));
fps_smoothed = if fps_smoothed <= 0.0 {
inst_fps
} else {
// light EMA smoothing to reduce jitter
(fps_smoothed * 0.7) + (inst_fps * 0.3)
};
s.fps = fps_smoothed;
fps_window_frames = 0;
fps_window_start = Instant::now();
}

frames += 1;
let elapsed = last_log.elapsed();
if elapsed >= Duration::from_secs(2) {
let fps = frames as f64 / elapsed.as_secs_f64();
info!("Receiving video: {}x{}, ~{:.1} fps", w, h, fps);
frames = 0;
last_log = Instant::now();
}
// Periodically infer active simulcast quality from inbound stats
if last_stats.elapsed() >= Duration::from_secs(1) {
if let Ok(stats) = rt_clone.block_on(video_track.get_stats()) {
let mut inbound: Option<livekit::webrtc::stats::InboundRtpStats> = None;
for s in stats.iter() {
if let livekit::webrtc::stats::RtcStats::InboundRtp(i) = s {
if i.stream.kind == "video" {
inbound = Some(i.clone());
}
}
}
if let Some(i) = inbound {
if let Some((fw, fh)) = simulcast_state_full_dims(&simulcast2) {
let q = infer_quality_from_dims(
fw,
fh,
i.inbound.frame_width as u32,
i.inbound.frame_height as u32,
);
let mut sc = simulcast2.lock();
sc.active_quality = Some(q);
}
}
}
last_stats = Instant::now();
}
}
info!("Video stream ended for {}", my_sid);
// Clear active sid if still ours
let mut active = active_sid2.lock();
if active.as_ref() == Some(&my_sid) {
*active = None;
}
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Stop stale sink threads when the active track changes.
If the active SID is cleared/changed, the old sink can still write frames until the stream ends, which can interleave with a new track. Add a guard inside the loop to exit when no longer active.

🐛 Proposed fix
         loop {
             if ctrl_c_sink.load(Ordering::Acquire) {
                 break;
             }
+            {
+                let active = active_sid2.lock();
+                if active.as_ref() != Some(&my_sid) {
+                    break;
+                }
+            }
             let next = rt_clone.block_on(async {
                 tokio::select! {
                     _ = wait_for_shutdown(ctrl_c_sink.clone()) => None,
                     frame = sink.next() => frame,
🤖 Prompt for AI Agents
In `@examples/local_video/src/subscriber.rs` around lines 122 - 366, The
background sink thread can keep running after the active video track changes;
inside the thread's main loop (the spawned closure using sink.next()), add a
guard that checks the shared active_sid (active_sid2) against my_sid and breaks
if they differ or if active is None so the old sink stops immediately when a new
track becomes active; perform this check each iteration (e.g., right after
waking/receiving next or before swapping buffers) to avoid interleaving frames
with a new subscription and ensure active SID ownership is validated before
writing into shared (SharedYuv) state.

@ladvoc ladvoc merged commit 1b63e11 into main Jan 21, 2026
11 checks passed
@ladvoc ladvoc deleted the dc/local_video_examples branch January 21, 2026 23:12
@ladvoc ladvoc changed the title Add local video publisher & subcriber examples Add local video publisher & subscriber examples Jan 21, 2026
@github-actions github-actions bot mentioned this pull request Jan 21, 2026
@coderabbitai coderabbitai bot mentioned this pull request Jan 28, 2026
This was referenced Jan 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants