Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit TrackUnpublished before TrackPublished within the same update #541

Merged
merged 7 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/curvy-years-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Emit TrackUnpublished before TrackPublished within the same update
12 changes: 11 additions & 1 deletion src/room/PCTransport.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import EventEmitter from 'events';
import { MediaDescription, parse, write } from 'sdp-transform';
import { debounce } from 'ts-debounce';
import log from '../logger';
Expand All @@ -10,8 +11,13 @@ interface TrackBitrateInfo {
maxbr: number;
}

export const PCEvents = {
NegotiationStarted: 'negotiationStarted',
NegotiationComplete: 'negotiationComplete',
} as const;

/** @internal */
export default class PCTransport {
export default class PCTransport extends EventEmitter {
pc: RTCPeerConnection;

pendingCandidates: RTCIceCandidateInit[] = [];
Expand All @@ -27,6 +33,7 @@ export default class PCTransport {
onOffer?: (offer: RTCSessionDescriptionInit) => void;

constructor(config?: RTCConfiguration) {
super();
this.pc = new RTCPeerConnection(config);
}

Expand Down Expand Up @@ -56,11 +63,14 @@ export default class PCTransport {
if (this.renegotiate) {
this.renegotiate = false;
this.createAndSendOffer();
} else if (sd.type === 'answer') {
this.emit(PCEvents.NegotiationComplete);
}
}

// debounced negotiate interface
negotiate = debounce((onError?: (e: Error) => void) => {
this.emit(PCEvents.NegotiationStarted);
try {
this.createAndSendOffer();
} catch (e) {
Expand Down
40 changes: 29 additions & 11 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
UnexpectedConnectionState,
} from './errors';
import { EngineEvent } from './events';
import PCTransport from './PCTransport';
import PCTransport, { PCEvents } from './PCTransport';
import type { ReconnectContext, ReconnectPolicy } from './ReconnectPolicy';
import type LocalTrack from './track/LocalTrack';
import type LocalVideoTrack from './track/LocalVideoTrack';
Expand Down Expand Up @@ -949,18 +949,36 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}

/** @internal */
negotiate() {
if (!this.publisher) {
return;
}
negotiate(): Promise<void> {
// observe signal state
return new Promise<void>((resolve, reject) => {
if (!this.publisher) {
reject(new NegotiationError('publisher is not defined'));
return;
}

this.hasPublished = true;
this.hasPublished = true;

this.publisher.negotiate((e) => {
if (e instanceof NegotiationError) {
this.fullReconnectOnNext = true;
}
this.handleDisconnect('negotiation');
const negotiationTimeout = setTimeout(() => {
reject('negotiation timed out');
this.handleDisconnect('negotiation');
}, this.peerConnectionTimeout);

this.publisher.once(PCEvents.NegotiationStarted, () => {
this.publisher?.once(PCEvents.NegotiationComplete, () => {
clearTimeout(negotiationTimeout);
resolve();
});
});

this.publisher.negotiate((e) => {
clearTimeout(negotiationTimeout);
reject(e);
if (e instanceof NegotiationError) {
this.fullReconnectOnNext = true;
}
this.handleDisconnect('negotiation');
});
});
}

Expand Down
40 changes: 29 additions & 11 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ export default class LocalParticipant extends Participant {
} else if (track && track.track) {
// screenshare cannot be muted, unpublish instead
if (source === Track.Source.ScreenShare) {
track = this.unpublishTrack(track.track);
track = await this.unpublishTrack(track.track);
const screenAudioTrack = this.getTrack(Track.Source.ScreenShareAudio);
if (screenAudioTrack && screenAudioTrack.track) {
this.unpublishTrack(screenAudioTrack.track);
Expand Down Expand Up @@ -694,10 +694,10 @@ export default class LocalParticipant extends Participant {
log.debug(`published ${videoCodec} for track ${track.sid}`, { encodings, trackInfo: ti });
}

unpublishTrack(
async unpublishTrack(
track: LocalTrack | MediaStreamTrack,
stopOnUnpublish?: boolean,
): LocalTrackPublication | undefined {
): Promise<LocalTrackPublication | undefined> {
// look through all published tracks to find the right ones
const publication = this.getPublicationForTrack(track);

Expand Down Expand Up @@ -744,7 +744,7 @@ export default class LocalParticipant extends Participant {
} catch (e) {
log.warn('failed to unpublish track', { error: e, method: 'unpublishTrack' });
} finally {
this.engine.negotiate();
await this.engine.negotiate();
}
}

Expand All @@ -769,15 +769,33 @@ export default class LocalParticipant extends Participant {
return publication;
}

unpublishTracks(tracks: LocalTrack[] | MediaStreamTrack[]): LocalTrackPublication[] {
const publications: LocalTrackPublication[] = [];
tracks.forEach((track: LocalTrack | MediaStreamTrack) => {
const pub = this.unpublishTrack(track);
if (pub) {
publications.push(pub);
async unpublishTracks(
tracks: LocalTrack[] | MediaStreamTrack[],
): Promise<LocalTrackPublication[]> {
const results = await Promise.all(tracks.map((track) => this.unpublishTrack(track)));
return results.filter(
(track) => track instanceof LocalTrackPublication,
) as LocalTrackPublication[];
}

async republishAllTracks(options?: TrackPublishOptions) {
const localPubs: LocalTrackPublication[] = [];
this.tracks.forEach((pub) => {
if (pub.track) {
if (options) {
pub.options = { ...pub.options, ...options };
}
localPubs.push(pub);
}
});
return publications;

await Promise.all(
localPubs.map(async (pub) => {
const track = pub.track!;
await this.unpublishTrack(track, false);
await this.publishTrack(track, pub.options);
}),
);
}

/**
Expand Down
10 changes: 5 additions & 5 deletions src/room/participant/RemoteParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,6 @@ export default class RemoteParticipant extends Participant {
validTracks.set(ti.sid, publication);
});

// always emit events for new publications, Room will not forward them unless it's ready
newTracks.forEach((publication) => {
this.emit(ParticipantEvent.TrackPublished, publication);
});

// detect removed tracks
this.tracks.forEach((publication) => {
if (!validTracks.has(publication.trackSid)) {
Expand All @@ -278,6 +273,11 @@ export default class RemoteParticipant extends Participant {
this.unpublishTrack(publication.trackSid, true);
}
});

// always emit events for new publications, Room will not forward them unless it's ready
newTracks.forEach((publication) => {
this.emit(ParticipantEvent.TrackPublished, publication);
});
}

/** @internal */
Expand Down