65
65
66
66
const {
67
67
ObjectSetPrototypeOf,
68
+ Symbol
68
69
} = primordials ;
69
70
70
71
module . exports = Transform ;
71
72
const {
72
- ERR_METHOD_NOT_IMPLEMENTED ,
73
- ERR_MULTIPLE_CALLBACK ,
74
- ERR_TRANSFORM_ALREADY_TRANSFORMING ,
75
- ERR_TRANSFORM_WITH_LENGTH_0
73
+ ERR_METHOD_NOT_IMPLEMENTED
76
74
} = require ( 'internal/errors' ) . codes ;
77
75
const Duplex = require ( '_stream_duplex' ) ;
78
76
ObjectSetPrototypeOf ( Transform . prototype , Duplex . prototype ) ;
79
77
ObjectSetPrototypeOf ( Transform , Duplex ) ;
80
78
81
-
82
- function afterTransform ( er , data ) {
83
- const ts = this . _transformState ;
84
- ts . transforming = false ;
85
-
86
- const cb = ts . writecb ;
87
-
88
- if ( cb === null ) {
89
- return this . emit ( 'error' , new ERR_MULTIPLE_CALLBACK ( ) ) ;
90
- }
91
-
92
- ts . writechunk = null ;
93
- ts . writecb = null ;
94
-
95
- if ( data != null ) // Single equals check for both `null` and `undefined`
96
- this . push ( data ) ;
97
-
98
- cb ( er ) ;
99
-
100
- const rs = this . _readableState ;
101
- rs . reading = false ;
102
- if ( rs . needReadable || rs . length < rs . highWaterMark ) {
103
- this . _read ( rs . highWaterMark ) ;
104
- }
105
- }
106
-
79
+ const kCallback = Symbol ( 'kCallback' ) ;
107
80
108
81
function Transform ( options ) {
109
82
if ( ! ( this instanceof Transform ) )
110
83
return new Transform ( options ) ;
111
84
112
85
Duplex . call ( this , options ) ;
113
86
114
- this . _transformState = {
115
- afterTransform : afterTransform . bind ( this ) ,
116
- needTransform : false ,
117
- transforming : false ,
118
- writecb : null ,
119
- writechunk : null ,
120
- writeencoding : null
121
- } ;
122
-
123
87
// We have implemented the _read method, and done the other things
124
88
// that Readable wants before the first _read call, so unset the
125
89
// sync guard flag.
126
90
this . _readableState . sync = false ;
127
91
92
+ this [ kCallback ] = null ;
93
+
128
94
if ( options ) {
129
95
if ( typeof options . transform === 'function' )
130
96
this . _transform = options . transform ;
@@ -133,89 +99,67 @@ function Transform(options) {
133
99
this . _flush = options . flush ;
134
100
}
135
101
136
- // When the writable side finishes, then flush out anything remaining.
102
+ // TODO(ronag): Unfortunately _final is invoked asynchronously.
103
+ // Use `prefinish` hack. `prefinish` is emitted synchronously when
104
+ // and only when `_final` is not defined. Implementing `_final`
105
+ // to a Transform should be an error.
137
106
this . on ( 'prefinish' , prefinish ) ;
138
107
}
139
108
140
109
function prefinish ( ) {
141
- if ( typeof this . _flush === 'function' && ! this . _readableState . destroyed ) {
110
+ if ( typeof this . _flush === 'function' && ! this . destroyed ) {
142
111
this . _flush ( ( er , data ) => {
143
- done ( this , er , data ) ;
112
+ if ( er ) {
113
+ this . destroy ( er ) ;
114
+ return ;
115
+ }
116
+
117
+ if ( data != null ) {
118
+ this . push ( data ) ;
119
+ }
120
+ this . push ( null ) ;
144
121
} ) ;
145
122
} else {
146
- done ( this , null , null ) ;
123
+ this . push ( null ) ;
147
124
}
148
125
}
149
126
150
- Transform . prototype . push = function ( chunk , encoding ) {
151
- this . _transformState . needTransform = false ;
152
- return Duplex . prototype . push . call ( this , chunk , encoding ) ;
153
- } ;
154
-
155
- // This is the part where you do stuff!
156
- // override this function in implementation classes.
157
- // 'chunk' is an input chunk.
158
- //
159
- // Call `push(newChunk)` to pass along transformed output
160
- // to the readable side. You may call 'push' zero or more times.
161
- //
162
- // Call `cb(err)` when you are done with this chunk. If you pass
163
- // an error, then that'll put the hurt on the whole operation. If you
164
- // never call cb(), then you'll never get another chunk.
165
- Transform . prototype . _transform = function ( chunk , encoding , cb ) {
127
+ Transform . prototype . _transform = function ( chunk , encoding , callback ) {
166
128
throw new ERR_METHOD_NOT_IMPLEMENTED ( '_transform()' ) ;
167
129
} ;
168
130
169
- Transform . prototype . _write = function ( chunk , encoding , cb ) {
170
- const ts = this . _transformState ;
171
- ts . writecb = cb ;
172
- ts . writechunk = chunk ;
173
- ts . writeencoding = encoding ;
174
- if ( ! ts . transforming ) {
175
- const rs = this . _readableState ;
176
- if ( ts . needTransform ||
177
- rs . needReadable ||
178
- rs . length < rs . highWaterMark )
179
- this . _read ( rs . highWaterMark ) ;
180
- }
131
+ Transform . prototype . _write = function ( chunk , encoding , callback ) {
132
+ const rState = this . _readableState ;
133
+ const wState = this . _writableState ;
134
+ const length = rState . length ;
135
+
136
+ this . _transform ( chunk , encoding , ( err , val ) => {
137
+ if ( err ) {
138
+ callback ( err ) ;
139
+ return ;
140
+ }
141
+
142
+ if ( val != null ) {
143
+ this . push ( val ) ;
144
+ }
145
+
146
+ if (
147
+ wState . ended || // Backwards compat.
148
+ length === rState . length || // Backwards compat.
149
+ rState . length < rState . highWaterMark ||
150
+ rState . length === 0
151
+ ) {
152
+ callback ( ) ;
153
+ } else {
154
+ this [ kCallback ] = callback ;
155
+ }
156
+ } ) ;
181
157
} ;
182
158
183
- // Doesn't matter what the args are here.
184
- // _transform does all the work.
185
- // That we got here means that the readable side wants more data.
186
- Transform . prototype . _read = function ( n ) {
187
- const ts = this . _transformState ;
188
-
189
- if ( ts . writechunk !== null && ! ts . transforming ) {
190
- ts . transforming = true ;
191
- this . _transform ( ts . writechunk , ts . writeencoding , ts . afterTransform ) ;
192
- } else {
193
- // Mark that we need a transform, so that any data that comes in
194
- // will get processed, now that we've asked for it.
195
- ts . needTransform = true ;
159
+ Transform . prototype . _read = function ( ) {
160
+ if ( this [ kCallback ] ) {
161
+ const callback = this [ kCallback ] ;
162
+ this [ kCallback ] = null ;
163
+ callback ( ) ;
196
164
}
197
165
} ;
198
-
199
-
200
- Transform . prototype . _destroy = function ( err , cb ) {
201
- Duplex . prototype . _destroy . call ( this , err , ( err2 ) => {
202
- cb ( err2 ) ;
203
- } ) ;
204
- } ;
205
-
206
-
207
- function done ( stream , er , data ) {
208
- if ( er )
209
- return stream . emit ( 'error' , er ) ;
210
-
211
- if ( data != null ) // Single equals check for both `null` and `undefined`
212
- stream . push ( data ) ;
213
-
214
- // These two error cases are coherence checks that can likely not be tested.
215
- if ( stream . _writableState . length )
216
- throw new ERR_TRANSFORM_WITH_LENGTH_0 ( ) ;
217
-
218
- if ( stream . _transformState . transforming )
219
- throw new ERR_TRANSFORM_ALREADY_TRANSFORMING ( ) ;
220
- return stream . push ( null ) ;
221
- }
0 commit comments