Skip to content

Commit

Permalink
fix: Client.stream writableNeedDrain
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 12, 2020
1 parent fb7fe1d commit 03c41a5
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 20 deletions.
6 changes: 6 additions & 0 deletions lib/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ class StreamHandler extends AsyncResource {
})

this.res = res

const needDrain = res.writableNeedDrain !== undefined
? res.writableNeedDrain
: res._writableState && res._writableState.needDrain

return needDrain !== true
}

onData (chunk) {
Expand Down
82 changes: 63 additions & 19 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ const {
const {
kUrl,
kReset,
kPause,
kHost,
kResume,
kClient,
Expand Down Expand Up @@ -376,9 +375,37 @@ class Parser extends HTTPParser {
this.shouldKeepAlive = false
this.read = 0
this.request = null
this.paused = false
this.queue = []

this._resume = () => {
this.paused = false

while (this.queue.length) {
const [fn, ...args] = this.queue.shift()

fn.apply(this, args)

if (this.paused) {
return
}
}

socketResume(socket)
}

this._pause = () => {
this.paused = true
socketPause(socket)
}
}

[HTTPParser.kOnHeaders] (rawHeaders) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnHeaders], rawHeaders])
return
}

if (this.headers) {
Array.prototype.push.apply(this.headers, rawHeaders)
} else {
Expand All @@ -387,6 +414,11 @@ class Parser extends HTTPParser {
}

[HTTPParser.kOnExecute] (ret) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnExecute], ret])
return
}

const { upgrade, socket } = this

if (!Number.isFinite(ret)) {
Expand Down Expand Up @@ -438,8 +470,6 @@ class Parser extends HTTPParser {
setImmediate((self) => self.close(), socket[kParser])
socket[kParser] = null

socket[kPause] = null
socket[kResume] = null
socket[kClient] = null
socket[kError] = null
socket
Expand All @@ -459,6 +489,12 @@ class Parser extends HTTPParser {

[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnHeadersComplete], versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive])
return
}

const { client, socket } = this

const request = client[kQueue][client[kRunningIdx]]
Expand Down Expand Up @@ -541,8 +577,8 @@ class Parser extends HTTPParser {
}

try {
if (request.onHeaders(statusCode, headers, socket[kResume]) === false) {
socket[kPause]()
if (request.onHeaders(statusCode, headers, this._resume) === false) {
this._pause()
}
} catch (err) {
util.destroy(socket, err)
Expand All @@ -553,6 +589,11 @@ class Parser extends HTTPParser {
}

[HTTPParser.kOnBody] (chunk, offset, length) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnBody], chunk, offset, length])
return
}

const { socket, statusCode, request } = this

if (socket.destroyed) {
Expand All @@ -563,14 +604,19 @@ class Parser extends HTTPParser {

try {
if (request.onBody(chunk, offset, length) === false) {
socket[kPause]()
this._pause()
}
} catch (err) {
util.destroy(socket, err)
}
}

[HTTPParser.kOnMessageComplete] () {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnMessageComplete]])
return
}

const { client, socket, statusCode, headers, upgrade, request, trailers } = this

if (socket.destroyed) {
Expand Down Expand Up @@ -636,7 +682,7 @@ class Parser extends HTTPParser {
// have been queued since then.
util.destroy(socket, new InformationalError('reset'))
} else {
socket[kResume]()
socketResume(socket)
resume(client)
}
}
Expand Down Expand Up @@ -766,8 +812,6 @@ function connect (client) {

socket[kIdleTimeout] = null
socket[kIdleTimeoutValue] = null
socket[kPause] = socketPause.bind(socket)
socket[kResume] = socketResume.bind(socket)
socket[kError] = null
socket[kParser] = parser
socket[kClient] = client
Expand All @@ -779,24 +823,24 @@ function connect (client) {
.on('close', onSocketClose)
}

function socketPause () {
function socketPause (socket) {
// TODO: Pause parser.
if (this._handle && this._handle.reading) {
this._handle.reading = false
const err = this._handle.readStop()
if (socket._handle && socket._handle.reading) {
socket._handle.reading = false
const err = socket._handle.readStop()
if (err) {
this.destroy(util.errnoException(err, 'read'))
socket.destroy(util.errnoException(err, 'read'))
}
}
}

function socketResume () {
function socketResume (socket) {
// TODO: Resume parser.
if (this._handle && !this._handle.reading) {
this._handle.reading = true
const err = this._handle.readStart()
if (socket._handle && !socket._handle.reading) {
socket._handle.reading = true
const err = socket._handle.readStart()
if (err) {
this.destroy(util.errnoException(err, 'read'))
socket.destroy(util.errnoException(err, 'read'))
}
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class Request {

onBody (chunk, offset, length) {
assert(!this.aborted)
assert(!this[kPaused])

if (this[kTimeout] && this[kTimeout].refresh) {
this[kTimeout].refresh()
Expand Down
1 change: 0 additions & 1 deletion lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ module.exports = {
kQueue: Symbol('queue'),
kConnect: Symbol('connect'),
kResume: Symbol('resume'),
kPause: Symbol('pause'),
kIdleTimeout: Symbol('idle timeout'),
kIdleTimeoutValue: Symbol('idle timeout value'),
kKeepAliveDefaultTimeout: Symbol('default keep alive timeout'),
Expand Down
44 changes: 44 additions & 0 deletions test/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -657,3 +657,47 @@ test('stream body destroyed on invalid callback', (t) => {
}
})
})

test('stream needDrain', (t) => {
t.plan(1)

const server = createServer((req, res) => {
res.end(Buffer.alloc(4096))
})
t.tearDown(server.close.bind(server))

server.listen(0, async () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(() => {
console.error(3)
client.destroy()
})

const dst = new PassThrough()
dst.pause()

while (dst.write(Buffer.alloc(4096))) {

}

const orgWrite = dst.write
dst.write = () => t.fail()
const p = client.stream({
path: '/',
method: 'GET'
}, () => {
return dst
})

setTimeout(() => {
dst.write = (...args) => {
orgWrite.call(dst, ...args)
}
dst.resume()
}, 1e3)

await p

t.pass()
})
})

0 comments on commit 03c41a5

Please sign in to comment.