Skip to content

Commit

Permalink
stream: simplify prefinish
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Oct 16, 2023
1 parent f09a50c commit 49bfa18
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 41 deletions.
2 changes: 1 addition & 1 deletion benchmark/streams/creation.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const {
} = require('stream');

const bench = common.createBenchmark(main, {
n: [50e6],
n: [10e6],
kind: ['duplex', 'readable', 'transform', 'writable'],
});

Expand Down
73 changes: 33 additions & 40 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -865,52 +865,45 @@ function needFinish(state) {
)) === (kEnding | kConstructed) && state.length === 0);
}

function callFinal(stream, state) {
let called = false;

function onFinish(err) {
if (called) {
errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
return;
}
called = true;

state.pendingcb--;
if (err) {
callFinishedCallbacks(state, err);
errorOrDestroy(stream, err, (state[kState] & kSync) !== 0);
} else if (needFinish(state)) {
state[kState] |= kPrefinished;
stream.emit('prefinish');
// Backwards compat. Don't check state.sync here.
// Some streams assume 'finish' will be emitted
// asynchronously relative to _final callback.
state.pendingcb++;
process.nextTick(finish, stream, state);
}
function onFinish(stream, state, err) {
if ((state[kState] & kPrefinished) !== 0) {
errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
return;
}

state[kState] |= kSync;
state.pendingcb++;

try {
stream._final(onFinish);
} catch (err) {
onFinish(err);
state.pendingcb--;
if (err) {
callFinishedCallbacks(state, err);
errorOrDestroy(stream, err, (state[kState] & kSync) !== 0);
} else if (needFinish(state)) {
state[kState] |= kPrefinished;
stream.emit('prefinish');
// Backwards compat. Don't check state.sync here.
// Some streams assume 'finish' will be emitted
// asynchronously relative to _final callback.
state.pendingcb++;
process.nextTick(finish, stream, state);
}

state[kState] &= ~kSync;
}

function prefinish(stream, state) {
if ((state[kState] & (kPrefinished | kFinalCalled)) === 0) {
if (typeof stream._final === 'function' && (state[kState] & kDestroyed) === 0) {
state[kState] |= kFinalCalled;
callFinal(stream, state);
} else {
state[kState] |= kPrefinished;
stream.emit('prefinish');
if ((state[kState] & (kPrefinished | kFinalCalled)) !== 0) {
return
}

if (typeof stream._final === 'function' && (state[kState] & kDestroyed) === 0) {
state[kState] |= kFinalCalled | kSync;
state.pendingcb++;

try {
stream._final(err => onFinish(stream, state, err));
} catch (err) {
onFinish(stream, state, err);
}

state[kState] &= ~kSync;
} else {
state[kState] |= kFinalCalled | kPrefinished;
stream.emit('prefinish');
}
}

Expand Down

0 comments on commit 49bfa18

Please sign in to comment.