Skip to content

Commit 0bd5595

Browse files
committed
stream: simplify Transform stream implementation
Significantly simplified Transform stream implementation by using mostly standard stream code. PR-URL: #32763 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent f22a9ca commit 0bd5595

File tree

4 files changed

+56
-114
lines changed

4 files changed

+56
-114
lines changed

lib/_stream_transform.js

Lines changed: 52 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -65,66 +65,32 @@
6565

6666
const {
6767
ObjectSetPrototypeOf,
68+
Symbol
6869
} = primordials;
6970

7071
module.exports = Transform;
7172
const {
72-
ERR_METHOD_NOT_IMPLEMENTED,
73-
ERR_MULTIPLE_CALLBACK,
74-
ERR_TRANSFORM_ALREADY_TRANSFORMING,
75-
ERR_TRANSFORM_WITH_LENGTH_0
73+
ERR_METHOD_NOT_IMPLEMENTED
7674
} = require('internal/errors').codes;
7775
const Duplex = require('_stream_duplex');
7876
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
7977
ObjectSetPrototypeOf(Transform, Duplex);
8078

81-
82-
function afterTransform(er, data) {
83-
const ts = this._transformState;
84-
ts.transforming = false;
85-
86-
const cb = ts.writecb;
87-
88-
if (cb === null) {
89-
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
90-
}
91-
92-
ts.writechunk = null;
93-
ts.writecb = null;
94-
95-
if (data != null) // Single equals check for both `null` and `undefined`
96-
this.push(data);
97-
98-
cb(er);
99-
100-
const rs = this._readableState;
101-
rs.reading = false;
102-
if (rs.needReadable || rs.length < rs.highWaterMark) {
103-
this._read(rs.highWaterMark);
104-
}
105-
}
106-
79+
const kCallback = Symbol('kCallback');
10780

