-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
[WIP] lib: merge onread handlers for http2 streams & net.Socket #20993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -113,7 +113,9 @@ const { | |
const { | ||
createWriteWrap, | ||
writeGeneric, | ||
writevGeneric | ||
writevGeneric, | ||
_onread, | ||
kUpdateTimer | ||
} = require('internal/stream_base_commons'); | ||
|
||
const { ShutdownWrap } = process.binding('stream_wrap'); | ||
|
@@ -146,7 +148,6 @@ const kServer = Symbol('server'); | |
const kSession = Symbol('session'); | ||
const kState = Symbol('state'); | ||
const kType = Symbol('type'); | ||
const kUpdateTimer = Symbol('update-timer'); | ||
|
||
const kDefaultSocketTimeout = 2 * 60 * 1000; | ||
|
||
|
@@ -361,31 +362,10 @@ function onStreamClose(code) { | |
// Receives a chunk of data for a given stream and forwards it on | ||
// to the Http2Stream Duplex for processing. | ||
function onStreamRead(nread, buf) { | ||
const handle = this; | ||
const stream = this[kOwner]; | ||
if (nread >= 0 && !stream.destroyed) { | ||
debug(`Http2Stream ${stream[kID]} [Http2Session ` + | ||
`${sessionName(stream[kSession][kType])}]: receiving data chunk ` + | ||
`of size ${nread}`); | ||
stream[kUpdateTimer](); | ||
if (!stream.push(buf)) { | ||
if (!stream.destroyed) // we have to check a second time | ||
this.readStop(); | ||
} | ||
return; | ||
} | ||
|
||
// Last chunk was received. End the readable side. | ||
debug(`Http2Stream ${stream[kID]} [Http2Session ` + | ||
`${sessionName(stream[kSession][kType])}]: ending readable.`); | ||
|
||
// defer this until we actually emit end | ||
if (stream._readableState.endEmitted) { | ||
stream[kMaybeDestroy](); | ||
} else { | ||
stream.on('end', stream[kMaybeDestroy]); | ||
stream.push(null); | ||
stream.read(0); | ||
} | ||
_onread(handle, stream, nread, buf); | ||
} | ||
|
||
// Called when the remote peer settings have been updated. | ||
|
@@ -2122,6 +2102,7 @@ function afterOpen(session, options, headers, streamOptions, err, fd) { | |
class ServerHttp2Stream extends Http2Stream { | ||
constructor(session, handle, id, options, headers) { | ||
super(session, options); | ||
handle.owner = this; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it might be better to attach Generally, it would imo be a good idea to have a standardized way to access the JS wrapper object from a C++ handle, and |
||
this[kInit](id, handle); | ||
this[kProtocol] = headers[HTTP2_HEADER_SCHEME]; | ||
this[kAuthority] = headers[HTTP2_HEADER_AUTHORITY]; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,10 @@ | ||
'use strict'; | ||
|
||
const { Buffer } = require('buffer'); | ||
const errors = require('internal/errors'); | ||
const { WriteWrap } = process.binding('stream_wrap'); | ||
|
||
const errnoException = errors.errnoException; | ||
const { UV_EOF } = process.binding('uv'); | ||
const { errnoException } = require('internal/errors'); | ||
const kUpdateTimer = Symbol('update-timer'); | ||
|
||
function handleWriteReq(req, data, encoding) { | ||
const { handle } = req; | ||
|
@@ -81,8 +81,40 @@ function afterWriteDispatched(self, req, err, cb) { | |
} | ||
} | ||
|
||
function _onread(handle, stream, nread, buf) { | ||
stream[kUpdateTimer](); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this deviating from http2's original implementation? Previously for http2 this was only called inside the conditional below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don’t think that’s an issue either, and arguably a bit more correct this way. |
||
|
||
if (nread > 0 && !stream.destroyed) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not the same as the previous http2 implementation, which executed the logic within this block when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don’t think omitting 0-sized reads is an issue. |
||
const ret = stream.push(buf); | ||
if (handle.reading && !ret) { | ||
handle.reading = false; | ||
const err = handle.readStop(); | ||
if (err && stream.destroy) | ||
stream.destroy(errnoException(err, 'read')); | ||
} | ||
|
||
return; | ||
} | ||
|
||
if (nread === 0) { | ||
return; | ||
} | ||
|
||
if (nread !== UV_EOF) { | ||
return stream.destroy(errnoException(nread, 'read')); | ||
} | ||
|
||
// push a null to signal the end of data. | ||
// Do it before `maybeDestroy` for correct order of events: | ||
// `end` -> `close` | ||
stream.push(null); | ||
stream.read(0); | ||
} | ||
|
||
module.exports = { | ||
createWriteWrap, | ||
writevGeneric, | ||
writeGeneric | ||
writeGeneric, | ||
_onread, | ||
kUpdateTimer | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,8 +36,7 @@ const { | |
const assert = require('assert'); | ||
const { | ||
UV_EADDRINUSE, | ||
UV_EINVAL, | ||
UV_EOF | ||
UV_EINVAL | ||
} = process.binding('uv'); | ||
|
||
const { Buffer } = require('buffer'); | ||
|
@@ -61,7 +60,9 @@ const { | |
const { | ||
createWriteWrap, | ||
writevGeneric, | ||
writeGeneric | ||
writeGeneric, | ||
_onread, | ||
kUpdateTimer | ||
} = require('internal/stream_base_commons'); | ||
const errors = require('internal/errors'); | ||
const { | ||
|
@@ -513,6 +514,12 @@ Object.defineProperty(Socket.prototype, 'bufferSize', { | |
} | ||
}); | ||
|
||
Object.defineProperty(Socket.prototype, kUpdateTimer, { | ||
get: function() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably avoid getters for hot paths like a socket read callback function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn’t expect it to make a big difference? I’d expect V8 to inline this – if benchmark runs come back okay, is this still an issue? That being said, it might be nice to move over to using |
||
return this._unrefTimer; | ||
} | ||
}); | ||
|
||
|
||
// Just call handle.readStart until we have enough in the buffer | ||
Socket.prototype._read = function(n) { | ||
|
@@ -619,53 +626,9 @@ Socket.prototype._destroy = function(exception, cb) { | |
// buffer, or when there's an error reading. | ||
function onread(nread, buffer) { | ||
var handle = this; | ||
var self = handle.owner; | ||
assert(handle === self._handle, 'handle != self._handle'); | ||
|
||
self._unrefTimer(); | ||
|
||
debug('onread', nread); | ||
|
||
if (nread > 0) { | ||
debug('got data'); | ||
|
||
// read success. | ||
// In theory (and in practice) calling readStop right now | ||
// will prevent this from being called again until _read() gets | ||
// called again. | ||
|
||
// Optimization: emit the original buffer with end points | ||
var ret = self.push(buffer); | ||
|
||
if (handle.reading && !ret) { | ||
handle.reading = false; | ||
debug('readStop'); | ||
var err = handle.readStop(); | ||
if (err) | ||
self.destroy(errnoException(err, 'read')); | ||
} | ||
return; | ||
} | ||
|
||
// if we didn't get any bytes, that doesn't necessarily mean EOF. | ||
// wait for the next one. | ||
if (nread === 0) { | ||
debug('not any data, keep waiting'); | ||
return; | ||
} | ||
|
||
// Error, possibly EOF. | ||
if (nread !== UV_EOF) { | ||
return self.destroy(errnoException(nread, 'read')); | ||
} | ||
|
||
debug('EOF'); | ||
var stream = handle.owner; | ||
|
||
// push a null to signal the end of data. | ||
// Do it before `maybeDestroy` for correct order of events: | ||
// `end` -> `close` | ||
self.push(null); | ||
self.read(0); | ||
_onread(handle, stream, nread, buffer); | ||
} | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is missing in the new implementation