Skip to content

[WIP] lib: merge onread handlers for http2 streams & net.Socket #20993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 6 additions & 25 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ const {
const {
createWriteWrap,
writeGeneric,
writevGeneric
writevGeneric,
_onread,
kUpdateTimer
} = require('internal/stream_base_commons');

const { ShutdownWrap } = process.binding('stream_wrap');
Expand Down Expand Up @@ -146,7 +148,6 @@ const kServer = Symbol('server');
const kSession = Symbol('session');
const kState = Symbol('state');
const kType = Symbol('type');
const kUpdateTimer = Symbol('update-timer');

const kDefaultSocketTimeout = 2 * 60 * 1000;

Expand Down Expand Up @@ -361,31 +362,10 @@ function onStreamClose(code) {
// Receives a chunk of data for a given stream and forwards it on
// to the Http2Stream Duplex for processing.
function onStreamRead(nread, buf) {
const handle = this;
const stream = this[kOwner];
if (nread >= 0 && !stream.destroyed) {
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: receiving data chunk ` +
`of size ${nread}`);
stream[kUpdateTimer]();
if (!stream.push(buf)) {
if (!stream.destroyed) // we have to check a second time
this.readStop();
}
return;
}

// Last chunk was received. End the readable side.
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is missing in the new implementation

`${sessionName(stream[kSession][kType])}]: ending readable.`);

// defer this until we actually emit end
if (stream._readableState.endEmitted) {
stream[kMaybeDestroy]();
} else {
stream.on('end', stream[kMaybeDestroy]);
stream.push(null);
stream.read(0);
}
_onread(handle, stream, nread, buf);
}

// Called when the remote peer settings have been updated.
Expand Down Expand Up @@ -2122,6 +2102,7 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
class ServerHttp2Stream extends Http2Stream {
constructor(session, handle, id, options, headers) {
super(session, options);
handle.owner = this;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to attach [kOwner] in net.js (and later switching over to always using that symbol over there)

Generally, it would imo be a good idea to have a standardized way to access the JS wrapper object from a C++ handle, and handle[kOwner] might be exactly that

this[kInit](id, handle);
this[kProtocol] = headers[HTTP2_HEADER_SCHEME];
this[kAuthority] = headers[HTTP2_HEADER_AUTHORITY];
Expand Down
40 changes: 36 additions & 4 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'use strict';

const { Buffer } = require('buffer');
const errors = require('internal/errors');
const { WriteWrap } = process.binding('stream_wrap');

const errnoException = errors.errnoException;
const { UV_EOF } = process.binding('uv');
const { errnoException } = require('internal/errors');
const kUpdateTimer = Symbol('update-timer');

function handleWriteReq(req, data, encoding) {
const { handle } = req;
Expand Down Expand Up @@ -81,8 +81,40 @@ function afterWriteDispatched(self, req, err, cb) {
}
}

function _onread(handle, stream, nread, buf) {
stream[kUpdateTimer]();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this deviating from http2's original implementation? Previously for http2 this was only called inside the conditional below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think that’s an issue either, and arguably a bit more correct this way.


if (nread > 0 && !stream.destroyed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the same as the previous http2 implementation, which executed the logic within this block when nread >= 0, not just nread > 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think omitting 0-sized reads is an issue.

const ret = stream.push(buf);
if (handle.reading && !ret) {
handle.reading = false;
const err = handle.readStop();
if (err && stream.destroy)
stream.destroy(errnoException(err, 'read'));
}

return;
}

if (nread === 0) {
return;
}

if (nread !== UV_EOF) {
return stream.destroy(errnoException(nread, 'read'));
}

// push a null to signal the end of data.
// Do it before `maybeDestroy` for correct order of events:
// `end` -> `close`
stream.push(null);
stream.read(0);
}

module.exports = {
createWriteWrap,
writevGeneric,
writeGeneric
writeGeneric,
_onread,
kUpdateTimer
};
61 changes: 12 additions & 49 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ const {
const assert = require('assert');
const {
UV_EADDRINUSE,
UV_EINVAL,
UV_EOF
UV_EINVAL
} = process.binding('uv');

const { Buffer } = require('buffer');
Expand All @@ -61,7 +60,9 @@ const {
const {
createWriteWrap,
writevGeneric,
writeGeneric
writeGeneric,
_onread,
kUpdateTimer
} = require('internal/stream_base_commons');
const errors = require('internal/errors');
const {
Expand Down Expand Up @@ -513,6 +514,12 @@ Object.defineProperty(Socket.prototype, 'bufferSize', {
}
});

Object.defineProperty(Socket.prototype, kUpdateTimer, {
get: function() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably avoid getters for hot paths like a socket read callback function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn’t expect it to make a big difference? I’d expect V8 to inline this – if benchmark runs come back okay, is this still an issue?

That being said, it might be nice to move over to using kUpdateTimer in all cases.

return this._unrefTimer;
}
});


// Just call handle.readStart until we have enough in the buffer
Socket.prototype._read = function(n) {
Expand Down Expand Up @@ -619,53 +626,9 @@ Socket.prototype._destroy = function(exception, cb) {
// buffer, or when there's an error reading.
function onread(nread, buffer) {
var handle = this;
var self = handle.owner;
assert(handle === self._handle, 'handle != self._handle');

self._unrefTimer();

debug('onread', nread);

if (nread > 0) {
debug('got data');

// read success.
// In theory (and in practice) calling readStop right now
// will prevent this from being called again until _read() gets
// called again.

// Optimization: emit the original buffer with end points
var ret = self.push(buffer);

if (handle.reading && !ret) {
handle.reading = false;
debug('readStop');
var err = handle.readStop();
if (err)
self.destroy(errnoException(err, 'read'));
}
return;
}

// if we didn't get any bytes, that doesn't necessarily mean EOF.
// wait for the next one.
if (nread === 0) {
debug('not any data, keep waiting');
return;
}

// Error, possibly EOF.
if (nread !== UV_EOF) {
return self.destroy(errnoException(nread, 'read'));
}

debug('EOF');
var stream = handle.owner;

// push a null to signal the end of data.
// Do it before `maybeDestroy` for correct order of events:
// `end` -> `close`
self.push(null);
self.read(0);
_onread(handle, stream, nread, buffer);
}


Expand Down