@@ -27,17 +27,8 @@ export type Operator<T, R> = UnaryFunction<Subscribable<T>, Subscribable<R>>
27
27
*/
28
28
export type Subscribable < T > = {
29
29
subscribe : ( observer : Partial < Observer < T > > ) => Unsubscribe
30
- readonly lastValue : T | undefined
31
- pipe < A > ( op1 : Operator < T , A > ) : Subscribable < A > ;
32
- pipe < A , B > ( op1 : Operator < T , A > , op2 : Operator < A , B > ) : Subscribable < B > ;
33
- pipe < A , B , C > ( op1 : Operator < T , A > , op2 : Operator < A , B > , op3 : Operator < B , C > ) : Subscribable < C > ;
34
- pipe < A , B , C , D > ( op1 : Operator < T , A > , op2 : Operator < A , B > , op3 : Operator < B , C > , op4 : Operator < C , D > ) : Subscribable < D > ;
35
- pipe < A , B , C , D , E > ( op1 : Operator < T , A > , op2 : Operator < A , B > , op3 : Operator < B , C > , op4 : Operator < C , D > , op5 : Operator < D , E > ) : Subscribable < E > ;
36
- pipe < A , B , C , D , E , F > ( op1 : Operator < T , A > , op2 : Operator < A , B > , op3 : Operator < B , C > , op4 : Operator < C , D > , op5 : Operator < D , E > , op6 : Operator < E , F > ) : Subscribable < F > ;
37
- pipe < A , B , C , D , E , F , G > ( op1 : Operator < T , A > , op2 : Operator < A , B > , op3 : Operator < B , C > , op4 : Operator < C , D > , op5 : Operator < D , E > , op6 : Operator < E , F > , op7 : Operator < F , G > ) : Subscribable < G > ;
38
- pipe < A , B , C , D , E , F , G , H > ( op1 : Operator < T , A > , op2 : Operator < A , B > , op3 : Operator < B , C > , op4 : Operator < C , D > , op5 : Operator < D , E > , op6 : Operator < E , F > , op7 : Operator < F , G > , op8 : Operator < G , H > ) : Subscribable < H > ;
39
- pipe < A , B , C , D , E , F , G , H , I > ( op1 : Operator < T , A > , op2 : Operator < A , B > , op3 : Operator < B , C > , op4 : Operator < C , D > , op5 : Operator < D , E > , op6 : Operator < E , F > , op7 : Operator < F , G > , op8 : Operator < G , H > , op9 : Operator < H , I > ) : Subscribable < I > ;
40
- pipe < A , B , C , D , E , F , G , H , I , J > ( op1 : Operator < T , A > , op2 : Operator < A , B > , op3 : Operator < B , C > , op4 : Operator < C , D > , op5 : Operator < D , E > , op6 : Operator < E , F > , op7 : Operator < F , G > , op8 : Operator < G , H > , op9 : Operator < H , I > , op10 : Operator < I , J > ) : Subscribable < J > ;
30
+ readonly get : T | undefined
31
+ readonly getState : State < T >
41
32
}
42
33
43
34
/**
@@ -70,14 +61,19 @@ export type Subscriber<T> = {
70
61
complete : ( ) => void
71
62
}
72
63
64
+ type ControllerBase < T > = Subscriber < T > & {
65
+ readonly get : ( ) => T | undefined
66
+ readonly getState : ( ) => State < T >
67
+ }
68
+
73
69
/** Utility type to force tuple Subscriber and controller */
74
70
export type ControllableObservable < T , V > = readonly [ Subscribable < T > , V ]
75
71
76
72
/**
77
73
* Represents a push-based observable with a subscriber.
78
74
* @template T The type of the value emitted by the observable.
79
75
*/
80
- export type PushObservable < T > = ControllableObservable < T , Subscriber < T > >
76
+ export type PushObservable < T > = ControllableObservable < T , ControllerBase < T > >
81
77
82
78
type SubjectInit < T > = {
83
79
kind : 'init'
@@ -87,7 +83,7 @@ type SubjectInit<T> = {
87
83
type SubjectActive < T > = {
88
84
kind : 'active'
89
85
subscribers : Set < Partial < Observer < T > > >
90
- lastValue ? : T
86
+ lastValue : T
91
87
}
92
88
93
89
type SubjectError = {
@@ -100,6 +96,9 @@ type SubjectComplete = {
100
96
}
101
97
102
98
type Subject < T > = SubjectInit < T > | SubjectActive < T > | SubjectError | SubjectComplete
99
+ type State < T > =
100
+ | { kind : 'active' , value : T }
101
+ | { kind : 'inactive' , value : undefined }
103
102
104
103
const noops : Unsubscribe = ( ) => { }
105
104
@@ -109,7 +108,7 @@ const noops: Unsubscribe = () => { }
109
108
* @param {T } [initialValue] - The initial value to emit.
110
109
* @returns {PushObservable<T> } The created push-based observable.
111
110
*/
112
- export function pushObservable < T > ( initialValue ?: T ) : PushObservable < T > {
111
+ export function createObservable < T > ( initialValue ?: T ) : PushObservable < T > {
113
112
let subject : Subject < T > = initialValue === undefined ? {
114
113
kind : 'init' ,
115
114
subscribers : new Set ( )
@@ -125,12 +124,11 @@ export function pushObservable<T>(initialValue?: T): PushObservable<T> {
125
124
if ( subject . kind === 'init' ) {
126
125
subject = {
127
126
kind : 'active' ,
128
- subscribers : subject . subscribers
127
+ subscribers : subject . subscribers ,
128
+ lastValue : value
129
129
} ;
130
130
}
131
131
132
- ( subject as SubjectActive < T > ) . lastValue = value
133
-
134
132
for ( const sub of subject . subscribers ) {
135
133
sub . next ?.( value ) ;
136
134
}
@@ -154,10 +152,20 @@ export function pushObservable<T>(initialValue?: T): PushObservable<T> {
154
152
}
155
153
}
156
154
157
- const subscriber : Subscriber < T > = {
155
+ const controller : ControllerBase < T > = {
158
156
next : handleNext ,
159
157
error : handleError ,
160
- complete : handleComplete
158
+ complete : handleComplete ,
159
+ get : ( ) => {
160
+ if ( subject . kind === 'active' ) {
161
+ return subject . lastValue ;
162
+ }
163
+ } ,
164
+ getState : ( ) => {
165
+ return subject . kind === 'active'
166
+ ? { kind : 'active' , value : subject . lastValue }
167
+ : { kind : 'inactive' , value : undefined }
168
+ }
161
169
} ;
162
170
163
171
const observable : Subscribable < T > = {
@@ -176,9 +184,7 @@ export function pushObservable<T>(initialValue?: T): PushObservable<T> {
176
184
complete : obs . complete ?? noops
177
185
} ) ;
178
186
179
- if ( subject . lastValue !== undefined ) {
180
- obs . next ?.( subject . lastValue )
181
- }
187
+ obs . next ?.( subject . lastValue )
182
188
183
189
break
184
190
}
@@ -198,50 +204,17 @@ export function pushObservable<T>(initialValue?: T): PushObservable<T> {
198
204
}
199
205
} ;
200
206
} ,
201
- pipe : ( ( ...ops : Operator < unknown , unknown > [ ] ) : Subscribable < unknown > => {
202
- return pipe ( observable , ...ops ) ;
203
- // biome-ignore lint/suspicious/noExplicitAny: <explanation>
204
- } ) as any ,
205
- get lastValue ( ) {
207
+ get get ( ) {
206
208
return subject . kind === 'active' ? subject . lastValue : undefined ;
209
+ } ,
210
+ get getState ( ) {
211
+ return subject . kind === 'active'
212
+ ? { kind : 'active' , value : subject . lastValue } as const
213
+ : { kind : 'inactive' , value : undefined } as const
207
214
}
208
215
}
209
216
210
- return [ observable , subscriber ] ;
211
- }
212
-
213
- /**
214
- * Also known as cold observable, the Input will be called and cleaned as a new subscriber subscribes
215
- * @param observer
216
- * @returns
217
- */
218
- /**
219
- * Creates a pull-based observable.
220
- * @template T The type of the value emitted by the observable.
221
- * @param {ObservableInput<T> } observer - The observer function to call for each subscription.
222
- * @returns {Subscribable<T> } The created pull-based observable.
223
- */
224
- export function pullObservable < T > ( observer : ObservableInput < T > ) : Subscribable < T > {
225
- const observable = {
226
- subscribe : ( subscriber ) => {
227
- const unsub = observer ( {
228
- next : subscriber . next ?. bind ( subscriber ) ,
229
- error : subscriber . error ?. bind ( subscriber ) ,
230
- complete : ( ) => {
231
- subscriber . complete ?.( ) ;
232
- unsub ( ) ; // auto cleanup on complete
233
- }
234
- } ) ;
235
- return unsub ;
236
- } ,
237
- pipe : ( ( ...ops : Operator < unknown , unknown > [ ] ) : Subscribable < unknown > => {
238
- return pipe ( observable , ...ops ) ;
239
- // biome-ignore lint/suspicious/noExplicitAny: <explanation>
240
- } ) as any ,
241
- lastValue : undefined
242
- } satisfies Subscribable < T > ;
243
-
244
- return observable ;
217
+ return [ observable , controller ] ;
245
218
}
246
219
247
220
/**
@@ -289,7 +262,7 @@ export const operators = {
289
262
*/
290
263
map < T , R > ( fn : ( value : T ) => R ) : Operator < T , R > {
291
264
return ( source ) => {
292
- const [ observable , subscriber ] = pushObservable < R > ( ) ;
265
+ const [ observable , subscriber ] = createObservable < R > ( ) ;
293
266
const unsub = source . subscribe ( {
294
267
next : ( val ) => {
295
268
try {
@@ -321,7 +294,7 @@ export const operators = {
321
294
*/
322
295
filter < T > ( predicate : ( value : T ) => boolean ) : Operator < T , T > {
323
296
return ( source ) => {
324
- const [ observable , subscriber ] = pushObservable < T > ( ) ;
297
+ const [ observable , subscriber ] = createObservable < T > ( ) ;
325
298
const unsub = source . subscribe ( {
326
299
next : ( val ) => {
327
300
try {
@@ -348,23 +321,18 @@ export const operators = {
348
321
/**
349
322
* Performs side effects for each value emitted by the source observable.
350
323
* @template T The type of the input value.
351
- * @param {object } handlers - The side effect handlers.
352
- * @param {function(T): void } [handlers.onNext] - The side effect function to apply to each value.
353
- * @param {function(unknown): void } [handlers.onError] - The side effect function to apply on error.
354
- * @param {function(): void } [handlers.onComplete] - The side effect function to apply on completion.
324
+ * @param {function(T): void } onNext - The side effect function to apply to each value.
325
+ * @param {function(unknown): void } [onError] - The side effect function to apply on error.
326
+ * @param {function(): void } [onComplete] - The side effect function to apply on completion.
355
327
* @returns {Operator<T, T> } The operator that performs the side effects.
356
328
*/
357
- tap < T > ( {
358
- onNext,
359
- onError,
360
- onComplete,
361
- } : {
362
- onNext ?: ( value : T ) => void ;
363
- onError ?: ( error : unknown ) => void ;
364
- onComplete ?: ( ) => void ;
365
- } ) : Operator < T , T > {
329
+ tap < T > (
330
+ onNext ?: ( value : T ) => void ,
331
+ onError ?: ( error : unknown ) => void ,
332
+ onComplete ?: ( ) => void
333
+ ) : Operator < T , T > {
366
334
return ( source ) => {
367
- const [ observable , subscriber ] = pushObservable < T > ( ) ;
335
+ const [ observable , subscriber ] = createObservable < T > ( ) ;
368
336
const unsub = source . subscribe ( {
369
337
next : ( val ) => {
370
338
try {
@@ -416,7 +384,7 @@ export const operators = {
416
384
*/
417
385
latest < T > ( ) : Operator < T , T > {
418
386
return ( source ) => {
419
- const [ observable , subscriber ] = pushObservable < T > ( ) ;
387
+ const [ observable , subscriber ] = createObservable < T > ( ) ;
420
388
let lastValue : T | undefined ;
421
389
let hasValue = false ;
422
390
const unsub = source . subscribe ( {
@@ -459,7 +427,7 @@ export const operators = {
459
427
const clone = options ?. clone ?? structuredClone ;
460
428
461
429
return ( source ) => {
462
- const [ observable , subscriber ] = pushObservable < T > ( ) ;
430
+ const [ observable , subscriber ] = createObservable < T > ( ) ;
463
431
let last : T | undefined ;
464
432
let hasValue = false ;
465
433
const unsub = source . subscribe ( {
@@ -501,7 +469,7 @@ export const operators = {
501
469
*/
502
470
reduce < T , R > ( accumulator : ( acc : R , value : T ) => R , seed : R ) : Operator < T , R > {
503
471
return ( source ) => {
504
- const [ observable , subscriber ] = pushObservable < R > ( ) ;
472
+ const [ observable , subscriber ] = createObservable < R > ( ) ;
505
473
let accumulated = seed ;
506
474
const unsub = source . subscribe ( {
507
475
next : ( val ) => {
@@ -540,7 +508,7 @@ export const operators = {
540
508
...sources : R
541
509
) : Operator < T , [ T , ...{ [ K in keyof R ] : R [ K ] extends Subscribable < infer U > ? U : R [ K ] extends ControllableObservable < infer U , unknown > ? U : never } ] > {
542
510
return ( source ) => {
543
- const [ observable , subscriber ] = pushObservable < [ T , ...{ [ K in keyof R ] : R [ K ] extends Subscribable < infer U > ? U : R [ K ] extends ControllableObservable < infer U , unknown > ? U : never } ] > ( ) ;
511
+ const [ observable , subscriber ] = createObservable < [ T , ...{ [ K in keyof R ] : R [ K ] extends Subscribable < infer U > ? U : R [ K ] extends ControllableObservable < infer U , unknown > ? U : never } ] > ( ) ;
544
512
const values = new Array ( sources . length ) as { [ K in keyof R ] : R [ K ] extends Subscribable < infer U > ? U : R [ K ] extends ControllableObservable < infer U , unknown > ? U : never } [ ] ;
545
513
546
514
const hasValue = Array ( sources . length ) . fill ( false ) ;
@@ -596,14 +564,7 @@ export const observables = {
596
564
* @param {T } [initialValue] - The initial value to emit.
597
565
* @returns {PushObservable<T> } The created push-based observable.
598
566
*/
599
- pushObservable,
600
- /**
601
- * Creates a pull-based observable.
602
- * @template T The type of the value emitted by the observable.
603
- * @param {ObservableInput<T> } observer - The observer function to call for each subscription.
604
- * @returns {Subscribable<T> } The created pull-based observable.
605
- */
606
- pullObservable,
567
+ create : createObservable ,
607
568
/**
608
569
* Pipes multiple operators together.
609
570
* @template S The type of the source value.
@@ -633,9 +594,9 @@ export const observables = {
633
594
initialValue : T ,
634
595
fn : ( subscriber : Subscriber < T > , get : ( ) => T ) => C
635
596
) : readonly [ Subscribable < T > , C ] => {
636
- const [ observable , subscriber ] = pushObservable < T > ( initialValue ) ;
597
+ const [ observable , subscriber ] = createObservable < T > ( initialValue ) ;
637
598
// biome-ignore lint/style/noNonNullAssertion: <explanation>
638
- const get = ( ) => observable . lastValue ! ;
599
+ const get = ( ) => observable . get ! ;
639
600
640
601
const controller = fn ( subscriber , get ) ;
641
602
return [ observable , controller ] as const ;
@@ -654,7 +615,7 @@ export const observables = {
654
615
655
616
// only emit where every up streams have emitted
656
617
const keys = Object . keys ( sources ) as ( keyof T ) [ ] ;
657
- const [ observable , subscriber ] = pushObservable < T > ( ) ;
618
+ const [ observable , subscriber ] = createObservable < T > ( ) ;
658
619
const values = { } as T ;
659
620
660
621
// value can be Subscriable or PushObservable, distinguish them
0 commit comments