Skip to content

Do not pause connection stream to detect closed connections immediately #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ successfully, i.e. it was read until its expected end.
The `error` event will be emitted in case the request stream contains invalid
chunked data or the connection closes before the complete request stream has
been received.
The server will automatically `pause()` the connection instead of closing it.
The server will automatically stop reading from the connection and discard all
incoming data instead of closing it.
A response message can still be sent (unless the connection is already closed).

A `close` event will be emitted after an `error` or `end` event.
Expand Down
14 changes: 10 additions & 4 deletions examples/08-stream-response.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@

// Note how this example still uses `Server` instead of `StreamingServer`.
// The `StreamingServer` is only required for streaming *incoming* requests.
$server = new Server($loop,function (ServerRequestInterface $request) use ($loop) {
$server = new Server(function (ServerRequestInterface $request) use ($loop) {
if ($request->getMethod() !== 'GET' || $request->getUri()->getPath() !== '/') {
return new Response(404);
}

$stream = new ThroughStream();

// send some data every once in a while with periodic timer
$timer = $loop->addPeriodicTimer(0.5, function () use ($stream) {
$stream->emit('data', array(microtime(true) . PHP_EOL));
$stream->write(microtime(true) . PHP_EOL);
});

$loop->addTimer(5, function() use ($loop, $timer, $stream) {
// demo for ending stream after a few seconds
$loop->addTimer(5.0, function() use ($stream) {
$stream->end();
});

// stop timer if stream is closed (such as when connection is closed)
$stream->on('close', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
$stream->emit('end');
});

return new Response(
Expand Down
21 changes: 13 additions & 8 deletions src/Io/CloseProtectionStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use React\Stream\WritableStreamInterface;

/**
* [Internal] Protects a given stream from actually closing and only pauses it instead.
* [Internal] Protects a given stream from actually closing and only discards its incoming data instead.
*
* This is used internally to prevent the underlying connection from closing, so
* that we can still send back a response over the same stream.
Expand All @@ -19,9 +19,10 @@ class CloseProtectionStream extends EventEmitter implements ReadableStreamInterf
{
private $input;
private $closed = false;
private $paused = false;

/**
* @param ReadableStreamInterface $input stream that will be paused instead of closed on an 'close' event.
* @param ReadableStreamInterface $input stream that will be discarded instead of closing it on an 'close' event.
*/
public function __construct(ReadableStreamInterface $input)
{
Expand All @@ -44,6 +45,7 @@ public function pause()
return;
}

$this->paused = true;
$this->input->pause();
}

Expand All @@ -53,6 +55,7 @@ public function resume()
return;
}

$this->paused = false;
$this->input->resume();
}

Expand All @@ -71,16 +74,19 @@ public function close()

$this->closed = true;

$this->emit('close');

// 'pause' the stream avoids additional traffic transferred by this stream
$this->input->pause();

// stop listening for incoming events
$this->input->removeListener('data', array($this, 'handleData'));
$this->input->removeListener('error', array($this, 'handleError'));
$this->input->removeListener('end', array($this, 'handleEnd'));
$this->input->removeListener('close', array($this, 'close'));

// resume the stream to ensure we discard everything from incoming connection
if ($this->paused) {
$this->paused = false;
$this->input->resume();
}

$this->emit('close');
$this->removeAllListeners();
}

Expand All @@ -102,5 +108,4 @@ public function handleError(\Exception $e)
{
$this->emit('error', array($e));
}

}
4 changes: 1 addition & 3 deletions src/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt

// Close response stream once connection closes.
// Note that this TCP/IP close detection may take some time,
// in particular this may only fire on a later read/write attempt
// because we stop/pause reading from the connection once the
// request has been processed.
// in particular this may only fire on a later read/write attempt.
$connection->on('close', array($body, 'close'));

$body->pipe($connection);
Expand Down
30 changes: 11 additions & 19 deletions tests/FunctionalServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
use React\Http\Middleware\RequestBodyBufferMiddleware;
use React\Http\Response;
use React\Http\StreamingServer;
use React\Socket\Server as Socket;
use React\EventLoop\Factory;
use React\Http\StreamingServer;
use Psr\Http\Message\RequestInterface;
use React\Socket\Connector;
use React\Socket\ConnectionInterface;
use Clue\React\Block;
use React\Http\Response;
use React\Socket\SecureServer;
use React\Promise;
use React\Promise\Stream;
Expand Down Expand Up @@ -498,7 +498,7 @@ public function testRequestHandlerWillReceiveCloseEventIfConnectionClosesWhileSe
$socket->close();
}

public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesWhileSendingBody()
public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesWhileSendingRequestBody()
{
$loop = Factory::create();
$connector = new Connector($loop);
Expand Down Expand Up @@ -528,13 +528,12 @@ public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesWhileS
$this->assertNull($ret);
}

