Skip to content

Use duplex stream response body for CONNECT requests #189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,6 @@ Allowed).
Note that if you want to handle this method, the client MAY send a different
request-target than the `Host` header value (such as removing default ports)
and the request-target MUST take precendence when forwarding.
The HTTP specs define an opaque "tunneling mode" for this method and make no
use of the message body.
For consistency reasons, this library uses the message body of the request and
response for tunneled application data.
This implies that that a `2xx` (Successful) response to a `CONNECT` request
can in fact use a streaming response body for the tunneled application data.
See also [example #21](examples) for more details.

The `getCookieParams(): string[]` method can be used to
get all cookies sent with the current request.
Expand Down Expand Up @@ -562,14 +555,35 @@ Modified) status code MAY include these headers even though
the message does not contain a response body, because these header would apply
to the message if the same request would have used an (unconditional) `GET`.

> Note that special care has to be taken if you use a body stream instance that
implements ReactPHP's
[`DuplexStreamInterface`](https://github.com/reactphp/stream#duplexstreaminterface)
(such as the `ThroughStream` in the above example).

> For *most* cases, this will simply only consume its readable side and forward
(send) any data that is emitted by the stream, thus entirely ignoring the
writable side of the stream.
If however this is a `2xx` (Successful) response to a `CONNECT` method, it
will also *write* data to the writable side of the stream.
This can be avoided by either rejecting all requests with the `CONNECT`
method (which is what most *normal* origin HTTP servers would likely do) or
or ensuring that only ever an instance of `ReadableStreamInterface` is
used.

> The `CONNECT` method is useful in a tunneling setup (HTTPS proxy) and not
something most HTTP servers would want to care about.
something most origin HTTP servers would want to care about.
The HTTP specs define an opaque "tunneling mode" for this method and make no
use of the message body.
For consistency reasons, this library uses the message body of the request and
response for tunneled application data.
For consistency reasons, this library uses a `DuplexStreamInterface` in the
response body for tunneled application data.
This implies that that a `2xx` (Successful) response to a `CONNECT` request
can in fact use a streaming response body for the tunneled application data.
can in fact use a streaming response body for the tunneled application data,
so that any raw data the client sends over the connection will be piped
through the writable stream for consumption.
Note that while the HTTP specs make no use of the request body for `CONNECT`
requests, one may still be present. Normal request body processing applies
here and the connection will only turn to "tunneling mode" after the request
body has been processed (which should be empty in most cases).
See also [example #22](examples) for more details.

A `Date` header will be automatically added with the system date and time if none is given.
Expand Down
4 changes: 4 additions & 0 deletions examples/08-stream-response.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
$socket = new Server(isset($argv[1]) ? $argv[1] : '0.0.0.0:0', $loop);

$server = new \React\Http\Server($socket, function (ServerRequestInterface $request) use ($loop) {
if ($request->getMethod() !== 'GET' || $request->getUri()->getPath() !== '/') {
return new Response(404);
}

$stream = new ThroughStream();

$timer = $loop->addPeriodicTimer(0.5, function () use ($stream) {
Expand Down
19 changes: 1 addition & 18 deletions examples/22-connect-proxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,10 @@
);
}

// pause consuming request body
$body = $request->getBody();
$body->pause();

$buffer = '';
$body->on('data', function ($chunk) use (&$buffer) {
$buffer .= $chunk;
});

// try to connect to given target host
return $connector->connect($request->getRequestTarget())->then(
function (ConnectionInterface $remote) use ($body, &$buffer) {
function (ConnectionInterface $remote) {
// connection established => forward data
$body->pipe($remote);
$body->resume();

if ($buffer !== '') {
$remote->write($buffer);
$buffer = '';
}

return new Response(
200,
array(),
Expand Down
2 changes: 1 addition & 1 deletion src/HttpBodyStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
class HttpBodyStream extends EventEmitter implements StreamInterface, ReadableStreamInterface
{
private $input;
public $input;
private $closed = false;
private $size;

Expand Down
2 changes: 2 additions & 0 deletions src/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ private function parseRequest($data)
$originalTarget = $parts[1];
$parts[1] = '/';
$headers = implode(' ', $parts);
} else {
throw new \InvalidArgumentException('CONNECT method MUST use authority-form request target');
}
}

Expand Down
39 changes: 20 additions & 19 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use RingCentral\Psr7 as Psr7Implementation;
use Psr\Http\Message\ServerRequestInterface;
use React\Promise\CancellablePromiseInterface;
use React\Stream\WritableStreamInterface;

/**
* The `Server` class is responsible for handling incoming connections and then
Expand Down Expand Up @@ -161,25 +162,7 @@ public function handleRequest(ConnectionInterface $conn, ServerRequestInterface
{
$contentLength = 0;
$stream = new CloseProtectionStream($conn);
if ($request->getMethod() === 'CONNECT') {
// CONNECT method MUST use authority-form request target
$parts = parse_url('tcp://' . $request->getRequestTarget());
if (!isset($parts['scheme'], $parts['host'], $parts['port']) || count($parts) !== 3) {
$this->emit('error', array(new \InvalidArgumentException('CONNECT method MUST use authority-form request target')));
return $this->writeError($conn, 400);
}

// CONNECT uses undelimited body until connection closes
$request = $request->withoutHeader('Transfer-Encoding');
$request = $request->withoutHeader('Content-Length');
$contentLength = null;

// emit end event before the actual close event
$stream->on('close', function () use ($stream) {
$stream->emit('end');
});
} else if ($request->hasHeader('Transfer-Encoding')) {

if ($request->hasHeader('Transfer-Encoding')) {
if (strtolower($request->getHeaderLine('Transfer-Encoding')) !== 'chunked') {
$this->emit('error', array(new \InvalidArgumentException('Only chunked-encoding is allowed for Transfer-Encoding')));
return $this->writeError($conn, 501, $request);
Expand Down Expand Up @@ -329,6 +312,24 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
$response = $response->withBody(Psr7Implementation\stream_for(''));
}

// 2xx reponse to CONNECT forwards tunneled application data through duplex stream
$body = $response->getBody();
if ($request->getMethod() === 'CONNECT' && $code >= 200 && $code < 300 && $body instanceof HttpBodyStream && $body->input instanceof WritableStreamInterface) {
if ($request->getBody()->isReadable()) {
// request is still streaming => wait for request close before forwarding following data from connection
$request->getBody()->on('close', function () use ($connection, $body) {
if ($body->input->isWritable()) {
$connection->pipe($body->input);
$connection->resume();
}
});
} elseif ($body->input->isWritable()) {
// request already closed => forward following data from connection
$connection->pipe($body->input);
$connection->resume();
}
}

$this->handleResponseBody($response, $connection);
}

Expand Down
106 changes: 106 additions & 0 deletions tests/FunctionalServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,112 @@ public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesButWil

$socket->close();
}

public function testConnectWithThroughStreamReturnsDataAsGiven()
{
$loop = Factory::create();
$socket = new Socket(0, $loop);
$connector = new Connector($loop);

$server = new Server($socket, function (RequestInterface $request) use ($loop) {
$stream = new ThroughStream();

$loop->addTimer(0.1, function () use ($stream) {
$stream->end();
});

return new Response(200, array(), $stream);
});

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n");

$conn->once('data', function () use ($conn) {
$conn->write('hello');
$conn->write('world');
});

return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);

$this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $response);
$this->assertStringEndsWith("\r\n\r\nhelloworld", $response);

$socket->close();
}

public function testConnectWithThroughStreamReturnedFromPromiseReturnsDataAsGiven()
{
$loop = Factory::create();
$socket = new Socket(0, $loop);
$connector = new Connector($loop);

$server = new Server($socket, function (RequestInterface $request) use ($loop) {
$stream = new ThroughStream();

$loop->addTimer(0.1, function () use ($stream) {
$stream->end();
});

return new Promise(function ($resolve) use ($loop, $stream) {
$loop->addTimer(0.001, function () use ($resolve, $stream) {
$resolve(new Response(200, array(), $stream));
});
});
});

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n");

$conn->once('data', function () use ($conn) {
$conn->write('hello');
$conn->write('world');
});

return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);

$this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $response);
$this->assertStringEndsWith("\r\n\r\nhelloworld", $response);

$socket->close();
}

public function testConnectWithClosedThroughStreamReturnsNoData()
{
$loop = Factory::create();
$socket = new Socket(0, $loop);
$connector = new Connector($loop);

$server = new Server($socket, function (RequestInterface $request) {
$stream = new ThroughStream();
$stream->close();

return new Response(200, array(), $stream);
});

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
$conn->write("CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n");

$conn->once('data', function () use ($conn) {
$conn->write('hello');
$conn->write('world');
});

return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);

$this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $response);
$this->assertStringEndsWith("\r\n\r\n", $response);

$socket->close();
}
}

function noScheme($uri)
Expand Down
16 changes: 16 additions & 0 deletions tests/RequestHeaderParserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,22 @@ public function testInvalidAbsoluteFormWithHostHeaderEmpty()
$this->assertSame('Invalid Host header value', $error->getMessage());
}

public function testInvalidConnectRequestWithNonAuthorityForm()
{
$error = null;

$parser = new RequestHeaderParser();
$parser->on('headers', $this->expectCallableNever());
$parser->on('error', function ($message) use (&$error) {
$error = $message;
});

$parser->feed("CONNECT http://example.com:8080/ HTTP/1.1\r\nHost: example.com:8080\r\n\r\n");

$this->assertInstanceOf('InvalidArgumentException', $error);
$this->assertSame('CONNECT method MUST use authority-form request target', $error->getMessage());
}

public function testInvalidHttpVersion()
{
$error = null;
Expand Down
48 changes: 48 additions & 0 deletions tests/ServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,54 @@ public function testResponseStreamWillBeClosedIfConnectionEmitsCloseEvent()
$this->connection->emit('close');
}

public function testConnectResponseStreamWillPipeDataToConnection()
{
$stream = new ThroughStream();

$server = new Server($this->socket, function (ServerRequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
});

$buffer = '';

$this->connection
->expects($this->any())
->method('write')
->will(
$this->returnCallback(
function ($data) use (&$buffer) {
$buffer .= $data;
}
)
);

$this->socket->emit('connection', array($this->connection));

$data = "CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n";
$this->connection->emit('data', array($data));

$stream->write('hello');
$stream->write('world');

$this->assertStringEndsWith("\r\n\r\nhelloworld", $buffer);
}

public function testConnectResponseStreamWillPipeDataFromConnection()
{
$stream = new ThroughStream();

$server = new Server($this->socket, function (ServerRequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
});

$this->socket->emit('connection', array($this->connection));

$this->connection->expects($this->once())->method('pipe')->with($stream);

$data = "CONNECT example.com:80 HTTP/1.1\r\nHost: example.com:80\r\n\r\n";
$this->connection->emit('data', array($data));
}

public function testResponseContainsSameRequestProtocolVersionAndChunkedBodyForHttp11()
{
$server = new Server($this->socket, function (ServerRequestInterface $request) {
Expand Down