Conversation
📝 WalkthroughWalkthroughAdds a new Changes
Sequence Diagram(s)sequenceDiagram
rect rgba(200,200,255,0.5)
participant Client
participant LocalMgr as Local Manager
participant SFU as SFU
participant Pipeline
participant Packetizer
participant Output as Output Events
end
Client->>LocalMgr: publish_track(options)
LocalMgr->>SFU: SfuPublishRequest(handle,name,uses_e2ee)
SFU-->>LocalMgr: SfuPublishResponse(handle,result)
LocalMgr-->>Client: LocalDataTrack (on success)
Client->>LocalMgr: try_push(frame)
LocalMgr->>Pipeline: process_frame(frame)
Pipeline->>Packetizer: packetize(frame/encrypted_payload)
Packetizer-->>LocalMgr: Vec<Packet>
LocalMgr->>Output: PacketsAvailable(packets)
sequenceDiagram
rect rgba(200,255,200,0.5)
participant Client
participant RemoteMgr as Remote Manager
participant SFU as SFU
participant Pipeline
participant Depacketizer
participant Subscriber as Subscriber Stream
end
Client->>RemoteMgr: subscribe(sid)
RemoteMgr->>SFU: SubscribeRequest
SFU-->>RemoteMgr: SfuPublicationUpdates & SubscriberHandles
SFU->>RemoteMgr: Packet
RemoteMgr->>Pipeline: process_packet(packet)
Pipeline->>Depacketizer: depacketize(packet)
Depacketizer-->>Pipeline: DepacketizerFrame
Pipeline-->>RemoteMgr: DataTrackFrame
RemoteMgr->>Subscriber: deliver frame
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 15
🤖 Fix all issues with AI agents
In `@livekit-datatrack/Cargo.toml`:
- Line 13: The crate livekit-datatrack is using a pinned dependency thiserror =
"2.0.17" because it isn't part of the workspace members; either add
livekit-datatrack to the workspace members in the root Cargo.toml and change
thiserror in livekit-datatrack/Cargo.toml to thiserror = { workspace = true }
(then coordinate a workspace upgrade to thiserror 2.x), or if livekit-datatrack
is intentionally external, remove the TODO and add a short comment in
livekit-datatrack/Cargo.toml documenting why it is excluded from the workspace
and why it pins thiserror; update the thiserror entry and comments accordingly.
- Around line 1-7: The workspace is missing registration for the new crate: add
"livekit-datatrack" to the workspace members array so Cargo includes it in
builds/tests/publishing; update the [workspace] members list (near existing
"livekit-runtime") to include the exact member name livekit-datatrack.
In `@livekit-datatrack/src/frame.rs`:
- Around line 50-62: The current duration_since_timestamp may panic because
SystemTime + Duration can overflow; update duration calculation in
duration_since_timestamp to use SystemTime::checked_add instead of the Add impl:
take the optional user_timestamp, build a Duration::from_millis, call
UNIX_EPOCH.checked_add(...) to get an Option<SystemTime>, and then if
Some(epoch) call SystemTime::now().duration_since(epoch).inspect_err(...).ok()
(returning None if checked_add returns None or duration_since errors). Keep the
existing log::error call for duration_since errors and preserve the Option
return semantics.
In `@livekit-datatrack/src/local/manager.rs`:
- Around line 240-250: In remove_descriptor, the logic in the Descriptor::Active
branch inverts the publish-check: it currently sends false only when
published_tx is already false. Change the condition to check if
*published_tx.borrow() is true and then send false to notify unpublish; i.e., in
the remove_descriptor implementation (handling Descriptor::Active and
published_tx) send false when the track is currently published so subscribers
are notified.
- Around line 32-40: The struct field name is wrong: in ManagerOptions rename
decryption_provider to encryption_provider and update its doc comment to
"Provider to use for encrypting outgoing frame payloads." Keep the type as
Option<Arc<dyn EncryptionProvider>> and then update all references/usages
(constructors, Manager::new, tests, builders, and any pattern matches) from
decryption_provider to encryption_provider to avoid breakage.
In `@livekit-datatrack/src/local/packetizer.rs`:
- Around line 56-86: The closure in packetize moves the non-Copy Header via
`..header`, causing a move on the first iteration; fix by creating a cloned base
header for each packet (e.g., call header.clone() or clone a prepared
base_header) inside the map/iterator so you set per-packet fields (marker,
sequence, extensions) on that clone rather than using `..header`; ensure the
Header type implements Clone if it doesn't already and update the map to use the
cloned header when building each Packet (see packetize, Header, frame_marker,
and sequence references).
In `@livekit-datatrack/src/local/pipeline.rs`:
- Around line 45-51: The constructor Pipeline::new currently uses
debug_assert_eq! to validate that options.info.uses_e2ee matches
options.encryption_provider.is_some(), which is stripped in release builds;
change that check to a runtime assertion (replace debug_assert_eq! with
assert_eq!) so mismatches are caught in all builds (include a clear error
message referencing uses_e2ee vs encryption_provider). Update the assertion in
Pipeline::new (and keep the rest of the initialization untouched) so
encrypt_if_needed cannot be left running without a provider when the server
indicates E2EE is required.
In `@livekit-datatrack/src/packet/deserialize.rs`:
- Around line 78-88: The ext_words u16 can overflow when computing ext_len by
doing ext_words + 1 in u16; update the calculation in the ext_flag branch (where
ext_words is read with raw.get_u16()) to convert ext_words to usize before
adding 1 and multiplying (i.e., compute ext_len using ext_words as usize: 4 *
(ext_words_as_usize + 1)) and keep the existing bounds checks
(MissingExtWords/HeaderOverrun) intact so malformed inputs cannot wrap and
bypass the header length validation.
- Around line 97-131: The deserializer currently ignores the len field and can
go out-of-sync; update Extensions::deserialize to use the len value for
alignment: after reading tag and len, validate that raw.remaining() >= len (or
return MalformedExt), and for known tags (E2eeExt::TAG, UserTimestampExt::TAG)
check that len equals the serialized payload-length (i.e. expected_payload_len -
1 as written by serialize.rs) before reading fields (E2eeExt::LEN and
UserTimestampExt::LEN should be used to compute the expected len), then read
exactly that payload; for EXT_TAG_PADDING and unknown tags, advance by len (or
len + 1 if serialize writes len as payload_len - 1 and you need to skip the
extra byte) to preserve alignment instead of just continuing, returning
MalformedExt on mismatch. Use the existing symbols Extensions::deserialize,
E2eeExt::TAG/LEN, UserTimestampExt::TAG/LEN, and EXT_TAG_PADDING to locate where
to add these checks and advances.
In `@livekit-datatrack/src/packet/time.rs`:
- Around line 52-79: The Clock::at method currently calls
instant.duration_since(self.epoch), which can panic if instant is earlier than
epoch; change that to instant.saturating_duration_since(self.epoch) to
defensively handle stale instants and avoid future panics. Update the
computation in Clock::at (function name: at, struct: Clock) to use
saturating_duration_since when computing elapsed before calling
duration_to_ticks, preserving the remaining logic (wrapping_add, monotonicity
check with prev) unchanged.
In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Around line 151-158: The expected packet count calculation in the
DepacketizerDropError construction is wrong when u16 sequence numbers wrap;
update the computation used in DepacketizerDropReason::Incomplete (where you
reference partial.frame_number, partial.start_sequence and end_sequence) to use
wrapping arithmetic instead of plain subtraction—e.g., compute expected by using
end_sequence.wrapping_sub(partial.start_sequence).wrapping_add(1) (and
cast/promote to the expected integer type such as usize or u32) so wrapped
ranges like start=65530 and end=5 produce the correct inclusive count.
In `@livekit-datatrack/src/remote/manager.rs`:
- Around line 210-219: handle_track_unpublished currently removes the
subscription entry but does not await the running subscription task, risking
races; change handle_track_unpublished to be async (matching shutdown()) and
when matching SubscriptionState::Active { sub_handle, .. } use
self.sub_handles.remove(&sub_handle) to take the task's JoinHandle (or
equivalent task_handle) and await it (e.g., task_handle.await) before
proceeding, then send published_tx(false) and remove descriptor; use the same
JoinHandle type and awaiting pattern used in shutdown() to ensure the background
task finishes before the manager continues.
- Around line 142-156: on_unsubscribe_request currently returns early when
descriptor.subscription matches SubscriptionState::Pending, leaving the
descriptor.subscription and any pending result receivers unresolved; update
on_unsubscribe_request (the match on descriptor.subscription) to, when
SubscriptionState::Pending is found, set descriptor.subscription =
SubscriptionState::None, clear any pending result receivers (notify or send an
error/closure to the pending result channel stored on the descriptor), and still
send the SfuUpdateSubscription event via self.event_out_tx; ensure you also
remove any related entries from self.sub_handles if applicable so state and
receivers are consistently cleaned up.
- Around line 253-257: The code creates a pipeline for tracks that use E2EE
(descriptor.info.uses_e2ee()) even when self.decryption_provider is None, which
can cause silent decryption failures; update the manager to detect this mismatch
and refuse subscription (or at minimum log a warning) before constructing the
pipeline: when descriptor.info.uses_e2ee() is true and
self.decryption_provider.is_none(), either return an Err (or skip subscription)
from the function creating the pipeline or emit a clear warning via the existing
logger, and only build Pipeline::new with the decryption_provider variable when
Some(Arc) is present so Pipeline::new never receives None for an E2EE track.
In `@livekit-datatrack/src/remote/pipeline.rs`:
- Around line 23-84: The code currently relies on
debug_assert_eq!(options.info.uses_e2ee, options.decryption_provider.is_some())
which is removed in release builds and can cause encrypted payloads to be
treated as plaintext; fix by performing a runtime invariant check in
Pipeline::new and by making decryption decision based on the track's E2EE flag.
Specifically: in Pipeline::new (where PipelineOptions, publisher_identity, and
decryption_provider are passed) replace the debug_assert with a runtime check —
if options.info.uses_e2ee is true but options.decryption_provider.is_none(), log
an error and fail construction (return a Result or panic) or set a safe
fallback; if uses_e2ee is false but a decryption_provider is provided, ignore it
or log a warning. Also add a field to Pipeline (e.g., uses_e2ee: bool or store
Arc<DataTrackInfo>) and update decrypt_if_needed to first check that stored
uses_e2ee is true before attempting decryption, and if uses_e2ee is true but
e2ee_provider is missing, log and return None to avoid emitting ciphertext as
plaintext.
🧹 Nitpick comments (10)
livekit-datatrack/src/remote/proto.rs (1)
66-74: Usemap_orfor broader Rust version compatibility.The crate does not declare an MSRV, and
Option::is_none_orrequires Rust 1.82.0 (stabilized October 2024). To support a wider range of Rust versions without explicitly committing to an MSRV, use the equivalent and more compatiblemap_orapproach:Suggested change
- .filter(|participant| { - local_participant_identity.is_none_or(|identity| participant.identity != identity) - }) + .filter(|participant| { + local_participant_identity.map_or(true, |identity| participant.identity != identity) + })livekit-datatrack/src/remote/mod.rs (1)
55-97: Extract subscription timeouts into named constants.
This aligns with the TODO and avoids magic numbers in the API flow.♻️ Suggested refactor
+const SUBSCRIBE_SEND_TIMEOUT_MS: u64 = 50; +const SUBSCRIBE_RESULT_TIMEOUT_SECS: u64 = 10; + pub async fn subscribe(&self) -> Result<impl Stream<Item = DataTrackFrame>, SubscribeError> { let (result_tx, result_rx) = oneshot::channel(); let subscribe_event = SubscribeRequest { sid: self.info.sid.clone(), result_tx }; self.inner() .event_in_tx .upgrade() .ok_or(SubscribeError::Disconnected)? - .send_timeout(subscribe_event.into(), Duration::from_millis(50)) + .send_timeout(subscribe_event.into(), Duration::from_millis(SUBSCRIBE_SEND_TIMEOUT_MS)) .await .map_err(|_| { SubscribeError::Internal(anyhow!("Failed to send subscribe event").into()) })?; // TODO: standardize timeout - let frame_rx = timeout(Duration::from_secs(10), result_rx) + let frame_rx = timeout(Duration::from_secs(SUBSCRIBE_RESULT_TIMEOUT_SECS), result_rx) .await .map_err(|_| SubscribeError::Timeout)? .map_err(|_| SubscribeError::Disconnected)??;livekit-datatrack/src/remote/manager.rs (2)
74-75: Consider documenting or increasing channel buffer sizes.The
mpsc::channel(4)buffer size is quite small. With high-frequency events or bursts of publications/subscriptions, this could cause backpressure. The TODO comment acknowledges this, but it would be good to either document the rationale or plan to tune this before production use.
466-467: Minor: Redundant field name in struct initialization.The field initialization can be simplified.
✨ Suggested fix
let task = - TrackTask { info: info, pipeline, published_rx, packet_rx, frame_tx, event_in_tx }; + TrackTask { info, pipeline, published_rx, packet_rx, frame_tx, event_in_tx };livekit-datatrack/src/remote/depacketizer.rs (1)
386-415: Test could be more comprehensive for duplicate sequence handling.The test verifies that duplicate sequences result in the later packet's payload being used, but doesn't validate:
- The final frame length (should be 6 bytes: 3 from seq 1 + 3 from seq 2, not 9)
- Complete frame content
The comment "Should retain the second packet" is slightly ambiguous—it replaces the first packet with the duplicate sequence, not retains both.
✨ Suggested test improvement
let result = depacketizer.push(packet.clone()); assert!(result.drop_error.is_none()); let frame = result.frame.unwrap(); - assert!(frame.payload.starts_with(&[0xCD; 3])); - // Should retain the second packet with duplicate sequence number + // Duplicate sequence replaces the first packet's payload + assert_eq!(frame.payload.len(), 6); // seq 1 (replaced) + seq 2 + assert_eq!(&frame.payload[..3], &[0xCD; 3]); // Replaced payload + assert_eq!(&frame.payload[3..], &[0xEF; 3]); // Final payload }livekit-datatrack/src/track.rs (1)
108-118: Consider adding length validation for DataTrackSid.The
TryFrom<String>implementation only validates the prefix. A SID like"DTR_"(empty ID portion) or"DTR_x"(very short ID) would be accepted. If the SFU has specific requirements for SID format, consider adding validation here.livekit-datatrack/src/local/manager.rs (3)
284-290: Consider usingbiasedselect for consistent priority handling.Unlike the remote
TrackTask, thistokio::select!doesn't usebiased, meaning thepublished_rx.changed()branch may not be prioritized over frame processing. For consistent behavior and responsive unpublish handling, consider addingbiased.✨ Suggested fix
while is_published { tokio::select! { + biased; _ = self.published_rx.changed() => { is_published = *self.published_rx.borrow(); } Some(frame) = self.frame_rx.recv() => self.process_and_send(frame) } }
355-358: Inconsistent timeout API usage.
publish_trackusestokio::time::timeout, but tests in this file uselivekit_runtime::timeout. For consistency and to ensure proper runtime behavior across different execution contexts, consider usinglivekit_runtime::timeouthere as well.✨ Suggested fix
- let track = tokio::time::timeout(Self::PUBLISH_TIMEOUT, result_rx) + let track = livekit_runtime::timeout(Self::PUBLISH_TIMEOUT, result_rx) .await - .map_err(|_| PublishError::Timeout)? + .map_err(|_| PublishError::Timeout)? .map_err(|_| PublishError::Disconnected)??;
433-436: Minor: Preferfirst()ornext()overnth(0).Using
into_iter().nth(0)is less idiomatic than alternatives.✨ Suggested fix
OutputEvent::PacketsAvailable(packets) => { - let packet = packets.into_iter().nth(0).unwrap(); + let packet = packets.into_iter().next().unwrap(); let payload = Packet::deserialize(packet).unwrap().payload;livekit-datatrack/src/local/mod.rs (1)
137-149: Consider validating non-empty track name.The doc comment states the name "Must not be empty", but
new()accepts any string including empty. Consider adding validation to enforce this requirement.✨ Suggested validation
/// Creates options with the given track name. /// /// The track name is used to identify the track to other participants. /// /// # Requirements /// - Must not be empty /// - Must be unique per publisher /// - pub fn new(name: impl Into<String>) -> Self { - Self { name: name.into() } + /// # Panics + /// Panics if `name` is empty. + /// + pub fn new(name: impl Into<String>) -> Self { + let name = name.into(); + assert!(!name.is_empty(), "Track name must not be empty"); + Self { name } }Alternatively, return a
Resultfor a non-panicking API.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (29)
Cargo.tomllivekit-datatrack/Cargo.tomllivekit-datatrack/README.mdlivekit-datatrack/src/e2ee.rslivekit-datatrack/src/error.rslivekit-datatrack/src/frame.rslivekit-datatrack/src/lib.rslivekit-datatrack/src/local/events.rslivekit-datatrack/src/local/manager.rslivekit-datatrack/src/local/mod.rslivekit-datatrack/src/local/packetizer.rslivekit-datatrack/src/local/pipeline.rslivekit-datatrack/src/local/proto.rslivekit-datatrack/src/packet/deserialize.rslivekit-datatrack/src/packet/extension.rslivekit-datatrack/src/packet/handle.rslivekit-datatrack/src/packet/mod.rslivekit-datatrack/src/packet/serialize.rslivekit-datatrack/src/packet/time.rslivekit-datatrack/src/remote/depacketizer.rslivekit-datatrack/src/remote/events.rslivekit-datatrack/src/remote/manager.rslivekit-datatrack/src/remote/mod.rslivekit-datatrack/src/remote/pipeline.rslivekit-datatrack/src/remote/proto.rslivekit-datatrack/src/track.rslivekit-datatrack/src/utils/bytes.rslivekit-datatrack/src/utils/counter.rslivekit-datatrack/src/utils/mod.rs
🧰 Additional context used
🧬 Code graph analysis (16)
livekit-datatrack/src/utils/mod.rs (1)
livekit-api/src/http_client.rs (1)
bytes(81-83)
livekit-datatrack/src/packet/mod.rs (6)
livekit-api/src/http_client.rs (1)
bytes(81-83)livekit-datatrack/src/frame.rs (2)
fmt(90-95)payload(41-43)livekit-datatrack/src/packet/extension.rs (1)
fmt(36-39)livekit-datatrack/src/packet/handle.rs (1)
fmt(64-66)livekit-datatrack/src/packet/deserialize.rs (3)
deserialize(44-49)deserialize(53-94)deserialize(98-134)livekit-datatrack/src/packet/serialize.rs (1)
serialize(50-57)
livekit-datatrack/src/lib.rs (1)
livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)
livekit-datatrack/src/frame.rs (2)
livekit-datatrack/src/remote/depacketizer.rs (2)
from(190-192)from(196-198)livekit-datatrack/src/local/pipeline.rs (1)
from(81-89)
livekit-datatrack/src/packet/handle.rs (2)
livekit-datatrack/src/local/proto.rs (6)
try_from(45-48)try_from(54-58)try_from(64-73)from(27-31)from(35-37)from(97-109)livekit-datatrack/src/remote/proto.rs (2)
try_from(28-39)from(94-101)
livekit-datatrack/src/utils/counter.rs (1)
livekit-datatrack/src/packet/time.rs (1)
wrapping_add(39-41)
livekit-datatrack/src/local/packetizer.rs (4)
livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)livekit-datatrack/src/packet/time.rs (2)
new(54-56)random(23-25)livekit-datatrack/src/utils/counter.rs (1)
new(21-23)livekit-datatrack/src/packet/handle.rs (2)
from(52-54)from(58-60)
livekit-datatrack/src/remote/depacketizer.rs (1)
livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)
livekit-datatrack/src/packet/deserialize.rs (4)
livekit-datatrack/src/frame.rs (6)
from(101-103)from(107-109)from(113-115)from(119-121)payload(41-43)user_timestamp(46-48)livekit-datatrack/src/packet/handle.rs (2)
from(52-54)from(58-60)livekit-datatrack/src/packet/time.rs (1)
from_ticks(27-29)livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)
livekit-datatrack/src/remote/proto.rs (1)
livekit-datatrack/src/packet/handle.rs (5)
try_from(34-39)try_from(45-48)from(52-54)from(58-60)get(78-81)
livekit-datatrack/src/remote/pipeline.rs (1)
livekit-datatrack/src/remote/depacketizer.rs (3)
new(39-41)from(190-192)from(196-198)
livekit-datatrack/src/remote/manager.rs (3)
livekit-datatrack/src/remote/mod.rs (4)
new(43-45)publisher_identity(100-102)published_rx(113-115)inner(47-52)livekit-datatrack/src/remote/pipeline.rs (1)
new(39-47)livekit-datatrack/src/track.rs (4)
info(38-40)sid(83-85)published_rx(63-68)uses_e2ee(91-93)
livekit-datatrack/src/packet/extension.rs (6)
livekit-datatrack/src/frame.rs (1)
fmt(90-95)livekit-datatrack/src/local/mod.rs (1)
fmt(231-236)livekit-datatrack/src/packet/handle.rs (1)
fmt(64-66)livekit-datatrack/src/packet/mod.rs (1)
fmt(60-65)livekit-datatrack/src/remote/depacketizer.rs (1)
fmt(228-237)livekit-datatrack/src/track.rs (1)
fmt(127-129)
livekit-datatrack/src/local/manager.rs (5)
livekit-datatrack/src/local/mod.rs (4)
new(41-43)new(146-148)new(203-205)inner(45-50)livekit-datatrack/src/local/pipeline.rs (1)
new(47-51)livekit-datatrack/src/remote/pipeline.rs (1)
new(39-47)livekit-datatrack/src/track.rs (4)
info(38-40)sid(83-85)uses_e2ee(91-93)name(87-89)livekit-runtime/src/dispatcher.rs (1)
timeout(115-128)
livekit-datatrack/src/packet/time.rs (1)
livekit-datatrack/src/local/packetizer.rs (1)
new(46-54)
livekit-datatrack/src/track.rs (2)
livekit-datatrack/src/remote/mod.rs (2)
published_rx(113-115)inner(47-52)livekit-datatrack/src/local/proto.rs (3)
from(27-31)from(35-37)from(97-109)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
- GitHub Check: Test (x86_64-pc-windows-msvc)
- GitHub Check: Test (x86_64-apple-darwin)
🔇 Additional comments (71)
livekit-datatrack/README.md (1)
1-6: Clear and appropriately scoped README.Concise guidance that the crate is internal and should be used via client SDK APIs is helpful and unambiguous.
Cargo.toml (1)
58-73: LGTM!The new workspace dependencies (
from_variants,futures-core,tokio-stream) are properly added in alphabetical order and align with existing dependency versions in the workspace (e.g.,futures-core = "0.3"matchesfutures = "0.3"andfutures-util = "0.3").livekit-datatrack/Cargo.toml (1)
9-25: Dependencies are well-structured.Good use of workspace dependencies for shared crates, and the inline comment on
anyhowclarifying its intended scope ("For internal error handling only") is helpful for maintainability.livekit-datatrack/src/error.rs (1)
17-24: LGTM: clear internal error wrapper.livekit-datatrack/src/utils/bytes.rs (3)
17-32: LGTM: clean chunking extension API.
34-52: LGTM: iterator flow is straightforward and safe.
55-85: Tests cover empty input and varied sizes.livekit-datatrack/src/frame.rs (3)
33-48: LGTM: compact frame container and accessors.
65-86: LGTM: builder-style constructors are clear.
89-121: LGTM: Debug and From impls improve ergonomics.livekit-datatrack/src/e2ee.rs (1)
22-57: LGTM: E2EE payload and provider traits are clear.livekit-datatrack/src/utils/mod.rs (1)
15-21: LGTM: simple utils re-export layer.livekit-datatrack/src/local/pipeline.rs (7)
25-35: LGTM: pipeline options and state are minimal and clear.
37-43: LGTM: error composition is straightforward.
53-57: LGTM: process_frame flow is clean.
59-74: LGTM: encryption path is straightforward.
76-77: LGTM: MTU constant is clearly scoped.
80-89: LGTM: frame-to-packetizer conversion is clear.
92-120: LGTM: test validates packetization output.livekit-datatrack/src/lib.rs (1)
17-59: LGTM: clear public API and backend re-exports.livekit-datatrack/src/utils/counter.rs (2)
15-36: Clean counter API.Monotonic wrap-around behavior is concise and easy to use.
38-53: WrappingIncrement impls look good.Macro coverage for unsigned ints is consistent.
livekit-datatrack/src/packet/deserialize.rs (3)
22-41: Clear error taxonomy.Variants map cleanly to parsing failure cases.
43-49: Straightforward packet assembly.Header parse + payload copy path is clean.
137-275: Good coverage of error and extension cases.The tests exercise critical parsing paths and edge cases well.
livekit-datatrack/src/packet/handle.rs (3)
18-67: Handle type + conversions are solid.Validation and display formatting are clear.
69-82: Allocator behavior looks correct.Monotonic allocation with overflow guard is fine.
84-88: Test-only Dummy impl is fine.Keeps test scaffolding concise.
livekit-datatrack/src/packet/mod.rs (4)
15-57: Core packet types are clean and focused.Public surface is minimal and readable.
59-65: Custom Debug output is appropriate.Avoids payload dumps while keeping shape visible.
68-90: Serialization constants are clear.Bitfield and size constants are easy to reason about.
92-109: Round‑trip test is a good sanity check.Validates serialization ↔ deserialization cohesion.
livekit-datatrack/src/packet/serialize.rs (4)
19-58: Packet serialization API looks consistent.Surface area aligns well with deserialization.
60-127: Header sizing + serialization logic reads correctly.Metrics and write order are clear.
130-168: Extension serialization is straightforward.Marker/length/payload layout is consistent with tests.
170-241: Serialization tests cover key paths.Good validation of lengths, padding, and payload layout.
livekit-datatrack/src/local/packetizer.rs (3)
22-54: Packetizer setup looks good.State fields and defaults are sensible.
90-99: Frame marker helper is clear.The branching is easy to follow and testable.
102-145: Packetization tests cover marker and sizing cases.Good breadth across payload sizes and MTUs.
livekit-datatrack/src/remote/proto.rs (5)
25-39: Subscriber handle mapping is clear.Error mapping and collection are clean.
47-64: Join/update extraction logic looks fine.Ownership transfer via
mem::takeis well‑documented.
82-89: Track-info extraction viamem::takeis clean.Avoids clones and preserves intent.
93-101: Subscription update mapping is correct.The protocol payload is populated consistently.
104-158: Tests exercise the conversion paths well.Covers handle mapping and track extraction.
livekit-datatrack/src/packet/extension.rs (2)
17-33: LGTM: extension container and typed wrappers are clean and test-friendly.
The derives and Option-wrapped fields keep the API minimal and clear.
35-52: Redacted Debug plus explicit TAG/LEN constants look solid.
Good balance between observability and security hygiene.livekit-datatrack/src/packet/time.rs (2)
18-42: Timestamp encapsulation and wraparound helpers look good.
91-122: Tests and Dummy impl provide good coverage and test ergonomics.livekit-datatrack/src/remote/pipeline.rs (1)
87-123: Frame conversion and the no‑E2EE test path look good.livekit-datatrack/src/local/proto.rs (6)
26-38: Publish/unpublish request conversions are clear and consistent.
42-59: Response conversions handle missing info and handle parsing cleanly.
61-73: DataTrackInfo conversion maps encryption and SID as expected.
76-92: RequestResponse error mapping looks sensible.
96-120: Sync-state conversion helpers are tidy and efficient.
122-204: Tests cover the critical conversion paths well.livekit-datatrack/src/remote/mod.rs (3)
42-53: Type-state wiring forRemotelooks good.
99-115: Publisher identity accessor and inner state look good.
118-128: SubscribeError variants are clear and user-facing.livekit-datatrack/src/remote/events.rs (2)
24-54: Input/Output event enums are well-scoped and readable.
56-126: Event payload structs are clear and map cleanly to protocol intent.livekit-datatrack/src/local/events.rs (2)
24-44: Input/Output event enums are well-structured.
48-131: Local event payload structs are clear and consistent.livekit-datatrack/src/remote/manager.rs (2)
378-411: LGTM!The
TrackTask::runimplementation correctly usesbiasedselect to prioritize state updates, properly handles subscriber drop detection viaframe_tx.closed(), and cleanly sends an unsubscribe request when all subscribers are gone.
419-424: LGTM!The
sendmethod appropriately usestry_sendfor synchronous contexts and properly wraps the error with context usinganyhow.livekit-datatrack/src/remote/depacketizer.rs (2)
115-121: LGTM!The buffer overflow protection correctly drops the entire frame and resets state. This prevents memory exhaustion from large fragmented frames while providing clear error reporting.
177-199: LGTM!The
DepacketizerPushResultdesign elegantly handles the case where a single push operation can both complete a frame and report a dropped frame. TheFromimplementations provide clean ergonomic construction.livekit-datatrack/src/track.rs (2)
21-69: LGTM!The
DataTrack<L>generic type with phantom marker for location is a clean design pattern. Thepublished_rx()method correctly dispatches to the appropriate inner type, andwait_for_unpublish()properly handles the already-unpublished case.
132-145: LGTM!The
fake::Dummyimplementation correctly generates realistic test SIDs using a base-57 alphabet that excludes ambiguous characters. The 12-character random suffix provides sufficient uniqueness for testing.livekit-datatrack/src/local/mod.rs (3)
40-96: LGTM!The
DataTrack<Local>implementation provides a clean API for frame publishing. Thetry_pushmethod correctly checks publication state before sending, and the error handling preserves the frame for potential retry. Theunpublishmethod consumingselfensures the track can't be used after unpublishing.
114-119: LGTM!The
Dropimplementation ensures the track is properly unpublished even if the user doesn't explicitly callunpublish(). This is good defensive programming for resource cleanup.
163-237: LGTM!The error types are well-designed with clear documentation.
PushFrameErrorpreserving the frame viainto_frame()enables application-level retry logic, which is a thoughtful API design choice.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@livekit-datatrack/src/local/manager.rs`:
- Around line 298-311: process_and_send currently uses
event_out_tx.try_send(...) which silently drops packets on full channel; change
the behavior to propagate backpressure instead of dropping: make
process_and_send async (or add a new async variant) and replace try_send(...)
with event_out_tx.send(packets.into()).await, returning or propagating the
Result so callers (e.g. try_push) can handle backpressure/failure; update
callers of process_and_send/try_push to await and handle the send error rather
than relying on the non-blocking try_send drop semantics.
In `@livekit-datatrack/src/local/mod.rs`:
- Around line 143-149: The constructor new currently accepts any string but the
doc says "Must not be empty", so add validation: convert the incoming name (in
new) to a String, check if .is_empty(), and if so return an error instead of
constructing the struct; update new's signature to return Result<Self,
SomeErrorType> (create a simple error enum like InvalidTrackName or reuse an
existing error type) and return Ok(Self { name }) on success; reference the new
function and the name field so reviewers can locate the change.
In `@livekit-datatrack/src/remote/mod.rs`:
- Around line 118-121: The SubscribeError enum defines an unused variant
SubscribeError::Unpublished; either remove this dead variant or wire it into the
subscription flow: update the subscribe() function (and any helpers in remote
module) to detect when a track becomes unpublished and return
SubscribeError::Unpublished at that point, or delete the Unpublished variant and
any references to it to avoid dead code. Ensure references to SubscribeError and
match arms in callers are adjusted accordingly.
🧹 Nitpick comments (5)
livekit-datatrack/src/remote/mod.rs (1)
94-96: Silently dropping lagged frames is intentional but worth noting.The
filter_map(|result| async move { result.ok() })silently discardsBroadcastStreamlag errors (when a subscriber can't keep up). This is a reasonable design choice for real-time data, but subscribers won't know if they've missed frames.Consider adding a brief doc comment noting this behavior for users who need guaranteed delivery.
livekit-datatrack/src/track.rs (1)
43-47: Minor: Simplifyis_published()implementation.The intermediate variable is unnecessary.
♻️ Suggested simplification
pub fn is_published(&self) -> bool { let published_rx = self.published_rx(); - let published = *published_rx.borrow(); - published + *published_rx.borrow() }livekit-datatrack/src/local/manager.rs (3)
252-265: Consider parallel task shutdown for efficiency.The shutdown awaits each active task sequentially. With many published tracks, this could delay shutdown. Consider
futures::future::join_allfor parallel completion.♻️ Parallel shutdown suggestion
async fn shutdown(self) { + let mut task_handles = Vec::new(); for (_, descriptor) in self.descriptors { match descriptor { Descriptor::Pending(result_tx) => { _ = result_tx.send(Err(PublishError::Disconnected)) } Descriptor::Active { published_tx, task_handle, .. } => { _ = published_tx.send(false); - task_handle.await; + task_handles.push(task_handle); } } } + futures::future::join_all(task_handles).await; }
367-376: Inconsistent channel send behavior.
publish_trackusestry_send(non-blocking), whilequery_tracksusessend().await(blocking). This inconsistency is likely intentional since queries are less time-critical, but it could cause unexpected blocking if the manager is overwhelmed.
433-436: Minor: Preferfirst()overnth(0).♻️ Style suggestion
OutputEvent::PacketsAvailable(packets) => { - let packet = packets.into_iter().nth(0).unwrap(); + let packet = packets.into_iter().next().unwrap(); let payload = Packet::deserialize(packet).unwrap().payload;
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (15)
Cargo.tomllivekit-datatrack/src/frame.rslivekit-datatrack/src/local/events.rslivekit-datatrack/src/local/manager.rslivekit-datatrack/src/local/mod.rslivekit-datatrack/src/local/pipeline.rslivekit-datatrack/src/packet/mod.rslivekit-datatrack/src/remote/depacketizer.rslivekit-datatrack/src/remote/mod.rslivekit-datatrack/src/track.rslivekit-ffi/src/conversion/participant.rslivekit/src/proto.rslivekit/src/room/mod.rslivekit/src/room/participant/mod.rsrelease-plz.toml
✅ Files skipped from review due to trivial changes (1)
- livekit-ffi/src/conversion/participant.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- Cargo.toml
🧰 Additional context used
🧬 Code graph analysis (5)
livekit-datatrack/src/track.rs (2)
livekit-datatrack/src/remote/mod.rs (1)
published_rx(113-115)livekit-datatrack/src/local/proto.rs (3)
try_from(45-48)try_from(54-58)try_from(64-73)
livekit/src/room/mod.rs (7)
livekit-api/src/access_token.rs (2)
default(100-118)default(131-133)webrtc-sys/src/peer_connection.rs (1)
default(223-234)livekit-api/src/signal_client/mod.rs (2)
default(82-84)default(96-102)livekit/src/room/options.rs (1)
default(94-106)livekit/src/room/participant/rpc.rs (1)
default(29-36)libwebrtc/src/data_channel.rs (1)
default(34-44)libwebrtc/src/native/peer_connection_factory.rs (1)
default(44-55)
livekit-datatrack/src/local/mod.rs (1)
livekit-datatrack/src/remote/mod.rs (2)
inner(47-52)published_rx(113-115)
livekit-datatrack/src/local/pipeline.rs (3)
livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)livekit-datatrack/src/frame.rs (2)
new(69-71)payload(41-43)livekit-datatrack/src/local/packetizer.rs (1)
new(46-54)
livekit-datatrack/src/remote/mod.rs (3)
livekit-datatrack/src/local/mod.rs (5)
new(41-43)new(147-149)new(204-206)inner(45-50)published_rx(110-112)livekit-datatrack/src/remote/pipeline.rs (1)
new(39-47)livekit-datatrack/src/track.rs (2)
info(38-40)published_rx(63-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (x86_64-linux-android)
- GitHub Check: Test (x86_64-apple-darwin)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
- GitHub Check: Test (x86_64-pc-windows-msvc)
- GitHub Check: Build (x86_64-unknown-linux-gnu)
- GitHub Check: Build (x86_64-apple-darwin)
- GitHub Check: Build (aarch64-apple-ios-sim)
- GitHub Check: Build (aarch64-pc-windows-msvc)
- GitHub Check: Build (aarch64-linux-android)
- GitHub Check: Build (aarch64-apple-ios)
- GitHub Check: Build (armv7-linux-androideabi)
- GitHub Check: Build (aarch64-apple-darwin)
- GitHub Check: Build (x86_64-pc-windows-msvc)
- GitHub Check: Build (aarch64-unknown-linux-gnu)
🔇 Additional comments (11)
livekit/src/room/participant/mod.rs (1)
48-48: LGTM!Trailing comma additions to enum variants improve consistency and reduce diff noise for future additions.
Also applies to: 57-57
livekit-datatrack/src/remote/mod.rs (1)
39-52: LGTM!The type-state pattern with
PhantomData<Remote>and theinner()accessor is correctly implemented. Theunreachable!()is safe due to compile-time guarantees.livekit-datatrack/src/track.rs (3)
71-94: LGTM!
DataTrackInfoprovides clean encapsulation with internal fields and appropriate public accessors.
108-118: LGTM!The prefix-based validation is appropriate for SFU-assigned identifiers. The implementation correctly rejects malformed SIDs.
132-145: LGTM!The test fake implementation correctly generates valid SIDs with the required prefix and a base57 suffix for better readability in test output.
livekit-datatrack/src/local/mod.rs (3)
83-91: LGTM!The
try_pushimplementation correctly handles the race between checkingis_published()and sending. The early check provides a clearTrackUnpublishederror, whiletry_sendfailure catches edge cases with theDroppederror.
115-120: LGTM!The
Dropimplementation ensures proper cleanup by signaling unpublish, preventing resource leaks if the user forgets to callunpublish().
164-238: LGTM!The error types are well-designed:
PublishErrorcovers all failure modes clearlyPushFrameErrorallows frame recovery for retry logic- Good documentation on each variant
livekit-datatrack/src/local/manager.rs (3)
61-77: LGTM!The manager setup is clean. The TODO for buffer size tuning is noted - this may need adjustment based on real-world usage patterns.
139-156: LGTM!The
forward_publish_resulttask correctly usesbiasedselect to prioritize the result channel, and properly notifies the manager on cancellation via the weak sender.
390-463: LGTM!Good test coverage for the manager lifecycle and publish flow. The tests verify the complete round-trip including SFU response handling and packet emission.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| /// # Requirements | ||
| /// - Must not be empty | ||
| /// - Must be unique per publisher | ||
| /// | ||
| pub fn new(name: impl Into<String>) -> Self { | ||
| Self { name: name.into() } | ||
| } |
There was a problem hiding this comment.
Consider validating non-empty name requirement.
The documentation states the track name "Must not be empty", but new() doesn't enforce this. Consider adding validation or clarifying that empty names are validated server-side.
🛡️ Optional validation
pub fn new(name: impl Into<String>) -> Self {
- Self { name: name.into() }
+ let name = name.into();
+ debug_assert!(!name.is_empty(), "Track name must not be empty");
+ Self { name }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// # Requirements | |
| /// - Must not be empty | |
| /// - Must be unique per publisher | |
| /// | |
| pub fn new(name: impl Into<String>) -> Self { | |
| Self { name: name.into() } | |
| } | |
| /// # Requirements | |
| /// - Must not be empty | |
| /// - Must be unique per publisher | |
| /// | |
| pub fn new(name: impl Into<String>) -> Self { | |
| let name = name.into(); | |
| debug_assert!(!name.is_empty(), "Track name must not be empty"); | |
| Self { name } | |
| } |
🤖 Prompt for AI Agents
In `@livekit-datatrack/src/local/mod.rs` around lines 143 - 149, The constructor
new currently accepts any string but the doc says "Must not be empty", so add
validation: convert the incoming name (in new) to a String, check if
.is_empty(), and if so return an error instead of constructing the struct;
update new's signature to return Result<Self, SomeErrorType> (create a simple
error enum like InvalidTrackName or reuse an existing error type) and return
Ok(Self { name }) on success; reference the new function and the name field so
reviewers can locate the change.
| #[derive(Debug, Error)] | ||
| pub enum SubscribeError { | ||
| #[error("The track has been unpublished and is no longer available")] | ||
| Unpublished, |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n 'SubscribeError::Unpublished' livekit-datatrack/Repository: livekit/rust-sdks
Length of output: 43
🏁 Script executed:
rg -n 'subscribe' livekit-datatrack/src/remote/mod.rs -A 5 -B 2Repository: livekit/rust-sdks
Length of output: 1875
🏁 Script executed:
sed -n '75,100p' livekit-datatrack/src/remote/mod.rsRepository: livekit/rust-sdks
Length of output: 1158
Remove the unused SubscribeError::Unpublished variant or implement its use.
The Unpublished variant is defined in the enum but is never returned by subscribe() or used elsewhere in the crate. Either remove it as dead code or implement the logic to return this error when the track becomes unpublished.
🤖 Prompt for AI Agents
In `@livekit-datatrack/src/remote/mod.rs` around lines 118 - 121, The
SubscribeError enum defines an unused variant SubscribeError::Unpublished;
either remove this dead variant or wire it into the subscription flow: update
the subscribe() function (and any helpers in remote module) to detect when a
track becomes unpublished and return SubscribeError::Unpublished at that point,
or delete the Unpublished variant and any references to it to avoid dead code.
Ensure references to SubscribeError and match arms in callers are adjusted
accordingly.
| let remaining_initial = buf.remaining_mut(); | ||
|
|
||
| if buf.remaining_mut() < serialized_len { | ||
| Err(SerializeError::TooSmallForHeader)? |
There was a problem hiding this comment.
question: Is there a benefit to Err(...)? vs return Err(...)? I've never encountered this former form before but it logically makes sense and both seem equivilent.
There was a problem hiding this comment.
Functionally the same, I just prefer the former because it's shorter
| #[test] | ||
| fn test_serialize() { | ||
| let mut buf = packet().serialize().try_into_mut().unwrap(); | ||
| assert_eq!(buf.len(), 1070); | ||
|
|
||
| // Base header | ||
| assert_eq!(buf.get_u8(), 0xC); // Version 0, final, extension | ||
| assert_eq!(buf.get_u8(), 0); // Reserved | ||
| assert_eq!(buf.get_u16(), 0x8811); // Track handle | ||
| assert_eq!(buf.get_u16(), 0x4422); // Sequence | ||
| assert_eq!(buf.get_u16(), 0x4411); // Frame number |
There was a problem hiding this comment.
suggestion(if-minor): It might be worth porting over more of the serialization tests I wrote in the js implementation. In particular, a test to serialize a packet without any extensions and one to try to serialize a packet only containing each extension individually might be nice.
| E2eeExt::TAG => { | ||
| if raw.remaining() < E2eeExt::LEN { | ||
| Err(DeserializeError::MalformedExt(tag))? | ||
| } | ||
| let key_index = raw.get_u8(); | ||
| let mut iv = [0u8; 12]; | ||
| raw.copy_to_slice(&mut iv); | ||
| extensions.e2ee = E2eeExt { key_index, iv }.into(); | ||
| } | ||
| UserTimestampExt::TAG => { | ||
| if raw.remaining() < UserTimestampExt::LEN { | ||
| Err(DeserializeError::MalformedExt(tag))? | ||
| } | ||
| extensions.user_timestamp = UserTimestampExt(raw.get_u64()).into() | ||
| } |
There was a problem hiding this comment.
suggestion(if-minor): You might consider refactoring these individual bits of packet deserializing logic into distinct deserialize methods to match the convention used in Extensions, Headers, etc up the chain (and fwiw, it is asymmetrical right now because E2eeExt / UserTimestampExt have serialize_into methods, so they follow the convention on that end, but not for deserialize).
There was a problem hiding this comment.
Done. Later we might consider doing lazy deserialization of extensions (e.g., ExtensionMap replaces Extensions). This would also help reduce boilerplate.
| E2eeExt::TAG => { | ||
| if raw.remaining() < E2eeExt::LEN { | ||
| Err(DeserializeError::MalformedExt(tag))? | ||
| } | ||
| let key_index = raw.get_u8(); | ||
| let mut iv = [0u8; 12]; | ||
| raw.copy_to_slice(&mut iv); | ||
| extensions.e2ee = E2eeExt { key_index, iv }.into(); | ||
| } | ||
| UserTimestampExt::TAG => { | ||
| if raw.remaining() < UserTimestampExt::LEN { | ||
| Err(DeserializeError::MalformedExt(tag))? | ||
| } | ||
| extensions.user_timestamp = UserTimestampExt(raw.get_u64()).into() | ||
| } |
There was a problem hiding this comment.
suggestion: Right now in processing both E2eeExt and UserTimestampExt you are assuming that the logic in each switch case will always consume all the bytes in that section, which while it is an ok assumption right now, could break in the future if the structure of the extensions were to evolve (ie, maybe a new section is added to the E2eeExt packet and in the future new E2eeExt packets have a longer len value encoded), breaking "forward compatibility".
IMO, The ideal way to ensure "forward compatibility" is to ensure that after processing each extension, the offset of raw is always advanced so that all len bytes (from the extension header) are consumed, disregarding any extra bytes that are larger than expected.
(this is being done in the js implementation right now)
There was a problem hiding this comment.
This was my thinking: since these are tagged extensions (with 2 bytes used to represent each tag), we can treat them similarly to fields in protobuf. For example, if we need to support a new E2EE type, we can introduce a new extension (e.g., E2eeV2Ext) with a new tag number and structure. Newer clients that understand this extension will decode it, while older clients will continue decoding the original extension they know about and safely skip the v2 extension by length. This is effectively the same pattern as deprecating a field in a protobuf message and introducing a new field with a different field number. Let me know if this makes sense.
| /// Result from a call to [`Depacketizer::push`]. | ||
| /// | ||
| /// The reason this type is used instead of [`core::result::Result`] is due to the fact a single | ||
| /// call to push can result in both a complete frame being delivered and a previous | ||
| /// frame being dropped. | ||
| /// | ||
| #[derive(Debug, Default)] | ||
| pub struct DepacketizerPushResult { | ||
| pub frame: Option<DepacketizerFrame>, | ||
| pub drop_error: Option<DepacketizerDropError>, | ||
| } |
There was a problem hiding this comment.
nitpick: I'll leave it up to you if you want to propagate any of the changes I made here into the rust implementation, given it sounds like there's still some evolution that is expected to happen for statistics reporting.
1egoman
left a comment
There was a problem hiding this comment.
Two more packetizer / depacketizer comments!
| /// A frame that has been fully reassembled by [`Depacketizer`]. | ||
| #[derive(Debug)] | ||
| pub struct DepacketizerFrame { | ||
| pub payload: Bytes, | ||
| pub extensions: Extensions, | ||
| } |
There was a problem hiding this comment.
question: Is it worth consolidating DepacketizerFrame into the already existing DataTrackFrame given they are so similar?
There was a problem hiding this comment.
Keeping these types separate was a deliberate design decision. At the pipeline level, access to all the extensions is required (e.g., E2EE metadata). However, when the frame is delivered to the user, there's no reason to keep them all around. In the future, there will likely be more extensions that serve a role at the pipeline level but do not need to be propagated up.
As for DepacketizerFrame and PacketizerFrame being identical, while these could be combined, I think keeping them separate could make sense in case their fields need to diverge in the future—don't have a strong opinion though.
| /// Frame packetized by [`Packetizer`]. | ||
| pub struct PacketizerFrame { | ||
| pub payload: Bytes, | ||
| pub extensions: Extensions, | ||
| } |
There was a problem hiding this comment.
question: Is it worth consolidating PacketizerFrame into the already existing DataTrackFrame given they are so similar?
There was a problem hiding this comment.
See previous comment.
Fixes an issue where duplicate packets cause the length to be incorrect
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Around line 82-88: The struct initialization for PartialFrame (involving
start_sequence and payloads) is failing formatting CI; edit the payloads field
where BTreeMap::from([(start_sequence, packet.payload)]) is used and add a
trailing comma after the single entry so it becomes
BTreeMap::from([(start_sequence, packet.payload),]), keeping the rest of the
PartialFrame fields (frame_number, start_sequence, extensions) intact.
- Around line 143-154: The reassembly loop in depacketizer.rs incorrectly uses
sequence < end_sequence, which fails when u16 wraps; update the loop termination
logic in the block that processes partial.start_sequence / sequence /
end_sequence so it continues advancing with sequence = sequence.wrapping_add(1)
until sequence == end_sequence (i.e., replace the `<` check with a `sequence !=
end_sequence` style condition), ensuring you still call
payload.extend(partial_payload) and return DepacketizerFrame { payload:
payload.freeze(), extensions: partial.extensions } only after the final packet
is appended; keep using wrapping_add and partial.payloads.remove(&sequence) as
before.
🧹 Nitpick comments (1)
livekit-datatrack/src/remote/depacketizer.rs (1)
266-296: Consider adding a test for sequence number wrapping.The test suite covers many scenarios but lacks a test where sequence numbers wrap around
u16::MAX(e.g., start at65534, end at1). This would exercise the wrapping bug flagged infinalize.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
livekit-datatrack/src/remote/depacketizer.rs
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
📚 Learning: 2026-02-03T01:23:11.346Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
Applied to files:
livekit-datatrack/src/remote/depacketizer.rs
🪛 GitHub Actions: Rust Formatting
livekit-datatrack/src/remote/depacketizer.rs
[error] 84-84: Code formatting issue detected by cargo fmt: trailing comma placement in payloads map. Run 'cargo fmt' to fix formatting.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (aarch64-linux-android)
- GitHub Check: Build (x86_64-linux-android)
- GitHub Check: Build (armv7-linux-androideabi)
- GitHub Check: Build (x86_64-pc-windows-msvc)
- GitHub Check: Build (aarch64-pc-windows-msvc)
- GitHub Check: Build (x86_64-unknown-linux-gnu)
- GitHub Check: Build (aarch64-apple-ios-sim)
- GitHub Check: Build (aarch64-unknown-linux-gnu)
- GitHub Check: Build (aarch64-apple-darwin)
- GitHub Check: Build (x86_64-apple-darwin)
- GitHub Check: Build (aarch64-apple-ios)
- GitHub Check: Test (x86_64-pc-windows-msvc)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
- GitHub Check: Test (x86_64-apple-darwin)
🔇 Additional comments (2)
livekit-datatrack/src/remote/depacketizer.rs (2)
386-415: Good coverage of duplicate sequence behavior. The test correctly verifies that the latest payload replaces the earlier one and that the assembled frame reflects this.
180-202: Clean design for the push result type. Using a struct with bothframeanddrop_errorfields (instead ofResult) is a thoughtful choice since a singlepushcan simultaneously complete a frame and report a drop.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Around line 278-287: The test increments packet.header.sequence with `+= 1`
which panics on u16 overflow in debug builds; update the test to use wrapping
arithmetic (e.g., replace the `packet.header.sequence += 1` increment used
around the FrameMarker::Inter/Final loop with a wrapping increment such as
assigning packet.header.sequence = packet.header.sequence.wrapping_add(1)) so
the test matches production semantics, and apply the same change to the
`frame_number` increment in the `test_interrupted` test (use wrapping_add(1) for
frame_number).
🧹 Nitpick comments (2)
livekit-datatrack/src/packet/deserialize.rs (1)
273-282:test_ext_unknowndoesn't exercise non-zero payload skipping.The test writes
put_bytes(0, 6)after the tag, makinglen=0— so the unknown-extension skip path (raw.advance(len)) advances by 0 and the remaining zeros parse as padding. Consider adding a test with a real non-zerolenpayload to validate skipping actually works (this would also catch thelenvslen + 1misalignment noted above).livekit-datatrack/src/remote/depacketizer.rs (1)
95-111: Incoming packet silently dropped on frame-number mismatch.When
push_to_partialdetects a frame-number mismatch (line 105), it reportsInterruptedfor the old partial frame but the incoming packet is also consumed without any corresponding error. The caller has no indication that a packet forpacket.header.frame_numberwas lost—only that the old frame was interrupted.This is fine if intentional, but worth a brief comment in the code to document that the new packet is intentionally discarded without its own drop error.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
livekit-datatrack/src/packet/deserialize.rslivekit-datatrack/src/remote/depacketizer.rs
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
📚 Learning: 2026-02-03T01:23:11.346Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
Applied to files:
livekit-datatrack/src/remote/depacketizer.rs
🧬 Code graph analysis (2)
livekit-datatrack/src/packet/deserialize.rs (4)
livekit-datatrack/src/frame.rs (6)
from(103-105)from(109-111)from(115-117)from(121-123)payload(41-43)user_timestamp(46-48)livekit-datatrack/src/packet/handle.rs (2)
from(52-54)from(58-60)livekit-datatrack/src/packet/time.rs (1)
from_ticks(27-29)livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)
livekit-datatrack/src/remote/depacketizer.rs (1)
livekit-datatrack/src/remote/pipeline.rs (1)
new(39-47)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (aarch64-apple-darwin)
- GitHub Check: Build (x86_64-linux-android)
- GitHub Check: Build (x86_64-pc-windows-msvc)
- GitHub Check: Build (armv7-linux-androideabi)
- GitHub Check: Build (aarch64-pc-windows-msvc)
- GitHub Check: Build (aarch64-apple-ios)
- GitHub Check: Build (aarch64-unknown-linux-gnu)
- GitHub Check: Build (aarch64-linux-android)
- GitHub Check: Build (x86_64-unknown-linux-gnu)
- GitHub Check: Build (x86_64-apple-darwin)
- GitHub Check: Build (aarch64-apple-ios-sim)
- GitHub Check: Test (x86_64-apple-darwin)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
- GitHub Check: Test (x86_64-pc-windows-msvc)
🔇 Additional comments (7)
livekit-datatrack/src/packet/deserialize.rs (3)
52-95: Header deserialization looks solid.The overflow fix for
ext_words(line 84) is properly applied. The base header field parsing, version check, marker decoding, and extension block slicing all look correct.
139-153: Clean fixed-size deserialization.Both
UserTimestampExt::deserializeandE2eeExt::deserializecorrectly parse from fixed-size byte arrays. The slice range&raw[1..13]inE2eeExtis safe givenLENis 13.
284-292: Good edge-case coverage for alignment enforcement.This test correctly validates that a short extension block (3 bytes instead of the required 4) triggers an error.
livekit-datatrack/src/remote/depacketizer.rs (4)
34-50: Clean dispatch logic, LGTM.The
pushmethod cleanly routes packets by marker type. The separation offrame_from_single,begin_partial, andpush_to_partialkeeps each path focused and readable.
136-164: Wrapping-safe reassembly looks correct.The wrapping arithmetic fixes from prior reviews are properly applied—
sequence != end_sequenceon line 149 andwrapping_sub/wrapping_addon line 160. The loop correctly handles both normal and wrapped sequence ranges.
167-241: Well-structured result and error types.The
DepacketizerPushResultdesign—allowing both a completed frame and a drop error from a single push—is a pragmatic choice well-documented in the doc comment. TheFromimpls andDisplayare clean.
386-415: Duplicate-sequence test is thorough, nice edge-case coverage.The test verifies that a duplicate sequence number replaces the earlier payload and that reassembly still succeeds. Good coverage of BTreeMap insert-overwrite semantics.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Line 309: The code incorrectly doubles frame_number by using "+=" with
wrapping_add; replace the faulty increment at packet.header.frame_number (the
line currently reading "packet.header.frame_number +=
packet.header.frame_number.wrapping_add(1);") with a single wrapping increment
so the value becomes packet.header.frame_number =
packet.header.frame_number.wrapping_add(1); — this preserves wrapping semantics
and avoids checked-add overflow panics in tests (affects the frame_number
handling in the depacketizer logic and Faker-generated values).
- Line 329: The test modifies packet.header.sequence with plain addition which
can overflow in debug builds when Faker generates values near u16::MAX; change
the increment to a wrapping operation (use wrapping_add(3)) on
packet.header.sequence to match other tests and avoid panic during debug builds.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
livekit-datatrack/src/remote/depacketizer.rs
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
📚 Learning: 2026-02-03T01:23:11.346Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
Applied to files:
livekit-datatrack/src/remote/depacketizer.rs
🧬 Code graph analysis (1)
livekit-datatrack/src/remote/depacketizer.rs (1)
livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (x86_64-unknown-linux-gnu)
- GitHub Check: Build (aarch64-unknown-linux-gnu)
- GitHub Check: Build (x86_64-pc-windows-msvc)
- GitHub Check: Build (aarch64-apple-ios-sim)
- GitHub Check: Build (armv7-linux-androideabi)
- GitHub Check: Build (x86_64-linux-android)
- GitHub Check: Build (aarch64-apple-ios)
- GitHub Check: Build (x86_64-apple-darwin)
- GitHub Check: Build (aarch64-linux-android)
- GitHub Check: Build (aarch64-pc-windows-msvc)
- GitHub Check: Build (aarch64-apple-darwin)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
- GitHub Check: Test (x86_64-apple-darwin)
- GitHub Check: Test (x86_64-pc-windows-msvc)
🔇 Additional comments (6)
livekit-datatrack/src/remote/depacketizer.rs (6)
1-50: Clean public API and dispatch logic — LGTM.The
Depacketizerstruct,DepacketizerFrame, andpushdispatch are well-structured. The marker-based routing is clear and correct.
52-92: Single-frame and begin-partial paths look correct.Both
frame_from_singleandbegin_partialproperly drain any existing partial frame asInterruptedbefore proceeding. Good.
94-134:push_to_partial— frame-number mismatch silently discards the incoming packet.When
packet.header.frame_number != partial.frame_numberon line 105, the old partial is dropped asInterrupted, but the incoming packet (which belongs to a different, unknown frame) is also silently lost with no error reported for it. This is arguably fine sinceDepacketizerPushResultcan only carry onedrop_error, but worth noting as a design trade-off — callers won't know about the lost packet from the new frame.
136-164: Wrapping arithmetic infinalizelooks correct.The
sequence != end_sequenceloop condition andwrapping_sub/wrapping_addin theIncompleteexpected count both handleu16wraparound properly.
167-241: Type definitions and Display/Error impls are clean.
PartialFrame,DepacketizerPushResult,DepacketizerDropError,DepacketizerDropReason, and theFromconversions are all well-structured and idiomatic.
417-423: Faker impl for Packet looks good.Generates realistic random packets for testing with variable payload sizes. Clean and useful test scaffolding.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| assert!(result.frame.is_none() && result.drop_error.is_none()); | ||
|
|
||
| let first_frame_number = packet.header.frame_number; | ||
| packet.header.frame_number += packet.header.frame_number.wrapping_add(1); // Next frame |
There was a problem hiding this comment.
Bug: += combined with wrapping_add corrupts frame_number.
Line 309 reads:
packet.header.frame_number += packet.header.frame_number.wrapping_add(1);This computes frame_number = frame_number + (frame_number + 1) = 2 * frame_number + 1, not the intended frame_number + 1. Worse, the += uses checked arithmetic in debug builds, so if Faker generates a frame_number ≥ 32768 (roughly half of u16::MAX), the addition overflows and the test panics — causing flaky test failures.
The test still passes when it doesn't panic because the result happens to differ from the original value, but the intent is clearly wrong.
🐛 Proposed fix
- packet.header.frame_number += packet.header.frame_number.wrapping_add(1); // Next frame
+ packet.header.frame_number = packet.header.frame_number.wrapping_add(1); // Next frame🤖 Prompt for AI Agents
In `@livekit-datatrack/src/remote/depacketizer.rs` at line 309, The code
incorrectly doubles frame_number by using "+=" with wrapping_add; replace the
faulty increment at packet.header.frame_number (the line currently reading
"packet.header.frame_number += packet.header.frame_number.wrapping_add(1);")
with a single wrapping increment so the value becomes packet.header.frame_number
= packet.header.frame_number.wrapping_add(1); — this preserves wrapping
semantics and avoids checked-add overflow panics in tests (affects the
frame_number handling in the depacketizer logic and Faker-generated values).
|
|
||
| depacketizer.push(packet.clone()); | ||
|
|
||
| packet.header.sequence += 3; |
There was a problem hiding this comment.
sequence += 3 can overflow in debug builds with random Faker values.
Same class of issue flagged in past reviews for other fields: if Faker generates a sequence near u16::MAX, += 3 panics in debug mode. Consider using wrapping_add(3) for consistency with the rest of the test file.
Proposed fix
- packet.header.sequence += 3;
+ packet.header.sequence = packet.header.sequence.wrapping_add(3);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| packet.header.sequence += 3; | |
| packet.header.sequence = packet.header.sequence.wrapping_add(3); |
🤖 Prompt for AI Agents
In `@livekit-datatrack/src/remote/depacketizer.rs` at line 329, The test modifies
packet.header.sequence with plain addition which can overflow in debug builds
when Faker generates values near u16::MAX; change the increment to a wrapping
operation (use wrapping_add(3)) on packet.header.sequence to match other tests
and avoid panic during debug builds.
This PR introduces livekit-datatrack, a new crate that encapsulates the core functionality and APIs for data tracks. The crate is intentionally designed with no dependencies on libwebrtc or the signaling client, making it fully unit-testable and enabling incremental adoption in Swift and other languages via the new livekit-uniffi interface.
Key modules:
Given the large scope of data tracks, integration with the Rust client (including E2E tests and examples) as well as FFI bindings have been split up into separate PRs:
Data tracks is also being developed in parallel for the web:
Summary by CodeRabbit