|
6 | 6 | use Rx\Notification\OnCompletedNotification; |
7 | 7 | use Rx\Notification\OnErrorNotification; |
8 | 8 | use Rx\Notification\OnNextNotification; |
| 9 | +use Rx\Notification\OnNextObservableNotification; |
| 10 | +use Rx\Observable; |
9 | 11 | use Rx\ObserverInterface; |
10 | 12 |
|
11 | 13 | /** |
12 | 14 | * Mock observer that records all messages. |
13 | 15 | */ |
14 | 16 | class MockObserver implements ObserverInterface |
15 | 17 | { |
| 18 | + /** @var TestScheduler */ |
16 | 19 | private $scheduler; |
| 20 | + |
| 21 | + /** @var Recorded[] */ |
17 | 22 | private $messages = []; |
18 | 23 |
|
19 | | - public function __construct($scheduler) |
| 24 | + private $startTime = 0; |
| 25 | + |
| 26 | + public function __construct($scheduler, $startTime = 0) |
20 | 27 | { |
21 | 28 | $this->scheduler = $scheduler; |
| 29 | + $this->startTime = $startTime; |
22 | 30 | } |
23 | 31 |
|
24 | 32 | public function onNext($value) |
25 | 33 | { |
| 34 | + if ($value instanceof Observable) { |
| 35 | + $notification = new OnNextObservableNotification($value, $this->scheduler); |
| 36 | + } else { |
| 37 | + $notification = new OnNextNotification($value); |
| 38 | + } |
| 39 | + |
26 | 40 | $this->messages[] = new Recorded( |
27 | | - $this->scheduler->getClock(), |
28 | | - new OnNextNotification($value) |
| 41 | + $this->scheduler->getClock() - $this->startTime, |
| 42 | + $notification |
29 | 43 | ); |
30 | 44 | } |
31 | 45 |
|
32 | 46 | public function onError(Exception $error) |
33 | 47 | { |
34 | 48 | $this->messages[] = new Recorded( |
35 | | - $this->scheduler->getClock(), |
| 49 | + $this->scheduler->getClock() - $this->startTime, |
36 | 50 | new OnErrorNotification($error) |
37 | 51 | ); |
38 | 52 | } |
39 | 53 |
|
40 | 54 | public function onCompleted() |
41 | 55 | { |
42 | 56 | $this->messages[] = new Recorded( |
43 | | - $this->scheduler->getClock(), |
| 57 | + $this->scheduler->getClock() - $this->startTime, |
44 | 58 | new OnCompletedNotification() |
45 | 59 | ); |
46 | 60 | } |
|
0 commit comments