Skip to content

Automatically close response stream once client connection closes #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,15 @@ This is just a example you could use of the streaming,
you could also send a big amount of data via little chunks
or use it for body data that needs to calculated.

If the request handler resolves with a response stream that is already closed,
it will simply send an empty response body.
If the client closes the connection while the stream is still open, the
response stream will automatically be closed.
If a promise is resolved with a streaming body after the client closes, the
response stream will automatically be closed.
The `close` event can be used to clean up any pending resources allocated
in this case (if applicable).

If the response body is a `string`, a `Content-Length` header will be added
automatically.
If the response body is a ReactPHP `ReadableStreamInterface` and you do not
Expand Down
4 changes: 1 addition & 3 deletions src/ChunkedEncoder.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ public function close()
}

$this->closed = true;

$this->readable = false;
$this->input->close();

$this->emit('close');

$this->removeAllListeners();
}

Expand Down
30 changes: 25 additions & 5 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,35 @@ private function handleResponseBody(ResponseInterface $response, ConnectionInter
return $connection->end();
}

$body = $response->getBody();
$stream = $body;
$stream = $response->getBody();

if ($response->getHeaderLine('Transfer-Encoding') === 'chunked') {
$stream = new ChunkedEncoder($body);
// close response stream if connection is already closed
if (!$connection->isWritable()) {
return $stream->close();
}

$connection->write(Psr7Implementation\str($response));
$stream->pipe($connection);

if ($stream->isReadable()) {
if ($response->getHeaderLine('Transfer-Encoding') === 'chunked') {
$stream = new ChunkedEncoder($stream);
}

// Close response stream once connection closes.
// Note that this TCP/IP close detection may take some time,
// in particular this may only fire on a later read/write attempt
// because we stop/pause reading from the connection once the
// request has been processed.
$connection->on('close', array($stream, 'close'));

$stream->pipe($connection);
} else {
if ($response->getHeaderLine('Transfer-Encoding') === 'chunked') {
$connection->write("0\r\n\r\n");
}

$connection->end();
}
}

