Skip to content

Commit

Permalink
fix: Correct the behavior to defer closeSegmentIndex() calls during u…
Browse files Browse the repository at this point in the history
…pdates (#7217)

Resolves the issues reported by
#7213, which correctly
fixes #7156.

The latest comment
#7156 (comment)
goes further into detail on the problems of the initial PR.
  • Loading branch information
JulianDomingo authored and avelad committed Aug 29, 2024
1 parent e76f6b2 commit af80b87
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 31 deletions.
50 changes: 49 additions & 1 deletion lib/media/streaming_engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ shaka.media.StreamingEngine = class {
/** @private {?shaka.extern.StreamingConfiguration} */
this.config_ = null;

/**
* Retains a reference to the function used to close SegmentIndex objects
* for streams which were switched away from during an ongoing update_().
* @private {!Map.<string, !function()>}
*/
this.deferredCloseSegmentIndex_ = new Map();

/** @private {number} */
this.bufferingGoalScale_ = 1;

Expand Down Expand Up @@ -504,6 +511,24 @@ shaka.media.StreamingEngine = class {
}


/**
* Handles deferred releases of old SegmentIndexes for the mediaState's
* content type from a previous update.
* @param {!shaka.media.StreamingEngine.MediaState_} mediaState
* @private
*/
handleDeferredCloseSegmentIndexes_(mediaState) {
for (const [key, value] of this.deferredCloseSegmentIndex_.entries()) {
const streamId = /** @type {string} */ (key);
const closeSegmentIndex = /** @type {!function()} */ (value);
if (streamId.includes(mediaState.type)) {
closeSegmentIndex();
this.deferredCloseSegmentIndex_.delete(streamId);
}
}
}


/**
* Switches to the given Stream. |stream| may be from any Variant.
*
Expand Down Expand Up @@ -576,7 +601,23 @@ shaka.media.StreamingEngine = class {
// Do not close segment indexes we are prefetching.
if (!this.audioPrefetchMap_.has(mediaState.stream)) {
if (mediaState.stream.closeSegmentIndex) {
mediaState.stream.closeSegmentIndex();
if (mediaState.performingUpdate) {
const oldStreamTag =
shaka.media.StreamingEngine.logPrefix_(mediaState);
if (!this.deferredCloseSegmentIndex_.has(oldStreamTag)) {
// The ongoing update is still using the old stream's segment
// reference information.
// If we close the old stream now, the update will not complete
// correctly.
// The next onUpdate_() for this content type will resume the
// closeSegmentIndex() operation for the old stream once the ongoing
// update has finished, then immediately create a new segment index.
this.deferredCloseSegmentIndex_.set(
oldStreamTag, mediaState.stream.closeSegmentIndex);
}
} else {
mediaState.stream.closeSegmentIndex();
}
}
}

Expand Down Expand Up @@ -1169,6 +1210,13 @@ shaka.media.StreamingEngine = class {
return;
}

// If stream switches happened during the previous update_() for this
// content type, close out the old streams that were switched away from.
// Even if we had switched away from the active stream 'A' during the
// update_(), e.g. (A -> B -> A), closing 'A' is permissible here since we
// will immediately re-create it in the logic below.
this.handleDeferredCloseSegmentIndexes_(mediaState);

// Make sure the segment index exists. If not, create the segment index.
if (!mediaState.stream.segmentIndex) {
const thisStream = mediaState.stream;
Expand Down
166 changes: 142 additions & 24 deletions test/media/streaming_engine_unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,109 @@ describe('StreamingEngine', () => {
expect(mediaSourceEngine.resetCaptionParser).not.toHaveBeenCalled();
});

it('defers old stream cleanup on switchVariant during update', async () => {
// Delay the appendBuffer call until later so we are waiting for this to
// finish when we switch.
let p = new shaka.util.PublicPromise();
const old = mediaSourceEngine.appendBuffer;
// Replace the whole spy since we want to call the original.
mediaSourceEngine.appendBuffer =
jasmine.createSpy('appendBuffer')
.and.callFake(async (type, data, reference) => {
await p;
return Util.invokeSpy(old, type, data, reference);
});

// Starts with 'initialVariant' (video-11-%d/audio-10-%d).
await streamingEngine.start();
playing = true;

await Util.fakeEventLoop(1);

// Grab a reference to initialVariant's segmentIndex before the switch so
// we can test how its internal fields change overtime.
const initialVariantSegmentIndex = initialVariant.video.segmentIndex;

// Switch to 'differentVariant' (video-14-%d/audio-15-%d) in the middle of
// the update.
streamingEngine.switchVariant(differentVariant, /* clearBuffer= */ true);

// Finish the update for 'initialVariant'.
p.resolve();
// Create a new promise to delay the appendBuffer for 'differentVariant'.
p = new shaka.util.PublicPromise();
await Util.fakeEventLoop(1);

const segmentType = shaka.net.NetworkingEngine.RequestType.SEGMENT;
const segmentContext = {
type: shaka.net.NetworkingEngine.AdvancedRequestType.MEDIA_SEGMENT,
};

// Since a switch occurred in the middle of a fetch for a 'initialVariant'
// segment, the closing of the segment index for 'initialVariant' was
// deferred.
// We check the length of the segment references array to determine
// whether it was closed or not.
expect(initialVariantSegmentIndex.references.length).toBeGreaterThan(0);
netEngine.expectRequest('video-11-0.mp4', segmentType, segmentContext);
netEngine.expectRequest('audio-10-0.mp4', segmentType, segmentContext);
netEngine.expectNoRequest('video-14-0.mp4', segmentType, segmentContext);
netEngine.expectNoRequest('audio-15-0.mp4', segmentType, segmentContext);

// Finish the update for 'differentVariant'. At this point, the
// segmentIndex for 'initialVariant' has been closed.
p.resolve();
await Util.fakeEventLoop(2);
expect(initialVariantSegmentIndex.references.length).toBe(0);
});

