Skip to content

Commit

Permalink
Send permissions update on subscribe. (livekit#805)
Browse files Browse the repository at this point in the history
* Send permissions update on subscribe.

The permission `allowed` update was happening only when
processing pending subscriptions (which happens only on
subscription permissions update).

It is possible that subscription happens through other
paths (like subscribing new participant to tracks).
In that path, we were checking if the track has permissions
and adding to pending. But, we were not checking if
the track is in pending and if it is in there, removing
in on successful subscription and sending an update.
Fix that.

* log more fields in error
  • Loading branch information
boks1971 authored Jul 3, 2022
1 parent 5704a7b commit 7c35184
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions pkg/rtc/uptrackmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func (u *UpTrackManager) AddSubscriber(sub types.LocalParticipant, params types.
return n, err
}
n += 1

u.lock.Lock()
u.maybeRemovePendingSubscription(trackID, sub, true)
u.lock.Unlock()
}
return n, nil
}
Expand All @@ -155,7 +159,7 @@ func (u *UpTrackManager) RemoveSubscriber(sub types.LocalParticipant, trackID li
}

u.lock.Lock()
u.maybeRemovePendingSubscription(trackID, sub)
u.maybeRemovePendingSubscription(trackID, sub, false)
u.lock.Unlock()
}

Expand Down Expand Up @@ -429,13 +433,16 @@ func (u *UpTrackManager) maybeAddPendingSubscription(trackID livekit.TrackID, su
})
}

func (u *UpTrackManager) maybeRemovePendingSubscription(trackID livekit.TrackID, sub types.LocalParticipant) {
func (u *UpTrackManager) maybeRemovePendingSubscription(trackID livekit.TrackID, sub types.LocalParticipant, sendUpdate bool) {
subscriberIdentity := sub.Identity()

found := false

pending := u.pendingSubscriptions[trackID]
n := len(pending)
for idx, identity := range pending {
if identity == subscriberIdentity {
found = true
u.pendingSubscriptions[trackID][idx] = u.pendingSubscriptions[trackID][n-1]
u.pendingSubscriptions[trackID] = u.pendingSubscriptions[trackID][:n-1]
break
Expand All @@ -444,6 +451,13 @@ func (u *UpTrackManager) maybeRemovePendingSubscription(trackID livekit.TrackID,
if len(u.pendingSubscriptions[trackID]) == 0 {
delete(u.pendingSubscriptions, trackID)
}

if found && sendUpdate {
u.params.Logger.Debugw("removing pending subscription", "subscriberID", sub.ID(), "trackID", trackID)
u.opsQueue.Enqueue(func() {
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)
})
}
}

// creates subscriptions for tracks if permissions have been granted
Expand Down Expand Up @@ -473,13 +487,13 @@ func (u *UpTrackManager) processPendingSubscriptions(resolver func(participantId
}

if err := track.AddSubscriber(sub); err != nil {
u.params.Logger.Errorw("error reinstating pending subscription", err)
u.params.Logger.Errorw("error reinstating subscription", err, "subscirberID", sub.ID(), "trackID", trackID)
// keep it in pending on error in case the error is transient
updatedPending = append(updatedPending, identity)
continue
}

u.params.Logger.Debugw("reinstating pending subscription", "subscriberID", sub.ID(), "trackID", trackID)
u.params.Logger.Debugw("reinstating subscription", "subscriberID", sub.ID(), "trackID", trackID)
u.opsQueue.Enqueue(func() {
sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)
})
Expand Down

0 comments on commit 7c35184

Please sign in to comment.