/**
Expand Down
95 changes: 95 additions & 0 deletions tests/FunctionalServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use React\Promise\Promise;
use React\Promise\PromiseInterface;
use React\Promise\Stream;
use React\Stream\ThroughStream;

class FunctionalServerTest extends TestCase
{
Expand Down Expand Up @@ -351,6 +352,100 @@ public function testSecureHttpsOnHttpStandardPortReturnsUriWithPort()

$socket->close();
}

public function testClosedStreamFromRequestHandlerWillSendEmptyBody()
{
$loop = Factory::create();
$socket = new Socket(0, $loop);
$connector = new Connector($loop);

$stream = new ThroughStream();
$stream->close();

$server = new Server($socket, function (RequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
});

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) use ($loop) {
$conn->write("GET / HTTP/1.0\r\n\r\n");

return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);

$this->assertStringStartsWith("HTTP/1.0 200 OK", $response);
$this->assertStringEndsWith("\r\n\r\n", $response);

$socket->close();
}

public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesWhileSendingBody()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed tests by increasing test timeouts :shipit:

{
$loop = Factory::create();
$socket = new Socket(0, $loop);
$connector = new Connector($loop);

$stream = new ThroughStream();
$stream->on('close', $this->expectCallableOnce());

$server = new Server($socket, function (RequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
});

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) use ($loop) {
$conn->write("GET / HTTP/1.0\r\nContent-Length: 100\r\n\r\n");

$loop->addTimer(0.1, function() use ($conn) {
$conn->end();
});

return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);

$this->assertStringStartsWith("HTTP/1.0 200 OK", $response);
$this->assertStringEndsWith("\r\n\r\n", $response);

$socket->close();
}

public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesButWillOnlyBeDetectedOnNextWrite()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed tests by increasing test timeouts :shipit:

{
$loop = Factory::create();
$socket = new Socket(0, $loop);
$connector = new Connector($loop);

$stream = new ThroughStream();
$stream->on('close', $this->expectCallableOnce());

$server = new Server($socket, function (RequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
});

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) use ($loop) {
$conn->write("GET / HTTP/1.0\r\n\r\n");

$loop->addTimer(0.1, function() use ($conn) {
$conn->end();
});

return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);

$stream->write('nope');
Block\sleep(0.1, $loop);
$stream->write('nope');
Block\sleep(0.1, $loop);

$this->assertStringStartsWith("HTTP/1.0 200 OK", $response);
$this->assertStringEndsWith("\r\n\r\n", $response);

$socket->close();
}
}

function noScheme($uri)
Expand Down
160 changes: 160 additions & 0 deletions tests/ServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,166 @@ function ($data) use (&$buffer) {
$this->assertEquals('', $buffer);
}

public function testStreamAlreadyClosedWillSendEmptyBodyChunkedEncoded()
{
$stream = new ThroughStream();
$stream->close();

$server = new Server($this->socket, function (ServerRequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
});

$buffer = '';

$this->connection
->expects($this->any())
->method('write')
->will(
$this->returnCallback(
function ($data) use (&$buffer) {
$buffer .= $data;
}
)
);

$this->socket->emit('connection', array($this->connection));

$data = "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n";
$this->connection->emit('data', array($data));

$this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer);
$this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer);
}

public function testResponseStreamEndingWillSendEmptyBodyChunkedEncoded()
{
$stream = new ThroughStream();

$server = new Server($this->socket, function (ServerRequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
});

$buffer = '';

$this->connection
->expects($this->any())
->method('write')
->will(
$this->returnCallback(
function ($data) use (&$buffer) {
$buffer .= $data;
}
)
);

$this->socket->emit('connection', array($this->connection));

$data = "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n";
$this->connection->emit('data', array($data));

$stream->end();

$this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer);
$this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer);
}

public function testResponseStreamAlreadyClosedWillSendEmptyBodyPlainHttp10()
{
$stream = new ThroughStream();
$stream->close();

$server = new Server($this->socket, function (ServerRequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
});

$buffer = '';

$this->connection
->expects($this->any())
->method('write')
->will(
$this->returnCallback(
function ($data) use (&$buffer) {
$buffer .= $data;
}
)
);

$this->socket->emit('connection', array($this->connection));

$data = "GET / HTTP/1.0\r\nHost: localhost\r\n\r\n";
$this->connection->emit('data', array($data));

$this->assertStringStartsWith("HTTP/1.0 200 OK\r\n", $buffer);
$this->assertStringEndsWith("\r\n\r\n", $buffer);
}

public function testResponseStreamWillBeClosedIfConnectionIsAlreadyClosed()
{
$stream = new ThroughStream();
$stream->on('close', $this->expectCallableOnce());

$server = new Server($this->socket, function (ServerRequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
});

$buffer = '';

$this->connection
->expects($this->any())
->method('write')
->will(
$this->returnCallback(
function ($data) use (&$buffer) {
$buffer .= $data;
}
)
);

$this->connection = $this->getMockBuilder('React\Socket\Connection')
->disableOriginalConstructor()
->setMethods(
array(
'write',
'end',
'close',
'pause',
'resume',
'isReadable',
'isWritable',
'getRemoteAddress',
'getLocalAddress',
'pipe'
)
)
->getMock();

$this->connection->expects($this->once())->method('isWritable')->willReturn(false);
$this->connection->expects($this->never())->method('write');
$this->connection->expects($this->never())->method('write');

$this->socket->emit('connection', array($this->connection));

$data = $this->createGetRequest();
$this->connection->emit('data', array($data));
}

public function testResponseStreamWillBeClosedIfConnectionEmitsCloseEvent()
{
$stream = new ThroughStream();
$stream->on('close', $this->expectCallableOnce());

$server = new Server($this->socket, function (ServerRequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
});

$this->socket->emit('connection', array($this->connection));

$data = $this->createGetRequest();
$this->connection->emit('data', array($data));
$this->connection->emit('close');
}

public function testResponseContainsSameRequestProtocolVersionAndChunkedBodyForHttp11()
{
$server = new Server($this->socket, function (ServerRequestInterface $request) {
Expand Down