Skip to content

Commit 68c0d3b

Browse files
committed
stream: simplify Transform
1 parent d17f233 commit 68c0d3b

8 files changed

+94
-159
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const Transform = require('stream').Transform;
5+
6+
const bench = common.createBenchmark(main, {
7+
n: [2e6],
8+
sync: ['yes', 'no'],
9+
callback: ['yes', 'no']
10+
});
11+
12+
function main({ n, sync, callback }) {
13+
const b = Buffer.allocUnsafe(1024);
14+
const s = new Transform();
15+
sync = sync === 'yes';
16+
17+
const writecb = (cb) => {
18+
if (sync)
19+
cb();
20+
else
21+
process.nextTick(cb);
22+
};
23+
24+
s._transform = (chunk, encoding, cb) => writecb(cb);
25+
26+
const cb = callback === 'yes' ? () => {} : null;
27+
28+
bench.start();
29+
30+
let k = 0;
31+
function run() {
32+
while (k++ < n && s.write(b, cb));
33+
if (k >= n)
34+
bench.end(n);
35+
}
36+
s.on('drain', run);
37+
s.on('data', () => {});
38+
run();
39+
}

lib/_stream_transform.js

Lines changed: 39 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -65,66 +65,25 @@
6565

6666
const {
6767
ObjectSetPrototypeOf,
68+
Symbol
6869
} = primordials;
6970

7071
module.exports = Transform;
7172
const {
72-
ERR_METHOD_NOT_IMPLEMENTED,
73-
ERR_MULTIPLE_CALLBACK,
74-
ERR_TRANSFORM_ALREADY_TRANSFORMING,
75-
ERR_TRANSFORM_WITH_LENGTH_0
73+
ERR_METHOD_NOT_IMPLEMENTED
7674
} = require('internal/errors').codes;
7775
const Duplex = require('_stream_duplex');
7876
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
7977
ObjectSetPrototypeOf(Transform, Duplex);
8078

81-
82-
function afterTransform(er, data) {
83-
const ts = this._transformState;
84-
ts.transforming = false;
85-
86-
const cb = ts.writecb;
87-
88-
if (cb === null) {
89-
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
90-
}
91-
92-
ts.writechunk = null;
93-
ts.writecb = null;
94-
95-
if (data != null) // Single equals check for both `null` and `undefined`
96-
this.push(data);
97-
98-
cb(er);
99-
100-
const rs = this._readableState;
101-
rs.reading = false;
102-
if (rs.needReadable || rs.length < rs.highWaterMark) {
103-
this._read(rs.highWaterMark);
104-
}
105-
}
106-
79+
const kResume = Symbol('kResume');
10780

