Skip to content

Commit 9756162

Browse files
authored
Merge pull request #123 from mbonneau/2.x_tests
More tests and removed some unreachable code
2 parents 850f70f + b46de82 commit 9756162

16 files changed

+474
-45
lines changed

src/Observer/ScheduledObserver.php

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use Rx\ObserverInterface;
77
use Rx\SchedulerInterface;
88

9-
class ScheduledObserver extends AbstractObserver
9+
final class ScheduledObserver extends AbstractObserver
1010
{
1111
/** @var SchedulerInterface */
1212
private $scheduler;
@@ -15,13 +15,13 @@ class ScheduledObserver extends AbstractObserver
1515
private $observer;
1616

1717
/** @var bool */
18-
public $isAcquired = false;
18+
private $isAcquired = false;
1919

2020
/** @var bool */
2121
private $hasFaulted = false;
2222

2323
/** @var \Closure[] */
24-
public $queue = [];
24+
private $queue = [];
2525

2626
/** @var SerialDisposable */
2727
private $disposable;
@@ -84,17 +84,12 @@ function ($recurse) {
8484
return;
8585
}
8686
try {
87-
if (!is_callable($work)) {
88-
throw new \Exception('work is not callable');
89-
}
90-
$res = $work();
87+
$work();
9188
} catch (\Throwable $e) {
92-
$res = $e;
93-
}
94-
if ($res instanceof \Throwable) {
9589
$parent->queue = [];
9690
$parent->hasFaulted = true;
97-
throw $res;
91+
92+
throw $e;
9893
}
9994
$recurse($parent);
10095
}

src/Operator/ConcatAllOperator.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@
66
use Rx\Disposable\EmptyDisposable;
77
use Rx\Disposable\SerialDisposable;
88
use Rx\DisposableInterface;
9-
use Rx\Observable;
109
use Rx\ObservableInterface;
1110
use Rx\Observer\CallbackObserver;
1211
use Rx\ObserverInterface;
13-
use Rx\SchedulerInterface;
1412