public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesButWillOnlyBeDetectedOnNextWrite()
public function testStreamFromRequestHandlerWillBeClosedIfConnectionCloses()
{
$loop = Factory::create();
$connector = new Connector($loop);

$stream = new ThroughStream();
$stream->on('close', $this->expectCallableOnce());

$server = new StreamingServer(function (RequestInterface $request) use ($stream) {
return new Response(200, array(), $stream);
Expand All @@ -543,27 +542,20 @@ public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesButWil
$socket = new Socket(0, $loop);
$server->listen($socket);

$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) use ($loop) {
$connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) use ($loop) {
$conn->write("GET / HTTP/1.0\r\n\r\n");

$loop->addTimer(0.1, function() use ($conn) {
$conn->end();
$loop->addTimer(0.1, function () use ($conn) {
$conn->close();
});

return Stream\buffer($conn);
});

$response = Block\await($result, $loop, 1.0);

$stream->write('nope');
Block\sleep(0.1, $loop);
$stream->write('nope');
Block\sleep(0.1, $loop);

$this->assertStringStartsWith("HTTP/1.0 200 OK", $response);
$this->assertStringEndsWith("\r\n\r\n", $response);
// await response stream to be closed
$ret = Block\await(Stream\first($stream, 'close'), $loop, 1.0);

$socket->close();

$this->assertNull($ret);
}

public function testUpgradeWithThroughStreamReturnsDataAsGiven()
Expand Down
21 changes: 17 additions & 4 deletions tests/Io/CloseProtectionStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

class CloseProtectionStreamTest extends TestCase
{
public function testClosePausesTheInputStreamInsteadOfClosing()
public function testCloseDoesNotCloseTheInputStream()
{
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->disableOriginalConstructor()->getMock();
$input->expects($this->once())->method('pause');
$input->expects($this->never())->method('pause');
$input->expects($this->never())->method('resume');
$input->expects($this->never())->method('close');

$protection = new CloseProtectionStream($input);
Expand Down Expand Up @@ -43,6 +44,17 @@ public function testResumeStreamWillResumeInputStream()
$protection->resume();
}

public function testCloseResumesInputStreamIfItWasPreviouslyPaused()
{
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
$input->expects($this->once())->method('pause');
$input->expects($this->once())->method('resume');

$protection = new CloseProtectionStream($input);
$protection->pause();
$protection->close();
}

public function testInputStreamIsNotReadableAfterClose()
{
$input = new ThroughStream();
Expand Down Expand Up @@ -121,7 +133,8 @@ public function testEndWontBeEmittedAfterClose()
public function testPauseAfterCloseHasNoEffect()
{
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
$input->expects($this->once())->method('pause');
$input->expects($this->never())->method('pause');
$input->expects($this->never())->method('resume');

$protection = new CloseProtectionStream($input);
$protection->on('data', $this->expectCallableNever());
Expand All @@ -134,7 +147,7 @@ public function testPauseAfterCloseHasNoEffect()
public function testResumeAfterCloseHasNoEffect()
{
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
$input->expects($this->once())->method('pause');
$input->expects($this->never())->method('pause');
$input->expects($this->never())->method('resume');

$protection = new CloseProtectionStream($input);
Expand Down
19 changes: 7 additions & 12 deletions tests/StreamingServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ public function testRequestOptionsAbsoluteEvent()
$this->assertSame('example.com', $requestAssertion->getHeaderLine('Host'));
}

public function testRequestPauseWillbeForwardedToConnection()
public function testRequestPauseWillBeForwardedToConnection()
{
$server = new StreamingServer(function (ServerRequestInterface $request) {
$request->getBody()->pause();
Expand All @@ -517,7 +517,7 @@ public function testRequestPauseWillbeForwardedToConnection()
$this->connection->emit('data', array($data));
}

public function testRequestResumeWillbeForwardedToConnection()
public function testRequestResumeWillBeForwardedToConnection()
{
$server = new StreamingServer(function (ServerRequestInterface $request) {
$request->getBody()->resume();
Expand All @@ -532,13 +532,13 @@ public function testRequestResumeWillbeForwardedToConnection()
$this->connection->emit('data', array($data));
}

public function testRequestCloseWillPauseConnection()
public function testRequestCloseWillNotCloseConnection()
{
$server = new StreamingServer(function (ServerRequestInterface $request) {
$request->getBody()->close();
});

$this->connection->expects($this->once())->method('pause');
$this->connection->expects($this->never())->method('close');

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));
Expand All @@ -554,7 +554,8 @@ public function testRequestPauseAfterCloseWillNotBeForwarded()
$request->getBody()->pause();
});

$this->connection->expects($this->once())->method('pause');
$this->connection->expects($this->never())->method('close');
$this->connection->expects($this->never())->method('pause');

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));
Expand All @@ -570,7 +571,7 @@ public function testRequestResumeAfterCloseWillNotBeForwarded()
$request->getBody()->resume();
});

$this->connection->expects($this->once())->method('pause');
$this->connection->expects($this->never())->method('close');
$this->connection->expects($this->never())->method('resume');

$server->listen($this->socket);
Expand Down Expand Up @@ -1964,7 +1965,6 @@ public function testRequestInvalidChunkHeaderTooLongWillEmitErrorOnRequestStream
});

$this->connection->expects($this->never())->method('close');
$this->connection->expects($this->once())->method('pause');

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));
Expand All @@ -1989,7 +1989,6 @@ public function testRequestInvalidChunkBodyTooLongWillEmitErrorOnRequestStream()
});

$this->connection->expects($this->never())->method('close');
$this->connection->expects($this->once())->method('pause');

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));
Expand All @@ -2012,7 +2011,6 @@ public function testRequestUnexpectedEndOfRequestWithChunkedTransferConnectionWi
});

$this->connection->expects($this->never())->method('close');
$this->connection->expects($this->once())->method('pause');

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));
Expand All @@ -2036,7 +2034,6 @@ public function testRequestInvalidChunkHeaderWillEmitErrorOnRequestStream()
});

$this->connection->expects($this->never())->method('close');
$this->connection->expects($this->once())->method('pause');

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));
Expand All @@ -2059,7 +2056,6 @@ public function testRequestUnexpectedEndOfRequestWithContentLengthWillEmitErrorO
});

$this->connection->expects($this->never())->method('close');
$this->connection->expects($this->once())->method('pause');

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));
Expand Down Expand Up @@ -2089,7 +2085,6 @@ public function testRequestWithoutBodyWillEmitEndOnRequestStream()
$request->getBody()->on('error', $errorEvent);
});

$this->connection->expects($this->once())->method('pause');
$this->connection->expects($this->never())->method('close');

$server->listen($this->socket);
Expand Down