11import Operator from '../Operator' ;
2- import Observer from '../Observer' ;
32import Scheduler from '../Scheduler' ;
43import Subscriber from '../Subscriber' ;
54import Notification from '../Notification' ;
65import immediate from '../schedulers/immediate' ;
6+ import isDate from '../util/isDate' ;
77
8- export default function delay < T > ( delay : number , scheduler : Scheduler = immediate ) {
9- return this . lift ( new DelayOperator ( delay , scheduler ) ) ;
8+ export default function delay < T > ( delay : number | Date ,
9+ scheduler : Scheduler = immediate ) {
10+ let absoluteDelay = isDate ( delay ) ;
11+ let delayFor = absoluteDelay ? ( + delay - scheduler . now ( ) ) : < number > delay ;
12+ return this . lift ( new DelayOperator ( delayFor , scheduler ) ) ;
1013}
1114
1215class DelayOperator < T , R > implements Operator < T , R > {
13-
14- delay : number ;
15- scheduler : Scheduler ;
16-
17- constructor ( delay : number , scheduler : Scheduler ) {
18- this . delay = delay ;
19- this . scheduler = scheduler ;
16+ constructor ( private delay : number ,
17+ private scheduler : Scheduler ) {
2018 }
2119
2220 call ( subscriber : Subscriber < T > ) : Subscriber < T > {
@@ -25,21 +23,20 @@ class DelayOperator<T, R> implements Operator<T, R> {
2523}
2624
2725class DelaySubscriber < T > extends Subscriber < T > {
26+ private queue : Array < any > = [ ] ;
27+ private active : boolean = false ;
28+ private errored : boolean = false ;
2829
29- protected delay : number ;
30- protected queue : Array < any > = [ ] ;
31- protected scheduler : Scheduler ;
32- protected active : boolean = false ;
33- protected errored : boolean = false ;
34-
35- static dispatch ( state ) {
30+ private static dispatch ( state ) : void {
3631 const source = state . source ;
3732 const queue = source . queue ;
3833 const scheduler = state . scheduler ;
3934 const destination = state . destination ;
35+
4036 while ( queue . length > 0 && ( queue [ 0 ] . time - scheduler . now ( ) ) <= 0 ) {
4137 queue . shift ( ) . notification . observe ( destination ) ;
4238 }
39+
4340 if ( queue . length > 0 ) {
4441 let delay = Math . max ( 0 , queue [ 0 ] . time - scheduler . now ( ) ) ;
4542 ( < any > this ) . schedule ( state , delay ) ;
@@ -48,56 +45,50 @@ class DelaySubscriber<T> extends Subscriber<T> {
4845 }
4946 }
5047
51- constructor ( destination : Subscriber < T > , delay : number , scheduler : Scheduler ) {
48+ constructor ( destination : Subscriber < T > ,
49+ private delay : number ,
50+ private scheduler : Scheduler ) {
5251 super ( destination ) ;
53- this . delay = delay ;
54- this . scheduler = scheduler ;
5552 }
5653
57- _next ( x ) {
58- if ( this . errored ) {
54+ private _schedule ( scheduler : Scheduler ) : void {
55+ this . active = true ;
56+ this . add ( scheduler . schedule ( DelaySubscriber . dispatch , this . delay , {
57+ source : this , destination : this . destination , scheduler : scheduler
58+ } ) ) ;
59+ }
60+
61+ private scheduleNotification ( notification : Notification < any > ) : void {
62+ if ( this . errored === true ) {
5963 return ;
6064 }
65+
6166 const scheduler = this . scheduler ;
62- this . queue . push ( new DelayMessage < T > ( scheduler . now ( ) + this . delay , Notification . createNext ( x ) ) ) ;
67+ let message = new DelayMessage < T > ( scheduler . now ( ) + this . delay , notification ) ;
68+ this . queue . push ( message ) ;
69+
6370 if ( this . active === false ) {
6471 this . _schedule ( scheduler ) ;
6572 }
6673 }
6774
68- _error ( e ) {
69- const scheduler = this . scheduler ;
70- this . errored = true ;
71- this . queue = [ new DelayMessage < T > ( scheduler . now ( ) + this . delay , Notification . createError ( e ) ) ] ;
72- if ( this . active === false ) {
73- this . _schedule ( scheduler ) ;
74- }
75+ _next ( value : T ) {
76+ this . scheduleNotification ( Notification . createNext ( value ) ) ;
7577 }
7678
77- _complete ( ) {
78- if ( this . errored ) {
79- return ;
80- }
81- const scheduler = this . scheduler ;
82- this . queue . push ( new DelayMessage < T > ( scheduler . now ( ) + this . delay , Notification . createComplete ( ) ) ) ;
83- if ( this . active === false ) {
84- this . _schedule ( scheduler ) ;
85- }
79+ _error ( err ) {
80+ this . errored = true ;
81+ this . queue = [ ] ;
82+ this . destination . error ( err ) ;
8683 }
8784
88- _schedule ( scheduler ) {
89- this . active = true ;
90- this . add ( scheduler . schedule ( DelaySubscriber . dispatch , this . delay , {
91- source : this , destination : this . destination , scheduler : scheduler
92- } ) ) ;
85+ _complete ( ) {
86+ this . scheduleNotification ( Notification . createComplete ( ) ) ;
9387 }
9488}
9589
9690class DelayMessage < T > {
97- time : number ;
98- notification : any ;
99- constructor ( time : number , notification : any ) {
100- this . time = time ;
101- this . notification = notification ;
91+ constructor ( private time : number ,
92+ private notification : any ) {
10293 }
10394}
0 commit comments