@@ -519,20 +519,22 @@ export function consumeStream<T, R = T>(stream: ReadableStreamEvents<T>, reducer
519
519
return new Promise ( ( resolve , reject ) => {
520
520
const chunks : T [ ] = [ ] ;
521
521
522
- listenStream ( stream , {
522
+ const l = listenStream ( stream , {
523
523
onData : chunk => {
524
524
if ( reducer ) {
525
525
chunks . push ( chunk ) ;
526
526
}
527
527
} ,
528
528
onError : error => {
529
+ l . dispose ( ) ;
529
530
if ( reducer ) {
530
531
reject ( error ) ;
531
532
} else {
532
533
resolve ( undefined ) ;
533
534
}
534
535
} ,
535
536
onEnd : ( ) => {
537
+ l . dispose ( ) ;
536
538
if ( reducer ) {
537
539
resolve ( reducer ( chunks ) ) ;
538
540
} else {
@@ -570,15 +572,18 @@ export interface IStreamListener<T> {
570
572
export function listenStream < T > ( stream : ReadableStreamEvents < T > , listener : IStreamListener < T > ) : IDisposable {
571
573
let destroyed = false ;
572
574
575
+ // error and end events are in the next microtask so that a stream that is
576
+ // closed synchronously (e.g from a memory `toStream`) can get its disposable
577
+ // and destroy the stream.
573
578
stream . on ( 'error' , error => {
574
579
if ( ! destroyed ) {
575
- listener . onError ( error ) ;
580
+ queueMicrotask ( ( ) => listener . onError ( error ) ) ;
576
581
}
577
582
} ) ;
578
583
579
584
stream . on ( 'end' , ( ) => {
580
585
if ( ! destroyed ) {
581
- listener . onEnd ( ) ;
586
+ queueMicrotask ( ( ) => listener . onEnd ( ) ) ;
582
587
}
583
588
} ) ;
584
589
@@ -692,10 +697,16 @@ export function toReadable<T>(t: T): Readable<T> {
692
697
export function transform < Original , Transformed > ( stream : ReadableStreamEvents < Original > , transformer : ITransformer < Original , Transformed > , reducer : IReducer < Transformed > ) : ReadableStream < Transformed > {
693
698
const target = newWriteableStream < Transformed > ( reducer ) ;
694
699
695
- listenStream ( stream , {
700
+ const l = listenStream ( stream , {
696
701
onData : data => target . write ( transformer . data ( data ) ) ,
697
- onError : error => target . error ( transformer . error ? transformer . error ( error ) : error ) ,
698
- onEnd : ( ) => target . end ( )
702
+ onError : error => {
703
+ l . dispose ( ) ;
704
+ target . error ( transformer . error ? transformer . error ( error ) : error ) ;
705
+ } ,
706
+ onEnd : ( ) => {
707
+ l . dispose ( ) ;
708
+ target . end ( ) ;
709
+ }
699
710
} ) ;
700
711
701
712
return target ;
@@ -740,7 +751,7 @@ export function prefixedStream<T>(prefix: T, stream: ReadableStream<T>, reducer:
740
751
741
752
const target = newWriteableStream < T > ( reducer ) ;
742
753
743
- listenStream ( stream , {
754
+ const l = listenStream ( stream , {
744
755
onData : data => {
745
756
746
757
// Handle prefix only once
@@ -752,8 +763,12 @@ export function prefixedStream<T>(prefix: T, stream: ReadableStream<T>, reducer:
752
763
753
764
return target . write ( data ) ;
754
765
} ,
755
- onError : error => target . error ( error ) ,
766
+ onError : error => {
767
+ l . dispose ( ) ;
768
+ target . error ( error ) ;
769
+ } ,
756
770
onEnd : ( ) => {
771
+ l . dispose ( ) ;
757
772
758
773
// Handle prefix only once
759
774
if ( ! prefixHandled ) {
0 commit comments