10881
function Transform(options) {
10982
if (!(this instanceof Transform))
11083
return new Transform(options);
11184

11285
Duplex.call(this, options);
11386

114-
this._transformState = {
115-
afterTransform: afterTransform.bind(this),
116-
needTransform: false,
117-
transforming: false,
118-
writecb: null,
119-
writechunk: null,
120-
writeencoding: null
121-
};
122-
123-
// We have implemented the _read method, and done the other things
124-
// that Readable wants before the first _read call, so unset the
125-
// sync guard flag.
126-
this._readableState.sync = false;
127-
12887
if (options) {
12988
if (typeof options.transform === 'function')
13089
this._transform = options.transform;
@@ -133,89 +92,53 @@ function Transform(options) {
13392
this._flush = options.flush;
13493
}
13594

136-
// When the writable side finishes, then flush out anything remaining.
137-
this.on('prefinish', prefinish);
95+
this._readableState.sync = false;
96+
this[kResume] = null;
13897
}
13998

140-
function prefinish() {
141-
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
142-
this._flush((er, data) => {
143-
done(this, er, data);
99+
Transform.prototype._final = function(cb) {
100+
if (typeof this._flush === 'function') {
101+
this._flush((err) => {
102+
if (err) {
103+
cb(err);
104+
} else {
105+
this.push(null);
106+
cb();
107+
}
144108
});
145109
} else {
146-
done(this, null, null);
147-
}
148-
}
149-
150-
Transform.prototype.push = function(chunk, encoding) {
151-
this._transformState.needTransform = false;
152-
return Duplex.prototype.push.call(this, chunk, encoding);
153-
};
154-
155-
// This is the part where you do stuff!
156-
// override this function in implementation classes.
157-
// 'chunk' is an input chunk.
158-
//
159-
// Call `push(newChunk)` to pass along transformed output
160-
// to the readable side. You may call 'push' zero or more times.
161-
//
162-
// Call `cb(err)` when you are done with this chunk. If you pass
163-
// an error, then that'll put the hurt on the whole operation. If you
164-
// never call cb(), then you'll never get another chunk.
165-
Transform.prototype._transform = function(chunk, encoding, cb) {
166-
cb(new ERR_METHOD_NOT_IMPLEMENTED('_transform()'));
167-
};
168-
169-
Transform.prototype._write = function(chunk, encoding, cb) {
170-
const ts = this._transformState;
171-
ts.writecb = cb;
172-
ts.writechunk = chunk;
173-
ts.writeencoding = encoding;
174-
if (!ts.transforming) {
175-
var rs = this._readableState;
176-
if (ts.needTransform ||
177-
rs.needReadable ||
178-
rs.length < rs.highWaterMark)
179-
this._read(rs.highWaterMark);
110+
this.push(null);
111+
cb();
180112
}
181113
};
182114

183-
// Doesn't matter what the args are here.
184-
// _transform does all the work.
185-
// That we got here means that the readable side wants more data.
186115
Transform.prototype._read = function(n) {
187-
const ts = this._transformState;
188-
189-
if (ts.writechunk !== null && !ts.transforming) {
190-
ts.transforming = true;
191-
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
192-
} else {
193-
// Mark that we need a transform, so that any data that comes in
194-
// will get processed, now that we've asked for it.
195-
ts.needTransform = true;
116+
if (this[kResume]) {
117+
const resume = this[kResume];
118+
this[kResume] = null;
119+
resume();
196120
}
197121
};
198122

199-
200-
Transform.prototype._destroy = function(err, cb) {
201-
Duplex.prototype._destroy.call(this, err, (err2) => {
202-
cb(err2);
123+
Transform.prototype._write = function(chunk, encoding, callback) {
124+
this._transform(chunk, encoding, (err, data) => {
125+
if (err) {
126+
return callback(err);
127+
}
128+
129+
if (data !== undefined) {
130+
this.push(data);
131+
}
132+
133+
const r = this._readableState;
134+
if (r.length < r.highWaterMark || r.length === 0) {
135+
callback();
136+
} else {
137+
this[kResume] = callback;
138+
}
203139
});
204140
};
205141

206-
207-
function done(stream, er, data) {
208-
if (er)
209-
return stream.emit('error', er);
210-
211-
if (data != null) // Single equals check for both `null` and `undefined`
212-
stream.push(data);
213-
214-
// These two error cases are coherence checks that can likely not be tested.
215-
if (stream._writableState.length)
216-
throw new ERR_TRANSFORM_WITH_LENGTH_0();
217-
218-
if (stream._transformState.transforming)
219-
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
220-
return stream.push(null);
221-
}
142+
Transform.prototype._transform = function(chunk, encoding, cb) {
143+
cb(new ERR_METHOD_NOT_IMPLEMENTED('_transform()'));
144+
};

lib/internal/errors.js

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1297,12 +1297,7 @@ E('ERR_TLS_SNI_FROM_SERVER',
12971297
E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED',
12981298
'At least one category is required', TypeError);
12991299
E('ERR_TRACE_EVENTS_UNAVAILABLE', 'Trace events are unavailable', Error);
1300-
E('ERR_TRANSFORM_ALREADY_TRANSFORMING',
1301-
'Calling transform done when still transforming', Error);
13021300

1303-
// This should probably be a `RangeError`.
1304-
E('ERR_TRANSFORM_WITH_LENGTH_0',
1305-
'Calling transform done when writableState.length != 0', Error);
13061301
E('ERR_TTY_INIT_FAILED', 'TTY initialization failed', SystemError);
13071302
E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET',
13081303
'`process.setupUncaughtExceptionCapture()` was called while a capture ' +

test/parallel/test-stream-transform-constructor-set-methods.js

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,17 @@ const _transform = common.mustCall((chunk, _, next) => {
1818
next();
1919
});
2020

21-
const _final = common.mustCall((next) => {
22-
next();
23-
});
24-
2521
const _flush = common.mustCall((next) => {
2622
next();
2723
});
2824

2925
const t2 = new Transform({
3026
transform: _transform,
31-
flush: _flush,
32-
final: _final
27+
flush: _flush
3328
});
3429

3530
strictEqual(t2._transform, _transform);
3631
strictEqual(t2._flush, _flush);
37-
strictEqual(t2._final, _final);
3832

3933
t2.end(Buffer.from('blerg'));
4034
t2.resume();

test/parallel/test-stream-transform-final-sync.js

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,36 +66,27 @@ const t = new stream.Transform({
6666
assert.strictEqual(++state, chunk + 2);
6767
process.nextTick(next);
6868
}, 3),
69-
final: common.mustCall(function(done) {
70-
state++;
71-
// finalCallback part 1
72-
assert.strictEqual(state, 10);
73-
state++;
74-
// finalCallback part 2
75-
assert.strictEqual(state, 11);
76-
done();
77-
}, 1),
7869
flush: common.mustCall(function(done) {
7970
state++;
8071
// fluchCallback part 1
81-
assert.strictEqual(state, 12);
72+
assert.strictEqual(state, 10);
8273
process.nextTick(function() {
8374
state++;
8475
// fluchCallback part 2
85-
assert.strictEqual(state, 13);
76+
assert.strictEqual(state, 11);
8677
done();
8778
});
8879
}, 1)
8980
});
9081
t.on('finish', common.mustCall(function() {
9182
state++;
9283
// finishListener
93-
assert.strictEqual(state, 14);
84+
assert.strictEqual(state, 12);
9485
}, 1));
9586
t.on('end', common.mustCall(function() {
9687
state++;
9788
// endEvent
98-
assert.strictEqual(state, 16);
89+
assert.strictEqual(state, 14);
9990
}, 1));
10091
t.on('data', common.mustCall(function(d) {
10192
// dataListener
@@ -106,5 +97,5 @@ t.write(4);
10697
t.end(7, common.mustCall(function() {
10798
state++;
10899
// endMethodCallback
109-
assert.strictEqual(state, 15);
100+
assert.strictEqual(state, 13);
110101
}, 1));

test/parallel/test-stream-transform-final.js

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,38 +66,27 @@ const t = new stream.Transform({
6666
assert.strictEqual(++state, chunk + 2);
6767
process.nextTick(next);
6868
}, 3),
69-
final: common.mustCall(function(done) {
70-
state++;
71-
// finalCallback part 1
72-
assert.strictEqual(state, 10);
73-
setTimeout(function() {
74-
state++;
75-
// finalCallback part 2
76-
assert.strictEqual(state, 11);
77-
done();
78-
}, 100);
79-
}, 1),
8069
flush: common.mustCall(function(done) {
8170
state++;
8271
// flushCallback part 1
83-
assert.strictEqual(state, 12);
72+
assert.strictEqual(state, 10);
8473
process.nextTick(function() {
8574
state++;
8675
// flushCallback part 2
87-
assert.strictEqual(state, 15);
76+
assert.strictEqual(state, 11);
8877
done();
8978
});
9079
}, 1)
9180
});
9281
t.on('finish', common.mustCall(function() {
9382
state++;
9483
// finishListener
95-
assert.strictEqual(state, 13);
84+
assert.strictEqual(state, 12);
9685
}, 1));
9786
t.on('end', common.mustCall(function() {
9887
state++;
9988
// end event
100-
assert.strictEqual(state, 16);
89+
assert.strictEqual(state, 14);
10190
}, 1));
10291
t.on('data', common.mustCall(function(d) {
10392
// dataListener
@@ -108,5 +97,5 @@ t.write(4);
10897
t.end(7, common.mustCall(function() {
10998
state++;
11099
// endMethodCallback
111-
assert.strictEqual(state, 14);
100+
assert.strictEqual(state, 13);
112101
}, 1));

test/parallel/test-stream2-transform.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ const Transform = require('_stream_transform');
4545

4646
assert.strictEqual(tx.readableLength, 10);
4747
assert.strictEqual(transformed, 10);
48-
assert.strictEqual(tx._transformState.writechunk.length, 5);
4948
assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
5049
return c.chunk.length;
5150
}), [6, 7, 8, 9, 10]);

test/parallel/test-zlib-flush-drain.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ deflater.on('drain', function() {
3636
drainCount++;
3737
});
3838

39+
deflater.on('data', function() {
40+
// Keep reading data or flush will
41+
// wait until buffer is emptied.
42+
});
43+
3944
process.once('exit', function() {
4045
assert.strictEqual(
4146
beforeFlush, true);

0 commit comments

Comments
 (0)