Skip to content

Data tracks core#862

Open
ladvoc wants to merge 24 commits intomainfrom
ladvoc/data-tracks-core
Open

Data tracks core#862
ladvoc wants to merge 24 commits intomainfrom
ladvoc/data-tracks-core

Conversation

@ladvoc
Copy link
Contributor

@ladvoc ladvoc commented Feb 3, 2026

This PR introduces livekit-datatrack, a new crate that encapsulates the core functionality and APIs for data tracks. The crate is intentionally designed with no dependencies on libwebrtc or the signaling client, making it fully unit-testable and enabling incremental adoption in Swift and other languages via the new livekit-uniffi interface.

Key modules:

  • track: Public types for local and remote data tracks
  • frame: Public types for constructing and reading frames
  • packet: Packet serialization and deserialization
  • e2ee: Provider interfaces for end-to-end encryption
  • local: Publishing functionality, including packetization and the serialization pipeline
  • remote: Subscribing functionality, including depacketization and the deserialization pipeline

Given the large scope of data tracks, integration with the Rust client (including E2E tests and examples) as well as FFI bindings have been split up into separate PRs:

Data tracks is also being developed in parallel for the web:

Summary by CodeRabbit

  • New Features
    • Full data-track system: publish/subscribe workflows, per-track local & remote managers, lifecycle controls, subscribe API with timeouts.
    • Optional end-to-end encryption via pluggable providers; frame handling, packetization/serialization, MTU-aware chunking, depacketization/reassembly, timestamps, and reliable packet routing.
  • Chores
    • Added a new internal data-track crate to the workspace and required workspace dependencies.
  • Documentation
    • README stating crate is internal and recommending client SDKs.
  • Tests
    • Extensive unit tests covering managers, packetization, serialization, depacketization, pipelines, and conversions.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 3, 2026

📝 Walkthrough

Walkthrough

Adds a new livekit-datatrack crate to the workspace and implements local & remote data-track subsystems, packet model (serialize/deserialize), packetizer/depacketizer pipelines with optional E2EE traits, utilities (bytes chunking, counters), managers/event loops, protocol conversions, tests, and release metadata.

Changes

