77use Rx \Notification \OnCompletedNotification ;
88use Rx \Notification \OnErrorNotification ;
99use Rx \Notification \OnNextNotification ;
10+ use Rx \Notification \OnNextObservableNotification ;
11+ use Rx \Observable ;
1012use Rx \ObserverInterface ;
1113
1214/**
@@ -16,32 +18,40 @@ class MockObserver implements ObserverInterface
1618{
1719 private $ scheduler ;
1820 private $ messages = [];
21+ private $ startTime = 0 ;
1922
20- public function __construct (TestScheduler $ scheduler )
23+ public function __construct (TestScheduler $ scheduler, int $ startTime = 0 )
2124 {
2225 $ this ->scheduler = $ scheduler ;
26+ $ this ->startTime = $ startTime ;
2327 }
2428
2529 public function onNext ($ value )
2630 {
31+ if ($ value instanceof Observable) {
32+ $ notification = new OnNextObservableNotification ($ value , $ this ->scheduler );
33+ } else {
34+ $ notification = new OnNextNotification ($ value );
35+ }
36+
2737 $ this ->messages [] = new Recorded (
28- $ this ->scheduler ->getClock (),
29- new OnNextNotification ( $ value )
38+ $ this ->scheduler ->getClock () - $ this -> startTime ,
39+ $ notification
3040 );
3141 }
3242
3343 public function onError (\Throwable $ error )
3444 {
3545 $ this ->messages [] = new Recorded (
36- $ this ->scheduler ->getClock (),
46+ $ this ->scheduler ->getClock () - $ this -> startTime ,
3747 new OnErrorNotification ($ error )
3848 );
3949 }
4050
4151 public function onCompleted ()
4252 {
4353 $ this ->messages [] = new Recorded (
44- $ this ->scheduler ->getClock (),
54+ $ this ->scheduler ->getClock () - $ this -> startTime ,
4555 new OnCompletedNotification ()
4656 );
4757 }
0 commit comments