Skip to content

Commit

Permalink
Another turn of the "socket timeout" crank (#169)
Browse files Browse the repository at this point in the history
This PR reworks the last couple days' worth of effort to do something
useful when connected sockets time out.

Notably, during the course of this work, I ran into another case of the
`null...finishWrite` exception. Existing Node bugs (the latter was filed
by me):

* <nodejs/node#35695>
* <nodejs/node#46094>
  • Loading branch information
danfuzz authored Mar 21, 2023
2 parents 684b559 + 735ffa1 commit 13f8e67
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 83 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changelog
=========

### v0.5.6 -- 2023-03-21

Notable changes:

* A more holistic attempt at doing socket timeouts.

### v0.5.5 -- 2023-03-20

Notable changes:
Expand Down
96 changes: 89 additions & 7 deletions src/builtin-services/private/RateLimitedStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,50 @@ export class RateLimitedStream {
* @returns {Duplex|Writable} The wrapper.
*/
#createWrapper() {
const inner = this.#innerStream;
const hasReader = inner instanceof Readable;
const inner = this.#innerStream;
const isReadable = inner instanceof Readable;
const isSocket = inner instanceof Socket;

inner.on('close', () => this.#writableOnClose());
inner.on('error', (error) => this.#onError(error));

if (hasReader) {
// Note: Adding the `readable` listener causes the stream to become
// "paused" (that is, it won't spontaneously emit `data` events).
if (isReadable) {
// Note: Adding a listener for the `readable` event (as is done here)
// causes the stream to become "paused" (that is, it won't spontaneously
// emit `data` events).
inner.on('end', () => this.#readableOnEnd());
inner.on('readable', () => this.#readableOnReadable());
}

if (inner instanceof Socket) {
if (isSocket) {
inner.on('timeout', () => {
this.#logger?.timedOut();
this.#outerStream.emit('timeout');
});
}

if (isSocket) {
return new RateLimitedStream.#SocketWrapper(this);
} else if (inner instanceof Readable) {
} else if (isReadable) {
return new RateLimitedStream.#DuplexWrapper(this);
} else {
return new RateLimitedStream.#WritableWrapper(this);
}
}

/**
* Processes a `_destroy()` call (indicator that the instance has been
* "destroyed") from the outer stream.
*
* @param {?Error} error Optional error.
* @param {function(Error)} callback Callback to call when this method is
* finished.
*/
#destroy(error, callback) {
this.#innerStream.destroy(error);
callback();
}

/**
* Handles an `error` event from the inner stream.
*
Expand Down Expand Up @@ -296,6 +318,11 @@ export class RateLimitedStream {
callback();
}

/** @override */
_destroy(...args) {
this.#outerThis.#destroy(...args);
}

/** @override */
_read(...args) {
this.#outerThis.#read(...args);
Expand Down Expand Up @@ -338,6 +365,61 @@ export class RateLimitedStream {
get remotePort() {
return this.#outerThis.#innerStream.remotePort;
}

/**
* @returns {number} The idle-timeout time, in msec (`Socket` interface).
* `0` indicates that timeout is disabled.
*/
get timeout() {
return this.#outerThis.#innerStream.timeout;
}

/**
* @param {number} timeoutMsec The new idle-timeout time, in msec. `0`
* indicates that timeout is disabled.
*/
set timeout(timeoutMsec) {
this.setTimeout(timeoutMsec);
}

/**
* Passthrough of same-named method to the underlying socket.
*/
destroySoon() {
if (!this.destroyed) {
if (this.closed) {
// This wrapper has already been closed, just not destroyed. The only
// thing to do is destroy it.
this.destroy();
} else {
// The wrapper hasn't yet been closed, so recapitulate the expected
// behavior from `Socket`, namely to `end()` the stream and then
// `destroy()` it.
this.end(() => {
this.destroy();
});
}
}
}

/**
* Sets a new value for the socket timeout, and optionally adds a `timeout`
* listener.
*
* @param {number} timeoutMsec The new idle-timeout time, in msec. `0`
* indicates that timeout is disabled.
* @param {?function()} [callback = null] Optional callback function.
*/
setTimeout(timeoutMsec, callback = null) {
MustBe.number(timeoutMsec, { finite: true, minInclusive: 0 });
this.#outerThis.#innerStream.setTimeout(timeoutMsec);

if (callback) {
// Note: The `timeout` event gets plumbed through from the inner socket
// to this instance in `createWrapper()`, above.
this.on('timeout', callback);
}
}
};

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main-lactoserv/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@this/main-lactoserv",
"version": "0.5.5",
"version": "0.5.6",
"type": "module",
"private": true,
"license": "Apache-2.0",
Expand Down
51 changes: 27 additions & 24 deletions src/network-protocol/private/Http2Wrangler.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,6 @@ export class Http2Wrangler extends TcpWrangler {

this.#protocolServer = http2.createSecureServer(serverOptions);
this.#protocolServer.on('session', (session) => this.#addSession(session));

// Explicitly set the default socket timeout, as doing this _might_
// mitigate a memory leak as noted in
// <https://github.com/nodejs/node/issues/42710>. As of this writing, there
// _is_ a memory leak of some sort in this project, and the working
// hypothesis is that setting this timeout will suffice as a fix /
// workaround (depending on one's perspective).
// **Note:** `server.setTimeout(msec)` and `server.timeout = msec` do the
// same thing (though the former can be used with an extra argument to
// set up a callback at the same time).
this.#protocolServer.timeout = Http2Wrangler.#SOCKET_TIMEOUT_MSEC;

// TODO: Either remove this entirely, if it turns out that the server
// timeout is useless (for us), or add something useful here.
this.#protocolServer.on('timeout', () => {
this.#logger?.serverTimeout();
});
}

/** @override */
Expand Down Expand Up @@ -133,9 +116,35 @@ export class Http2Wrangler extends TcpWrangler {
session.on('frameError', removeSession);
session.on('goaway', removeSession);

// What's going on: If the underlying socket was closed and we didn't do
// anything here (that is, if this event handler weren't added), the HTTP2
// session-handling code wouldn't fully notice by itself. Later, the session
// would do its idle timeout, and the callback here (below) would try to
// close the session. At that point, the HTTP2 system would get confused and
// end up throwing an unhandleable error (method call on the internal socket
// reference, except the reference had gotten `null`ed out). So, with that
// as context, if -- as we do here -- we tell the session to close as soon
// as we see the underlying socket go away, there's no internal HTTP2 error.
// Salient issues in Node:
// * <https://github.com/nodejs/node/issues/35695>
// * <https://github.com/nodejs/node/issues/46094>
ctx.socket.on('close', () => {
if (!session.closed) {
session.close();

// When we're in this situation, the HTTP2 library doesn't seem to emit
// the expected `close` event by itself.
session.emit('close');
}
});

session.setTimeout(Http2Wrangler.#SESSION_TIMEOUT_MSEC, () => {
ctx.sessionLogger?.idleTimeout();
session.close();
if (session.closed) {
ctx.sessionLogger?.alreadyClosed();
} else {
session.close();
}
});
}

Expand Down Expand Up @@ -254,10 +263,4 @@ export class Http2Wrangler extends TcpWrangler {
* before considering it "timed out" and telling it to close.
*/
static #SESSION_TIMEOUT_MSEC = 1 * 60 * 1000; // One minute.

/**
* @type {number} How long in msec to wait before considering a socket
* "timed out."
*/
static #SOCKET_TIMEOUT_MSEC = 3 * 60 * 1000; // Three minutes.
}
23 changes: 0 additions & 23 deletions src/network-protocol/private/HttpWrangler.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,6 @@ export class HttpWrangler extends TcpWrangler {
this.#logger = options.logger?.http ?? null;
this.#application = express();
this.#protocolServer = http.createServer();

// Explicitly set the default socket timeout, as doing this _might_ help
// prevent memory leaks. See the longer comment in the `Http2Wrangler`
// constructor for details. The bug noted there is HTTP2-specific, but the
// possibility of socket leakage seems like it could easily happen here too.
this.#protocolServer.timeout = HttpWrangler.#SOCKET_TIMEOUT_MSEC;

// TODO: Either remove this entirely, if it turns out that the server
// timeout is useless (for us), or add something useful here.
this.#protocolServer.on('timeout', () => {
this.#logger?.serverTimeout();
});
}

/** @override */
Expand All @@ -71,15 +59,4 @@ export class HttpWrangler extends TcpWrangler {
_impl_server() {
return this.#protocolServer;
}


//
// Static members
//

/**
* @type {number} How long in msec to wait before considering a socket
* "timed out."
*/
static #SOCKET_TIMEOUT_MSEC = 3 * 60 * 1000; // Three minutes.
}
23 changes: 0 additions & 23 deletions src/network-protocol/private/HttpsWrangler.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,6 @@ export class HttpsWrangler extends TcpWrangler {
this.#logger = options.logger?.https ?? null;
this.#application = express();
this.#protocolServer = https.createServer(options.hosts);

// Explicitly set the default socket timeout, as doing this _might_ help
// prevent memory leaks. See the longer comment in the `Http2Wrangler`
// constructor for details. The bug noted there is HTTP2-specific, but the
// possibility of socket leakage seems like it could easily happen here too.
this.#protocolServer.timeout = HttpsWrangler.#SOCKET_TIMEOUT_MSEC;

// TODO: Either remove this entirely, if it turns out that the server
// timeout is useless (for us), or add something useful here.
this.#protocolServer.on('timeout', () => {
this.#logger?.serverTimeout();
});
}

/** @override */
Expand All @@ -71,15 +59,4 @@ export class HttpsWrangler extends TcpWrangler {
_impl_server() {
return this.#protocolServer;
}


//
// Static members
//

/**
* @type {number} How long in msec to wait before considering a socket
* "timed out."
*/
static #SOCKET_TIMEOUT_MSEC = 3 * 60 * 1000; // Three minutes.
}
Loading

0 comments on commit 13f8e67

Please sign in to comment.