Skip to content

Commit

Permalink
Separate smaller core
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed Sep 8, 2024
1 parent b3e23ab commit c60bb52
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 98 deletions.
1 change: 0 additions & 1 deletion source/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {stripVTControlCharacters} from 'node:util';
export const getContext = raw => ({
start: process.hrtime.bigint(),
command: raw.map(part => getCommandPart(stripVTControlCharacters(part))).join(' '),
state: {stdout: '', stderr: '', output: ''},
});

const getCommandPart = part => /[^\w./-]/.test(part)
Expand Down
19 changes: 16 additions & 3 deletions source/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {getContext} from './context.js';
import {getOptions} from './options.js';
import {spawnSubprocess} from './spawn.js';
import {handleArguments} from './spawn.js';
import picoSpawn from './pico-spawn.js';
import {getResult} from './result.js';
import {handlePipe} from './pipe.js';
import {lineIterator, combineAsyncIterators} from './iterable.js';
Expand All @@ -9,8 +10,9 @@ export default function spawn(file, second, third, previous) {
const [commandArguments = [], options = {}] = Array.isArray(second) ? [second, third] : [[], second];
const context = getContext([file, ...commandArguments]);
const spawnOptions = getOptions(options);
const nodeChildProcess = spawnSubprocess(file, commandArguments, spawnOptions, context);
let subprocess = getResult(nodeChildProcess, spawnOptions, context);
const picoPromise = getPicoSubprocess(file, commandArguments, spawnOptions, context);
const nodeChildProcess = getNodeChildProcess(picoPromise);
let subprocess = getResult(picoPromise, nodeChildProcess, context, spawnOptions);
Object.assign(subprocess, {nodeChildProcess});
subprocess = previous ? handlePipe([previous, subprocess]) : subprocess;

Expand All @@ -24,3 +26,14 @@ export default function spawn(file, second, third, previous) {
pipe: (file, second, third) => spawn(file, second, third, subprocess),
});
}

const getPicoSubprocess = async (file, commandArguments, spawnOptions, context) => {
const spawnArguments = await handleArguments(file, commandArguments, spawnOptions, context);
const picoSubprocess = picoSpawn(...spawnArguments);
return {picoSubprocess};
};

