Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src/UnwrapReadableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function __construct(PromiseInterface $promise)

$this->promise = $promise->then(
function ($stream) {
if (!($stream instanceof ReadableStreamInterface)) {
if (!$stream instanceof ReadableStreamInterface) {
throw new InvalidArgumentException('Not a readable stream');
}
return $stream;
Expand Down Expand Up @@ -80,6 +80,9 @@ function ($e) use ($out, &$closed) {
$out->emit('error', array($e, $out));
$out->close();
}

// resume() and pause() may attach to this promise, so ensure we actually reject here
throw $e;
}
);
}
Expand All @@ -91,16 +94,20 @@ public function isReadable()

public function pause()
{
$this->promise->then(function (ReadableStreamInterface $stream) {
$stream->pause();
});
if ($this->promise !== null) {
$this->promise->then(function (ReadableStreamInterface $stream) {
$stream->pause();
});
}
}

public function resume()
{
$this->promise->then(function (ReadableStreamInterface $stream) {
$stream->resume();
});
if ($this->promise !== null) {
$this->promise->then(function (ReadableStreamInterface $stream) {
$stream->resume();
});
}
}

public function pipe(WritableStreamInterface $dest, array $options = array())
Expand All @@ -122,7 +129,9 @@ public function close()
if ($this->promise instanceof CancellablePromiseInterface) {
$this->promise->cancel();
}
$this->promise = null;

$this->emit('close', array($this));
$this->emit('close');
$this->removeAllListeners();
}
}
6 changes: 4 additions & 2 deletions src/UnwrapWritableStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function __construct(PromiseInterface $promise)

$this->promise = $promise->then(
function ($stream) {
if (!($stream instanceof WritableStreamInterface)) {
if (!$stream instanceof WritableStreamInterface) {
throw new InvalidArgumentException('Not a writable stream');
}
return $stream;
Expand Down Expand Up @@ -156,7 +156,9 @@ public function close()
if ($this->promise instanceof CancellablePromiseInterface) {
$this->promise->cancel();
}
$this->promise = $this->stream = null;

$this->emit('close', array($this));
$this->emit('close');
$this->removeAllListeners();
}
}
63 changes: 63 additions & 0 deletions tests/UnwrapReadableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,30 @@ public function testForwardsPauseToInputStream()
$stream->pause();
}

/**
* @doesNotPerformAssertions
*/
public function testPauseAfterCloseHasNoEffect()
{
$promise = new \React\Promise\Promise(function () { });
$stream = Stream\unwrapReadable($promise);

$stream->close();
$stream->pause();
}


/**
* @doesNotPerformAssertions
*/
public function testPauseAfterErrorDueToInvalidInputHasNoEffect()
{
$promise = \React\Promise\reject(new \RuntimeException());
$stream = Stream\unwrapReadable($promise);

$stream->pause();
}

public function testForwardsResumeToInputStream()
{
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
Expand All @@ -211,6 +235,18 @@ public function testForwardsResumeToInputStream()
$stream->resume();
}

/**
* @doesNotPerformAssertions
*/
public function testResumeAfterCloseHasNoEffect()
{
$promise = new \React\Promise\Promise(function () { });
$stream = Stream\unwrapReadable($promise);

$stream->close();
$stream->resume();
}

public function testPipingStreamWillForwardDataEvents()
{
$input = new ThroughStream();
Expand Down Expand Up @@ -279,4 +315,31 @@ public function testClosingStreamWillCloseStreamFromCancellationHandler()

$this->assertFalse($input->isReadable());
}

public function testCloseShouldRemoveAllListenersAfterCloseEvent()
{
$promise = new \React\Promise\Promise(function () { });
$stream = Stream\unwrapReadable($promise);

$stream->on('close', $this->expectCallableOnce());
$this->assertCount(1, $stream->listeners('close'));

$stream->close();

$this->assertCount(0, $stream->listeners('close'));
}

public function testCloseShouldRemoveReferenceToPromiseToAvoidGarbageReferences()
{
$promise = new \React\Promise\Promise(function () { });
$stream = Stream\unwrapReadable($promise);

$stream->close();

$ref = new \ReflectionProperty($stream, 'promise');
$ref->setAccessible(true);
$value = $ref->getValue($stream);

$this->assertNull($value);
}
}
33 changes: 33 additions & 0 deletions tests/UnwrapWritableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -352,4 +352,37 @@ public function testClosingStreamWillCloseStreamFromCancellationHandler()

$this->assertFalse($input->isWritable());
}

public function testCloseShouldRemoveAllListenersAfterCloseEvent()
{
$promise = new \React\Promise\Promise(function () { });
$stream = Stream\unwrapWritable($promise);

$stream->on('close', $this->expectCallableOnce());
$this->assertCount(1, $stream->listeners('close'));

$stream->close();

$this->assertCount(0, $stream->listeners('close'));
}

public function testCloseShouldRemoveReferenceToPromiseAndStreamToAvoidGarbageReferences()
{
$promise = \React\Promise\resolve(new ThroughStream());
$stream = Stream\unwrapWritable($promise);

$stream->close();

$ref = new \ReflectionProperty($stream, 'promise');
$ref->setAccessible(true);
$value = $ref->getValue($stream);

$this->assertNull($value);

$ref = new \ReflectionProperty($stream, 'stream');
$ref->setAccessible(true);
$value = $ref->getValue($stream);

$this->assertNull($value);
}
}