Skip to content

Commit 7fe6411

Browse files
committed
CompositeStream starts closed if either side is already closed
1 parent 4a6c128 commit 7fe6411

File tree

3 files changed

+130
-6
lines changed

3 files changed

+130
-6
lines changed

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ descriptor based implementation with an in-memory write buffer.
3838
* [WritableResourceStream](#writableresourcestream)
3939
* [DuplexResourceStream](#duplexresourcestream)
4040
* [ThroughStream](#throughstream)
41+
* [CompositeStream](#compositestream)
4142
* [Usage](#usage)
4243
* [Install](#install)
4344
* [Tests](#tests)
@@ -1010,6 +1011,44 @@ $through->on('data', $this->expectCallableNever()));
10101011
$through->write(2);
10111012
```
10121013

1014+
### CompositeStream
1015+
1016+
The `CompositeStream` implements the
1017+
[`DuplexStreamInterface`](#duplexstreaminterface) and can be used to create a
1018+
single duplex stream from two individual streams implementing
1019+
[`ReadableStreamInterface`](#readablestreaminterface) and
1020+
[`WritableStreamInterface`](#writablestreaminterface) respectively.
1021+
1022+
This is useful for some APIs which may require a single
1023+
[`DuplexStreamInterface`](#duplexstreaminterface) or simply because it's often
1024+
more convenient to work with a single stream instance like this:
1025+
1026+
```php
1027+
$stdin = new ReadableStreamResource(STDIN, $loop);
1028+
$stdout = new WritableStreamResource(STDOUT, $loop);
1029+
1030+
$stdio = new CompositeStream($stdin, $stdout);
1031+
1032+
$stdio->on('data', function ($chunk) use ($stdio) {
1033+
$stdio->write('You said: ' . $chunk);
1034+
});
1035+
```
1036+
1037+
This is a well-behaving stream which forwards all stream events from the
1038+
underlying streams and forwards all streams calls to the underlying streams.
1039+
1040+
If you `write()` to the duplex stream, it will simply `write()` to the
1041+
writable side and return its status.
1042+
1043+
If you `end()` the duplex stream, it will `end()` the writable side and will
1044+
`pause()` the readable side.
1045+
1046+
If you `close()` the duplex stream, both input streams will be closed.
1047+
If either of the two input streams emits a `close` event, the duplex stream
1048+
will also close.
1049+
If either of the two input streams is already closed while constructing the
1050+
duplex stream, it will `close()` the other side and return a closed stream.
1051+
10131052
## Usage
10141053
```php
10151054
$loop = React\EventLoop\Factory::create();

src/CompositeStream.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ public function __construct(ReadableStreamInterface $readable, WritableStreamInt
1515
$this->readable = $readable;
1616
$this->writable = $writable;
1717

18+
if (!$readable->isReadable() || !$writable->isWritable()) {
19+
return $this->close();
20+
}
21+
1822
Util::forwardEvents($this->readable, $this, array('data', 'end', 'error'));
1923
Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe'));
2024

tests/CompositeStreamTest.php

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,68 @@
1010
*/
1111
class CompositeStreamTest extends TestCase
1212
{
13+
/** @test */
14+
public function itShouldCloseReadableIfNotWritable()
15+
{
16+
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
17+
$readable
18+
->expects($this->once())
19+
->method('isReadable')
20+
->willReturn(true);
21+
$readable
22+
->expects($this->once())
23+
->method('close');
24+
25+
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
26+
$writable
27+
->expects($this->once())
28+
->method('isWritable')
29+
->willReturn(false);
30+
31+
$composite = new CompositeStream($readable, $writable);
32+
33+
$composite->on('close', $this->expectCallableNever());
34+
$composite->close();
35+
}
36+
37+
/** @test */
38+
public function itShouldCloseWritableIfNotReadable()
39+
{
40+
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
41+
$readable
42+
->expects($this->once())
43+
->method('isReadable')
44+
->willReturn(false);
45+
46+
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
47+
$writable
48+
->expects($this->once())
49+
->method('close');
50+
51+
$composite = new CompositeStream($readable, $writable);
52+
53+
$composite->on('close', $this->expectCallableNever());
54+
$composite->close();
55+
}
56+
1357
/** @test */
1458
public function itShouldForwardWritableCallsToWritableStream()
1559
{
1660
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
61+
$readable
62+
->expects($this->once())
63+
->method('isReadable')
64+
->willReturn(true);
65+
1766
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
1867
$writable
1968
->expects($this->once())
2069
->method('write')
2170
->with('foo');
2271
$writable
23-
->expects($this->once())
24-
->method('isWritable');
72+
->expects($this->exactly(2))
73+
->method('isWritable')
74+
->willReturn(true);
2575

2676
$composite = new CompositeStream($readable, $writable);
2777
$composite->write('foo');
@@ -33,14 +83,16 @@ public function itShouldForwardReadableCallsToReadableStream()
3383
{
3484
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
3585
$readable
36-
->expects($this->once())
37-
->method('isReadable');
86+
->expects($this->exactly(2))
87+
->method('isReadable')
88+
->willReturn(true);
3889
$readable
3990
->expects($this->once())
4091
->method('pause');
4192
$readable
4293
->expects($this->once())
4394
->method('resume');
95+
4496
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
4597
$writable
4698
->expects($this->any())
@@ -57,15 +109,19 @@ public function itShouldForwardReadableCallsToReadableStream()
57109
public function itShouldNotForwardResumeIfStreamIsNotWritable()
58110
{
59111
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
112+
$readable
113+
->expects($this->once())
114+
->method('isReadable')
115+
->willReturn(true);
60116
$readable
61117
->expects($this->never())
62118
->method('resume');
63119

64120
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
65121
$writable
66-
->expects($this->once())
122+
->expects($this->exactly(2))
67123
->method('isWritable')
68-
->willReturn(false);
124+
->willReturn(true, false);
69125

70126
$composite = new CompositeStream($readable, $writable);
71127
$composite->resume();
@@ -75,7 +131,16 @@ public function itShouldNotForwardResumeIfStreamIsNotWritable()
75131
public function endShouldDelegateToWritableWithData()
76132
{
77133
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
134+
$readable
135+
->expects($this->once())
136+
->method('isReadable')
137+
->willReturn(true);
138+
78139
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
140+
$writable
141+
->expects($this->once())
142+
->method('isWritable')
143+
->willReturn(true);
79144
$writable
80145
->expects($this->once())
81146
->method('end')
@@ -89,10 +154,19 @@ public function endShouldDelegateToWritableWithData()
89154
public function closeShouldCloseBothStreams()
90155
{
91156
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
157+
$readable
158+
->expects($this->once())
159+
->method('isReadable')
160+
->willReturn(true);
92161
$readable
93162
->expects($this->once())
94163
->method('close');
164+
95165
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
166+
$writable
167+
->expects($this->once())
168+
->method('isWritable')
169+
->willReturn(true);
96170
$writable
97171
->expects($this->once())
98172
->method('close');
@@ -132,6 +206,11 @@ public function itShouldReceiveForwardedEvents()
132206
public function itShouldHandlePipingCorrectly()
133207
{
134208
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
209+
$readable
210+
->expects($this->once())
211+
->method('isReadable')
212+
->willReturn(true);
213+
135214
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
136215
$writable->expects($this->any())->method('isWritable')->willReturn(True);
137216
$writable
@@ -150,8 +229,10 @@ public function itShouldHandlePipingCorrectly()
150229
public function itShouldForwardPipeCallsToReadableStream()
151230
{
152231
$readable = new ThroughStream();
232+
153233
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
154234
$writable->expects($this->any())->method('isWritable')->willReturn(True);
235+
155236
$composite = new CompositeStream($readable, $writable);
156237

157238
$output = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();

0 commit comments

Comments
 (0)