Skip to content

Commit

Permalink
Merge branch 'feat/demux-livemix' into v5-release-candidate
Browse files Browse the repository at this point in the history
  • Loading branch information
Nfrederiksen committed Oct 2, 2024
2 parents f749ed4 + 013d10c commit 01f1987
Show file tree
Hide file tree
Showing 12 changed files with 2,677 additions and 866 deletions.
2 changes: 1 addition & 1 deletion engine/playhead_state.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class PlayheadStateStore extends SharedStateStore {
constructor(opts) {
super("playhead", opts, {
state: PlayheadState.IDLE,
tickInterval: 3,
tickInterval: (opts.averageSegmentDuration/1000) || 3,
mediaSeq: 0,
vodMediaSeqVideo: 0,
vodMediaSeqAudio: 0,
Expand Down
132 changes: 77 additions & 55 deletions engine/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ export class ChannelEngine {
cacheTTL: options.sharedStoreCacheTTL,
volatileKeyTTL: options.volatileKeyTTL,
}),
playheadStateStore: new PlayheadStateStore({
playheadStateStore: new PlayheadStateStore({
averageSegmentDuration: options.averageSegmentDuration,
redisUrl: options.redisUrl,
memcachedUrl: options.memcachedUrl,
cacheTTL: options.sharedStoreCacheTTL,
Expand Down Expand Up @@ -452,7 +453,7 @@ export class ChannelEngine {
try {
await Promise.all(channels.map(channel => getSwitchStatusAndPerformSwitch(channel)));
} catch (err) {
debug('Problem occured when updating streamSwitchers');
debug('Problem occurred when updating streamSwitchers');
throw new Error (err);
}

Expand Down Expand Up @@ -497,13 +498,15 @@ export class ChannelEngine {
useDemuxedAudio: options.useDemuxedAudio,
dummySubtitleEndpoint: this.dummySubtitleEndpoint,
subtitleSliceEndpoint: this.subtitleSliceEndpoint,
useVTTSubtitles: this.useVTTSubtitles,
useVTTSubtitles: false,
cloudWatchMetrics: this.logCloudWatchMetrics,
profile: channel.profile,
audioTracks: channel.audioTracks
}, this.sessionLiveStore);

sessionSwitchers[channel.id] = new StreamSwitcher({
sessionId: channel.id,
useDemuxedAudio: options.useDemuxedAudio,
streamSwitchManager: this.streamSwitchManager ? this.streamSwitchManager : null
});

Expand Down Expand Up @@ -772,17 +775,80 @@ export class ChannelEngine {
next(this._gracefulErrorHandler("Could not find a valid session"));
}
}

async _handleMediaManifest(req, res, next) {
debug(`x-playback-session-id=${req.headers["x-playback-session-id"]} req.url=${req.url}`);
debug(req.params);
const session = sessions[req.params[1]];
const sessionLive = sessionsLive[req.params[1]];
if (session && sessionLive) {
try {
let ts1 = Date.now();
let body = null;
if (!this.streamSwitchManager) {
debug(`[${req.params[1]}]: Responding with VOD2Live manifest`);
body = await session.getCurrentMediaManifestAsync(req.params[0], req.headers["x-playback-session-id"]);
} else {
while (switcherStatus[req.params[1]] === null || switcherStatus[req.params[1]] === undefined) {
debug(`[${req.params[1]}]: (${switcherStatus[req.params[1]]}) Waiting for streamSwitcher to respond`);
await timer(500);
}
debug(`switcherStatus[${req.params[1]}]=[${switcherStatus[req.params[1]]}]`);
if (switcherStatus[req.params[1]]) {
debug(`[${req.params[1]}]: Responding with Live-stream manifest`);
body = await sessionLive.getCurrentMediaManifestAsync(req.params[0]);
} else {
debug(`[${req.params[1]}]: Responding with VOD2Live manifest`);
body = await session.getCurrentMediaManifestAsync(req.params[0], req.headers["x-playback-session-id"]);
}
debug(`[${req.params[1]}]: Media Manifest Request Took (${Date.now() - ts1})ms`);
}
//verbose(`[${session.sessionId}] body=`);
//verbose(body);
res.sendRaw(200, Buffer.from(body, 'utf8'), {
"Content-Type": "application/vnd.apple.mpegurl",
"Access-Control-Allow-Origin": "*",
"Cache-Control": `max-age=${this.streamerOpts.cacheTTL || '4'}`,
"X-Instance-Id": this.instanceId + `<${version}>`,
});
next();
} catch (err) {
next(this._gracefulErrorHandler(err));
}
} else {
const err = new errs.NotFoundError('Invalid session(s)');
next(err);
}
}

async _handleAudioManifest(req, res, next) {
debug(`req.url=${req.url}`);
debug(`x-playback-session-id=${req.headers["x-playback-session-id"]} req.url=${req.url}`);
debug(req.params);
const session = sessions[req.params[2]];
if (session) {
const sessionLive = sessionsLive[req.params[2]];
if (session && sessionLive) {
try {
const body = await session.getCurrentAudioManifestAsync(
req.params[0],
req.params[1],
req.headers["x-playback-session-id"]
);
let body = null;
let ts1: number = Date.now();
if (!this.streamSwitchManager) {
debug(`[${req.params[2]}]: Responding with VOD2Live manifest`);
body = await session.getCurrentAudioManifestAsync(req.params[0], req.params[1], req.headers["x-playback-session-id"]);
} else {
while (switcherStatus[req.params[2]] === null || switcherStatus[req.params[2]] === undefined) {
debug(`[${req.params[2]}]: (${switcherStatus[req.params[1]]}) Waiting for streamSwitcher to respond`);
await timer(500);
}
debug(`switcherStatus[${req.params[1]}]=[${switcherStatus[req.params[2]]}]`);
if (switcherStatus[req.params[2]]) {
debug(`[${req.params[2]}]: Responding with Live-stream manifest`);
body = await sessionLive.getCurrentAudioManifestAsync(req.params[0], req.params[1]);
} else {
debug(`[${req.params[2]}]: Responding with VOD2Live manifest`);
body = await session.getCurrentAudioManifestAsync(req.params[0], req.params[1], req.headers["x-playback-session-id"]);
}
debug(`[${req.params[2]}]: Audio Manifest Request Took (${Date.now() - ts1})ms`);
}

res.sendRaw(200, Buffer.from(body, 'utf8'), {
"Content-Type": "application/vnd.apple.mpegurl",
"Access-Control-Allow-Origin": "*",
Expand All @@ -794,7 +860,7 @@ export class ChannelEngine {
next(this._gracefulErrorHandler(err));
}
} else {
const err = new errs.NotFoundError('Invalid session');
const err = new errs.NotFoundError('Invalid session(s)');
next(err);
}
}
Expand Down Expand Up @@ -858,50 +924,6 @@ export class ChannelEngine {
}
}

async _handleMediaManifest(req, res, next) {
debug(`x-playback-session-id=${req.headers["x-playback-session-id"]} req.url=${req.url}`);
debug(req.params);
const session = sessions[req.params[1]];
const sessionLive = sessionsLive[req.params[1]];
if (session && sessionLive) {
try {
let body = null;
if (!this.streamSwitchManager) {
debug(`[${req.params[1]}]: Responding with VOD2Live manifest`);
body = await session.getCurrentMediaManifestAsync(req.params[0], req.headers["x-playback-session-id"]);
} else {
while (switcherStatus[req.params[1]] === null || switcherStatus[req.params[1]] === undefined) {
debug(`[${req.params[1]}]: (${switcherStatus[req.params[1]]}) Waiting for streamSwitcher to respond`);
await timer(500);
}
debug(`switcherStatus[${req.params[1]}]=[${switcherStatus[req.params[1]]}]`);
if (switcherStatus[req.params[1]]) {
debug(`[${req.params[1]}]: Responding with Live-stream manifest`);
body = await sessionLive.getCurrentMediaManifestAsync(req.params[0]);
} else {
debug(`[${req.params[1]}]: Responding with VOD2Live manifest`);
body = await session.getCurrentMediaManifestAsync(req.params[0], req.headers["x-playback-session-id"]);
}
}

//verbose(`[${session.sessionId}] body=`);
//verbose(body);
res.sendRaw(200, Buffer.from(body, 'utf8'), {
"Content-Type": "application/vnd.apple.mpegurl",
"Access-Control-Allow-Origin": "*",
"Cache-Control": `max-age=${this.streamerOpts.cacheTTL || '4'}`,
"X-Instance-Id": this.instanceId + `<${version}>`,
});
next();
} catch (err) {
next(this._gracefulErrorHandler(err));
}
} else {
const err = new errs.NotFoundError('Invalid session(s)');
next(err);
}
}

_handleEventStream(req, res, next) {
debug(`req.url=${req.url}`);
const eventStream = eventStreams[req.params.sessionId];
Expand Down
Loading

0 comments on commit 01f1987

Please sign in to comment.