Cohort / File(s) Summary
Workspace & release
Cargo.toml, release-plz.toml
Added livekit-datatrack workspace member and workspace deps (from_variants, futures-core, tokio-stream); release/publish config for new crate.
Crate manifest & docs
livekit-datatrack/Cargo.toml, livekit-datatrack/README.md
New crate manifest and README describing the crate and recommended public SDK usage.
Crate root & API surface
livekit-datatrack/src/lib.rs, livekit-datatrack/src/track.rs
Introduces module layout, api and backend re-exports, DataTrack types, DataTrackInfo, DataTrackSid and accessors.
Frame, E2EE & errors
livekit-datatrack/src/frame.rs, livekit-datatrack/src/e2ee.rs, livekit-datatrack/src/error.rs
Adds DataTrackFrame with timestamp helpers, EncryptedPayload, Encryption/Decryption provider traits, and InternalError wrapper.
Local subsystem
livekit-datatrack/src/local/...
mod.rs, manager.rs, events.rs, packetizer.rs, pipeline.rs, proto.rs
Local publish manager, input/output events, TrackTask lifecycle, packetizer (fragmentation/markers), pipeline with optional E2EE, protocol conversions and tests.
Remote subsystem
livekit-datatrack/src/remote/...
mod.rs, manager.rs, events.rs, depacketizer.rs, pipeline.rs, proto.rs
Remote manager, subscription lifecycle, depacketizer (reassembly), per-track pipeline with optional decryption, proto converters and tests.
Packet model & codec
livekit-datatrack/src/packet/...
mod.rs, handle.rs, extension.rs, time.rs, serialize.rs, deserialize.rs
Packet, Header, FrameMarker, Handle allocator, Extensions (E2EE, user timestamp), Clock/Timestamp, full serialize/deserialize implementations, error types and tests.
Utilities
livekit-datatrack/src/utils/*
mod.rs, bytes.rs, counter.rs
Bytes zero-copy chunk iterator and generic wrap-around Counter with trait impls and re-exports.
Minor formatting
livekit-ffi/src/conversion/participant.rs, livekit/src/*
Small trailing-comma/formatting edits with no behavioral changes.

Sequence Diagram(s)

sequenceDiagram
    rect rgba(200,200,255,0.5)
    participant Client
    participant LocalMgr as Local Manager
    participant SFU as SFU
    participant Pipeline
    participant Packetizer
    participant Output as Output Events
    end

    Client->>LocalMgr: publish_track(options)
    LocalMgr->>SFU: SfuPublishRequest(handle,name,uses_e2ee)
    SFU-->>LocalMgr: SfuPublishResponse(handle,result)
    LocalMgr-->>Client: LocalDataTrack (on success)

    Client->>LocalMgr: try_push(frame)
    LocalMgr->>Pipeline: process_frame(frame)
    Pipeline->>Packetizer: packetize(frame/encrypted_payload)
    Packetizer-->>LocalMgr: Vec<Packet>
    LocalMgr->>Output: PacketsAvailable(packets)
Loading
sequenceDiagram
    rect rgba(200,255,200,0.5)
    participant Client
    participant RemoteMgr as Remote Manager
    participant SFU as SFU
    participant Pipeline
    participant Depacketizer
    participant Subscriber as Subscriber Stream
    end

    Client->>RemoteMgr: subscribe(sid)
    RemoteMgr->>SFU: SubscribeRequest
    SFU-->>RemoteMgr: SfuPublicationUpdates & SubscriberHandles
    SFU->>RemoteMgr: Packet
    RemoteMgr->>Pipeline: process_packet(packet)
    Pipeline->>Depacketizer: depacketize(packet)
    Depacketizer-->>Pipeline: DepacketizerFrame
    Pipeline-->>RemoteMgr: DataTrackFrame
    RemoteMgr->>Subscriber: deliver frame
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • pblazej
  • xianshijing-lk

Poem

🐰
I hop on bytes through moonlit tracks,
I stitch the pieces, guard the backs,
IVs tucked, handles bright,
Packets leap into the night,
Carrots cheer — data hops, hooray!

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Data tracks core' accurately reflects the main purpose of the PR—introducing a new livekit-datatrack crate with core data track functionality.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ladvoc/data-tracks-core

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ladvoc ladvoc requested review from a team and 1egoman February 3, 2026 00:26
@ladvoc ladvoc mentioned this pull request Feb 3, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🤖 Fix all issues with AI agents
In `@livekit-datatrack/Cargo.toml`:
- Line 13: The crate livekit-datatrack is using a pinned dependency thiserror =
"2.0.17" because it isn't part of the workspace members; either add
livekit-datatrack to the workspace members in the root Cargo.toml and change
thiserror in livekit-datatrack/Cargo.toml to thiserror = { workspace = true }
(then coordinate a workspace upgrade to thiserror 2.x), or if livekit-datatrack
is intentionally external, remove the TODO and add a short comment in
livekit-datatrack/Cargo.toml documenting why it is excluded from the workspace
and why it pins thiserror; update the thiserror entry and comments accordingly.
- Around line 1-7: The workspace is missing registration for the new crate: add
"livekit-datatrack" to the workspace members array so Cargo includes it in
builds/tests/publishing; update the [workspace] members list (near existing
"livekit-runtime") to include the exact member name livekit-datatrack.

In `@livekit-datatrack/src/frame.rs`:
- Around line 50-62: The current duration_since_timestamp may panic because
SystemTime + Duration can overflow; update duration calculation in
duration_since_timestamp to use SystemTime::checked_add instead of the Add impl:
take the optional user_timestamp, build a Duration::from_millis, call
UNIX_EPOCH.checked_add(...) to get an Option<SystemTime>, and then if
Some(epoch) call SystemTime::now().duration_since(epoch).inspect_err(...).ok()
(returning None if checked_add returns None or duration_since errors). Keep the
existing log::error call for duration_since errors and preserve the Option
return semantics.

In `@livekit-datatrack/src/local/manager.rs`:
- Around line 240-250: In remove_descriptor, the logic in the Descriptor::Active
branch inverts the publish-check: it currently sends false only when
published_tx is already false. Change the condition to check if
*published_tx.borrow() is true and then send false to notify unpublish; i.e., in
the remove_descriptor implementation (handling Descriptor::Active and
published_tx) send false when the track is currently published so subscribers
are notified.
- Around line 32-40: The struct field name is wrong: in ManagerOptions rename
decryption_provider to encryption_provider and update its doc comment to
"Provider to use for encrypting outgoing frame payloads." Keep the type as
Option<Arc<dyn EncryptionProvider>> and then update all references/usages
(constructors, Manager::new, tests, builders, and any pattern matches) from
decryption_provider to encryption_provider to avoid breakage.

In `@livekit-datatrack/src/local/packetizer.rs`:
- Around line 56-86: The closure in packetize moves the non-Copy Header via
`..header`, causing a move on the first iteration; fix by creating a cloned base
header for each packet (e.g., call header.clone() or clone a prepared
base_header) inside the map/iterator so you set per-packet fields (marker,
sequence, extensions) on that clone rather than using `..header`; ensure the
Header type implements Clone if it doesn't already and update the map to use the
cloned header when building each Packet (see packetize, Header, frame_marker,
and sequence references).

In `@livekit-datatrack/src/local/pipeline.rs`:
- Around line 45-51: The constructor Pipeline::new currently uses
debug_assert_eq! to validate that options.info.uses_e2ee matches
options.encryption_provider.is_some(), which is stripped in release builds;
change that check to a runtime assertion (replace debug_assert_eq! with
assert_eq!) so mismatches are caught in all builds (include a clear error
message referencing uses_e2ee vs encryption_provider). Update the assertion in
Pipeline::new (and keep the rest of the initialization untouched) so
encrypt_if_needed cannot be left running without a provider when the server
indicates E2EE is required.

In `@livekit-datatrack/src/packet/deserialize.rs`:
- Around line 78-88: The ext_words u16 can overflow when computing ext_len by
doing ext_words + 1 in u16; update the calculation in the ext_flag branch (where
ext_words is read with raw.get_u16()) to convert ext_words to usize before
adding 1 and multiplying (i.e., compute ext_len using ext_words as usize: 4 *
(ext_words_as_usize + 1)) and keep the existing bounds checks
(MissingExtWords/HeaderOverrun) intact so malformed inputs cannot wrap and
bypass the header length validation.
- Around line 97-131: The deserializer currently ignores the len field and can
go out-of-sync; update Extensions::deserialize to use the len value for
alignment: after reading tag and len, validate that raw.remaining() >= len (or
return MalformedExt), and for known tags (E2eeExt::TAG, UserTimestampExt::TAG)
check that len equals the serialized payload-length (i.e. expected_payload_len -
1 as written by serialize.rs) before reading fields (E2eeExt::LEN and
UserTimestampExt::LEN should be used to compute the expected len), then read
exactly that payload; for EXT_TAG_PADDING and unknown tags, advance by len (or
len + 1 if serialize writes len as payload_len - 1 and you need to skip the
extra byte) to preserve alignment instead of just continuing, returning
MalformedExt on mismatch. Use the existing symbols Extensions::deserialize,
E2eeExt::TAG/LEN, UserTimestampExt::TAG/LEN, and EXT_TAG_PADDING to locate where
to add these checks and advances.

In `@livekit-datatrack/src/packet/time.rs`:
- Around line 52-79: The Clock::at method currently calls
instant.duration_since(self.epoch), which can panic if instant is earlier than
epoch; change that to instant.saturating_duration_since(self.epoch) to
defensively handle stale instants and avoid future panics. Update the
computation in Clock::at (function name: at, struct: Clock) to use
saturating_duration_since when computing elapsed before calling
duration_to_ticks, preserving the remaining logic (wrapping_add, monotonicity
check with prev) unchanged.

In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Around line 151-158: The expected packet count calculation in the
DepacketizerDropError construction is wrong when u16 sequence numbers wrap;
update the computation used in DepacketizerDropReason::Incomplete (where you
reference partial.frame_number, partial.start_sequence and end_sequence) to use
wrapping arithmetic instead of plain subtraction—e.g., compute expected by using
end_sequence.wrapping_sub(partial.start_sequence).wrapping_add(1) (and
cast/promote to the expected integer type such as usize or u32) so wrapped
ranges like start=65530 and end=5 produce the correct inclusive count.

In `@livekit-datatrack/src/remote/manager.rs`:
- Around line 210-219: handle_track_unpublished currently removes the
subscription entry but does not await the running subscription task, risking
races; change handle_track_unpublished to be async (matching shutdown()) and
when matching SubscriptionState::Active { sub_handle, .. } use
self.sub_handles.remove(&sub_handle) to take the task's JoinHandle (or
equivalent task_handle) and await it (e.g., task_handle.await) before
proceeding, then send published_tx(false) and remove descriptor; use the same
JoinHandle type and awaiting pattern used in shutdown() to ensure the background
task finishes before the manager continues.
- Around line 142-156: on_unsubscribe_request currently returns early when
descriptor.subscription matches SubscriptionState::Pending, leaving the
descriptor.subscription and any pending result receivers unresolved; update
on_unsubscribe_request (the match on descriptor.subscription) to, when
SubscriptionState::Pending is found, set descriptor.subscription =
SubscriptionState::None, clear any pending result receivers (notify or send an
error/closure to the pending result channel stored on the descriptor), and still
send the SfuUpdateSubscription event via self.event_out_tx; ensure you also
remove any related entries from self.sub_handles if applicable so state and
receivers are consistently cleaned up.
- Around line 253-257: The code creates a pipeline for tracks that use E2EE
(descriptor.info.uses_e2ee()) even when self.decryption_provider is None, which
can cause silent decryption failures; update the manager to detect this mismatch
and refuse subscription (or at minimum log a warning) before constructing the
pipeline: when descriptor.info.uses_e2ee() is true and
self.decryption_provider.is_none(), either return an Err (or skip subscription)
from the function creating the pipeline or emit a clear warning via the existing
logger, and only build Pipeline::new with the decryption_provider variable when
Some(Arc) is present so Pipeline::new never receives None for an E2EE track.

In `@livekit-datatrack/src/remote/pipeline.rs`:
- Around line 23-84: The code currently relies on
debug_assert_eq!(options.info.uses_e2ee, options.decryption_provider.is_some())
which is removed in release builds and can cause encrypted payloads to be
treated as plaintext; fix by performing a runtime invariant check in
Pipeline::new and by making decryption decision based on the track's E2EE flag.
Specifically: in Pipeline::new (where PipelineOptions, publisher_identity, and
decryption_provider are passed) replace the debug_assert with a runtime check —
if options.info.uses_e2ee is true but options.decryption_provider.is_none(), log
an error and fail construction (return a Result or panic) or set a safe
fallback; if uses_e2ee is false but a decryption_provider is provided, ignore it
or log a warning. Also add a field to Pipeline (e.g., uses_e2ee: bool or store
Arc<DataTrackInfo>) and update decrypt_if_needed to first check that stored
uses_e2ee is true before attempting decryption, and if uses_e2ee is true but
e2ee_provider is missing, log and return None to avoid emitting ciphertext as
plaintext.
🧹 Nitpick comments (10)
livekit-datatrack/src/remote/proto.rs (1)

66-74: Use map_or for broader Rust version compatibility.

The crate does not declare an MSRV, and Option::is_none_or requires Rust 1.82.0 (stabilized October 2024). To support a wider range of Rust versions without explicitly committing to an MSRV, use the equivalent and more compatible map_or approach:

Suggested change
-        .filter(|participant| {
-            local_participant_identity.is_none_or(|identity| participant.identity != identity)
-        })
+        .filter(|participant| {
+            local_participant_identity.map_or(true, |identity| participant.identity != identity)
+        })
livekit-datatrack/src/remote/mod.rs (1)

55-97: Extract subscription timeouts into named constants.
This aligns with the TODO and avoids magic numbers in the API flow.

♻️ Suggested refactor
+const SUBSCRIBE_SEND_TIMEOUT_MS: u64 = 50;
+const SUBSCRIBE_RESULT_TIMEOUT_SECS: u64 = 10;
+
     pub async fn subscribe(&self) -> Result<impl Stream<Item = DataTrackFrame>, SubscribeError> {
         let (result_tx, result_rx) = oneshot::channel();
         let subscribe_event = SubscribeRequest { sid: self.info.sid.clone(), result_tx };
         self.inner()
             .event_in_tx
             .upgrade()
             .ok_or(SubscribeError::Disconnected)?
-            .send_timeout(subscribe_event.into(), Duration::from_millis(50))
+            .send_timeout(subscribe_event.into(), Duration::from_millis(SUBSCRIBE_SEND_TIMEOUT_MS))
             .await
             .map_err(|_| {
                 SubscribeError::Internal(anyhow!("Failed to send subscribe event").into())
             })?;
 
         // TODO: standardize timeout
-        let frame_rx = timeout(Duration::from_secs(10), result_rx)
+        let frame_rx = timeout(Duration::from_secs(SUBSCRIBE_RESULT_TIMEOUT_SECS), result_rx)
             .await
             .map_err(|_| SubscribeError::Timeout)?
             .map_err(|_| SubscribeError::Disconnected)??;
livekit-datatrack/src/remote/manager.rs (2)

74-75: Consider documenting or increasing channel buffer sizes.

The mpsc::channel(4) buffer size is quite small. With high-frequency events or bursts of publications/subscriptions, this could cause backpressure. The TODO comment acknowledges this, but it would be good to either document the rationale or plan to tune this before production use.


466-467: Minor: Redundant field name in struct initialization.

The field initialization can be simplified.

✨ Suggested fix
         let task =
-            TrackTask { info: info, pipeline, published_rx, packet_rx, frame_tx, event_in_tx };
+            TrackTask { info, pipeline, published_rx, packet_rx, frame_tx, event_in_tx };
livekit-datatrack/src/remote/depacketizer.rs (1)

386-415: Test could be more comprehensive for duplicate sequence handling.

The test verifies that duplicate sequences result in the later packet's payload being used, but doesn't validate:

  1. The final frame length (should be 6 bytes: 3 from seq 1 + 3 from seq 2, not 9)
  2. Complete frame content

The comment "Should retain the second packet" is slightly ambiguous—it replaces the first packet with the duplicate sequence, not retains both.

✨ Suggested test improvement
         let result = depacketizer.push(packet.clone());
         assert!(result.drop_error.is_none());
         let frame = result.frame.unwrap();

-        assert!(frame.payload.starts_with(&[0xCD; 3]));
-        // Should retain the second packet with duplicate sequence number
+        // Duplicate sequence replaces the first packet's payload
+        assert_eq!(frame.payload.len(), 6); // seq 1 (replaced) + seq 2
+        assert_eq!(&frame.payload[..3], &[0xCD; 3]); // Replaced payload
+        assert_eq!(&frame.payload[3..], &[0xEF; 3]); // Final payload
     }
livekit-datatrack/src/track.rs (1)

108-118: Consider adding length validation for DataTrackSid.

The TryFrom<String> implementation only validates the prefix. A SID like "DTR_" (empty ID portion) or "DTR_x" (very short ID) would be accepted. If the SFU has specific requirements for SID format, consider adding validation here.

livekit-datatrack/src/local/manager.rs (3)

284-290: Consider using biased select for consistent priority handling.

Unlike the remote TrackTask, this tokio::select! doesn't use biased, meaning the published_rx.changed() branch may not be prioritized over frame processing. For consistent behavior and responsive unpublish handling, consider adding biased.

✨ Suggested fix
         while is_published {
             tokio::select! {
+                biased;
                 _ = self.published_rx.changed() => {
                     is_published = *self.published_rx.borrow();
                 }
                 Some(frame) = self.frame_rx.recv() => self.process_and_send(frame)
             }
         }

355-358: Inconsistent timeout API usage.

publish_track uses tokio::time::timeout, but tests in this file use livekit_runtime::timeout. For consistency and to ensure proper runtime behavior across different execution contexts, consider using livekit_runtime::timeout here as well.

✨ Suggested fix
-        let track = tokio::time::timeout(Self::PUBLISH_TIMEOUT, result_rx)
+        let track = livekit_runtime::timeout(Self::PUBLISH_TIMEOUT, result_rx)
             .await
-            .map_err(|_| PublishError::Timeout)?
+            .map_err(|_| PublishError::Timeout)?
             .map_err(|_| PublishError::Disconnected)??;

433-436: Minor: Prefer first() or next() over nth(0).

Using into_iter().nth(0) is less idiomatic than alternatives.

✨ Suggested fix
                     OutputEvent::PacketsAvailable(packets) => {
-                        let packet = packets.into_iter().nth(0).unwrap();
+                        let packet = packets.into_iter().next().unwrap();
                         let payload = Packet::deserialize(packet).unwrap().payload;
livekit-datatrack/src/local/mod.rs (1)

137-149: Consider validating non-empty track name.

The doc comment states the name "Must not be empty", but new() accepts any string including empty. Consider adding validation to enforce this requirement.

✨ Suggested validation
     /// Creates options with the given track name.
     ///
     /// The track name is used to identify the track to other participants.
     ///
     /// # Requirements
     /// - Must not be empty
     /// - Must be unique per publisher
     ///
-    pub fn new(name: impl Into<String>) -> Self {
-        Self { name: name.into() }
+    /// # Panics
+    /// Panics if `name` is empty.
+    ///
+    pub fn new(name: impl Into<String>) -> Self {
+        let name = name.into();
+        assert!(!name.is_empty(), "Track name must not be empty");
+        Self { name }
     }

Alternatively, return a Result for a non-panicking API.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7424141 and 7a90118.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (29)
  • Cargo.toml
  • livekit-datatrack/Cargo.toml
  • livekit-datatrack/README.md
  • livekit-datatrack/src/e2ee.rs
  • livekit-datatrack/src/error.rs
  • livekit-datatrack/src/frame.rs
  • livekit-datatrack/src/lib.rs
  • livekit-datatrack/src/local/events.rs
  • livekit-datatrack/src/local/manager.rs
  • livekit-datatrack/src/local/mod.rs
  • livekit-datatrack/src/local/packetizer.rs
  • livekit-datatrack/src/local/pipeline.rs
  • livekit-datatrack/src/local/proto.rs
  • livekit-datatrack/src/packet/deserialize.rs
  • livekit-datatrack/src/packet/extension.rs
  • livekit-datatrack/src/packet/handle.rs
  • livekit-datatrack/src/packet/mod.rs
  • livekit-datatrack/src/packet/serialize.rs
  • livekit-datatrack/src/packet/time.rs
  • livekit-datatrack/src/remote/depacketizer.rs
  • livekit-datatrack/src/remote/events.rs
  • livekit-datatrack/src/remote/manager.rs
  • livekit-datatrack/src/remote/mod.rs
  • livekit-datatrack/src/remote/pipeline.rs
  • livekit-datatrack/src/remote/proto.rs
  • livekit-datatrack/src/track.rs
  • livekit-datatrack/src/utils/bytes.rs
  • livekit-datatrack/src/utils/counter.rs
  • livekit-datatrack/src/utils/mod.rs
🧰 Additional context used
🧬 Code graph analysis (16)
livekit-datatrack/src/utils/mod.rs (1)
livekit-api/src/http_client.rs (1)
  • bytes (81-83)
livekit-datatrack/src/packet/mod.rs (6)
livekit-api/src/http_client.rs (1)
  • bytes (81-83)
livekit-datatrack/src/frame.rs (2)
  • fmt (90-95)
  • payload (41-43)
livekit-datatrack/src/packet/extension.rs (1)
  • fmt (36-39)
livekit-datatrack/src/packet/handle.rs (1)
  • fmt (64-66)
livekit-datatrack/src/packet/deserialize.rs (3)
  • deserialize (44-49)
  • deserialize (53-94)
  • deserialize (98-134)
livekit-datatrack/src/packet/serialize.rs (1)
  • serialize (50-57)
livekit-datatrack/src/lib.rs (1)
livekit-datatrack/src/packet/serialize.rs (1)
  • packet (178-193)
livekit-datatrack/src/frame.rs (2)
livekit-datatrack/src/remote/depacketizer.rs (2)
  • from (190-192)
  • from (196-198)
livekit-datatrack/src/local/pipeline.rs (1)
  • from (81-89)
livekit-datatrack/src/packet/handle.rs (2)
livekit-datatrack/src/local/proto.rs (6)
  • try_from (45-48)
  • try_from (54-58)
  • try_from (64-73)
  • from (27-31)
  • from (35-37)
  • from (97-109)
livekit-datatrack/src/remote/proto.rs (2)
  • try_from (28-39)
  • from (94-101)
livekit-datatrack/src/utils/counter.rs (1)
livekit-datatrack/src/packet/time.rs (1)
  • wrapping_add (39-41)
livekit-datatrack/src/local/packetizer.rs (4)
livekit-datatrack/src/packet/serialize.rs (1)
  • packet (178-193)
livekit-datatrack/src/packet/time.rs (2)
  • new (54-56)
  • random (23-25)
livekit-datatrack/src/utils/counter.rs (1)
  • new (21-23)
livekit-datatrack/src/packet/handle.rs (2)
  • from (52-54)
  • from (58-60)
livekit-datatrack/src/remote/depacketizer.rs (1)
livekit-datatrack/src/packet/serialize.rs (1)
  • packet (178-193)
livekit-datatrack/src/packet/deserialize.rs (4)
livekit-datatrack/src/frame.rs (6)
  • from (101-103)
  • from (107-109)
  • from (113-115)
  • from (119-121)
  • payload (41-43)
  • user_timestamp (46-48)
livekit-datatrack/src/packet/handle.rs (2)
  • from (52-54)
  • from (58-60)
livekit-datatrack/src/packet/time.rs (1)
  • from_ticks (27-29)
livekit-datatrack/src/packet/serialize.rs (1)
  • packet (178-193)
livekit-datatrack/src/remote/proto.rs (1)
livekit-datatrack/src/packet/handle.rs (5)
  • try_from (34-39)
  • try_from (45-48)
  • from (52-54)
  • from (58-60)
  • get (78-81)
livekit-datatrack/src/remote/pipeline.rs (1)
livekit-datatrack/src/remote/depacketizer.rs (3)
  • new (39-41)
  • from (190-192)
  • from (196-198)
livekit-datatrack/src/remote/manager.rs (3)
livekit-datatrack/src/remote/mod.rs (4)
  • new (43-45)
  • publisher_identity (100-102)
  • published_rx (113-115)
  • inner (47-52)
livekit-datatrack/src/remote/pipeline.rs (1)
  • new (39-47)
livekit-datatrack/src/track.rs (4)
  • info (38-40)
  • sid (83-85)
  • published_rx (63-68)
  • uses_e2ee (91-93)
livekit-datatrack/src/packet/extension.rs (6)
livekit-datatrack/src/frame.rs (1)
  • fmt (90-95)
livekit-datatrack/src/local/mod.rs (1)
  • fmt (231-236)
livekit-datatrack/src/packet/handle.rs (1)
  • fmt (64-66)
livekit-datatrack/src/packet/mod.rs (1)
  • fmt (60-65)
livekit-datatrack/src/remote/depacketizer.rs (1)
  • fmt (228-237)
livekit-datatrack/src/track.rs (1)
  • fmt (127-129)
livekit-datatrack/src/local/manager.rs (5)
livekit-datatrack/src/local/mod.rs (4)
  • new (41-43)
  • new (146-148)
  • new (203-205)
  • inner (45-50)
livekit-datatrack/src/local/pipeline.rs (1)
  • new (47-51)
livekit-datatrack/src/remote/pipeline.rs (1)
  • new (39-47)
livekit-datatrack/src/track.rs (4)
  • info (38-40)
  • sid (83-85)
  • uses_e2ee (91-93)
  • name (87-89)
livekit-runtime/src/dispatcher.rs (1)
  • timeout (115-128)
livekit-datatrack/src/packet/time.rs (1)
livekit-datatrack/src/local/packetizer.rs (1)
  • new (46-54)
livekit-datatrack/src/track.rs (2)
livekit-datatrack/src/remote/mod.rs (2)
  • published_rx (113-115)
  • inner (47-52)
livekit-datatrack/src/local/proto.rs (3)
  • from (27-31)
  • from (35-37)
  • from (97-109)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Test (x86_64-unknown-linux-gnu)
  • GitHub Check: Test (x86_64-pc-windows-msvc)
  • GitHub Check: Test (x86_64-apple-darwin)
🔇 Additional comments (71)
livekit-datatrack/README.md (1)

1-6: Clear and appropriately scoped README.

Concise guidance that the crate is internal and should be used via client SDK APIs is helpful and unambiguous.

Cargo.toml (1)

58-73: LGTM!

The new workspace dependencies (from_variants, futures-core, tokio-stream) are properly added in alphabetical order and align with existing dependency versions in the workspace (e.g., futures-core = "0.3" matches futures = "0.3" and futures-util = "0.3").

livekit-datatrack/Cargo.toml (1)

9-25: Dependencies are well-structured.

Good use of workspace dependencies for shared crates, and the inline comment on anyhow clarifying its intended scope ("For internal error handling only") is helpful for maintainability.

livekit-datatrack/src/error.rs (1)

17-24: LGTM: clear internal error wrapper.

livekit-datatrack/src/utils/bytes.rs (3)

17-32: LGTM: clean chunking extension API.


34-52: LGTM: iterator flow is straightforward and safe.


55-85: Tests cover empty input and varied sizes.

livekit-datatrack/src/frame.rs (3)

33-48: LGTM: compact frame container and accessors.


65-86: LGTM: builder-style constructors are clear.


89-121: LGTM: Debug and From impls improve ergonomics.

livekit-datatrack/src/e2ee.rs (1)

22-57: LGTM: E2EE payload and provider traits are clear.

livekit-datatrack/src/utils/mod.rs (1)

15-21: LGTM: simple utils re-export layer.

livekit-datatrack/src/local/pipeline.rs (7)

25-35: LGTM: pipeline options and state are minimal and clear.


37-43: LGTM: error composition is straightforward.


53-57: LGTM: process_frame flow is clean.


59-74: LGTM: encryption path is straightforward.


76-77: LGTM: MTU constant is clearly scoped.


80-89: LGTM: frame-to-packetizer conversion is clear.


92-120: LGTM: test validates packetization output.

livekit-datatrack/src/lib.rs (1)

17-59: LGTM: clear public API and backend re-exports.

livekit-datatrack/src/utils/counter.rs (2)

15-36: Clean counter API.

Monotonic wrap-around behavior is concise and easy to use.


38-53: WrappingIncrement impls look good.

Macro coverage for unsigned ints is consistent.

livekit-datatrack/src/packet/deserialize.rs (3)

22-41: Clear error taxonomy.

Variants map cleanly to parsing failure cases.


43-49: Straightforward packet assembly.

Header parse + payload copy path is clean.


137-275: Good coverage of error and extension cases.

The tests exercise critical parsing paths and edge cases well.

livekit-datatrack/src/packet/handle.rs (3)

18-67: Handle type + conversions are solid.

Validation and display formatting are clear.


69-82: Allocator behavior looks correct.

Monotonic allocation with overflow guard is fine.


84-88: Test-only Dummy impl is fine.

Keeps test scaffolding concise.

livekit-datatrack/src/packet/mod.rs (4)

15-57: Core packet types are clean and focused.

Public surface is minimal and readable.


59-65: Custom Debug output is appropriate.

Avoids payload dumps while keeping shape visible.


68-90: Serialization constants are clear.

Bitfield and size constants are easy to reason about.


92-109: Round‑trip test is a good sanity check.

Validates serialization ↔ deserialization cohesion.

livekit-datatrack/src/packet/serialize.rs (4)

19-58: Packet serialization API looks consistent.

Surface area aligns well with deserialization.


60-127: Header sizing + serialization logic reads correctly.

Metrics and write order are clear.


130-168: Extension serialization is straightforward.

Marker/length/payload layout is consistent with tests.


170-241: Serialization tests cover key paths.

Good validation of lengths, padding, and payload layout.

livekit-datatrack/src/local/packetizer.rs (3)

22-54: Packetizer setup looks good.

State fields and defaults are sensible.


90-99: Frame marker helper is clear.

The branching is easy to follow and testable.


102-145: Packetization tests cover marker and sizing cases.

Good breadth across payload sizes and MTUs.

livekit-datatrack/src/remote/proto.rs (5)

25-39: Subscriber handle mapping is clear.

Error mapping and collection are clean.


47-64: Join/update extraction logic looks fine.

Ownership transfer via mem::take is well‑documented.


82-89: Track-info extraction via mem::take is clean.

Avoids clones and preserves intent.


93-101: Subscription update mapping is correct.

The protocol payload is populated consistently.


104-158: Tests exercise the conversion paths well.

Covers handle mapping and track extraction.

livekit-datatrack/src/packet/extension.rs (2)

17-33: LGTM: extension container and typed wrappers are clean and test-friendly.
The derives and Option-wrapped fields keep the API minimal and clear.


35-52: Redacted Debug plus explicit TAG/LEN constants look solid.
Good balance between observability and security hygiene.

livekit-datatrack/src/packet/time.rs (2)

18-42: Timestamp encapsulation and wraparound helpers look good.


91-122: Tests and Dummy impl provide good coverage and test ergonomics.

livekit-datatrack/src/remote/pipeline.rs (1)

87-123: Frame conversion and the no‑E2EE test path look good.

livekit-datatrack/src/local/proto.rs (6)

26-38: Publish/unpublish request conversions are clear and consistent.


42-59: Response conversions handle missing info and handle parsing cleanly.


61-73: DataTrackInfo conversion maps encryption and SID as expected.


76-92: RequestResponse error mapping looks sensible.


96-120: Sync-state conversion helpers are tidy and efficient.


122-204: Tests cover the critical conversion paths well.

livekit-datatrack/src/remote/mod.rs (3)

42-53: Type-state wiring for Remote looks good.


99-115: Publisher identity accessor and inner state look good.


118-128: SubscribeError variants are clear and user-facing.

livekit-datatrack/src/remote/events.rs (2)

24-54: Input/Output event enums are well-scoped and readable.


56-126: Event payload structs are clear and map cleanly to protocol intent.

livekit-datatrack/src/local/events.rs (2)

24-44: Input/Output event enums are well-structured.


48-131: Local event payload structs are clear and consistent.

livekit-datatrack/src/remote/manager.rs (2)

378-411: LGTM!

The TrackTask::run implementation correctly uses biased select to prioritize state updates, properly handles subscriber drop detection via frame_tx.closed(), and cleanly sends an unsubscribe request when all subscribers are gone.


419-424: LGTM!

The send method appropriately uses try_send for synchronous contexts and properly wraps the error with context using anyhow.

livekit-datatrack/src/remote/depacketizer.rs (2)

115-121: LGTM!

The buffer overflow protection correctly drops the entire frame and resets state. This prevents memory exhaustion from large fragmented frames while providing clear error reporting.


177-199: LGTM!

The DepacketizerPushResult design elegantly handles the case where a single push operation can both complete a frame and report a dropped frame. The From implementations provide clean ergonomic construction.

livekit-datatrack/src/track.rs (2)

21-69: LGTM!

The DataTrack<L> generic type with phantom marker for location is a clean design pattern. The published_rx() method correctly dispatches to the appropriate inner type, and wait_for_unpublish() properly handles the already-unpublished case.


132-145: LGTM!

The fake::Dummy implementation correctly generates realistic test SIDs using a base-57 alphabet that excludes ambiguous characters. The 12-character random suffix provides sufficient uniqueness for testing.

livekit-datatrack/src/local/mod.rs (3)

40-96: LGTM!

The DataTrack<Local> implementation provides a clean API for frame publishing. The try_push method correctly checks publication state before sending, and the error handling preserves the frame for potential retry. The unpublish method consuming self ensures the track can't be used after unpublishing.


114-119: LGTM!

The Drop implementation ensures the track is properly unpublished even if the user doesn't explicitly call unpublish(). This is good defensive programming for resource cleanup.


163-237: LGTM!

The error types are well-designed with clear documentation. PushFrameError preserving the frame via into_frame() enables application-level retry logic, which is a thoughtful API design choice.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@livekit-datatrack/src/local/manager.rs`:
- Around line 298-311: process_and_send currently uses
event_out_tx.try_send(...) which silently drops packets on full channel; change
the behavior to propagate backpressure instead of dropping: make
process_and_send async (or add a new async variant) and replace try_send(...)
with event_out_tx.send(packets.into()).await, returning or propagating the
Result so callers (e.g. try_push) can handle backpressure/failure; update
callers of process_and_send/try_push to await and handle the send error rather
than relying on the non-blocking try_send drop semantics.

In `@livekit-datatrack/src/local/mod.rs`:
- Around line 143-149: The constructor new currently accepts any string but the
doc says "Must not be empty", so add validation: convert the incoming name (in
new) to a String, check if .is_empty(), and if so return an error instead of
constructing the struct; update new's signature to return Result<Self,
SomeErrorType> (create a simple error enum like InvalidTrackName or reuse an
existing error type) and return Ok(Self { name }) on success; reference the new
function and the name field so reviewers can locate the change.

In `@livekit-datatrack/src/remote/mod.rs`:
- Around line 118-121: The SubscribeError enum defines an unused variant
SubscribeError::Unpublished; either remove this dead variant or wire it into the
subscription flow: update the subscribe() function (and any helpers in remote
module) to detect when a track becomes unpublished and return
SubscribeError::Unpublished at that point, or delete the Unpublished variant and
any references to it to avoid dead code. Ensure references to SubscribeError and
match arms in callers are adjusted accordingly.
🧹 Nitpick comments (5)
livekit-datatrack/src/remote/mod.rs (1)

94-96: Silently dropping lagged frames is intentional but worth noting.

The filter_map(|result| async move { result.ok() }) silently discards BroadcastStream lag errors (when a subscriber can't keep up). This is a reasonable design choice for real-time data, but subscribers won't know if they've missed frames.

Consider adding a brief doc comment noting this behavior for users who need guaranteed delivery.

livekit-datatrack/src/track.rs (1)

43-47: Minor: Simplify is_published() implementation.

The intermediate variable is unnecessary.

♻️ Suggested simplification
     pub fn is_published(&self) -> bool {
         let published_rx = self.published_rx();
-        let published = *published_rx.borrow();
-        published
+        *published_rx.borrow()
     }
livekit-datatrack/src/local/manager.rs (3)

252-265: Consider parallel task shutdown for efficiency.

The shutdown awaits each active task sequentially. With many published tracks, this could delay shutdown. Consider futures::future::join_all for parallel completion.

♻️ Parallel shutdown suggestion
     async fn shutdown(self) {
+        let mut task_handles = Vec::new();
         for (_, descriptor) in self.descriptors {
             match descriptor {
                 Descriptor::Pending(result_tx) => {
                     _ = result_tx.send(Err(PublishError::Disconnected))
                 }
                 Descriptor::Active { published_tx, task_handle, .. } => {
                     _ = published_tx.send(false);
-                    task_handle.await;
+                    task_handles.push(task_handle);
                 }
             }
         }
+        futures::future::join_all(task_handles).await;
     }

367-376: Inconsistent channel send behavior.

publish_track uses try_send (non-blocking), while query_tracks uses send().await (blocking). This inconsistency is likely intentional since queries are less time-critical, but it could cause unexpected blocking if the manager is overwhelmed.


433-436: Minor: Prefer first() over nth(0).

♻️ Style suggestion
                     OutputEvent::PacketsAvailable(packets) => {
-                        let packet = packets.into_iter().nth(0).unwrap();
+                        let packet = packets.into_iter().next().unwrap();
                         let payload = Packet::deserialize(packet).unwrap().payload;
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7a90118 and 1347840.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (15)
  • Cargo.toml
  • livekit-datatrack/src/frame.rs
  • livekit-datatrack/src/local/events.rs
  • livekit-datatrack/src/local/manager.rs
  • livekit-datatrack/src/local/mod.rs
  • livekit-datatrack/src/local/pipeline.rs
  • livekit-datatrack/src/packet/mod.rs
  • livekit-datatrack/src/remote/depacketizer.rs
  • livekit-datatrack/src/remote/mod.rs
  • livekit-datatrack/src/track.rs
  • livekit-ffi/src/conversion/participant.rs
  • livekit/src/proto.rs
  • livekit/src/room/mod.rs
  • livekit/src/room/participant/mod.rs
  • release-plz.toml
✅ Files skipped from review due to trivial changes (1)
  • livekit-ffi/src/conversion/participant.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • Cargo.toml
🧰 Additional context used
🧬 Code graph analysis (5)
livekit-datatrack/src/track.rs (2)
livekit-datatrack/src/remote/mod.rs (1)
  • published_rx (113-115)
livekit-datatrack/src/local/proto.rs (3)
  • try_from (45-48)
  • try_from (54-58)
  • try_from (64-73)
livekit/src/room/mod.rs (7)
livekit-api/src/access_token.rs (2)
  • default (100-118)
  • default (131-133)
webrtc-sys/src/peer_connection.rs (1)
  • default (223-234)
livekit-api/src/signal_client/mod.rs (2)
  • default (82-84)
  • default (96-102)
livekit/src/room/options.rs (1)
  • default (94-106)
livekit/src/room/participant/rpc.rs (1)
  • default (29-36)
libwebrtc/src/data_channel.rs (1)
  • default (34-44)
libwebrtc/src/native/peer_connection_factory.rs (1)
  • default (44-55)
livekit-datatrack/src/local/mod.rs (1)
livekit-datatrack/src/remote/mod.rs (2)
  • inner (47-52)
  • published_rx (113-115)
livekit-datatrack/src/local/pipeline.rs (3)
livekit-datatrack/src/packet/serialize.rs (1)
  • packet (178-193)
livekit-datatrack/src/frame.rs (2)
  • new (69-71)
  • payload (41-43)
livekit-datatrack/src/local/packetizer.rs (1)
  • new (46-54)
livekit-datatrack/src/remote/mod.rs (3)
livekit-datatrack/src/local/mod.rs (5)
  • new (41-43)
  • new (147-149)
  • new (204-206)
  • inner (45-50)
  • published_rx (110-112)
livekit-datatrack/src/remote/pipeline.rs (1)
  • new (39-47)
livekit-datatrack/src/track.rs (2)
  • info (38-40)
  • published_rx (63-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Build (x86_64-linux-android)
  • GitHub Check: Test (x86_64-apple-darwin)
  • GitHub Check: Test (x86_64-unknown-linux-gnu)
  • GitHub Check: Test (x86_64-pc-windows-msvc)
  • GitHub Check: Build (x86_64-unknown-linux-gnu)
  • GitHub Check: Build (x86_64-apple-darwin)
  • GitHub Check: Build (aarch64-apple-ios-sim)
  • GitHub Check: Build (aarch64-pc-windows-msvc)
  • GitHub Check: Build (aarch64-linux-android)
  • GitHub Check: Build (aarch64-apple-ios)
  • GitHub Check: Build (armv7-linux-androideabi)
  • GitHub Check: Build (aarch64-apple-darwin)
  • GitHub Check: Build (x86_64-pc-windows-msvc)
  • GitHub Check: Build (aarch64-unknown-linux-gnu)
🔇 Additional comments (11)
livekit/src/room/participant/mod.rs (1)

48-48: LGTM!

Trailing comma additions to enum variants improve consistency and reduce diff noise for future additions.

Also applies to: 57-57

livekit-datatrack/src/remote/mod.rs (1)

39-52: LGTM!

The type-state pattern with PhantomData<Remote> and the inner() accessor is correctly implemented. The unreachable!() is safe due to compile-time guarantees.

livekit-datatrack/src/track.rs (3)

71-94: LGTM!

DataTrackInfo provides clean encapsulation with internal fields and appropriate public accessors.


108-118: LGTM!

The prefix-based validation is appropriate for SFU-assigned identifiers. The implementation correctly rejects malformed SIDs.


132-145: LGTM!

The test fake implementation correctly generates valid SIDs with the required prefix and a base57 suffix for better readability in test output.

livekit-datatrack/src/local/mod.rs (3)

83-91: LGTM!

The try_push implementation correctly handles the race between checking is_published() and sending. The early check provides a clear TrackUnpublished error, while try_send failure catches edge cases with the Dropped error.


115-120: LGTM!

The Drop implementation ensures proper cleanup by signaling unpublish, preventing resource leaks if the user forgets to call unpublish().


164-238: LGTM!

The error types are well-designed:

  • PublishError covers all failure modes clearly
  • PushFrameError allows frame recovery for retry logic
  • Good documentation on each variant
livekit-datatrack/src/local/manager.rs (3)

61-77: LGTM!

The manager setup is clean. The TODO for buffer size tuning is noted - this may need adjustment based on real-world usage patterns.


139-156: LGTM!

The forward_publish_result task correctly uses biased select to prioritize the result channel, and properly notifies the manager on cancellation via the weak sender.


390-463: LGTM!

Good test coverage for the manager lifecycle and publish flow. The tests verify the complete round-trip including SFU response handling and packet emission.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +143 to +149
/// # Requirements
/// - Must not be empty
/// - Must be unique per publisher
///
pub fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider validating non-empty name requirement.

The documentation states the track name "Must not be empty", but new() doesn't enforce this. Consider adding validation or clarifying that empty names are validated server-side.

🛡️ Optional validation
     pub fn new(name: impl Into<String>) -> Self {
-        Self { name: name.into() }
+        let name = name.into();
+        debug_assert!(!name.is_empty(), "Track name must not be empty");
+        Self { name }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// # Requirements
/// - Must not be empty
/// - Must be unique per publisher
///
pub fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
}
/// # Requirements
/// - Must not be empty
/// - Must be unique per publisher
///
pub fn new(name: impl Into<String>) -> Self {
let name = name.into();
debug_assert!(!name.is_empty(), "Track name must not be empty");
Self { name }
}
🤖 Prompt for AI Agents
In `@livekit-datatrack/src/local/mod.rs` around lines 143 - 149, The constructor
new currently accepts any string but the doc says "Must not be empty", so add
validation: convert the incoming name (in new) to a String, check if
.is_empty(), and if so return an error instead of constructing the struct;
update new's signature to return Result<Self, SomeErrorType> (create a simple
error enum like InvalidTrackName or reuse an existing error type) and return
Ok(Self { name }) on success; reference the new function and the name field so
reviewers can locate the change.

Comment on lines +118 to +121
#[derive(Debug, Error)]
pub enum SubscribeError {
#[error("The track has been unpublished and is no longer available")]
Unpublished,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

rg -n 'SubscribeError::Unpublished' livekit-datatrack/

Repository: livekit/rust-sdks

Length of output: 43


🏁 Script executed:

rg -n 'subscribe' livekit-datatrack/src/remote/mod.rs -A 5 -B 2

Repository: livekit/rust-sdks

Length of output: 1875


🏁 Script executed:

sed -n '75,100p' livekit-datatrack/src/remote/mod.rs

Repository: livekit/rust-sdks

Length of output: 1158


Remove the unused SubscribeError::Unpublished variant or implement its use.

The Unpublished variant is defined in the enum but is never returned by subscribe() or used elsewhere in the crate. Either remove it as dead code or implement the logic to return this error when the track becomes unpublished.

🤖 Prompt for AI Agents
In `@livekit-datatrack/src/remote/mod.rs` around lines 118 - 121, The
SubscribeError enum defines an unused variant SubscribeError::Unpublished;
either remove this dead variant or wire it into the subscription flow: update
the subscribe() function (and any helpers in remote module) to detect when a
track becomes unpublished and return SubscribeError::Unpublished at that point,
or delete the Unpublished variant and any references to it to avoid dead code.
Ensure references to SubscribeError and match arms in callers are adjusted
accordingly.

@ladvoc ladvoc mentioned this pull request Feb 3, 2026
Copy link
Contributor

@1egoman 1egoman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: These are my comments for exclusively the packet serialization/deserialization, and frame packetizer/depacketizer.

I will go through the managers (as requested by @ladvoc) in a separate upcoming review.

let remaining_initial = buf.remaining_mut();

if buf.remaining_mut() < serialized_len {
Err(SerializeError::TooSmallForHeader)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is there a benefit to Err(...)? vs return Err(...)? I've never encountered this former form before but it logically makes sense and both seem equivilent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functionally the same, I just prefer the former because it's shorter

Comment on lines +211 to +221
#[test]
fn test_serialize() {
let mut buf = packet().serialize().try_into_mut().unwrap();
assert_eq!(buf.len(), 1070);

// Base header
assert_eq!(buf.get_u8(), 0xC); // Version 0, final, extension
assert_eq!(buf.get_u8(), 0); // Reserved
assert_eq!(buf.get_u16(), 0x8811); // Track handle
assert_eq!(buf.get_u16(), 0x4422); // Sequence
assert_eq!(buf.get_u16(), 0x4411); // Frame number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(if-minor): It might be worth porting over more of the serialization tests I wrote in the js implementation. In particular, a test to serialize a packet without any extensions and one to try to serialize a packet only containing each extension individually might be nice.

Comment on lines 108 to 122
E2eeExt::TAG => {
if raw.remaining() < E2eeExt::LEN {
Err(DeserializeError::MalformedExt(tag))?
}
let key_index = raw.get_u8();
let mut iv = [0u8; 12];
raw.copy_to_slice(&mut iv);
extensions.e2ee = E2eeExt { key_index, iv }.into();
}
UserTimestampExt::TAG => {
if raw.remaining() < UserTimestampExt::LEN {
Err(DeserializeError::MalformedExt(tag))?
}
extensions.user_timestamp = UserTimestampExt(raw.get_u64()).into()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(if-minor): You might consider refactoring these individual bits of packet deserializing logic into distinct deserialize methods to match the convention used in Extensions, Headers, etc up the chain (and fwiw, it is asymmetrical right now because E2eeExt / UserTimestampExt have serialize_into methods, so they follow the convention on that end, but not for deserialize).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Later we might consider doing lazy deserialization of extensions (e.g., ExtensionMap replaces Extensions). This would also help reduce boilerplate.

Comment on lines 108 to 122
E2eeExt::TAG => {
if raw.remaining() < E2eeExt::LEN {
Err(DeserializeError::MalformedExt(tag))?
}
let key_index = raw.get_u8();
let mut iv = [0u8; 12];
raw.copy_to_slice(&mut iv);
extensions.e2ee = E2eeExt { key_index, iv }.into();
}
UserTimestampExt::TAG => {
if raw.remaining() < UserTimestampExt::LEN {
Err(DeserializeError::MalformedExt(tag))?
}
extensions.user_timestamp = UserTimestampExt(raw.get_u64()).into()
}
Copy link
Contributor

@1egoman 1egoman Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Right now in processing both E2eeExt and UserTimestampExt you are assuming that the logic in each switch case will always consume all the bytes in that section, which while it is an ok assumption right now, could break in the future if the structure of the extensions were to evolve (ie, maybe a new section is added to the E2eeExt packet and in the future new E2eeExt packets have a longer len value encoded), breaking "forward compatibility".

IMO, The ideal way to ensure "forward compatibility" is to ensure that after processing each extension, the offset of raw is always advanced so that all len bytes (from the extension header) are consumed, disregarding any extra bytes that are larger than expected.

(this is being done in the js implementation right now)

Copy link
Contributor Author

@ladvoc ladvoc Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my thinking: since these are tagged extensions (with 2 bytes used to represent each tag), we can treat them similarly to fields in protobuf. For example, if we need to support a new E2EE type, we can introduce a new extension (e.g., E2eeV2Ext) with a new tag number and structure. Newer clients that understand this extension will decode it, while older clients will continue decoding the original extension they know about and safely skip the v2 extension by length. This is effectively the same pattern as deprecating a field in a protobuf message and introducing a new field with a different field number. Let me know if this makes sense.

Comment on lines +177 to +187
/// Result from a call to [`Depacketizer::push`].
///
/// The reason this type is used instead of [`core::result::Result`] is due to the fact a single
/// call to push can result in both a complete frame being delivered and a previous
/// frame being dropped.
///
#[derive(Debug, Default)]
pub struct DepacketizerPushResult {
pub frame: Option<DepacketizerFrame>,
pub drop_error: Option<DepacketizerDropError>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I'll leave it up to you if you want to propagate any of the changes I made here into the rust implementation, given it sounds like there's still some evolution that is expected to happen for statistics reporting.

Copy link
Contributor

@1egoman 1egoman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two more packetizer / depacketizer comments!

Comment on lines +27 to +32
/// A frame that has been fully reassembled by [`Depacketizer`].
#[derive(Debug)]
pub struct DepacketizerFrame {
pub payload: Bytes,
pub extensions: Extensions,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is it worth consolidating DepacketizerFrame into the already existing DataTrackFrame given they are so similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping these types separate was a deliberate design decision. At the pipeline level, access to all the extensions is required (e.g., E2EE metadata). However, when the frame is delivered to the user, there's no reason to keep them all around. In the future, there will likely be more extensions that serve a role at the pipeline level but do not need to be propagated up.

As for DepacketizerFrame and PacketizerFrame being identical, while these could be combined, I think keeping them separate could make sense in case their fields need to diverge in the future—don't have a strong opinion though.

Comment on lines +32 to +36
/// Frame packetized by [`Packetizer`].
pub struct PacketizerFrame {
pub payload: Bytes,
pub extensions: Extensions,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is it worth consolidating PacketizerFrame into the already existing DataTrackFrame given they are so similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment.

Fixes an issue where duplicate packets cause the length to be incorrect
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Around line 82-88: The struct initialization for PartialFrame (involving
start_sequence and payloads) is failing formatting CI; edit the payloads field
where BTreeMap::from([(start_sequence, packet.payload)]) is used and add a
trailing comma after the single entry so it becomes
BTreeMap::from([(start_sequence, packet.payload),]), keeping the rest of the
PartialFrame fields (frame_number, start_sequence, extensions) intact.
- Around line 143-154: The reassembly loop in depacketizer.rs incorrectly uses
sequence < end_sequence, which fails when u16 wraps; update the loop termination
logic in the block that processes partial.start_sequence / sequence /
end_sequence so it continues advancing with sequence = sequence.wrapping_add(1)
until sequence == end_sequence (i.e., replace the `<` check with a `sequence !=
end_sequence` style condition), ensuring you still call
payload.extend(partial_payload) and return DepacketizerFrame { payload:
payload.freeze(), extensions: partial.extensions } only after the final packet
is appended; keep using wrapping_add and partial.payloads.remove(&sequence) as
before.
🧹 Nitpick comments (1)
livekit-datatrack/src/remote/depacketizer.rs (1)

266-296: Consider adding a test for sequence number wrapping.

The test suite covers many scenarios but lacks a test where sequence numbers wrap around u16::MAX (e.g., start at 65534, end at 1). This would exercise the wrapping bug flagged in finalize.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8239631 and 5ec6439.

📒 Files selected for processing (1)
  • livekit-datatrack/src/remote/depacketizer.rs
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
📚 Learning: 2026-02-03T01:23:11.346Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.

Applied to files:

  • livekit-datatrack/src/remote/depacketizer.rs
🪛 GitHub Actions: Rust Formatting
livekit-datatrack/src/remote/depacketizer.rs

[error] 84-84: Code formatting issue detected by cargo fmt: trailing comma placement in payloads map. Run 'cargo fmt' to fix formatting.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Build (aarch64-linux-android)
  • GitHub Check: Build (x86_64-linux-android)
  • GitHub Check: Build (armv7-linux-androideabi)
  • GitHub Check: Build (x86_64-pc-windows-msvc)
  • GitHub Check: Build (aarch64-pc-windows-msvc)
  • GitHub Check: Build (x86_64-unknown-linux-gnu)
  • GitHub Check: Build (aarch64-apple-ios-sim)
  • GitHub Check: Build (aarch64-unknown-linux-gnu)
  • GitHub Check: Build (aarch64-apple-darwin)
  • GitHub Check: Build (x86_64-apple-darwin)
  • GitHub Check: Build (aarch64-apple-ios)
  • GitHub Check: Test (x86_64-pc-windows-msvc)
  • GitHub Check: Test (x86_64-unknown-linux-gnu)
  • GitHub Check: Test (x86_64-apple-darwin)
🔇 Additional comments (2)
livekit-datatrack/src/remote/depacketizer.rs (2)

386-415: Good coverage of duplicate sequence behavior. The test correctly verifies that the latest payload replaces the earlier one and that the assembled frame reflects this.


180-202: Clean design for the push result type. Using a struct with both frame and drop_error fields (instead of Result) is a thoughtful choice since a single push can simultaneously complete a frame and report a drop.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Around line 278-287: The test increments packet.header.sequence with `+= 1`
which panics on u16 overflow in debug builds; update the test to use wrapping
arithmetic (e.g., replace the `packet.header.sequence += 1` increment used
around the FrameMarker::Inter/Final loop with a wrapping increment such as
assigning packet.header.sequence = packet.header.sequence.wrapping_add(1)) so
the test matches production semantics, and apply the same change to the
`frame_number` increment in the `test_interrupted` test (use wrapping_add(1) for
frame_number).
🧹 Nitpick comments (2)
livekit-datatrack/src/packet/deserialize.rs (1)

273-282: test_ext_unknown doesn't exercise non-zero payload skipping.

The test writes put_bytes(0, 6) after the tag, making len=0 — so the unknown-extension skip path (raw.advance(len)) advances by 0 and the remaining zeros parse as padding. Consider adding a test with a real non-zero len payload to validate skipping actually works (this would also catch the len vs len + 1 misalignment noted above).

livekit-datatrack/src/remote/depacketizer.rs (1)

95-111: Incoming packet silently dropped on frame-number mismatch.

When push_to_partial detects a frame-number mismatch (line 105), it reports Interrupted for the old partial frame but the incoming packet is also consumed without any corresponding error. The caller has no indication that a packet for packet.header.frame_number was lost—only that the old frame was interrupted.

This is fine if intentional, but worth a brief comment in the code to document that the new packet is intentionally discarded without its own drop error.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 01c550a and b920d42.

📒 Files selected for processing (2)
  • livekit-datatrack/src/packet/deserialize.rs
  • livekit-datatrack/src/remote/depacketizer.rs
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
📚 Learning: 2026-02-03T01:23:11.346Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.

Applied to files:

  • livekit-datatrack/src/remote/depacketizer.rs
🧬 Code graph analysis (2)
livekit-datatrack/src/packet/deserialize.rs (4)
livekit-datatrack/src/frame.rs (6)
  • from (103-105)
  • from (109-111)
  • from (115-117)
  • from (121-123)
  • payload (41-43)
  • user_timestamp (46-48)
livekit-datatrack/src/packet/handle.rs (2)
  • from (52-54)
  • from (58-60)
livekit-datatrack/src/packet/time.rs (1)
  • from_ticks (27-29)
livekit-datatrack/src/packet/serialize.rs (1)
  • packet (178-193)
livekit-datatrack/src/remote/depacketizer.rs (1)
livekit-datatrack/src/remote/pipeline.rs (1)
  • new (39-47)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Build (aarch64-apple-darwin)
  • GitHub Check: Build (x86_64-linux-android)
  • GitHub Check: Build (x86_64-pc-windows-msvc)
  • GitHub Check: Build (armv7-linux-androideabi)
  • GitHub Check: Build (aarch64-pc-windows-msvc)
  • GitHub Check: Build (aarch64-apple-ios)
  • GitHub Check: Build (aarch64-unknown-linux-gnu)
  • GitHub Check: Build (aarch64-linux-android)
  • GitHub Check: Build (x86_64-unknown-linux-gnu)
  • GitHub Check: Build (x86_64-apple-darwin)
  • GitHub Check: Build (aarch64-apple-ios-sim)
  • GitHub Check: Test (x86_64-apple-darwin)
  • GitHub Check: Test (x86_64-unknown-linux-gnu)
  • GitHub Check: Test (x86_64-pc-windows-msvc)
🔇 Additional comments (7)
livekit-datatrack/src/packet/deserialize.rs (3)

52-95: Header deserialization looks solid.

The overflow fix for ext_words (line 84) is properly applied. The base header field parsing, version check, marker decoding, and extension block slicing all look correct.


139-153: Clean fixed-size deserialization.

Both UserTimestampExt::deserialize and E2eeExt::deserialize correctly parse from fixed-size byte arrays. The slice range &raw[1..13] in E2eeExt is safe given LEN is 13.


284-292: Good edge-case coverage for alignment enforcement.

This test correctly validates that a short extension block (3 bytes instead of the required 4) triggers an error.

livekit-datatrack/src/remote/depacketizer.rs (4)

34-50: Clean dispatch logic, LGTM.

The push method cleanly routes packets by marker type. The separation of frame_from_single, begin_partial, and push_to_partial keeps each path focused and readable.


136-164: Wrapping-safe reassembly looks correct.

The wrapping arithmetic fixes from prior reviews are properly applied—sequence != end_sequence on line 149 and wrapping_sub/wrapping_add on line 160. The loop correctly handles both normal and wrapped sequence ranges.


167-241: Well-structured result and error types.

The DepacketizerPushResult design—allowing both a completed frame and a drop error from a single push—is a pragmatic choice well-documented in the doc comment. The From impls and Display are clean.


386-415: Duplicate-sequence test is thorough, nice edge-case coverage.

The test verifies that a duplicate sequence number replaces the earlier payload and that reassembly still succeeds. Good coverage of BTreeMap insert-overwrite semantics.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Line 309: The code incorrectly doubles frame_number by using "+=" with
wrapping_add; replace the faulty increment at packet.header.frame_number (the
line currently reading "packet.header.frame_number +=
packet.header.frame_number.wrapping_add(1);") with a single wrapping increment
so the value becomes packet.header.frame_number =
packet.header.frame_number.wrapping_add(1); — this preserves wrapping semantics
and avoids checked-add overflow panics in tests (affects the frame_number
handling in the depacketizer logic and Faker-generated values).
- Line 329: The test modifies packet.header.sequence with plain addition which
can overflow in debug builds when Faker generates values near u16::MAX; change
the increment to a wrapping operation (use wrapping_add(3)) on
packet.header.sequence to match other tests and avoid panic during debug builds.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b920d42 and 301aedf.

📒 Files selected for processing (1)
  • livekit-datatrack/src/remote/depacketizer.rs
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
📚 Learning: 2026-02-03T01:23:11.346Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.

Applied to files:

  • livekit-datatrack/src/remote/depacketizer.rs
🧬 Code graph analysis (1)
livekit-datatrack/src/remote/depacketizer.rs (1)
livekit-datatrack/src/packet/serialize.rs (1)
  • packet (178-193)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Build (x86_64-unknown-linux-gnu)
  • GitHub Check: Build (aarch64-unknown-linux-gnu)
  • GitHub Check: Build (x86_64-pc-windows-msvc)
  • GitHub Check: Build (aarch64-apple-ios-sim)
  • GitHub Check: Build (armv7-linux-androideabi)
  • GitHub Check: Build (x86_64-linux-android)
  • GitHub Check: Build (aarch64-apple-ios)
  • GitHub Check: Build (x86_64-apple-darwin)
  • GitHub Check: Build (aarch64-linux-android)
  • GitHub Check: Build (aarch64-pc-windows-msvc)
  • GitHub Check: Build (aarch64-apple-darwin)
  • GitHub Check: Test (x86_64-unknown-linux-gnu)
  • GitHub Check: Test (x86_64-apple-darwin)
  • GitHub Check: Test (x86_64-pc-windows-msvc)
🔇 Additional comments (6)
livekit-datatrack/src/remote/depacketizer.rs (6)

1-50: Clean public API and dispatch logic — LGTM.

The Depacketizer struct, DepacketizerFrame, and push dispatch are well-structured. The marker-based routing is clear and correct.


52-92: Single-frame and begin-partial paths look correct.

Both frame_from_single and begin_partial properly drain any existing partial frame as Interrupted before proceeding. Good.


94-134: push_to_partial — frame-number mismatch silently discards the incoming packet.

When packet.header.frame_number != partial.frame_number on line 105, the old partial is dropped as Interrupted, but the incoming packet (which belongs to a different, unknown frame) is also silently lost with no error reported for it. This is arguably fine since DepacketizerPushResult can only carry one drop_error, but worth noting as a design trade-off — callers won't know about the lost packet from the new frame.


136-164: Wrapping arithmetic in finalize looks correct.

The sequence != end_sequence loop condition and wrapping_sub/wrapping_add in the Incomplete expected count both handle u16 wraparound properly.


167-241: Type definitions and Display/Error impls are clean.

PartialFrame, DepacketizerPushResult, DepacketizerDropError, DepacketizerDropReason, and the From conversions are all well-structured and idiomatic.


417-423: Faker impl for Packet looks good.

Generates realistic random packets for testing with variable payload sizes. Clean and useful test scaffolding.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

assert!(result.frame.is_none() && result.drop_error.is_none());

let first_frame_number = packet.header.frame_number;
packet.header.frame_number += packet.header.frame_number.wrapping_add(1); // Next frame
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bug: += combined with wrapping_add corrupts frame_number.

Line 309 reads:

packet.header.frame_number += packet.header.frame_number.wrapping_add(1);

This computes frame_number = frame_number + (frame_number + 1) = 2 * frame_number + 1, not the intended frame_number + 1. Worse, the += uses checked arithmetic in debug builds, so if Faker generates a frame_number ≥ 32768 (roughly half of u16::MAX), the addition overflows and the test panics — causing flaky test failures.

The test still passes when it doesn't panic because the result happens to differ from the original value, but the intent is clearly wrong.

🐛 Proposed fix
-        packet.header.frame_number += packet.header.frame_number.wrapping_add(1); // Next frame
+        packet.header.frame_number = packet.header.frame_number.wrapping_add(1); // Next frame
🤖 Prompt for AI Agents
In `@livekit-datatrack/src/remote/depacketizer.rs` at line 309, The code
incorrectly doubles frame_number by using "+=" with wrapping_add; replace the
faulty increment at packet.header.frame_number (the line currently reading
"packet.header.frame_number += packet.header.frame_number.wrapping_add(1);")
with a single wrapping increment so the value becomes packet.header.frame_number
= packet.header.frame_number.wrapping_add(1); — this preserves wrapping
semantics and avoids checked-add overflow panics in tests (affects the
frame_number handling in the depacketizer logic and Faker-generated values).


depacketizer.push(packet.clone());

packet.header.sequence += 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

sequence += 3 can overflow in debug builds with random Faker values.

Same class of issue flagged in past reviews for other fields: if Faker generates a sequence near u16::MAX, += 3 panics in debug mode. Consider using wrapping_add(3) for consistency with the rest of the test file.

Proposed fix
-        packet.header.sequence += 3;
+        packet.header.sequence = packet.header.sequence.wrapping_add(3);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
packet.header.sequence += 3;
packet.header.sequence = packet.header.sequence.wrapping_add(3);
🤖 Prompt for AI Agents
In `@livekit-datatrack/src/remote/depacketizer.rs` at line 329, The test modifies
packet.header.sequence with plain addition which can overflow in debug builds
when Faker generates values near u16::MAX; change the increment to a wrapping
operation (use wrapping_add(3)) on packet.header.sequence to match other tests
and avoid panic during debug builds.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants