Add local video publisher & subscriber examples#830
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds two comprehensive examples demonstrating local video capture and streaming with the LiveKit Rust SDK: a publisher that captures frames from a local camera and publishes them to a LiveKit room, and a subscriber that connects to a room and renders received video in a window with GPU acceleration.
Key changes:
- Publisher example with camera capture, format conversion (YUYV/MJPEG/RGB24 to I420), and H.264/H.265 codec support
- Subscriber example with GPU-accelerated YUV rendering using WGPU/egui and simulcast layer controls
- Enhanced yuv-sys build script to detect and enable libjpeg/libjpeg-turbo for MJPEG fast-path decoding
Reviewed changes
Copilot reviewed 8 out of 10 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| yuv-sys/build.rs | Adds pkg-config detection for system libjpeg to enable MJPEG fast-path conversion in libyuv |
| yuv-sys/Cargo.toml | Adds pkg-config dependency for build-time library detection |
| examples/local_video/Cargo.toml | New example package configuration with required dependencies for camera capture, video processing, and GPU rendering |
| examples/local_video/README.md | Documentation for both publisher and subscriber examples with usage instructions |
| examples/local_video/src/publisher.rs | Complete publisher implementation with camera capture, format detection/conversion, and LiveKit video track publishing |
| examples/local_video/src/subscriber.rs | Complete subscriber implementation with video stream reception, GPU rendering, and simulcast controls |
| examples/local_video/src/yuv_shader.wgsl | WGSL shader for YUV to RGB conversion and rendering in the subscriber |
| Cargo.toml | Adds local_video example to workspace members |
| Cargo.lock | Lock file updates for new dependencies |
| .gitignore | Adds .cursor IDE directory to ignore list |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
examples/local_video/README.md
Outdated
| --api-secret YOUR_SECRET | ||
|
|
||
| # subscribe to a specific participant's video only | ||
| cargo run -p local_video --bin subscriber -- \ | ||
| --room-name demo \ | ||
| --identity viewer-1 \ | ||
| --participant alice |
There was a problem hiding this comment.
Inconsistent indentation: these lines use 3 spaces while lines 40-45 above use 4 spaces (with the exception of line 45). The indentation should be consistent throughout the command blocks.
| --api-secret YOUR_SECRET | |
| # subscribe to a specific participant's video only | |
| cargo run -p local_video --bin subscriber -- \ | |
| --room-name demo \ | |
| --identity viewer-1 \ | |
| --participant alice | |
| --api-secret YOUR_SECRET | |
| # subscribe to a specific participant's video only | |
| cargo run -p local_video --bin subscriber -- \ | |
| --room-name demo \ | |
| --identity viewer-1 \ | |
| --participant alice |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This reverts commit 2a6aae3.
| env_logger::init(); | ||
| let args = Args::parse(); | ||
|
|
||
| let ctrl_c_received = Arc::new(AtomicBool::new(false)); |
There was a problem hiding this comment.
suggestion: This can be handled more idiomatically by defining a run function that accepts the CLI arguments and using tokio::select!:
tokio::select! {
_ = run(args) => {},
_ = signal::ctrl_c() => {}
}| env_logger::init(); | ||
| let args = Args::parse(); | ||
|
|
||
| let ctrl_c_received = Arc::new(AtomicBool::new(false)); |
| while let Some(evt) = events.recv().await { | ||
| debug!("Room event: {:?}", evt); | ||
| match evt { | ||
| RoomEvent::TrackSubscribed { track, publication, participant } => { |
There was a problem hiding this comment.
nitpick: Handling each event type in a separate function would improve readability.
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughAdds a new example workspace Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Publisher as Publisher Binary
participant Camera as Camera (Nokhwa)
participant LiveKit as LiveKit Room
participant Network as Network
User->>Publisher: Run with camera index & room info
Publisher->>Camera: Open camera, negotiate format
Camera-->>Publisher: Handle + format
Publisher->>LiveKit: Create token & connect
LiveKit-->>Publisher: Connected
Publisher->>Publisher: Create LocalVideoTrack (H.265/H.264)
loop Capture & Publish
Publisher->>Camera: Read frame
Camera-->>Publisher: Frame data
Publisher->>Publisher: Decode/convert to I420
Publisher->>Publisher: Maintain RTP timestamp & pace
Publisher->>LiveKit: Push I420 frame
LiveKit->>Network: Transmit video
end
User->>Publisher: Ctrl-C
Publisher->>LiveKit: Unpublish & disconnect
sequenceDiagram
participant User
participant Subscriber as Subscriber Binary
participant LiveKit as LiveKit Room
participant Network as Network
participant GPU as GPU (WGPU)
participant UI as egui/eframe
User->>Subscriber: Run with room credentials
Subscriber->>LiveKit: Create token & connect (auto-subscribe)
LiveKit-->>Subscriber: Connected
loop Room Events
LiveKit->>Subscriber: TrackSubscribed
Subscriber->>Subscriber: Spawn frame sink thread
Network->>Subscriber: Deliver frames
Subscriber->>Subscriber: Convert to I420, write SharedYuv
UI->>Subscriber: Request frame
Subscriber->>GPU: Upload Y/U/V textures
GPU->>GPU: Run yuv_shader (YUV→RGB)
GPU-->>UI: Render frame
UI->>User: Display video + HUD + controls
User->>UI: Change simulcast quality
UI->>Subscriber: Update SimulcastState
Subscriber->>LiveKit: Request quality change
end
User->>Subscriber: Ctrl-C
Subscriber->>LiveKit: Disconnect & cleanup
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@examples/local_video/src/publisher.rs`:
- Line 1: Validate pace_fps before computing Duration::from_secs_f64 to avoid a
divide-by-zero panic: in the code paths that call Duration::from_secs_f64(1.0 /
pace_fps) (look for uses in publisher.rs around the publisher setup and the loop
where pace_fps is applied), check that pace_fps > 0 and return an Err or print a
clear error and exit when it is zero or negative; update both occurrences (the
initial interval computation and the repeated use at lines ~184-186) to perform
this guard and handle the invalid value gracefully instead of letting
Duration::from_secs_f64 panic.
♻️ Duplicate comments (3)
examples/local_video/README.md (1)
55-60: Inconsistent indentation in command examples.Lines 56-59 use 4-space indentation while the earlier block (lines 48-53) uses 3-space indentation. Consider aligning for consistency throughout the document.
examples/local_video/src/publisher.rs (2)
165-175: Verify the nokhwa API call name (set_camera_request).The method name appears misspelled as
set_camera_requset, which would fail to compile if the API is actuallyset_camera_request. Please confirm against nokhwa’s current API and fix both call sites if needed.🔧 Possible fix (if the API is `set_camera_request`)
- if let Err(_) = camera - .set_camera_requset(RequestedFormat::new::<RgbFormat>(RequestedFormatType::Exact(wanted))) + if let Err(_) = camera + .set_camera_request(RequestedFormat::new::<RgbFormat>(RequestedFormatType::Exact(wanted))) { @@ - let _ = camera - .set_camera_requset(RequestedFormat::new::<RgbFormat>(RequestedFormatType::Exact(alt))); + let _ = camera + .set_camera_request(RequestedFormat::new::<RgbFormat>(RequestedFormatType::Exact(alt))); }
283-317: Handle libyuv conversion failures instead of ignoring return codes.
rs_YUY2ToI420/rs_RGB24ToI420return non‑zero on failure; ignoring them can publish corrupted frames. Consider logging and skipping the frame, and apply the same handling to the other conversion paths.🔍 Example handling for the YUYV path
- unsafe { - // returns 0 on success - let _ = yuv_sys::rs_YUY2ToI420( - src_bytes.as_ptr(), - src_stride, - data_y.as_mut_ptr(), - stride_y as i32, - data_u.as_mut_ptr(), - stride_u as i32, - data_v.as_mut_ptr(), - stride_v as i32, - width as i32, - height as i32, - ); - } + let ret = unsafe { + // returns 0 on success + yuv_sys::rs_YUY2ToI420( + src_bytes.as_ptr(), + src_stride, + data_y.as_mut_ptr(), + stride_y as i32, + data_u.as_mut_ptr(), + stride_u as i32, + data_v.as_mut_ptr(), + stride_v as i32, + width as i32, + height as i32, + ) + }; + if ret != 0 { + log::warn!("YUYV->I420 conversion failed: {}", ret); + continue; + }Also applies to: 362-374
🧹 Nitpick comments (4)
examples/local_video/README.md (1)
15-30: Add language specifier to fenced code blocks.The code blocks are missing language specifiers, which affects syntax highlighting and linting compliance.
📝 Suggested fix
-``` +```bash cargo run -p local_video --bin publisher -- --list-camerasApply similar changes to the code blocks starting at lines 33 and 43.
examples/local_video/src/subscriber.rs (2)
437-440: Minor redundancy in repaint requests.
request_repaint()on line 439 triggers an immediate repaint, whilerequest_repaint_after(16ms)on line 520 schedules a delayed one. For video playback at ~60fps, the delayed repaint alone should suffice. The immediate request causes extra repaints but isn't harmful.Consider removing the immediate
request_repaint()call if the 16ms delayed repaint provides sufficient smoothness.
992-998: Per-frame allocations for texture packing.The
packed,packed_u, andpacked_vvectors are allocated on every dirty frame. For 30-60fps video, this creates significant allocation pressure. Consider caching these buffers inYuvGpuStateand reusing them across frames.♻️ Optimization suggestion
Add cached buffers to
YuvGpuState:struct YuvGpuState { // ... existing fields ... packed_y: Vec<u8>, packed_u: Vec<u8>, packed_v: Vec<u8>, }Then resize and reuse instead of allocating new vectors each frame.
examples/local_video/src/list_devices.rs (1)
49-59: Consider graceful handling of per-format failures.If
compatible_list_by_resolutionfails for one fourcc (line 52), the entire function returns an error. For better robustness, consider skipping formats that fail to enumerate rather than aborting completely.♻️ Suggested improvement
for fourcc in fourccs { - let mut res_map = camera.compatible_list_by_resolution(fourcc)?; + let Ok(mut res_map) = camera.compatible_list_by_resolution(fourcc) else { + continue; // Skip formats that fail to enumerate + }; let mut res_sorted = BTreeMap::new();
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
.gitignoreCargo.tomlexamples/local_video/Cargo.tomlexamples/local_video/README.mdexamples/local_video/src/list_devices.rsexamples/local_video/src/publisher.rsexamples/local_video/src/subscriber.rsexamples/local_video/src/yuv_shader.wgslyuv-sys/Cargo.tomlyuv-sys/build.rs
🧰 Additional context used
🧬 Code graph analysis (1)
examples/local_video/src/publisher.rs (2)
yuv-sys/build.rs (1)
main(108-174)livekit/src/room/options.rs (1)
as_str(30-38)
🪛 markdownlint-cli2 (0.18.1)
examples/local_video/README.md
15-15: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
33-33: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
43-43: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🔇 Additional comments (8)
Cargo.toml (1)
22-22: LGTM!The new workspace member is correctly added and maintains the alphabetical ordering of the examples list.
.gitignore (1)
6-7: LGTM!Sensible additions:
.envfor local environment variables (used by the new example for LiveKit credentials) and.cursorfor Cursor IDE artifacts.examples/local_video/src/yuv_shader.wgsl (1)
1-61: LGTM!The shader correctly implements:
- Fullscreen triangle rendering via vertex index
- BT.601 limited-range YUV to RGB conversion with proper coefficients
- Padding-aware UV coordinate scaling to avoid sampling padded columns
- Vertical flip for correct orientation
examples/local_video/src/subscriber.rs (3)
23-27: LGTM!Simple and effective shutdown polling with appropriate memory ordering.
656-659: LGTM!Properly signals shutdown to background threads after the UI window closes, ensuring clean termination of the frame sink loop.
284-298: The review comment is incorrect. The I420 buffer slices returned bydata()are guaranteed to match the calculated sizes because they use identical formulas.Looking at the implementation of
I420Buffer::data()inlibwebrtc/src/native/video_frame.rs(lines 304-314), the slice lengths are constructed using:
- Y plane:
stride_y() * height()- U plane:
stride_u() * chroma_height- V plane:
stride_v() * chroma_heightThe code in subscriber.rs calculates the exact same values:
y_size = sy * h(wheresyis fromstrides()andhis fromheight())u_size = su * ch(wheresuis fromstrides()andch = (h + 1) / 2)v_size = sv * chSince both use the same strides, height, and chroma height calculations from the same underlying I420 buffer object, the sizes are guaranteed to match. The
copy_from_slicecalls cannot panic.Likely an incorrect or invalid review comment.
examples/local_video/src/list_devices.rs (2)
68-86: LGTM!Clean implementation that properly aggregates formats, deduplicates FPS values, and maintains sorted output via BTreeMap.
13-31: LGTM!The main function provides clear output with proper handling of the no-cameras case and graceful error reporting for capability enumeration failures.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| @@ -0,0 +1,447 @@ | |||
| use anyhow::Result; | |||
There was a problem hiding this comment.
Guard against --fps 0 causing a panic.
Duration::from_secs_f64(1.0 / pace_fps) will panic on zero; validate before computing the interval.
✅ Suggested fix
-use anyhow::Result;
+use anyhow::{bail, Result};
@@
- // Pace publishing at the requested FPS (not the camera-reported FPS) to hit desired cadence
- let pace_fps = args.fps as f64;
+ if args.fps == 0 {
+ bail!("--fps must be > 0");
+ }
+ // Pace publishing at the requested FPS (not the camera-reported FPS) to hit desired cadence
+ let pace_fps = args.fps as f64;Also applies to: 184-186
🤖 Prompt for AI Agents
In `@examples/local_video/src/publisher.rs` at line 1, Validate pace_fps before
computing Duration::from_secs_f64 to avoid a divide-by-zero panic: in the code
paths that call Duration::from_secs_f64(1.0 / pace_fps) (look for uses in
publisher.rs around the publisher setup and the loop where pace_fps is applied),
check that pace_fps > 0 and return an Err or print a clear error and exit when
it is zero or negative; update both occurrences (the initial interval
computation and the repeated use at lines ~184-186) to perform this guard and
handle the invalid value gracefully instead of letting Duration::from_secs_f64
panic.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@examples/local_video/src/subscriber.rs`:
- Around line 122-366: The background sink thread can keep running after the
active video track changes; inside the thread's main loop (the spawned closure
using sink.next()), add a guard that checks the shared active_sid (active_sid2)
against my_sid and breaks if they differ or if active is None so the old sink
stops immediately when a new track becomes active; perform this check each
iteration (e.g., right after waking/receiving next or before swapping buffers)
to avoid interleaving frames with a new subscription and ensure active SID
ownership is validated before writing into shared (SharedYuv) state.
🧹 Nitpick comments (2)
examples/local_video/src/list_devices.rs (1)
42-66: Add a fallback when FourCC probing yields no usable data.
Ifcompatible_fourcc()returnsOkbut empty (or a per‑FourCC resolution query fails), the current path can leavecapabilitiesempty even whencompatible_camera_formats()would work. Consider falling back when the map is still empty, and skip failing FourCCs. Please verify this behavior with nokhwa v0.10.0.♻️ Suggested tweak
- if let Ok(mut fourccs) = camera.compatible_fourcc() { - fourccs.sort(); - for fourcc in fourccs { - let mut res_map = camera.compatible_list_by_resolution(fourcc)?; - let mut res_sorted = BTreeMap::new(); - for (res, mut fps_list) in res_map.drain() { - fps_list.sort(); - res_sorted.insert(res, fps_list); - } - capabilities.insert(fourcc, res_sorted); - } - } else { + if let Ok(mut fourccs) = camera.compatible_fourcc() { + fourccs.sort(); + for fourcc in fourccs { + if let Ok(mut res_map) = camera.compatible_list_by_resolution(fourcc) { + let mut res_sorted = BTreeMap::new(); + for (res, mut fps_list) in res_map.drain() { + fps_list.sort(); + res_sorted.insert(res, fps_list); + } + capabilities.insert(fourcc, res_sorted); + } + } + if capabilities.is_empty() { + let formats = camera.compatible_camera_formats()?; + capabilities = capabilities_from_formats(formats); + } + } else { let formats = camera.compatible_camera_formats()?; capabilities = capabilities_from_formats(formats); }examples/local_video/src/subscriber.rs (1)
368-376: Optionally clear frame dimensions on unsubscribe.
Clearing only HUD data leaves the last frame displayed. Zeroing dims/buffers will blank the view after unpublish/unsubscribe.♻️ Suggested tweak
{ let mut s = shared.lock(); s.codec.clear(); s.fps = 0.0; + s.width = 0; + s.height = 0; + s.stride_y = 0; + s.stride_u = 0; + s.stride_v = 0; + s.y.clear(); + s.u.clear(); + s.v.clear(); }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
examples/local_video/src/list_devices.rsexamples/local_video/src/subscriber.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Test (x86_64-pc-windows-msvc)
- GitHub Check: Test (x86_64-apple-darwin)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
🔇 Additional comments (23)
examples/local_video/src/list_devices.rs (4)
13-31: LGTM. Clear flow and early exit for no cameras.
33-40: LGTM.
68-84: LGTM.
86-108: LGTM.examples/local_video/src/subscriber.rs (19)
23-27: LGTM.
29-55: LGTM.
57-69: LGTM.
71-90: LGTM.
92-96: LGTM.
98-115: LGTM.
117-120: LGTM.
378-391: LGTM.
393-406: LGTM.
408-413: LGTM.
415-515: LGTM.
517-533: LGTM.
535-643: LGTM.
647-649: LGTM.
651-705: LGTM.
707-709: LGTM.
711-718: LGTM.
720-1067: LGTM.
1069-1097: LGTM.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| async fn handle_track_subscribed( | ||
| track: livekit::track::RemoteTrack, | ||
| publication: RemoteTrackPublication, | ||
| participant: RemoteParticipant, | ||
| allowed_identity: &Option<String>, | ||
| shared: &Arc<Mutex<SharedYuv>>, | ||
| rt: &tokio::runtime::Handle, | ||
| active_sid: &Arc<Mutex<Option<TrackSid>>>, | ||
| ctrl_c_received: &Arc<AtomicBool>, | ||
| simulcast: &Arc<Mutex<SimulcastState>>, | ||
| ) { | ||
| // If a participant filter is set, skip others | ||
| if let Some(ref allow) = allowed_identity { | ||
| if participant.identity().as_str() != allow { | ||
| debug!("Skipping track from '{}' (filter set to '{}')", participant.identity(), allow); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| let livekit::track::RemoteTrack::Video(video_track) = track else { | ||
| return; | ||
| }; | ||
|
|
||
| let sid = publication.sid().clone(); | ||
| let codec = codec_label(&publication.mime_type()); | ||
| // Only handle if we don't already have an active video track | ||
| { | ||
| let mut active = active_sid.lock(); | ||
| if active.as_ref() == Some(&sid) { | ||
| debug!("Track {} already active, ignoring duplicate subscribe", sid); | ||
| return; | ||
| } | ||
| if active.is_some() { | ||
| debug!( | ||
| "A video track is already active ({}), ignoring new subscribe {}", | ||
| active.as_ref().unwrap(), | ||
| sid | ||
| ); | ||
| return; | ||
| } | ||
| *active = Some(sid.clone()); | ||
| } | ||
|
|
||
| // Update HUD codec label early (before first frame arrives) | ||
| { | ||
| let mut s = shared.lock(); | ||
| s.codec = codec; | ||
| } | ||
|
|
||
| info!( | ||
| "Subscribed to video track: {} (sid {}) from {} - codec: {}, simulcast: {}, dimension: {}x{}", | ||
| publication.name(), | ||
| publication.sid(), | ||
| participant.identity(), | ||
| publication.mime_type(), | ||
| publication.simulcasted(), | ||
| publication.dimension().0, | ||
| publication.dimension().1 | ||
| ); | ||
|
|
||
| // Try to fetch inbound RTP/codec stats for more details | ||
| match video_track.get_stats().await { | ||
| Ok(stats) => { | ||
| let mut codec_by_id: HashMap<String, (String, String)> = HashMap::new(); | ||
| let mut inbound: Option<livekit::webrtc::stats::InboundRtpStats> = None; | ||
| for s in stats.iter() { | ||
| match s { | ||
| livekit::webrtc::stats::RtcStats::Codec(c) => { | ||
| codec_by_id.insert( | ||
| c.rtc.id.clone(), | ||
| (c.codec.mime_type.clone(), c.codec.sdp_fmtp_line.clone()), | ||
| ); | ||
| } | ||
| livekit::webrtc::stats::RtcStats::InboundRtp(i) => { | ||
| if i.stream.kind == "video" { | ||
| inbound = Some(i.clone()); | ||
| } | ||
| } | ||
| _ => {} | ||
| } | ||
| } | ||
|
|
||
| if let Some(i) = inbound { | ||
| if let Some((mime, fmtp)) = codec_by_id.get(&i.stream.codec_id) { | ||
| info!("Inbound codec: {} (fmtp: {})", mime, fmtp); | ||
| } else { | ||
| info!("Inbound codec id: {}", i.stream.codec_id); | ||
| } | ||
| info!( | ||
| "Inbound current layer: {}x{} ~{:.1} fps, decoder: {}, power_efficient: {}", | ||
| i.inbound.frame_width, | ||
| i.inbound.frame_height, | ||
| i.inbound.frames_per_second, | ||
| i.inbound.decoder_implementation, | ||
| i.inbound.power_efficient_decoder | ||
| ); | ||
| } | ||
| } | ||
| Err(e) => debug!("Failed to get stats for video track: {:?}", e), | ||
| } | ||
|
|
||
| // Start background sink thread | ||
| let shared2 = shared.clone(); | ||
| let active_sid2 = active_sid.clone(); | ||
| let my_sid = sid.clone(); | ||
| let rt_clone = rt.clone(); | ||
| let ctrl_c_sink = ctrl_c_received.clone(); | ||
| // Initialize simulcast state for this publication | ||
| { | ||
| let mut sc = simulcast.lock(); | ||
| sc.available = publication.simulcasted(); | ||
| let dim = publication.dimension(); | ||
| sc.full_dims = Some((dim.0, dim.1)); | ||
| sc.requested_quality = None; | ||
| sc.active_quality = None; | ||
| sc.publication = Some(publication.clone()); | ||
| } | ||
| let simulcast2 = simulcast.clone(); | ||
| std::thread::spawn(move || { | ||
| let mut sink = NativeVideoStream::new(video_track.rtc_track()); | ||
| let mut frames: u64 = 0; | ||
| let mut last_log = Instant::now(); | ||
| let mut logged_first = false; | ||
| let mut last_stats = Instant::now(); | ||
| let mut fps_window_frames: u64 = 0; | ||
| let mut fps_window_start = Instant::now(); | ||
| let mut fps_smoothed: f32 = 0.0; | ||
| // YUV buffers reused to avoid per-frame allocations | ||
| let mut y_buf: Vec<u8> = Vec::new(); | ||
| let mut u_buf: Vec<u8> = Vec::new(); | ||
| let mut v_buf: Vec<u8> = Vec::new(); | ||
| loop { | ||
| if ctrl_c_sink.load(Ordering::Acquire) { | ||
| break; | ||
| } | ||
| let next = rt_clone.block_on(async { | ||
| tokio::select! { | ||
| _ = wait_for_shutdown(ctrl_c_sink.clone()) => None, | ||
| frame = sink.next() => frame, | ||
| } | ||
| }); | ||
| let Some(frame) = next else { break }; | ||
| let w = frame.buffer.width(); | ||
| let h = frame.buffer.height(); | ||
|
|
||
| if !logged_first { | ||
| debug!("First frame: {}x{}, type {:?}", w, h, frame.buffer.buffer_type()); | ||
| logged_first = true; | ||
| } | ||
|
|
||
| // Convert to I420 on CPU, but keep planes separate for GPU sampling | ||
| let i420 = frame.buffer.to_i420(); | ||
| let (sy, su, sv) = i420.strides(); | ||
| let (dy, du, dv) = i420.data(); | ||
|
|
||
| let ch = (h + 1) / 2; | ||
|
|
||
| // Ensure capacity and copy full plane slices | ||
| let y_size = (sy * h) as usize; | ||
| let u_size = (su * ch) as usize; | ||
| let v_size = (sv * ch) as usize; | ||
| if y_buf.len() != y_size { | ||
| y_buf.resize(y_size, 0); | ||
| } | ||
| if u_buf.len() != u_size { | ||
| u_buf.resize(u_size, 0); | ||
| } | ||
| if v_buf.len() != v_size { | ||
| v_buf.resize(v_size, 0); | ||
| } | ||
| y_buf.copy_from_slice(dy); | ||
| u_buf.copy_from_slice(du); | ||
| v_buf.copy_from_slice(dv); | ||
|
|
||
| // Swap buffers into shared state | ||
| let mut s = shared2.lock(); | ||
| s.width = w as u32; | ||
| s.height = h as u32; | ||
| s.stride_y = sy as u32; | ||
| s.stride_u = su as u32; | ||
| s.stride_v = sv as u32; | ||
| std::mem::swap(&mut s.y, &mut y_buf); | ||
| std::mem::swap(&mut s.u, &mut u_buf); | ||
| std::mem::swap(&mut s.v, &mut v_buf); | ||
| s.dirty = true; | ||
|
|
||
| // Update smoothed FPS (~500ms window) | ||
| fps_window_frames += 1; | ||
| let win_elapsed = fps_window_start.elapsed(); | ||
| if win_elapsed >= Duration::from_millis(500) { | ||
| let inst_fps = (fps_window_frames as f32) / (win_elapsed.as_secs_f32().max(0.001)); | ||
| fps_smoothed = if fps_smoothed <= 0.0 { | ||
| inst_fps | ||
| } else { | ||
| // light EMA smoothing to reduce jitter | ||
| (fps_smoothed * 0.7) + (inst_fps * 0.3) | ||
| }; | ||
| s.fps = fps_smoothed; | ||
| fps_window_frames = 0; | ||
| fps_window_start = Instant::now(); | ||
| } | ||
|
|
||
| frames += 1; | ||
| let elapsed = last_log.elapsed(); | ||
| if elapsed >= Duration::from_secs(2) { | ||
| let fps = frames as f64 / elapsed.as_secs_f64(); | ||
| info!("Receiving video: {}x{}, ~{:.1} fps", w, h, fps); | ||
| frames = 0; | ||
| last_log = Instant::now(); | ||
| } | ||
| // Periodically infer active simulcast quality from inbound stats | ||
| if last_stats.elapsed() >= Duration::from_secs(1) { | ||
| if let Ok(stats) = rt_clone.block_on(video_track.get_stats()) { | ||
| let mut inbound: Option<livekit::webrtc::stats::InboundRtpStats> = None; | ||
| for s in stats.iter() { | ||
| if let livekit::webrtc::stats::RtcStats::InboundRtp(i) = s { | ||
| if i.stream.kind == "video" { | ||
| inbound = Some(i.clone()); | ||
| } | ||
| } | ||
| } | ||
| if let Some(i) = inbound { | ||
| if let Some((fw, fh)) = simulcast_state_full_dims(&simulcast2) { | ||
| let q = infer_quality_from_dims( | ||
| fw, | ||
| fh, | ||
| i.inbound.frame_width as u32, | ||
| i.inbound.frame_height as u32, | ||
| ); | ||
| let mut sc = simulcast2.lock(); | ||
| sc.active_quality = Some(q); | ||
| } | ||
| } | ||
| } | ||
| last_stats = Instant::now(); | ||
| } | ||
| } | ||
| info!("Video stream ended for {}", my_sid); | ||
| // Clear active sid if still ours | ||
| let mut active = active_sid2.lock(); | ||
| if active.as_ref() == Some(&my_sid) { | ||
| *active = None; | ||
| } | ||
| }); | ||
| } |
There was a problem hiding this comment.
Stop stale sink threads when the active track changes.
If the active SID is cleared/changed, the old sink can still write frames until the stream ends, which can interleave with a new track. Add a guard inside the loop to exit when no longer active.
🐛 Proposed fix
loop {
if ctrl_c_sink.load(Ordering::Acquire) {
break;
}
+ {
+ let active = active_sid2.lock();
+ if active.as_ref() != Some(&my_sid) {
+ break;
+ }
+ }
let next = rt_clone.block_on(async {
tokio::select! {
_ = wait_for_shutdown(ctrl_c_sink.clone()) => None,
frame = sink.next() => frame,🤖 Prompt for AI Agents
In `@examples/local_video/src/subscriber.rs` around lines 122 - 366, The
background sink thread can keep running after the active video track changes;
inside the thread's main loop (the spawned closure using sink.next()), add a
guard that checks the shared active_sid (active_sid2) against my_sid and breaks
if they differ or if active is None so the old sink stops immediately when a new
track becomes active; perform this check each iteration (e.g., right after
waking/receiving next or before swapping buffers) to avoid interleaving frames
with a new subscription and ensure active SID ownership is validated before
writing into shared (SharedYuv) state.
adding
examples/local_videoto demonstrate publishing & subscribing to video tracks using the Rust SDK.Summary by CodeRabbit
New Features
Documentation
Chores
✏️ Tip: You can customize this high-level summary in your review settings.