Skip to content

Add LimitConcurrentRequestsMiddleware to limit how many next handlers can be executed concurrently #272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Event-driven, streaming plaintext HTTP and secure HTTPS server for [ReactPHP](ht
* [Request](#request)
* [Response](#response)
* [Middleware](#middleware)
* [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware)
* [RequestBodyBufferMiddleware](#requestbodybuffermiddleware)
* [RequestBodyParserMiddleware](#requestbodyparsermiddleware)
* [Third-Party Middleware](#third-party-middleware)
Expand Down Expand Up @@ -681,6 +682,59 @@ $server = new StreamingServer(new MiddlewareRunner([
]));
```

#### LimitConcurrentRequestsMiddleware

The `LimitConcurrentRequestsMiddleware` can be used to
limit how many next handlers can be executed concurrently.

If this middleware is invoked, it will check if the number of pending
handlers is below the allowed limit and then simply invoke the next handler
and it will return whatever the next handler returns (or throws).

If the number of pending handlers exceeds the allowed limit, the request will
be queued (and its streaming body will be paused) and it will return a pending
promise.
Once a pending handler returns (or throws), it will pick the oldest request
from this queue and invokes the next handler (and its streaming body will be
resumed).

The following example shows how this middleware can be used to ensure no more
than 10 handlers will be invoked at once:

```php
$server = new StreamingServer(new MiddlewareRunner([
new LimitConcurrentRequestsMiddleware(10),
$handler
]));
```

Similarly, this middleware is often used in combination with the
[`RequestBodyBufferMiddleware`](#requestbodybuffermiddleware) (see below)
to limit the total number of requests that can be buffered at once:

```php
$server = new StreamingServer(new MiddlewareRunner([
new LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
new RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
new RequestBodyParserMiddleware(),
$handler
]));
```

More sophisticated examples include limiting the total number of requests
that can be buffered at once and then ensure the actual request handler only
processes one request after another without any concurrency:

```php
$server = new StreamingServer(new MiddlewareRunner([
new LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
new RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
new RequestBodyParserMiddleware(),
new LimitConcurrentRequestsMiddleware(1), // only execute 1 handler (no concurrency)
$handler
]));
```

#### RequestBodyBufferMiddleware

One of the built-in middleware is the `RequestBodyBufferMiddleware` which
Expand Down Expand Up @@ -714,10 +768,18 @@ Similarly, this will immediately invoke the next middleware handler for requests
that have an empty request body (such as a simple `GET` request) and requests
that are already buffered (such as due to another middleware).

Note that the given buffer size limit is applied to each request individually.
This means that if you allow a 2 MiB limit and then receive 1000 concurrent
requests, up to 2000 MiB may be allocated for these buffers alone.
As such, it's highly recommended to use this along with the
[`LimitConcurrentRequestsMiddleware`](#limitconcurrentrequestsmiddleware) (see above) to limit
the total number of concurrent requests.

Usage:

```php
$middlewares = new MiddlewareRunner([
new LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
new RequestBodyBufferMiddleware(16 * 1024 * 1024), // 16 MiB
function (ServerRequestInterface $request, callable $next) {
// The body from $request->getBody() is now fully available without the need to stream it
Expand Down Expand Up @@ -776,6 +838,7 @@ $handler = function (ServerRequestInterface $request) {
};

$server = new StreamingServer(new MiddlewareRunner([
new LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
new RequestBodyBufferMiddleware(16 * 1024 * 1024), // 16 MiB
new RequestBodyParserMiddleware(),
$handler
Expand Down
2 changes: 2 additions & 0 deletions examples/12-upload.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Psr\Http\Message\UploadedFileInterface;
use React\EventLoop\Factory;
use React\Http\MiddlewareRunner;
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
use React\Http\Middleware\RequestBodyBufferMiddleware;
use React\Http\Middleware\RequestBodyParserMiddleware;
use React\Http\Response;
Expand Down Expand Up @@ -121,6 +122,7 @@

// buffer and parse HTTP request body before running our request handler
$server = new StreamingServer(new MiddlewareRunner(array(
new LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers, queue otherwise
new RequestBodyBufferMiddleware(8 * 1024 * 1024), // 8 MiB max, ignore body otherwise
new RequestBodyParserMiddleware(100 * 1024, 1), // 1 file with 100 KiB max, reject upload otherwise
$handler
Expand Down
188 changes: 188 additions & 0 deletions src/Io/PauseBufferStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
<?php

namespace React\Http\Io;

use Evenement\EventEmitter;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

/**
* [Internal] Pauses a given stream and buffers all events while paused
*
* This class is used to buffer all events that happen on a given stream while
* it is paused. This allows you to pause a stream and no longer watch for any
* of its events. Once the stream is resumed, all buffered events will be
* emitted. Explicitly closing the resulting stream clears all buffers.
*
* Note that this is an internal class only and nothing you should usually care
* about.
*
* @see ReadableStreamInterface
* @internal
*/
class PauseBufferStream extends EventEmitter implements ReadableStreamInterface
{
private $input;
private $closed = false;
private $paused = false;
private $dataPaused = '';
private $endPaused = false;
private $closePaused = false;
private $errorPaused = null;
private $implicit = false;

public function __construct(ReadableStreamInterface $input)
{
$this->input = $input;

$this->input->on('data', array($this, 'handleData'));
$this->input->on('end', array($this, 'handleEnd'));
$this->input->on('error', array($this, 'handleError'));
$this->input->on('close', array($this, 'handleClose'));
}

/**
* pause and remember this was not explicitly from user control
*
* @internal
*/
public function pauseImplicit()
{
$this->pause();
$this->implicit = true;
}

/**
* resume only if this was previously paused implicitly and not explicitly from user control
*
* @internal
*/
public function resumeImplicit()
{
if ($this->implicit) {
$this->resume();
}
}

public function isReadable()
{
return !$this->closed;
}

public function pause()
{
if ($this->closed) {
return;
}

$this->input->pause();
$this->paused = true;
$this->implicit = false;
}

public function resume()
{
if ($this->closed) {
return;
}

$this->paused = false;
$this->implicit = false;

if ($this->dataPaused !== '') {
$this->emit('data', array($this->dataPaused));
$this->dataPaused = '';
}

if ($this->errorPaused) {
$this->emit('error', array($this->errorPaused));
return $this->close();
}

if ($this->endPaused) {
$this->endPaused = false;
$this->emit('end');
return $this->close();
}

if ($this->closePaused) {
$this->closePaused = false;
return $this->close();
}

$this->input->resume();
}

public function pipe(WritableStreamInterface $dest, array $options = array())
{
Util::pipe($this, $dest, $options);

return $dest;
}

public function close()
{
if ($this->closed) {
return;
}

$this->closed = true;
$this->dataPaused = '';
$this->endPaused = $this->closePaused = false;
$this->errorPaused = null;

$this->input->close();

$this->emit('close');
$this->removeAllListeners();
}

/** @internal */
public function handleData($data)
{
if ($this->paused) {
$this->dataPaused .= $data;
return;
}

$this->emit('data', array($data));
}

/** @internal */
public function handleError(\Exception $e)
{
if ($this->paused) {
$this->errorPaused = $e;
return;
}

$this->emit('error', array($e));
$this->close();
}

/** @internal */
public function handleEnd()
{
if ($this->paused) {
$this->endPaused = true;
return;
}

if (!$this->closed) {
$this->emit('end');
$this->close();
}
}

/** @internal */
public function handleClose()
{
if ($this->paused) {
$this->closePaused = true;
return;
}

$this->close();
}
}
Loading