10881
function Transform(options) {
10982
if (!(this instanceof Transform))
11083
return new Transform(options);
11184

11285
Duplex.call(this, options);
11386

114-
this._transformState = {
115-
afterTransform: afterTransform.bind(this),
116-
needTransform: false,
117-
transforming: false,
118-
writecb: null,
119-
writechunk: null,
120-
writeencoding: null
121-
};
122-
12387
// We have implemented the _read method, and done the other things
12488
// that Readable wants before the first _read call, so unset the
12589
// sync guard flag.
12690
this._readableState.sync = false;
12791

92+
this[kCallback] = null;
93+
12894
if (options) {
12995
if (typeof options.transform === 'function')
13096
this._transform = options.transform;
@@ -133,89 +99,67 @@ function Transform(options) {
13399
this._flush = options.flush;
134100
}
135101

136-
// When the writable side finishes, then flush out anything remaining.
102+
// TODO(ronag): Unfortunately _final is invoked asynchronously.
103+
// Use `prefinish` hack. `prefinish` is emitted synchronously when
104+
// and only when `_final` is not defined. Implementing `_final`
105+
// to a Transform should be an error.
137106
this.on('prefinish', prefinish);
138107
}
139108

140109
function prefinish() {
141-
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
110+
if (typeof this._flush === 'function' && !this.destroyed) {
142111
this._flush((er, data) => {
143-
done(this, er, data);
112+
if (er) {
113+
this.destroy(er);
114+
return;
115+
}
116+
117+
if (data != null) {
118+
this.push(data);
119+
}
120+
this.push(null);
144121
});
145122
} else {
146-
done(this, null, null);
123+
this.push(null);
147124
}
148125
}
149126

150-
Transform.prototype.push = function(chunk, encoding) {
151-
this._transformState.needTransform = false;
152-
return Duplex.prototype.push.call(this, chunk, encoding);
153-
};
154-
155-
// This is the part where you do stuff!
156-
// override this function in implementation classes.
157-
// 'chunk' is an input chunk.
158-
//
159-
// Call `push(newChunk)` to pass along transformed output
160-
// to the readable side. You may call 'push' zero or more times.
161-
//
162-
// Call `cb(err)` when you are done with this chunk. If you pass
163-
// an error, then that'll put the hurt on the whole operation. If you
164-
// never call cb(), then you'll never get another chunk.
165-
Transform.prototype._transform = function(chunk, encoding, cb) {
127+
Transform.prototype._transform = function(chunk, encoding, callback) {
166128
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()');
167129
};
168130

169-
Transform.prototype._write = function(chunk, encoding, cb) {
170-
const ts = this._transformState;
171-
ts.writecb = cb;
172-
ts.writechunk = chunk;
173-
ts.writeencoding = encoding;
174-
if (!ts.transforming) {
175-
const rs = this._readableState;
176-
if (ts.needTransform ||
177-
rs.needReadable ||
178-
rs.length < rs.highWaterMark)
179-
this._read(rs.highWaterMark);
180-
}
131+
Transform.prototype._write = function(chunk, encoding, callback) {
132+
const rState = this._readableState;
133+
const wState = this._writableState;
134+
const length = rState.length;
135+
136+
this._transform(chunk, encoding, (err, val) => {
137+
if (err) {
138+
callback(err);
139+
return;
140+
}
141+
142+
if (val != null) {
143+
this.push(val);
144+
}
145+
146+
if (
147+
wState.ended || // Backwards compat.
148+
length === rState.length || // Backwards compat.
149+
rState.length < rState.highWaterMark ||
150+
rState.length === 0
151+
) {
152+
callback();
153+
} else {
154+
this[kCallback] = callback;
155+
}
156+
});
181157
};
182158

183-
// Doesn't matter what the args are here.
184-
// _transform does all the work.
185-
// That we got here means that the readable side wants more data.
186-
Transform.prototype._read = function(n) {
187-
const ts = this._transformState;
188-
189-
if (ts.writechunk !== null && !ts.transforming) {
190-
ts.transforming = true;
191-
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
192-
} else {
193-
// Mark that we need a transform, so that any data that comes in
194-
// will get processed, now that we've asked for it.
195-
ts.needTransform = true;
159+
Transform.prototype._read = function() {
160+
if (this[kCallback]) {
161+
const callback = this[kCallback];
162+
this[kCallback] = null;
163+
callback();
196164
}
197165
};
198-
199-
200-
Transform.prototype._destroy = function(err, cb) {
201-
Duplex.prototype._destroy.call(this, err, (err2) => {
202-
cb(err2);
203-
});
204-
};
205-
206-
207-
function done(stream, er, data) {
208-
if (er)
209-
return stream.emit('error', er);
210-
211-
if (data != null) // Single equals check for both `null` and `undefined`
212-
stream.push(data);
213-
214-
// These two error cases are coherence checks that can likely not be tested.
215-
if (stream._writableState.length)
216-
throw new ERR_TRANSFORM_WITH_LENGTH_0();
217-
218-
if (stream._transformState.transforming)
219-
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
220-
return stream.push(null);
221-
}

lib/internal/errors.js

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,12 +1363,8 @@ E('ERR_TLS_SNI_FROM_SERVER',
13631363
E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED',
13641364
'At least one category is required', TypeError);
13651365
E('ERR_TRACE_EVENTS_UNAVAILABLE', 'Trace events are unavailable', Error);
1366-
E('ERR_TRANSFORM_ALREADY_TRANSFORMING',
1367-
'Calling transform done when still transforming', Error);
13681366

13691367
// This should probably be a `RangeError`.
1370-
E('ERR_TRANSFORM_WITH_LENGTH_0',
1371-
'Calling transform done when writableState.length != 0', Error);
13721368
E('ERR_TTY_INIT_FAILED', 'TTY initialization failed', SystemError);
13731369
E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET',
13741370
'`process.setupUncaughtExceptionCapture()` was called while a capture ' +

test/parallel/test-stream2-transform.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ const Transform = require('_stream_transform');
4545

4646
assert.strictEqual(tx.readableLength, 10);
4747
assert.strictEqual(transformed, 10);
48-
assert.strictEqual(tx._transformState.writechunk.length, 5);
4948
assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
5049
return c.chunk.length;
51-
}), [6, 7, 8, 9, 10]);
50+
}), [5, 6, 7, 8, 9, 10]);
5251
}
5352

5453
{

test/parallel/test-zlib-flush-drain.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ const ws = deflater._writableState;
2828
const beforeFlush = ws.needDrain;
2929
let afterFlush = ws.needDrain;
3030

31+
deflater.on('data', () => {
32+
});
33+
3134
deflater.flush(function(err) {
3235
afterFlush = ws.needDrain;
3336
});

0 commit comments

Comments
 (0)