@@ -9,47 +9,158 @@ import S2A from "./stream-to-async-iterator";
99describe ( "StreamToAsyncIterator" , function ( ) {
1010 const filePath = path . join ( __dirname , "test/lorem-ipsum.txt" ) ;
1111
12+ function assertClosed ( stream : Readable , iter : S2A ) {
13+ expect ( stream ) . to . have . property ( "destroyed" , true ) ;
14+ expect ( stream . listenerCount ( "readable" ) ) . to . equal ( 0 ) ;
15+ expect ( stream . listenerCount ( "end" ) ) . to . equal ( 0 ) ;
16+ expect ( stream . listenerCount ( "error" ) ) . to . equal ( 0 ) ;
17+ expect ( iter ) . to . have . property ( "closed" , true ) ;
18+ }
19+
1220 it ( "should iterate on an object mode stream" , async function ( ) {
1321 type Obj = { id : number } ;
1422 const data : Obj [ ] = [ { id : 1 } , { id : 2 } , { id : 3 } ] ;
1523 const objStream = Readable . from ( data ) ;
24+ const iter = new S2A < Obj > ( objStream ) ;
1625 const buff : Obj [ ] = [ ] ;
1726
18- for await ( const value of new S2A < Obj > ( objStream ) ) {
27+ for await ( const value of iter ) {
1928 buff . push ( value ) ;
2029 }
2130
31+ assertClosed ( objStream , iter ) ;
2232 expect ( buff ) . to . have . lengthOf ( 3 ) ;
2333 } ) ;
2434
35+ it ( "should iterate on an empty stream" , async function ( ) {
36+ type Obj = { id : number } ;
37+ const data : Obj [ ] = [ ] ;
38+ const objStream = Readable . from ( data ) ;
39+ const iter = new S2A < Obj > ( objStream ) ;
40+ const buff : Obj [ ] = [ ] ;
41+
42+ for await ( const value of iter ) {
43+ buff . push ( value ) ;
44+ }
45+
46+ assertClosed ( objStream , iter ) ;
47+ expect ( buff ) . to . have . lengthOf ( 0 ) ;
48+ } ) ;
49+
50+ it ( "should handle unstable streams" , async function ( ) {
51+ type Obj = { id : number } ;
52+ const data : Obj [ ] = [ ] ;
53+ const objStream = Readable . from ( data ) ;
54+ const iter = new S2A < Obj > ( objStream ) ;
55+ const buff : Obj [ ] = [ ] ;
56+
57+ objStream . read ( ) ;
58+ for await ( const value of iter ) {
59+ buff . push ( value ) ;
60+ }
61+
62+ assertClosed ( objStream , iter ) ;
63+ expect ( buff ) . to . have . lengthOf ( 0 ) ;
64+ } ) ;
65+
66+ it ( "should handle premature loop break" , async function ( ) {
67+ type Obj = { id : number } ;
68+ const data : Obj [ ] = [ { id : 1 } , { id : 2 } , { id : 3 } ] ;
69+ const objStream = Readable . from ( data ) ;
70+ const iter = new S2A < Obj > ( objStream ) ;
71+ const buff : Obj [ ] = [ ] ;
72+
73+ let count = 0 ;
74+ for await ( const value of iter ) {
75+ if ( count >= 1 ) {
76+ break ;
77+ }
78+ count += 1 ;
79+ buff . push ( value ) ;
80+ }
81+
82+ assertClosed ( objStream , iter ) ;
83+ expect ( buff ) . to . have . lengthOf ( 1 ) ;
84+ } ) ;
85+
86+ it ( "should handle stream errors" , async function ( ) {
87+ type Obj = { id : number } ;
88+ const data : Obj [ ] = [ { id : 1 } , { id : 2 } , { id : 3 } ] ;
89+ const objStream = Readable . from ( data ) ;
90+ const iter = new S2A < Obj > ( objStream ) ;
91+ const buff : Obj [ ] = [ ] ;
92+
93+ const errMessage = "test throw" ;
94+ await expect (
95+ ( async ( ) => {
96+ let count = 0 ;
97+ for await ( const value of iter ) {
98+ count += 1 ;
99+ buff . push ( value ) ;
100+ if ( count >= 1 ) {
101+ objStream . emit ( "error" , new Error ( errMessage ) ) ;
102+ }
103+ }
104+ } ) ( )
105+ ) . to . eventually . be . rejectedWith ( errMessage ) ;
106+
107+ assertClosed ( objStream , iter ) ;
108+ expect ( buff ) . to . have . lengthOf ( 1 ) ;
109+ } ) ;
110+
111+ it ( "should handle manual throws" , async function ( ) {
112+ type Obj = { id : number } ;
113+ const data : Obj [ ] = [ { id : 1 } , { id : 2 } , { id : 3 } ] ;
114+ const objStream = Readable . from ( data ) ;
115+ const iter = new S2A < Obj > ( objStream ) ;
116+ const buff : Obj [ ] = [ ] ;
117+
118+ const errMessage = "test throw" ;
119+ await expect (
120+ ( async ( ) => {
121+ let count = 0 ;
122+ for await ( const value of iter ) {
123+ if ( count >= 1 ) {
124+ await iter . throw ( new Error ( errMessage ) ) ;
125+ }
126+ count += 1 ;
127+ buff . push ( value ) ;
128+ }
129+ } ) ( )
130+ ) . to . eventually . be . rejectedWith ( errMessage ) ;
131+
132+ assertClosed ( objStream , iter ) ;
133+ expect ( buff ) . to . have . lengthOf ( 1 ) ;
134+ } ) ;
135+
25136 it ( "should iterate on a node stream with string encoding" , async function ( ) {
26137 const fileStream = fs . createReadStream ( filePath , { encoding : "utf8" } ) ;
138+ const iter = new S2A < string > ( fileStream ) ;
27139 const buff : string [ ] = [ ] ;
28140
29- for await ( const value of new S2A < string > ( fileStream ) ) {
141+ for await ( const value of iter ) {
30142 buff . push ( value ) ;
31143 }
32144
33145 const content = buff . join ( "" ) ;
146+
147+ assertClosed ( fileStream , iter ) ;
34148 expect ( content ) . to . have . lengthOf ( 1502 ) ;
35149 } ) ;
36150
37151 it ( "should iterate on a node stream with a size with string encoding" , async function ( ) {
38152 const fileStream = fs . createReadStream ( filePath , { encoding : "utf8" } ) ;
153+ const iter = new S2A < string > ( fileStream , { size : 16 } ) ;
39154 const buff : string [ ] = [ ] ;
40155
41- for await ( const value of new S2A < string > ( fileStream , { size : 16 } ) ) {
156+ for await ( const value of iter ) {
42157 buff . push ( value ) ;
43158 }
44159
45160 const content = buff . join ( "" ) ;
161+
162+ assertClosed ( fileStream , iter ) ;
46163 expect ( buff ) . to . have . lengthOf ( 94 ) ;
47164 expect ( content ) . to . have . lengthOf ( 1502 ) ;
48165 } ) ;
49-
50- it ( "should clean up all stream events when stream ends" ) ;
51-
52- it ( "should clean up all stream events when stream errors" ) ;
53-
54- it ( "should handle a stream error in middle of iteration with a rejection" ) ;
55166} ) ;
0 commit comments