Skip to content

Content length buffered sink #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"guzzlehttp/psr7": "^1.0",
"react/socket": "^0.4",
"react/stream": "^0.4",
"evenement/evenement": "^2.0"
"evenement/evenement": "^2.0",
"react/promise": "^2.2"
},
"autoload": {
"psr-4": {
Expand Down
113 changes: 113 additions & 0 deletions src/StreamingBodyParser/ContentLengthBufferedSink.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
<?php

namespace React\Http\StreamingBodyParser;

use React\Http\Request;
use React\Promise\Deferred;
use React\Promise\ExtendedPromiseInterface;

/**
* Buffer the data coming in from a request until the specified length is reached.
* Or until the promise is canceled.
*
* @internal
*/
class ContentLengthBufferedSink
{
/**
* @var Deferred
*/
private $deferred;

/**
* @var Request
*/
private $request;

/**
* @var string
*/
private $buffer = '';

/**
* @var int
*/
private $length;

/**
* @param Request $request
* @param int $length
* @return ExtendedPromiseInterface
*/
public static function createPromise(Request $request, $length)
{
return (new static($request, $length))->getDeferred()->promise();
}

/**
* @param Request $request
* @param int $length
*/
protected function __construct(Request $request, $length)
{
$this->deferred = new Deferred(function (callable $resolve) {
$this->request->removeListener('data', [$this, 'feed']);
$this->request->removeListener('end', [$this, 'finish']);

$resolve($this->buffer);
});
$this->request = $request;
$this->length = (int)$length;
$this->request->on('data', [$this, 'feed']);
$this->request->on('end', [$this, 'finish']);
$this->check();
}

/**
* @param string $data
*
* @internal
*/
public function feed($data)
{
$this->buffer .= $data;

$this->check();
}

/**
* @internal
*/
public function finish()
{
$this->resolve();
}

/**
* Check if we reached the expected length and when so resolve promise
*/
protected function check()
{
if (strlen($this->buffer) >= $this->length) {
$this->resolve();
}
}

protected function resolve()
{
if (strlen($this->buffer) > $this->length) {
$this->buffer = substr($this->buffer, 0, $this->length);
}
$this->request->removeListener('data', [$this, 'feed']);
$this->request->removeListener('end', [$this, 'finish']);
$this->deferred->resolve($this->buffer);
}

/**
* @return Deferred
*/
protected function getDeferred()
{
return $this->deferred;
}
}
64 changes: 64 additions & 0 deletions tests/StreamingBodyParser/ContentLengthBufferedSinkTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

namespace React\Tests\Http\StreamingBodyParser;

use React\Http\StreamingBodyParser\ContentLengthBufferedSink;
use React\Http\Request;
use React\Tests\Http\TestCase;

class ContentLengthBufferedSinkTest extends TestCase
{
public function testCreatePromise()
{
$expectedBuffer = '0123456789';
$catchedBuffer = '';
$length = 10;
$request = new Request('GET', 'http://example.com/');
ContentLengthBufferedSink::createPromise($request, $length)->then(function ($buffer) use (&$catchedBuffer) {
$catchedBuffer = $buffer;
});
$request->emit('data', ['012345678']);
$request->emit('data', ['90123456789']);
$this->assertSame($expectedBuffer, $catchedBuffer);
}

public function testCancelPromise()
{
$expectedBuffer = '012345678';
$catchedBuffer = '';
$length = 10;
$request = new Request('GET', 'http://example.com/');
$promise = ContentLengthBufferedSink::createPromise($request, $length)->then(function ($buffer) use (&$catchedBuffer) {
$catchedBuffer = $buffer;
});
$request->emit('data', ['012345678']);
$promise->cancel();
$request->emit('data', ['90123456789']);
$this->assertSame($expectedBuffer, $catchedBuffer);
}

public function testRequestEnd()
{
$expectedBuffer = '012345678';
$catchedBuffer = '';
$length = 10;
$request = new Request('GET', 'http://example.com/');
ContentLengthBufferedSink::createPromise($request, $length)->then(function ($buffer) use (&$catchedBuffer) {
$catchedBuffer = $buffer;
});
$request->emit('data', ['012345678']);
$request->close();
$request->emit('data', ['90123456789']);
$this->assertSame($expectedBuffer, $catchedBuffer);
}

public function testZeroLengthBuffer()
{
$catchedBuffer = null;
$request = new Request('GET', 'http://example.com/');
ContentLengthBufferedSink::createPromise($request, 0)->then(function ($buffer) use (&$catchedBuffer) {
$catchedBuffer = $buffer;
});
$this->assertSame('', $catchedBuffer);
}
}