Skip to content

Commit 844083c

Browse files
authored
Merge pull request #83 from clue-labs/readable
Add ReadableResourceStream
2 parents 05f3367 + fd4f2a3 commit 844083c

File tree

7 files changed

+515
-3
lines changed

7 files changed

+515
-3
lines changed

README.md

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ descriptor based implementation with an in-memory write buffer.
3434
* [end()](#end)
3535
* [close()](#close-1)
3636
* [DuplexStreamInterface](#duplexstreaminterface)
37+
* [ReadableResourceStream](#readableresourcestream)
3738
* [Usage](#usage)
3839
* [Install](#install)
3940
* [Tests](#tests)
@@ -693,11 +694,78 @@ on the `ReadbleStreamInterface` and `WritableStreamInterface`.
693694
See also [`ReadableStreamInterface`](#readablestreaminterface) and
694695
[`WritableStreamInterface`](#writablestreaminterface) for more details.
695696

697+
### ReadableResourceStream
698+
699+
The `ReadableResourceStream` is a concrete implementation of the
700+
[`ReadableStreamInterface`](#readablestreaminterface) for PHP's stream resources.
701+
702+
This can be used to represent a read-only resource like a file stream opened in
703+
readable mode or a stream such as `STDIN`:
704+
705+
```php
706+
$stream = new ReadableResourceStream(STDIN, $loop);
707+
$stream->on('data', function ($chunk) {
708+
echo $chunk;
709+
});
710+
$stream->on('end', function () {
711+
echo 'END';
712+
});
713+
```
714+
715+
See also [`ReadableStreamInterface`](#readablestreaminterface) for more details.
716+
717+
The first parameter given to the constructor MUST be a valid stream resource
718+
that is opened in reading mode (e.g. `fopen()` mode `r`).
719+
Otherwise, it will throw an `InvalidArgumentException`:
720+
721+
```php
722+
// throws InvalidArgumentException
723+
$stream = new ReadableResourceStream(false, $loop);
724+
```
725+
726+
Internally, this class tries to enable non-blocking mode on the stream resource
727+
which may not be supported for all stream resources.
728+
Most notably, this is not supported by pipes on Windows (STDIN etc.).
729+
If this fails, it will throw a `RuntimeException`:
730+
731+
```php
732+
// throws RuntimeException on Windows
733+
$stream = new ReadableResourceStream(STDIN, $loop);
734+
```
735+
736+
Once the constructor is called with a valid stream resource, this class will
737+
take care of the underlying stream resource.
738+
You SHOULD only use its public API and SHOULD NOT interfere with the underlying
739+
stream resource manually.
740+
Should you need to access the underlying stream resource, you can use the public
741+
`$stream` property like this:
742+
743+
```php
744+
var_dump(stream_get_meta_data($stream->stream));
745+
```
746+
747+
The `$bufferSize` property controls the maximum buffer size in bytes to read
748+
at once from the stream.
749+
This value SHOULD NOT be changed unless you know what you're doing.
750+
This can be a positive number which means that up to X bytes will be read
751+
at once from the underlying stream resource. Note that the actual number
752+
of bytes read may be lower if the stream resource has less than X bytes
753+
currently available.
754+
This can be `null` which means "read everything available" from the
755+
underlying stream resource.
756+
This should read until the stream resource is not readable anymore
757+
(i.e. underlying buffer drained), note that this does not neccessarily
758+
mean it reached EOF.
759+
760+
```php
761+
$stream->bufferSize = 8192;
762+
```
763+
696764
## Usage
697765
```php
698766
$loop = React\EventLoop\Factory::create();
699767

700-
$source = new React\Stream\Stream(fopen('omg.txt', 'r'), $loop);
768+
$source = new React\Stream\ReadableResourceStream(fopen('omg.txt', 'r'), $loop);
701769
$dest = new React\Stream\Stream(fopen('wtf.txt', 'w'), $loop);
702770

703771
$source->pipe($dest);

examples/benchmark-throughput.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
$info->write('piping from ' . $if . ' to ' . $of . ' (for max ' . $t . ' second(s)) ...'. PHP_EOL);
2323

2424
// setup input and output streams and pipe inbetween
25-
$in = new React\Stream\Stream(fopen($if, 'r'), $loop);
25+
$in = new React\Stream\ReadableResourceStream(fopen($if, 'r'), $loop);
2626
$out = new React\Stream\Stream(fopen($of, 'w'), $loop);
2727
$out->pause();
2828
$in->pipe($out);

examples/cat.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
use React\EventLoop\Factory;
44
use React\Stream\Stream;
5+
use React\Stream\ReadableResourceStream;
56

67
require __DIR__ . '/../vendor/autoload.php';
78

@@ -10,7 +11,7 @@
1011
$stdout = new Stream(STDOUT, $loop);
1112
$stdout->pause();
1213

13-
$stdin = new Stream(STDIN, $loop);
14+
$stdin = new ReadableResourceStream(STDIN, $loop);
1415
$stdin->pipe($stdout);
1516

1617
$loop->run();

src/ReadableResourceStream.php

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
<?php
2+
3+
namespace React\Stream;
4+
5+
use Evenement\EventEmitter;
6+
use React\EventLoop\LoopInterface;
7+
use InvalidArgumentException;
8+
9+
class ReadableResourceStream extends EventEmitter implements ReadableStreamInterface
10+
{
11+
/**
12+
* Controls the maximum buffer size in bytes to read at once from the stream.
13+
*
14+
* This value SHOULD NOT be changed unless you know what you're doing.
15+
*
16+
* This can be a positive number which means that up to X bytes will be read
17+
* at once from the underlying stream resource. Note that the actual number
18+
* of bytes read may be lower if the stream resource has less than X bytes
19+
* currently available.
20+
*
21+
* This can be `null` which means read everything available from the
22+
* underlying stream resource.
23+
* This should read until the stream resource is not readable anymore
24+
* (i.e. underlying buffer drained), note that this does not neccessarily
25+
* mean it reached EOF.
26+
*
27+
* @var int|null
28+
*/
29+
public $bufferSize = 65536;
30+
31+
/**
32+
* @var resource
33+
*/
34+
public $stream;
35+
36+
private $closed = false;
37+
private $loop;
38+
39+
public function __construct($stream, LoopInterface $loop)
40+
{
41+
if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
42+
throw new InvalidArgumentException('First parameter must be a valid stream resource');
43+
}
44+
45+
// ensure resource is opened for reading (fopen mode must contain "r" or "+")
46+
$meta = stream_get_meta_data($stream);
47+
if (isset($meta['mode']) && strpos($meta['mode'], 'r') === strpos($meta['mode'], '+')) {
48+
throw new InvalidArgumentException('Given stream resource is not opened in read mode');
49+
}
50+
51+
// this class relies on non-blocking I/O in order to not interrupt the event loop
52+
// e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
53+
if (stream_set_blocking($stream, 0) !== true) {
54+
throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
55+
}
56+
57+
// Use unbuffered read operations on the underlying stream resource.
58+
// Reading chunks from the stream may otherwise leave unread bytes in
59+
// PHP's stream buffers which some event loop implementations do not
60+
// trigger events on (edge triggered).
61+
// This does not affect the default event loop implementation (level
62+
// triggered), so we can ignore platforms not supporting this (HHVM).
63+
// Pipe streams (such as STDIN) do not seem to require this and legacy
64+
// PHP < 5.4 causes SEGFAULTs on unbuffered pipe streams, so skip this.
65+
if (function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
66+
stream_set_read_buffer($stream, 0);
67+
}
68+
69+
$this->stream = $stream;
70+
$this->loop = $loop;
71+
72+
$this->resume();
73+
}
74+
75+
public function isReadable()
76+
{
77+
return !$this->closed;
78+
}
79+
80+
public function pause()
81+
{
82+
$this->loop->removeReadStream($this->stream);
83+
}
84+
85+
public function resume()
86+
{
87+
if (!$this->closed) {
88+
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
89+
}
90+
}
91+
92+
public function pipe(WritableStreamInterface $dest, array $options = array())
93+
{
94+
return Util::pipe($this, $dest, $options);
95+
}
96+
97+
public function close()
98+
{
99+
if ($this->closed) {
100+
return;
101+
}
102+
103+
$this->closed = true;
104+
105+
$this->emit('close');
106+
$this->loop->removeStream($this->stream);
107+
$this->removeAllListeners();
108+
109+
$this->handleClose();
110+
}
111+
112+
/** @internal */
113+
public function handleData()
114+
{
115+
$error = null;
116+
set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
117+
$error = new \ErrorException(
118+
$errstr,
119+
0,
120+
$errno,
121+
$errfile,
122+
$errline
123+
);
124+
});
125+
126+
$data = stream_get_contents($this->stream, $this->bufferSize === null ? -1 : $this->bufferSize);
127+
128+
restore_error_handler();
129+
130+
if ($error !== null) {
131+
$this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
132+
$this->close();
133+
return;
134+
}
135+
136+
if ($data !== '') {
137+
$this->emit('data', array($data));
138+
} else{
139+
// no data read => we reached the end and close the stream
140+
$this->emit('end');
141+
$this->close();
142+
}
143+
}
144+
145+
/** @internal */
146+
public function handleClose()
147+
{
148+
if (is_resource($this->stream)) {
149+
fclose($this->stream);
150+
}
151+
}
152+
153+
/**
154+
* Returns whether this is a pipe resource in a legacy environment
155+
*
156+
* @param resource $resource
157+
* @return bool
158+
*
159+
* @codeCoverageIgnore
160+
*/
161+
private function isLegacyPipe($resource)
162+
{
163+
if (PHP_VERSION_ID < 50400) {
164+
$meta = stream_get_meta_data($resource);
165+
166+
if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
167+
return true;
168+
}
169+
}
170+
return false;
171+
}
172+
}

src/Stream.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ public function getBuffer()
201201
*
202202
* @param resource $resource
203203
* @return bool
204+
*
205+
* @codeCoverageIgnore
204206
*/
205207
private function isLegacyPipe($resource)
206208
{

tests/EnforceBlockingWrapper.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ public function stream_cast($cast_as)
1919
return false;
2020
}
2121

22+
public function stream_eof()
23+
{
24+
return false;
25+
}
26+
2227
public function stream_set_option($option, $arg1, $arg2)
2328
{
2429
if ($option === STREAM_OPTION_BLOCKING) {

0 commit comments

Comments
 (0)