const getNodeChildProcess = async picoPromise => {
const {picoSubprocess} = await picoPromise;
return picoSubprocess.nodeChildProcess;
};
6 changes: 3 additions & 3 deletions source/iterable.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
export const lineIterator = async function * (subprocess, {state}, streamName) {
export const lineIterator = async function * (subprocess, context, streamName) {
// Prevent buffering when iterating.
// This would defeat one of the main goals of iterating: low memory consumption.
if (state.isIterating === false) {
if (context.isIterating === false) {
throw new Error(`The subprocess must be iterated right away, for example:
for await (const line of spawn(...)) { ... }`);
}

state.isIterating = true;
context.isIterating = true;

try {
const {[streamName]: stream} = await subprocess.nodeChildProcess;
Expand Down
90 changes: 90 additions & 0 deletions source/pico-spawn.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import {spawn} from 'node:child_process';
import {once, on} from 'node:events';

export default function picoSpawn(file, second, third) {
const [commandArguments = [], options = {}] = Array.isArray(second) ? [second, third] : [[], second];
const state = {
stdout: '',
stderr: '',
output: '',
command: [file, ...commandArguments].join(' '),
};
const nodeChildProcess = spawnSubprocess(file, commandArguments, options, state);
return Object.assign(getResult(nodeChildProcess, state), {nodeChildProcess});
}

const spawnSubprocess = async (file, commandArguments, options, state) => {
try {
const instance = spawn(file, commandArguments, options);
bufferOutput(instance.stdout, 'stdout', options, state);
bufferOutput(instance.stderr, 'stderr', options, state);

// The `error` event is caught by `once(instance, 'spawn')` and `once(instance, 'close')`.
// But it creates an uncaught exception if it happens exactly one tick after 'spawn'.
// This prevents that.
instance.once('error', () => {});

await once(instance, 'spawn');
return instance;
} catch (error) {
throw getSubprocessError(error, {}, state);
}
};

const bufferOutput = (stream, streamName, {buffer = true}, state) => {
if (stream) {
stream.setEncoding('utf8');
if (buffer) {
stream.on('data', chunk => {
state[streamName] += chunk;
state.output += chunk;
});
}
}
};

const getResult = async (nodeChildProcess, state) => {
const instance = await nodeChildProcess;
const onClose = once(instance, 'close');

try {
await Promise.race([
onClose,
...instance.stdio.filter(Boolean).map(stream => onStreamError(stream)),
]);
checkFailure(instance, state);
return state;
} catch (error) {
await Promise.allSettled([onClose]);
throw getSubprocessError(error, instance, state);
}
};

const onStreamError = async stream => {
for await (const [error] of on(stream, 'error')) {
// Ignore errors that are due to closing errors when the subprocesses exit normally, or due to piping
if (!['ERR_STREAM_PREMATURE_CLOSE', 'EPIPE'].includes(error?.code)) {
throw error;
}
}
};

const checkFailure = ({exitCode, signalCode}, {command}) => {
if (signalCode) {
throw new Error(`Command was terminated with ${signalCode}: ${command}`);
}

if (exitCode >= 1) {
throw new Error(`Command failed with exit code ${exitCode}: ${command}`);
}
};

const getSubprocessError = (error, {exitCode, signalCode}, state) => Object.assign(
error?.message?.includes('Command ')
? error
: new Error(`Command failed: ${state.command}`, {cause: error}),
// `exitCode` can be a negative number (`errno`) when the `error` event is emitted on the `instance`
exitCode >= 1 ? {exitCode} : {},
signalCode ? {signalName: signalCode} : {},
state,
);
67 changes: 14 additions & 53 deletions source/result.js
Original file line number Diff line number Diff line change
@@ -1,66 +1,27 @@
import {once, on} from 'node:events';
import process from 'node:process';

export const getResult = async (nodeChildProcess, {input}, context) => {
const instance = await nodeChildProcess;
if (input !== undefined) {
instance.stdin.end(input);
}

const onClose = once(instance, 'close');

export const getResult = async (picoPromise, nodeChildProcess, context, options) => {
try {
await Promise.race([
onClose,
...instance.stdio.filter(Boolean).map(stream => onStreamError(stream)),
]);
checkFailure(context, getErrorOutput(instance));
return getOutputs(context);
const {picoSubprocess} = await picoPromise;
const [result] = await Promise.all([picoSubprocess, nodeChildProcess, handleInput(picoSubprocess, options)]);
return updateResult(result, context);
} catch (error) {
await Promise.allSettled([onClose]);
throw getResultError(error, instance, context);
}
};

const onStreamError = async stream => {
for await (const [error] of on(stream, 'error')) {
// Ignore errors that are due to closing errors when the subprocesses exit normally, or due to piping
if (!['ERR_STREAM_PREMATURE_CLOSE', 'EPIPE'].includes(error?.code)) {
throw error;
}
error.message = error.message.replaceAll(error.command, context.command);
throw updateResult(error, context);
}
};

const checkFailure = ({command}, {exitCode, signalName}) => {
if (signalName !== undefined) {
throw new Error(`Command was terminated with ${signalName}: ${command}`);
}

if (exitCode !== undefined) {
throw new Error(`Command failed with exit code ${exitCode}: ${command}`);
const handleInput = async ({nodeChildProcess}, {input}) => {
if (input !== undefined) {
const {stdin} = await nodeChildProcess;
stdin.end(input);
}
};

export const getResultError = (error, instance, context) => Object.assign(
getErrorInstance(error, context),
getErrorOutput(instance),
getOutputs(context),
);

const getErrorInstance = (error, {command}) => error?.message.startsWith('Command ')
? error
: new Error(`Command failed: ${command}`, {cause: error});

const getErrorOutput = ({exitCode, signalCode}) => ({
// `exitCode` can be a negative number (`errno`) when the `error` event is emitted on the `instance`
...(exitCode < 1 ? {} : {exitCode}),
...(signalCode === null ? {} : {signalName: signalCode}),
});

const getOutputs = ({state: {stdout, stderr, output}, command, start}) => ({
stdout: getOutput(stdout),
stderr: getOutput(stderr),
output: getOutput(output),
const updateResult = (result, {command, start}) => Object.assign(result, {
stdout: getOutput(result.stdout),
stderr: getOutput(result.stderr),
output: getOutput(result.output),
command,
durationMs: Number(process.hrtime.bigint() - start) / 1e6,
});
Expand Down
49 changes: 11 additions & 38 deletions source/spawn.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,16 @@
import {spawn} from 'node:child_process';
import {once} from 'node:events';
import process from 'node:process';
import {applyForceShell} from './windows.js';
import {getResultError} from './result.js';

export const spawnSubprocess = async (file, commandArguments, options, context) => {
try {
// When running `node`, keep the current Node version and CLI flags.
// Not applied with file paths to `.../node` since those indicate a clear intent to use a specific Node version.
// Does not work with shebangs, but those don't work cross-platform anyway.
[file, commandArguments] = ['node', 'node.exe'].includes(file.toLowerCase())
? [process.execPath, [...process.execArgv.filter(flag => !flag.startsWith('--inspect')), ...commandArguments]]
: [file, commandArguments];
export const handleArguments = async (file, commandArguments, options, context) => {
// When running `node`, keep the current Node version and CLI flags.
// Not applied with file paths to `.../node` since those indicate a clear intent to use a specific Node version.
// This also provides a way to opting out, e.g. using `process.execPath` instead of `node` to discard current CLI flags.
// Does not work with shebangs, but those don't work cross-platform anyway.
[file, commandArguments] = ['node', 'node.exe'].includes(file.toLowerCase())
? [process.execPath, [...process.execArgv.filter(flag => !flag.startsWith('--inspect')), ...commandArguments]]
: [file, commandArguments];

const instance = spawn(...await applyForceShell(file, commandArguments, options));
bufferOutput(instance.stdout, context, 'stdout');
bufferOutput(instance.stderr, context, 'stderr');

// The `error` event is caught by `once(instance, 'spawn')` and `once(instance, 'close')`.
// But it creates an uncaught exception if it happens exactly one tick after 'spawn'.
// This prevents that.
instance.once('error', () => {});

await once(instance, 'spawn');
return instance;
} catch (error) {
throw getResultError(error, {}, context);
}
};

const bufferOutput = (stream, {state}, streamName) => {
if (stream) {
stream.setEncoding('utf8');
if (!state.isIterating) {
state.isIterating = false;
stream.on('data', chunk => {
state[streamName] += chunk;
state.output += chunk;
});
}
}
[file, commandArguments, options] = await applyForceShell(file, commandArguments, options);
context.isIterating ??= false;
return [file, commandArguments, {...options, buffer: !context.isIterating}];
};

0 comments on commit c60bb52

Please sign in to comment.