it('defers old stream cleanup on fast switch during update', async () => {
setupVod();

// Delay the appendBuffer call until later so we are waiting for this to
// finish when we switch.
const p = new shaka.util.PublicPromise();
const old = mediaSourceEngine.appendBuffer;
mediaSourceEngine.appendBuffer =
jasmine.createSpy('appendBuffer')
.and.callFake(async (type, data, reference) => {
await p;
return Util.invokeSpy(old, type, data, reference);
});

streamingEngine.switchVariant(variant, /* clearBuffer= */ true);
await streamingEngine.start();
playing = true;

expect(variant.video.createSegmentIndex).not.toHaveBeenCalled();
await Util.fakeEventLoop(1);
expect(variant.video.createSegmentIndex).toHaveBeenCalledTimes(1);

// Switch from variant A -> B -> A ("fast switch") multiple times.
for (let i = 0; i < 5; i++) {
streamingEngine.switchVariant(
alternateVariant, /* clearBuffer= */ true);
streamingEngine.switchVariant(variant, /* clearBuffer= */ true);
}
// Can resolve now to ensure all the switches happened during the update.
p.resolve();

// Give enough time for the next scheduled update to execute with the
// currently active variant ('variant').
await runTest();

// During the next scheduled update for 'variant', we close all streams
// that were switched away from, regardless of whether it is the active
// stream.
expect(variant.video.closeSegmentIndex).toHaveBeenCalledTimes(1);
expect(alternateVariant.video.closeSegmentIndex).toHaveBeenCalledTimes(1);
// However, we close all the deferred streams right before the check to
// create a new segmentIndex for the currently active stream.
expect(variant.video.createSegmentIndex).toHaveBeenCalledTimes(2);
expect(variant.video.segmentIndex).not.toBe(null);
expect(alternateVariant.video.segmentIndex).toBe(null);
});

// See https://github.com/shaka-project/shaka-player/issues/2956
it('works with fast variant switches during update', async () => {
// Delay the appendBuffer call until later so we are waiting for this to
Expand Down Expand Up @@ -1188,7 +1291,6 @@ describe('StreamingEngine', () => {
netEngine.expectRequest('text-20-0.mp4', segmentType, segmentContext);
netEngine.expectNoRequest('text-20-init', segmentType, segmentContext);
netEngine.expectNoRequest('text-21-init', segmentType, segmentContext);
// TODO: huh?
});
});

Expand Down Expand Up @@ -2213,9 +2315,11 @@ describe('StreamingEngine', () => {
});

onError.and.callFake((error) => {
expect(error.severity).toBe(shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.BAD_HTTP_STATUS);
if (error instanceof shaka.util.Error) {
expect(error.severity).toBe(shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.BAD_HTTP_STATUS);
}
});

