Skip to content

Commit

Permalink
Use SimpleQueue in WritableStream implementation
Browse files Browse the repository at this point in the history
Split out Queue into a separate file from ReadableStream and rename is
SimpleQueue to reduce the risk of name collisions on the binding object.

Add a peek() method as it is needed by WritableStream.

Modify WritableStream to use SimpleQueue.

BUG=681493

Review-Url: https://codereview.chromium.org/2752133003
Cr-Commit-Position: refs/heads/master@{#458324}
  • Loading branch information
ricea authored and Commit bot committed Mar 21, 2017
1 parent f2c7dc5 commit e123782
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 92 deletions.
1 change: 1 addition & 0 deletions .gn
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ default_args = {
# Dependencies used by the extra libraries. Putting them here causes them
# to be executed first during snapshot creation.
"//third_party/WebKit/Source/core/streams/CommonStrings.js",
"//third_party/WebKit/Source/core/streams/SimpleQueue.js",

# Extra libraries.
"//third_party/WebKit/Source/core/streams/ByteLengthQueuingStrategy.js",
Expand Down
82 changes: 6 additions & 76 deletions third_party/WebKit/Source/core/streams/ReadableStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@

this[_underlyingSource] = underlyingSource;

this[_queue] = new Queue();
this[_queue] = new binding.SimpleQueue();
this[_totalQueuedSize] = 0;

this[_readableStreamDefaultControllerBits] = 0b0;
Expand Down Expand Up @@ -501,7 +501,7 @@
}

function ReadableStreamDefaultControllerCancel(controller, reason) {
controller[_queue] = new Queue();
controller[_queue] = new binding.SimpleQueue();

const underlyingSource = controller[_underlyingSource];
return PromiseCallOrNoop(underlyingSource, 'cancel', reason, 'underlyingSource.cancel');
Expand Down Expand Up @@ -545,7 +545,7 @@

ReadableStreamReaderGenericInitialize(this, stream);

this[_readRequests] = new Queue();
this[_readRequests] = new binding.SimpleQueue();
}

get closed() {
Expand Down Expand Up @@ -688,7 +688,7 @@
}

function ReadableStreamDefaultControllerError(controller, e) {
controller[_queue] = new Queue();
controller[_queue] = new binding.SimpleQueue();
const stream = controller[_controlledReadableStream];
ReadableStreamError(stream, e);
}
Expand All @@ -704,7 +704,7 @@

if (IsReadableStreamDefaultReader(reader) === true) {
reader[_readRequests].forEach(request => v8.rejectPromise(request, e));
reader[_readRequests] = new Queue();
reader[_readRequests] = new binding.SimpleQueue();
}

v8.rejectPromise(reader[_closedPromise], e);
Expand All @@ -722,7 +722,7 @@
if (IsReadableStreamDefaultReader(reader) === true) {
reader[_readRequests].forEach(request =>
v8.resolvePromise(request, CreateIterResultObject(undefined, true)));
reader[_readRequests] = new Queue();
reader[_readRequests] = new binding.SimpleQueue();
}

v8.resolvePromise(reader[_closedPromise], undefined);
Expand Down Expand Up @@ -991,78 +991,8 @@

//
// Queue-with-sizes
// Modified from taking the queue (as in the spec) to taking the stream, so we
// can modify the queue size alongside.
//

// Simple queue structure. Avoids scalability issues with using
// InternalPackedArray directly by using multiple arrays
// in a linked list and keeping the array size bounded.
const QUEUE_MAX_ARRAY_SIZE = 16384;
class Queue {
constructor() {
this.front = {
elements: new v8.InternalPackedArray(),
next: undefined,
};
this.back = this.front;
// The cursor is used to avoid calling InternalPackedArray.shift().
this.cursor = 0;
this.size = 0;
}

get length() {
return this.size;
}

push(element) {
++this.size;
if (this.back.elements.length === QUEUE_MAX_ARRAY_SIZE) {
const oldBack = this.back;
this.back = {
elements: new v8.InternalPackedArray(),
next: undefined,
};
oldBack.next = this.back;
}
this.back.elements.push(element);
}

shift() {
// assert(this.size > 0);
--this.size;
if (this.front.elements.length === this.cursor) {
// assert(this.cursor === QUEUE_MAX_ARRAY_SIZE);
// assert(this.front.next !== undefined);
this.front = this.front.next;
this.cursor = 0;
}
const element = this.front.elements[this.cursor];
// Permit shifted element to be garbage collected.
this.front.elements[this.cursor] = undefined;
++this.cursor;

return element;
}

forEach(callback) {
let i = this.cursor;
let node = this.front;
let elements = node.elements;
while (i !== elements.length || node.next !== undefined) {
if (i === elements.length) {
// assert(node.next !== undefined);
// assert(i === QUEUE_MAX_ARRAY_SIZE);
node = node.next;
elements = node.elements;
i = 0;
}
callback(elements[i]);
++i;
}
}
}

function DequeueValue(controller) {
const result = controller[_queue].shift();
controller[_totalQueuedSize] -= result.size;
Expand Down
91 changes: 91 additions & 0 deletions third_party/WebKit/Source/core/streams/SimpleQueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

// Queue implementation used by ReadableStream and WritableStream.

(function(global, binding, v8) {
'use strict';

// Simple queue structure. Avoids scalability issues with using
// InternalPackedArray directly by using multiple arrays in a linked list and
// keeping the array size bounded.
const QUEUE_MAX_ARRAY_SIZE = 16384;
class SimpleQueue {
constructor() {
this.front = {
elements: new v8.InternalPackedArray(),
next: undefined,
};
this.back = this.front;
// The cursor is used to avoid calling InternalPackedArray.shift().
this.cursor = 0;
this.size = 0;
}

get length() {
return this.size;
}

push(element) {
++this.size;
if (this.back.elements.length === QUEUE_MAX_ARRAY_SIZE) {
const oldBack = this.back;
this.back = {
elements: new v8.InternalPackedArray(),
next: undefined,
};
oldBack.next = this.back;
}
this.back.elements.push(element);
}

shift() {
// assert(this.size > 0);
--this.size;
if (this.front.elements.length === this.cursor) {
// assert(this.cursor === QUEUE_MAX_ARRAY_SIZE);
// assert(this.front.next !== undefined);
this.front = this.front.next;
this.cursor = 0;
}
const element = this.front.elements[this.cursor];
// Permit shifted element to be garbage collected.
this.front.elements[this.cursor] = undefined;
++this.cursor;

return element;
}

forEach(callback) {
let i = this.cursor;
let node = this.front;
let elements = node.elements;
while (i !== elements.length || node.next !== undefined) {
if (i === elements.length) {
// assert(node.next !== undefined);
// assert(i === QUEUE_MAX_ARRAY_SIZE);
node = node.next;
elements = node.elements;
i = 0;
}
callback(elements[i]);
++i;
}
}

// Return the element that would be returned if shift() was called now,
// without modifying the queue.
peek() {
// assert(this.size > 0);
if (this.front.elements.length === this.cursor) {
// assert(this.cursor === QUEUE_MAX_ARRAY_SIZE)
// assert(this.front.next !== undefined);
return this.front.next.elements[0];
}
return this.front.elements[this.cursor];
}
}

binding.SimpleQueue = SimpleQueue;
});
25 changes: 9 additions & 16 deletions third_party/WebKit/Source/core/streams/WritableStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,8 @@
setDefaultControllerFlag(controller, FLAG_INCLOSE, value);
}

function rejectPromises(array, e) {
// array is an InternalPackedArray so forEach won't work.
for (let i = 0; i < array.length; ++i) {
v8.rejectPromise(array[i], e);
}
function rejectPromises(queue, e) {
queue.forEach(promise => v8.rejectPromise(promise, e));
}

// https://tc39.github.io/ecma262/#sec-ispropertykey
Expand Down Expand Up @@ -178,7 +175,7 @@
this[_pendingWriteRequest] = undefined;
this[_pendingCloseRequest] = undefined;
this[_pendingAbortRequest] = undefined;
this[_writeRequests] = new v8.InternalPackedArray();
this[_writeRequests] = new binding.SimpleQueue();
const type = underlyingSink.type;
if (type !== undefined) {
throw new RangeError(streamErrors.invalidType);
Expand Down Expand Up @@ -331,7 +328,7 @@

const storedError = stream[_storedError];
rejectPromises(stream[_writeRequests], storedError);
stream[_writeRequests] = new v8.InternalPackedArray();
stream[_writeRequests] = new binding.SimpleQueue();

if (stream[_pendingCloseRequest] !== undefined) {
TEMP_ASSERT(
Expand Down Expand Up @@ -623,7 +620,7 @@
}
this[_controlledWritableStream] = stream;
this[_underlyingSink] = underlyingSink;
this[_queue] = new v8.InternalPackedArray();
this[_queue] = new binding.SimpleQueue();
this[_queueSize] = 0;
this[_defaultControllerFlags] = 0;
const normalizedStrategy =
Expand Down Expand Up @@ -665,7 +662,7 @@
}

function WritableStreamDefaultControllerAbort(controller, reason) {
controller[_queue] = v8.InternalPackedArray();
controller[_queue] = new binding.SimpleQueue();
controller[_queueSize] = 0;
const sinkAbortPromise =
PromiseInvokeOrNoop(controller[_underlyingSink], 'abort', [reason]);
Expand Down Expand Up @@ -805,7 +802,7 @@
stream[_pendingWriteRequest] = stream[_writeRequests].shift();

const sinkWritePromise = PromiseInvokeOrNoop(controller[_underlyingSink],
'write', [chunk, controller]);
'write', [chunk, controller]);
thenPromise(
sinkWritePromise,
() => {
Expand Down Expand Up @@ -881,16 +878,12 @@
TEMP_ASSERT(state === WRITABLE || state === CLOSING,
'stream.[[state]] is "writable" or "closing".');
WritableStreamError(stream, e);
controller[_queue] = new v8.InternalPackedArray();
controller[_queue] = new binding.SimpleQueue();
controller[_queueSize] = 0;
}

// Queue-with-Sizes Operations
//
// These differ from the versions in the standard: they take a controller
// argument in order to cache the total queue size. This is necessary to avoid
// O(N^2) behaviour.
//
// TODO(ricea): Share these operations with ReadableStream.js.
function DequeueValueForController(controller) {
TEMP_ASSERT(controller[_queue].length !== 0,
Expand All @@ -917,7 +910,7 @@
function PeekQueueValue(queue) {
TEMP_ASSERT(queue.length !== 0,
'queue is not empty.');
return queue[0].value;
return queue.peek().value;
}

// Miscellaneous Operations
Expand Down

0 comments on commit e123782

Please sign in to comment.