1513
final class ConcatAllOperator implements OperatorInterface
1614
{

src/Operator/GroupByUntilOperator.php

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,21 +82,11 @@ function ($value) use (&$map, $keySelector, $elementSelector, $durationSelector,
8282

8383
$fireNewMapEntry = false;
8484

85-
try {
86-
if (!isset($map[$serializedKey])) {
87-
$map[$serializedKey] = new Subject();
88-
$fireNewMapEntry = true;
89-
}
90-
$writer = $map[$serializedKey];
91-
92-
} catch (\Throwable $e) {
93-
foreach ($map as $groupObserver) {
94-
$groupObserver->onError($e);
95-
}
96-
$observer->onError($e);
97-
98-
return;
85+
if (!isset($map[$serializedKey])) {
86+
$map[$serializedKey] = new Subject();
87+
$fireNewMapEntry = true;
9988
}
89+
$writer = $map[$serializedKey];
10090

10191
if ($fireNewMapEntry) {
10292
$group = new GroupedObservable($key, $writer, $refCountDisposable);

src/Operator/RetryWhenOperator.php

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,7 @@ public function __invoke(ObservableInterface $observable, ObserverInterface $obs
3434
return new EmptyDisposable();
3535
}
3636

37-
$subscribeToSource = function () use (
38-
$observer,
39-
$disposable,
40-
$observable,
41-
&$sourceError,
42-
$errors,
43-
&$sourceDisposable,
44-
&$innerCompleted
45-
) {
37+
$subscribeToSource = function () use ($observer, $disposable, $observable, &$sourceError, $errors, &$sourceDisposable, &$innerCompleted) {
4638
$sourceError = false;
4739
$sourceDisposable = $observable->subscribe(new CallbackObserver(
4840
[$observer, 'onNext'],

src/Scheduler/ImmediateScheduler.php

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@ public function schedule(callable $action, $delay = 0): DisposableInterface
2323

2424
public function scheduleRecursive(callable $action): DisposableInterface
2525
{
26-
27-
if (!is_callable($action)) {
28-
throw new InvalidArgumentException('Action should be a callable.');
29-
}
30-
3126
$goAgain = true;
3227
$disposable = new CompositeDisposable();
3328

src/Scheduler/VirtualTimeScheduler.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ public function schedule(callable $action, $delay = 0): DisposableInterface
3838

3939
public function scheduleRecursive(callable $action): DisposableInterface
4040
{
41-
if (!is_callable($action)) {
42-
throw new \InvalidArgumentException('Action should be a callable.');
43-
}
44-
4541
$goAgain = true;
4642
$disposable = new SerialDisposable();
4743

test/Rx/Functional/Operator/ConcatAllTest.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,31 @@ public function concatAll_timer_missing_item()
3232
onCompleted(216)
3333
], $results->getMessages());
3434
}
35+
36+
/**
37+
* @test
38+
*/
39+
public function concatAll_errors_when_exception_during_inner_subscribe()
40+
{
41+
$o1 = Observable::create(function () {
42+
throw new \Exception("Exception in inner subscribe");
43+
});
44+
45+
$xs = $this->createHotObservable([
46+
onNext(300, $o1),
47+
onCompleted(400)
48+
]);
49+
50+
$result = $this->scheduler->startWithCreate(function () use ($xs) {
51+
return $xs->concatAll();
52+
});
53+
54+
$this->assertMessages([
55+
onError(300, new \Exception())
56+
], $result->getMessages());
57+
58+
$this->assertSubscriptions([
59+
subscribe(200, 300)
60+
], $xs->getSubscriptions());
61+
}
3562
}

test/Rx/Functional/Operator/GroupByUntilTest.php

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,80 @@ function (Observable $g) {
8080
$this->assertEquals(12, $keyInvoked);
8181
}
8282

83+
/**
84+
* @test
85+
*/
86+
public function groupByUntilWithKeyComparerDefaultDurationSelector()
87+
{
88+
$keyInvoked = 0;
89+
90+
$xs = $this->createHotObservable(
91+
[
92+
onNext(90, new \Exception()),
93+
onNext(110, new \Exception()),
94+
onNext(130, new \Exception()),
95+
onNext(220, ' foo'),
96+
onNext(240, ' FoO '),
97+
onNext(270, 'baR '),
98+
onNext(310, 'foO '),
99+
onNext(350, ' Baz '),
100+
onNext(360, ' qux '),
101+
onNext(390, ' bar'),
102+
onNext(420, ' BAR '),
103+
onNext(470, 'FOO '),
104+
onNext(480, 'baz '),
105+
onNext(510, ' bAZ '),
106+
onNext(530, ' fOo '),
107+
onCompleted(570),
108+
onNext(580, new \Exception()),
109+
onCompleted(600),
110+
onError(650, new \Exception())
111+
]
112+
);
113+
114+
$results = $this->scheduler->startWithCreate(function () use ($xs, &$keyInvoked) {
115+
return $xs->groupByUntil(
116+
function ($x) use (&$keyInvoked) {
117+
$keyInvoked++;
118+
return trim(strtolower($x));
119+
},
120+
function ($x) {
121+
return $x;
122+
}
123+
)->map(function (GroupedObservable $x) {
124+
return $x->getKey();
125+
});
126+
});
127+
128+
$this->assertMessages(
129+
[
130+
onNext(220, 'foo'),
131+
onNext(240, 'foo'),
132+
onNext(270, 'bar'),
133+
onNext(310, 'foo'),
134+
onNext(350, 'baz'),
135+
onNext(360, 'qux'),
136+
onNext(390, 'bar'),
137+
onNext(420, 'bar'),
138+
onNext(470, 'foo'),
139+
onNext(480, 'baz'),
140+
onNext(510, 'baz'),
141+
onNext(530, 'foo'),
142+
onCompleted(570)
143+
],
144+
$results->getMessages()
145+
);
146+
147+
$this->assertSubscriptions(
148+
[
149+
subscribe(200, 570)
150+
],
151+
$xs->getSubscriptions()
152+
);
153+
154+
$this->assertEquals(12, $keyInvoked);
155+
}
156+
83157
/**
84158
* @test
85159
*/

test/Rx/Functional/Operator/RepeatTest.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,4 +416,27 @@ public function repeat_Observable_repeat_count_throws_4()
416416

417417
$xss->subscribe(new CallbackObserver());
418418
}
419+
420+
/**
421+
* @test
422+
*/
423+
public function repeat_returns_empty_when_count_is_zero()
424+
{
425+
$xs = $this->createColdObservable([
426+
onNext(5, 1),
427+
onNext(10, 2),
428+
onNext(15, 3),
429+
onCompleted(20)
430+
]);
431+
432+
$result = $this->scheduler->startWithCreate(function () use ($xs) {
433+
return $xs->repeat(0);
434+
});
435+
436+
$this->assertMessages([
437+
onCompleted(200)
438+
], $result->getMessages());
439+
440+
$this->assertSubscriptions([], $xs->getSubscriptions());
441+
}
419442
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
namespace Rx\Observable;
4+
5+
use Rx\DisposableInterface;
6+
use Rx\Observable;
7+
use Rx\Subject\Subject;
8+
use Rx\TestCase;
9+
10+
class RefCountObservableTest extends TestCase
11+
{
12+
public function testRefCountDisposableOnlyDisposesOnce()
13+
{
14+
$source = $this->createMock(ConnectableObservable::class);
15+
16+
$innerDisp = $this->createMock(DisposableInterface::class);
17+
18+
$source
19+
->expects($this->once())
20+
->method('subscribe')
21+
->willReturn($innerDisp);
22+
23+
$innerDisp
24+
->expects($this->once())
25+
->method('dispose');
26+
27+
$observable = new RefCountObservable($source);
28+
29+
$subscription = $observable->subscribe();
30+
31+
$subscription->dispose();
32+
$subscription->dispose();
33+
34+
35+
}
36+
37+
public function testDisposesWhenRefcountReachesZero()
38+
{
39+
40+
}
41+
}

0 commit comments

Comments
 (0)