disableStream.and.callFake((stream, time) => {
Expand All @@ -2238,6 +2342,7 @@ describe('StreamingEngine', () => {

await runTest();
expect(disableStream).toHaveBeenCalledTimes(1);
expect(onError).toHaveBeenCalled();
});

it('does not temporarily disables stream if not configured to',
Expand Down Expand Up @@ -2289,9 +2394,12 @@ describe('StreamingEngine', () => {
});

onError.and.callFake((error) => {
expect(error.severity).toBe(shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.SEGMENT_MISSING);
if (error instanceof shaka.util.Error) {
expect(error.severity).toBe(
shaka.util.Error.Severity.RECOVERABLE);
expect(error.category).toBe(shaka.util.Error.Category.NETWORK);
expect(error.code).toBe(shaka.util.Error.Code.SEGMENT_MISSING);
}
});

disableStream.and.callFake((stream, time) => {
Expand All @@ -2314,6 +2422,7 @@ describe('StreamingEngine', () => {

await runTest();
expect(disableStream).toHaveBeenCalledTimes(1);
expect(onError).toHaveBeenCalled();
});

it('throws recoverable error if try to disable stream succeeded',
Expand Down Expand Up @@ -4390,34 +4499,43 @@ describe('StreamingEngine', () => {
* @param {shaka.extern.Stream} alternateStream
*/
function createAlternateSegmentIndex(baseStream, alternateStream) {
const closeSegmentIndexSpy = Util.funcSpy(
/** @type {!function()} */ (alternateStream.closeSegmentIndex));
const createSegmentIndexSpy =
Util.funcSpy(alternateStream.createSegmentIndex);
const altSegmentIndex = new shaka.test.FakeSegmentIndex();

altSegmentIndex.find.and.callFake(
(time) => baseStream.segmentIndex.find(time));
createSegmentIndexSpy.and.callFake(() => {
const altSegmentIndex = new shaka.test.FakeSegmentIndex();

altSegmentIndex.getNumReferences.and.callFake(
() => baseStream.segmentIndex.getNumReferences());
altSegmentIndex.find.and.callFake(
(time) => baseStream.segmentIndex.find(time));

altSegmentIndex.get.and.callFake((pos) => {
const ref = baseStream.segmentIndex.get(pos);
altSegmentIndex.getNumReferences.and.callFake(
() => baseStream.segmentIndex.getNumReferences());

if (ref) {
const altInitUri = ref.initSegmentReference.getUris()[0] + '_alt';
const altSegmentUri = ref.getUris()[0] + '_alt';
altSegmentIndex.get.and.callFake((pos) => {
const ref = baseStream.segmentIndex.get(pos);

ref.initSegmentReference.getUris = () => [altInitUri];
ref.getUris = () => [altSegmentUri];
return ref;
}
if (ref) {
const altInitUri = ref.initSegmentReference.getUris()[0] + '_alt';
const altSegmentUri = ref.getUris()[0] + '_alt';

return null;
});
ref.initSegmentReference.getUris = () => [altInitUri];
ref.getUris = () => [altSegmentUri];
return ref;
}

createSegmentIndexSpy.and.callFake(() => {
return null;
});
alternateStream.segmentIndex = altSegmentIndex;
return Promise.resolve();
});
closeSegmentIndexSpy.and.callFake(() => {
if (alternateStream.segmentIndex) {
alternateStream.segmentIndex.release();
}
alternateStream.segmentIndex = null;
return Promise.resolve();
});
}
});
22 changes: 17 additions & 5 deletions test/test/util/manifest_generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -528,22 +528,34 @@ shaka.test.ManifestGenerator.Stream = class {
}

if (!isPartial) {
const shaka_ = manifest ? manifest.shaka_ : shaka;

/** @type {shaka.media.SegmentIndex} */
this.segmentIndex = shaka_.media.SegmentIndex.forSingleSegment(
/* startTime= */ 0, /* duration= */ 10, ['testUri']);

const create =
jasmine.createSpy('createSegmentIndex').and.callFake(() => {
this.segmentIndex = shaka_.media.SegmentIndex.forSingleSegment(
/* startTime= */ 0, /* duration= */ 10, ['testUri']);
return Promise.resolve();
});
const shaka_ = manifest ? manifest.shaka_ : shaka;
const segmentIndex = shaka_.media.SegmentIndex.forSingleSegment(
/* startTime= */ 0, /* duration= */ 10, ['testUri']);
const close = jasmine.createSpy('closeSegmentIndex').and.callFake(() => {
if (this.segmentIndex) {
this.segmentIndex.release();
}
this.segmentIndex = null;
return Promise.resolve();
});

/** @type {?string} */
this.originalId = null;
/** @type {?string} */
this.groupId = null;
/** @type {shaka.extern.CreateSegmentIndexFunction} */
this.createSegmentIndex = shaka.test.Util.spyFunc(create);
/** @type {shaka.media.SegmentIndex} */
this.segmentIndex = segmentIndex;
/** @type {!function()|undefined} */
this.closeSegmentIndex = shaka.test.Util.spyFunc(close);
/** @type {string} */
this.mimeType = defaultMimeType;
/** @type {string} */
Expand Down
Loading

0 comments on commit af80b87

Please sign in to comment.