-
-
Notifications
You must be signed in to change notification settings - Fork 34.7k
Closed
Labels
http2Issues or PRs related to the http2 subsystem.Issues or PRs related to the http2 subsystem.
Description
Version: v9.2.0
Platform: Linux develop 4.9.0-4-amd64 SMP Debian 4.9.51-1 (2017-09-28) x86_64 GNU/Linux
Subsystem: http2
Sorry for my english.
I try to do some action (read then unshift data back) with stream on "connection" event but before http2 connectionListener do its job (make session etc) ... and i`m not happy: after read, stream is no longer handled properly by http2 system and i must pack it into ugly Duplex stream.
My question: how to read data from stream, push it back and keep stream usable for http2 module?
Or maybe its a bug - https and http module handle properly such streams.
In this example i do pseudo alpn negotiation but i really need this to support PROXY protocol.
"use strict";
const http2 = require("http2");
const { Duplex } = require("stream");
// ...
const Http2Server = http2.createServer().constructor;
// HTTP2 preface from node-spdy
const PREFACE_BUFFER = Buffer. from ("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
const PREFACE_BUFFER_LENGTH = PREFACE_BUFFER.length;
// Ugly Duplex Proxy
const kWait = Symbol("wait");
const kNread = Symbol("nread");
const kSocket = Symbol("socket");
const kBuffer = Symbol("buffer");
class SocketProxy extends Duplex {
constructor(socket, buffer) {
super({
allowHalfOpen: true,
decodeStrings: false
});
this[kWait] = true;
this[kNread] = -1;
this[kSocket] = socket;
this[kBuffer] = buffer;
socket.on("error", err => this.emit("error", err));
socket.on("data", chunk => this.addChunk(chunk));
socket.once("end", () => {
this[kWait] = false;
this.tryRead();
this.emit("end");
this.destroy();
});
}
_write(data, encoding, cb) {
try {
this[kSocket].write(data, encoding);
cb();
} catch (err) {
cb(err);
}
}
_writev(chunks, cb) {
try {
this[kSocket].writev(chunks);
cb();
} catch (err) {
cb(err);
}
}
_destroy(e, cb) {
try {
this[kSocket].destroy(e);
cb();
} catch (err) {
cb(err);
}
delete this[kSocket];
delete this[kBuffer];
}
_final(cb) {
try {
this[kSocket].final();
cb();
} catch (err) {
cb(err);
}
}
_read(nread) {
if (this[kBuffer].length > 0) {
const data = this[kBuffer].slice(0, nread);
this[kBuffer] = this[kBuffer].slice(nread);
return this.push(data);
} else if (this[kWait]) {
this[kNread] = nread;
} else {
return this.push(null);
}
}
addChunk(chunk) {
this[kBuffer] = Buffer.concat([this[kBuffer], chunk]);
this.tryRead();
}
tryRead() {
const nread = this[kNread];
if (nread !== -1) {
this[kNread] = -1;
this._read(nread);
}
}
get remoteAddress() {
return this[kSocket].remoteAddress;
}
get remotePort() {
return this[kSocket].remotePort;
}
}
// new connection listener - read, unshift or create ugly proxy
function connectionListener(socket) {
const onReadable = () => {
// at this point socket is somehow "broken" for http2
socket.removeListener("readable", onReadable);
let buffer;
let chunk = socket.read();
while (null !== chunk) {
buffer = buffer ? Buffer.concat([buffer, chunk]) : chunk;
let isH2 = true;
const bufferLength = buffer.length;
if (bufferLength >= PREFACE_BUFFER_LENGTH) {
isH2 = PREFACE_BUFFER.equals(buffer.slice(0, PREFACE_BUFFER_LENGTH));
} else {
isH2 = buffer.equals(PREFACE_BUFFER.slice(0, bufferLength));
}
if (!isH2 || bufferLength >= PREFACE_BUFFER_LENGTH) {
if (!isH2) {
// ... pseudo alpn negotiation
Object.defineProperty(socket, "alpnProtocol", {
value: false
});
this.emit("postAlpnConnection", socket);
// httpConnectionListener support readed socekt
socket.unshift(buffer);
} else {
// socket is broken so make proxy...
const proxy = new SocketProxy(socket, buffer);
this.emit("postAlpnConnection", proxy);
}
return;
}
chunk = socket.read();
}
this.emit("postAlpnConnection", socket);
socket.destroy("No data");
};
socket.on("readable", onReadable);
}
// handle session shutdown...
let shutdownWrapper;
function getShutdownWrapper(session) {
if (!shutdownWrapper) {
const origShutdown = session.shutdown;
shutdownWrapper = function wrapper(options, callback) {
// callback === stream.destroy, ignore err object not recognised by JSStreamWrap(?)
origShutdown.call(this, options, callback ? (/*err*/) => callback() : undefined);
};
}
return shutdownWrapper;
}
class Server extends Http2Server {
constructor(options, handler) {
super(options, handler);
this.listeners("connection").forEach(listener => this.on("postAlpnConnection", listener));
this.removeAllListeners("connection");
this.addListener("connection", connectionListener);
this.addListener("session", session => session.shutdown = getShutdownWrapper(session));
}
}
const createServer = (options, handler) => {
if (typeof options === "function") {
handler = options;
options = Object.create(null);
}
return new Server(options, handler);
};
module.exports = {
Server,
createServer
};PS. This exemple works but probably make memory leaks.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
http2Issues or PRs related to the http2 subsystem.Issues or PRs related to the http2 subsystem.