@@ -519,22 +519,20 @@ export function consumeStream<T, R = T>(stream: ReadableStreamEvents<T>, reducer
519
519
return new Promise ( ( resolve , reject ) => {
520
520
const chunks : T [ ] = [ ] ;
521
521
522
- const l = listenStream ( stream , {
522
+ listenStream ( stream , {
523
523
onData : chunk => {
524
524
if ( reducer ) {
525
525
chunks . push ( chunk ) ;
526
526
}
527
527
} ,
528
528
onError : error => {
529
- l . dispose ( ) ;
530
529
if ( reducer ) {
531
530
reject ( error ) ;
532
531
} else {
533
532
resolve ( undefined ) ;
534
533
}
535
534
} ,
536
535
onEnd : ( ) => {
537
- l . dispose ( ) ;
538
536
if ( reducer ) {
539
537
resolve ( reducer ( chunks ) ) ;
540
538
} else {
@@ -572,18 +570,15 @@ export interface IStreamListener<T> {
572
570
export function listenStream < T > ( stream : ReadableStreamEvents < T > , listener : IStreamListener < T > ) : IDisposable {
573
571
let destroyed = false ;
574
572
575
- // error and end events are in the next microtask so that a stream that is
576
- // closed synchronously (e.g from a memory `toStream`) can get its disposable
577
- // and destroy the stream.
578
573
stream . on ( 'error' , error => {
579
574
if ( ! destroyed ) {
580
- queueMicrotask ( ( ) => listener . onError ( error ) ) ;
575
+ listener . onError ( error ) ;
581
576
}
582
577
} ) ;
583
578
584
579
stream . on ( 'end' , ( ) => {
585
580
if ( ! destroyed ) {
586
- queueMicrotask ( ( ) => listener . onEnd ( ) ) ;
581
+ listener . onEnd ( ) ;
587
582
}
588
583
} ) ;
589
584
@@ -697,16 +692,10 @@ export function toReadable<T>(t: T): Readable<T> {
697
692
export function transform < Original , Transformed > ( stream : ReadableStreamEvents < Original > , transformer : ITransformer < Original , Transformed > , reducer : IReducer < Transformed > ) : ReadableStream < Transformed > {
698
693
const target = newWriteableStream < Transformed > ( reducer ) ;
699
694
700
- const l = listenStream ( stream , {
695
+ listenStream ( stream , {
701
696
onData : data => target . write ( transformer . data ( data ) ) ,
702
- onError : error => {
703
- l . dispose ( ) ;
704
- target . error ( transformer . error ? transformer . error ( error ) : error ) ;
705
- } ,
706
- onEnd : ( ) => {
707
- l . dispose ( ) ;
708
- target . end ( ) ;
709
- }
697
+ onError : error => target . error ( transformer . error ? transformer . error ( error ) : error ) ,
698
+ onEnd : ( ) => target . end ( )
710
699
} ) ;
711
700
712
701
return target ;
@@ -751,7 +740,7 @@ export function prefixedStream<T>(prefix: T, stream: ReadableStream<T>, reducer:
751
740
752
741
const target = newWriteableStream < T > ( reducer ) ;
753
742
754
- const l = listenStream ( stream , {
743
+ listenStream ( stream , {
755
744
onData : data => {
756
745
757
746
// Handle prefix only once
@@ -763,12 +752,8 @@ export function prefixedStream<T>(prefix: T, stream: ReadableStream<T>, reducer:
763
752
764
753
return target . write ( data ) ;
765
754
} ,
766
- onError : error => {
767
- l . dispose ( ) ;
768
- target . error ( error ) ;
769
- } ,
755
+ onError : error => target . error ( error ) ,
770
756
onEnd : ( ) => {
771
- l . dispose ( ) ;
772
757
773
758
// Handle prefix only once
774
759
if ( ! prefixHandled ) {
0 commit comments