Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions livekit-ffi/protocol/participant.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package livekit.proto;
option csharp_namespace = "LiveKit.Proto";

import "handle.proto";
import "track.proto";

message ParticipantInfo {
required string sid = 1;
Expand All @@ -28,6 +29,7 @@ message ParticipantInfo {
required ParticipantKind kind = 6;
required DisconnectReason disconnect_reason = 7;
repeated ParticipantKindDetail kind_details = 8;
optional ParticipantPermission permission = 9;
}

message OwnedParticipant {
Expand Down Expand Up @@ -81,4 +83,24 @@ enum DisconnectReason {
SIP_TRUNK_FAILURE = 13;
CONNECTION_TIMEOUT = 14;
MEDIA_FAILURE = 15;
}

// copied from livekit-protocol/protocol/protobufs/livekit_models.proto and removed deprecated fields
message ParticipantPermission {
// allow participant to subscribe to other tracks in the room
required bool can_subscribe = 1;
// allow participant to publish new tracks to room
required bool can_publish = 2;
// allow participant to publish data
required bool can_publish_data = 3;
// sources that are allowed to be published
repeated TrackSource can_publish_sources = 9;
// indicates that it's hidden to others
required bool hidden = 7;
// indicates that participant can update own metadata and attributes
required bool can_update_metadata = 10;
// if a participant can subscribe to metrics
required bool can_subscribe_metrics = 12;

// NEXT_ID: 13
}
6 changes: 6 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ message RoomEvent {
// carry over all participant info updates, including sid
ParticipantsUpdated participants_updated = 38;
ParticipantEncryptionStatusChanged participant_encryption_status_changed = 39;
ParticipantPermissionChanged participant_permission_changed = 41;
TokenRefreshed token_refreshed = 40;
}
}
Expand Down Expand Up @@ -509,6 +510,11 @@ message ParticipantNameChanged {
required string name = 2;
}

message ParticipantPermissionChanged {
required string participant_identity = 1;
optional ParticipantPermission permission = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be optional? seems like it should always be there

}

message ConnectionQualityChanged {
required string participant_identity = 1;
required ConnectionQuality quality = 2;
Expand Down
19 changes: 19 additions & 0 deletions livekit-ffi/src/conversion/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use livekit::prelude::*;
use livekit::DisconnectReason;
use livekit::ParticipantKind;
use livekit::ParticipantKindDetail;
use livekit_protocol as livekit_proto;

impl From<&FfiParticipant> for proto::ParticipantInfo {
fn from(value: &FfiParticipant) -> Self {
Expand All @@ -26,6 +27,9 @@ impl From<&FfiParticipant> for proto::ParticipantInfo {

impl From<&Participant> for proto::ParticipantInfo {
fn from(participant: &Participant) -> Self {
// Convert permission if present
let permission = participant.permission().map(|p| (&p).into());

Self {
sid: participant.sid().into(),
name: participant.name(),
Expand All @@ -40,6 +44,7 @@ impl From<&Participant> for proto::ParticipantInfo {
.into_iter()
.map(|k| proto::ParticipantKindDetail::from(k).into())
.collect(),
permission,
}
}
}
Expand Down Expand Up @@ -92,3 +97,17 @@ impl From<DisconnectReason> for proto::DisconnectReason {
}
}
}

impl From<&livekit_proto::ParticipantPermission> for proto::ParticipantPermission {
fn from(perm: &livekit_proto::ParticipantPermission) -> Self {
proto::ParticipantPermission {
can_subscribe: perm.can_subscribe,
can_publish: perm.can_publish,
can_publish_data: perm.can_publish_data,
can_publish_sources: perm.can_publish_sources.clone(),
hidden: perm.hidden,
can_update_metadata: perm.can_update_metadata,
can_subscribe_metrics: perm.can_subscribe_metrics,
}
}
}
9 changes: 9 additions & 0 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,15 @@ async fn forward_event(
.into(),
);
}
RoomEvent::ParticipantPermissionChanged { participant, permission } => {
let _ = send_event(
proto::ParticipantPermissionChanged {
participant_identity: participant.identity().to_string(),
permission: permission.map(|p| (&p).into()),
}
.into(),
);
}
RoomEvent::ActiveSpeakersChanged { speakers } => {
let participant_identities =
speakers.iter().map(|p| p.identity().to_string()).collect::<Vec<_>>();
Expand Down
25 changes: 25 additions & 0 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ pub enum RoomEvent {
participant: Participant,
is_encrypted: bool,
},
ParticipantPermissionChanged {
participant: Participant,
permission: Option<proto::ParticipantPermission>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same concern here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same concerns from the beginning and have been struggling with this decision.

The root cause is that the protocol layer uses proto3 syntax, which makes the generated nested structs optional by default. Even though we know in practice it will never be empty (and we can get the semantic default value even if it's empty in the proto3 world), after careful consideration, I decided to make the permission field optional in the ParticipantInfo struct in the FFI layer to avoid potential breaking changes.

I completely agree that it should be required in the ParticipantPermissionChanged event. However, I initially thought it might be better to keep consistency with the ParticipantInfo struct. I'm happy to change it to required if you think that's the better approach.

BTW, I made a new commit to initialize the permission field(It should be discovered earlier if choose required instead, LOL)

},
ActiveSpeakersChanged {
speakers: Vec<Participant>,
},
Expand Down Expand Up @@ -505,6 +509,7 @@ impl Room {
pi.metadata,
pi.attributes,
e2ee_manager.encryption_type(),
pi.permission,
);

let dispatcher = Dispatcher::<RoomEvent>::default();
Expand Down Expand Up @@ -580,6 +585,14 @@ impl Room {
}
});

local_participant.on_permission_changed({
let dispatcher = dispatcher.clone();
move |participant, permission| {
let event = RoomEvent::ParticipantPermissionChanged { participant, permission };
dispatcher.dispatch(&event);
}
});

let (incoming_stream_manager, open_rx) = IncomingStreamManager::new();
let (outgoing_stream_manager, packet_rx) = OutgoingStreamManager::new();

Expand Down Expand Up @@ -649,6 +662,7 @@ impl Room {
pi.name,
pi.metadata,
pi.attributes,
pi.permission,
)
};
participant.update_info(pi.clone());
Expand Down Expand Up @@ -1011,6 +1025,7 @@ impl RoomSession {
pi.name,
pi.metadata,
pi.attributes,
pi.permission,
)
};

Expand Down Expand Up @@ -1616,6 +1631,7 @@ impl RoomSession {
name: String,
metadata: String,
attributes: HashMap<String, String>,
permission: Option<proto::ParticipantPermission>,
) -> RemoteParticipant {
let participant = RemoteParticipant::new(
self.rtc_engine.clone(),
Expand All @@ -1627,6 +1643,7 @@ impl RoomSession {
metadata,
attributes,
self.options.auto_subscribe,
permission,
);

participant.on_track_published({
Expand Down Expand Up @@ -1724,6 +1741,14 @@ impl RoomSession {
}
});

participant.on_permission_changed({
let dispatcher = self.dispatcher.clone();
move |participant, permission| {
let event = RoomEvent::ParticipantPermissionChanged { participant, permission };
dispatcher.dispatch(&event);
}
});

participant.on_encryption_status_changed({
let dispatcher = self.dispatcher.clone();
move |participant, is_encrypted| {
Expand Down
13 changes: 13 additions & 0 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl LocalParticipant {
metadata: String,
attributes: HashMap<String, String>,
encryption_type: EncryptionType,
permission: Option<proto::ParticipantPermission>,
) -> Self {
Self {
inner: super::new_inner(
Expand All @@ -127,6 +128,7 @@ impl LocalParticipant {
attributes,
kind,
kind_details,
permission,
),
local: Arc::new(LocalInfo {
events: LocalEvents::default(),
Expand Down Expand Up @@ -216,6 +218,13 @@ impl LocalParticipant {
super::on_attributes_changed(&self.inner, handler)
}

pub(crate) fn on_permission_changed(
&self,
handler: impl Fn(Participant, Option<proto::ParticipantPermission>) + Send + 'static,
) {
super::on_permission_changed(&self.inner, handler)
}

pub(crate) fn add_publication(&self, publication: TrackPublication) {
super::add_publication(&self.inner, &Participant::Local(self.clone()), publication);
}
Expand Down Expand Up @@ -755,6 +764,10 @@ impl LocalParticipant {
self.inner.info.read().disconnect_reason
}

pub fn permission(&self) -> Option<proto::ParticipantPermission> {
self.inner.info.read().permission.clone()
}

pub async fn perform_rpc(&self, data: PerformRpcData) -> Result<String, RpcError> {
// Maximum amount of time it should ever take for an RPC request to reach the destination, and the ACK to come back
// This is set to 7 seconds to account for various relay timeouts and retries in LiveKit Cloud that occur in rare cases
Expand Down
21 changes: 21 additions & 0 deletions livekit/src/room/participant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl Participant {
pub fn kind_details(self: &Self) -> Vec<ParticipantKindDetail>;
pub fn disconnect_reason(self: &Self) -> DisconnectReason;
pub fn is_encrypted(self: &Self) -> bool;
pub fn permission(self: &Self) -> Option<proto::ParticipantPermission>;

pub(crate) fn update_info(self: &Self, info: proto::ParticipantInfo) -> ();

Expand Down Expand Up @@ -128,6 +129,7 @@ struct ParticipantInfo {
pub kind: ParticipantKind,
pub kind_details: Vec<ParticipantKindDetail>,
pub disconnect_reason: DisconnectReason,
pub permission: Option<proto::ParticipantPermission>,
}

type TrackMutedHandler = Box<dyn Fn(Participant, TrackPublication) + Send>;
Expand All @@ -136,6 +138,8 @@ type MetadataChangedHandler = Box<dyn Fn(Participant, String, String) + Send>;
type AttributesChangedHandler = Box<dyn Fn(Participant, HashMap<String, String>) + Send>;
type NameChangedHandler = Box<dyn Fn(Participant, String, String) + Send>;
type EncryptionStatusChangedHandler = Box<dyn Fn(Participant, bool) + Send>;
type PermissionChangedHandler =
Box<dyn Fn(Participant, Option<proto::ParticipantPermission>) + Send>;

#[derive(Default)]
struct ParticipantEvents {
Expand All @@ -145,6 +149,7 @@ struct ParticipantEvents {
attributes_changed: Mutex<Option<AttributesChangedHandler>>,
name_changed: Mutex<Option<NameChangedHandler>>,
encryption_status_changed: Mutex<Option<EncryptionStatusChangedHandler>>,
permission_changed: Mutex<Option<PermissionChangedHandler>>,
}

pub(super) struct ParticipantInner {
Expand Down Expand Up @@ -172,6 +177,7 @@ pub(super) fn new_inner(
attributes: HashMap<String, String>,
kind: ParticipantKind,
kind_details: Vec<ParticipantKindDetail>,
permission: Option<proto::ParticipantPermission>,
) -> Arc<ParticipantInner> {
Arc::new(ParticipantInner {
rtc_engine,
Expand All @@ -187,6 +193,7 @@ pub(super) fn new_inner(
audio_level: 0.0,
connection_quality: ConnectionQuality::Excellent,
disconnect_reason: DisconnectReason::UnknownReason,
permission,
}),
track_publications: Default::default(),
events: Default::default(),
Expand Down Expand Up @@ -229,6 +236,13 @@ pub(super) fn update_info(
cb(participant.clone(), changed_attributes);
}
}

let old_permission = std::mem::replace(&mut info.permission, new_info.permission.clone());
if old_permission != new_info.permission {
if let Some(cb) = inner.events.permission_changed.lock().as_ref() {
cb(participant.clone(), new_info.permission.clone());
}
}
}

pub(super) fn set_speaking(
Expand Down Expand Up @@ -297,6 +311,13 @@ pub(super) fn on_encryption_status_changed(
*inner.events.encryption_status_changed.lock() = Some(Box::new(handler));
}

pub(super) fn on_permission_changed(
inner: &Arc<ParticipantInner>,
handler: impl Fn(Participant, Option<proto::ParticipantPermission>) + Send + 'static,
) {
*inner.events.permission_changed.lock() = Some(Box::new(handler));
}

pub(super) fn update_encryption_status(inner: &Arc<ParticipantInner>, participant: &Participant) {
use crate::e2ee::EncryptionType;

Expand Down
13 changes: 13 additions & 0 deletions livekit/src/room/participant/remote_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl RemoteParticipant {
metadata: String,
attributes: HashMap<String, String>,
auto_subscribe: bool,
permission: Option<proto::ParticipantPermission>,
) -> Self {
Self {
inner: super::new_inner(
Expand All @@ -95,6 +96,7 @@ impl RemoteParticipant {
attributes,
kind,
kind_details,
permission,
),
remote: Arc::new(RemoteInfo { events: Default::default(), auto_subscribe }),
}
Expand Down Expand Up @@ -310,6 +312,13 @@ impl RemoteParticipant {
super::on_attributes_changed(&self.inner, handler)
}

pub(crate) fn on_permission_changed(
&self,
handler: impl Fn(Participant, Option<proto::ParticipantPermission>) + Send + 'static,
) {
super::on_permission_changed(&self.inner, handler)
}

pub(crate) fn on_encryption_status_changed(
&self,
handler: impl Fn(Participant, bool) + Send + 'static,
Expand Down Expand Up @@ -548,6 +557,10 @@ impl RemoteParticipant {
self.inner.info.read().disconnect_reason
}

pub fn permission(&self) -> Option<proto::ParticipantPermission> {
self.inner.info.read().permission.clone()
}

pub fn is_encrypted(&self) -> bool {
*self.inner.is_encrypted.read()
}
Expand Down