-
Notifications
You must be signed in to change notification settings - Fork 29.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
stream: add abort signal for ReadableStream and WritableStream
Refs: #39316 PR-URL: #46273 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
- Loading branch information
1 parent
1395e36
commit c60816a
Showing
6 changed files
with
233 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
const { finished, addAbortSignal } = require('stream'); | ||
const { ReadableStream, WritableStream } = require('stream/web'); | ||
const assert = require('assert'); | ||
|
||
function createTestReadableStream() { | ||
return new ReadableStream({ | ||
start(controller) { | ||
controller.enqueue('a'); | ||
controller.enqueue('b'); | ||
controller.enqueue('c'); | ||
controller.close(); | ||
} | ||
}); | ||
} | ||
|
||
function createTestWritableStream(values) { | ||
return new WritableStream({ | ||
write(chunk) { | ||
values.push(chunk); | ||
} | ||
}); | ||
} | ||
|
||
{ | ||
const rs = createTestReadableStream(); | ||
|
||
const reader = rs.getReader(); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, rs); | ||
|
||
finished(rs, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(reader.read(), /AbortError/).then(common.mustCall()); | ||
assert.rejects(reader.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
reader.read().then(common.mustCall((result) => { | ||
assert.strictEqual(result.value, 'a'); | ||
ac.abort(); | ||
})); | ||
} | ||
|
||
{ | ||
const rs = createTestReadableStream(); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, rs); | ||
|
||
assert.rejects((async () => { | ||
for await (const chunk of rs) { | ||
if (chunk === 'b') { | ||
ac.abort(); | ||
} | ||
} | ||
})(), /AbortError/).then(common.mustCall()); | ||
} | ||
|
||
{ | ||
const rs1 = createTestReadableStream(); | ||
|
||
const rs2 = createTestReadableStream(); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, rs1); | ||
addAbortSignal(ac.signal, rs2); | ||
|
||
const reader1 = rs1.getReader(); | ||
const reader2 = rs2.getReader(); | ||
|
||
finished(rs1, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(reader1.read(), /AbortError/).then(common.mustCall()); | ||
assert.rejects(reader1.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
finished(rs2, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(reader2.read(), /AbortError/).then(common.mustCall()); | ||
assert.rejects(reader2.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
ac.abort(); | ||
} | ||
|
||
{ | ||
const rs = createTestReadableStream(); | ||
|
||
const { 0: rs1, 1: rs2 } = rs.tee(); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, rs); | ||
|
||
const reader1 = rs1.getReader(); | ||
const reader2 = rs2.getReader(); | ||
|
||
finished(rs1, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(reader1.read(), /AbortError/).then(common.mustCall()); | ||
assert.rejects(reader1.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
finished(rs2, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(reader2.read(), /AbortError/).then(common.mustCall()); | ||
assert.rejects(reader2.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
ac.abort(); | ||
} | ||
|
||
{ | ||
const values = []; | ||
const ws = createTestWritableStream(values); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, ws); | ||
|
||
const writer = ws.getWriter(); | ||
|
||
finished(ws, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.deepStrictEqual(values, ['a']); | ||
assert.rejects(writer.write('b'), /AbortError/).then(common.mustCall()); | ||
assert.rejects(writer.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
writer.write('a').then(() => { | ||
ac.abort(); | ||
}); | ||
} | ||
|
||
{ | ||
const values = []; | ||
|
||
const ws1 = createTestWritableStream(values); | ||
const ws2 = createTestWritableStream(values); | ||
|
||
const ac = new AbortController(); | ||
|
||
addAbortSignal(ac.signal, ws1); | ||
addAbortSignal(ac.signal, ws2); | ||
|
||
const writer1 = ws1.getWriter(); | ||
const writer2 = ws2.getWriter(); | ||
|
||
finished(ws1, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(writer1.write('a'), /AbortError/).then(common.mustCall()); | ||
assert.rejects(writer1.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
finished(ws2, common.mustCall((err) => { | ||
assert.strictEqual(err.name, 'AbortError'); | ||
assert.rejects(writer2.write('a'), /AbortError/).then(common.mustCall()); | ||
assert.rejects(writer2.closed, /AbortError/).then(common.mustCall()); | ||
})); | ||
|
||
ac.abort(); | ||
} |