Open
Description
I love CSP and think it may be a better approach than asynchronous iteration. For filtering channels, I’d prefer an approach that is different than constructor + callback. What do you think? If this makes sense, it could be added as, e.g., Channel.filter()
.
Error handling and the sentinel value END_OF_CHANNEL
feel like work-arounds. Are there ways of handling closing and errors that are more in line with the style of the API?
Example:
import fs from 'fs';
import Channel from 'async-csp';
const END_OF_CHANNEL = Symbol();
function filter(inputChannel, ...filterFuncs) {
for (const filterFunc of filterFuncs) {
const outputChannel = new Channel();
filterFunc(inputChannel, outputChannel);
inputChannel = outputChannel;
}
return inputChannel;
}
function readFile(fileName) {
const channel = new Channel();
const readStream = fs.createReadStream(fileName,
{ encoding: 'utf8', bufferSize: 1024 });
readStream.on('data', buffer => {
const str = buffer.toString('utf8');
channel.put(str);
});
readStream.on('error', err => {
channel.put(err);
});
readStream.on('end', () => {
// Signal end of output sequence
channel.put(END_OF_CHANNEL);
});
return channel;
}
async function splitLines(input, output) {
let previous = '';
while (true) {
const chunk = await input.take();
if (chunk === END_OF_CHANNEL) break;
if (chunk instanceof Error) {
output.put(chunk);
return;
}
previous += chunk;
let eolIndex;
while ((eolIndex = previous.indexOf('\n')) >= 0) {
const line = previous.slice(0, eolIndex);
output.put(line);
previous = previous.slice(eolIndex+1);
}
}
if (previous.length > 0) {
output.put(previous);
}
output.put(END_OF_CHANNEL);
}
async function prefixLines(input, output) {
while (true) {
const line = await input.take();
if (line === END_OF_CHANNEL) {
output.put(END_OF_CHANNEL);
return;
}
if (line instanceof Error) {
output.put(line);
return;
}
output.put('> '+line);
}
}
async function main() {
const fileName = process.argv[2];
const ch = filter(readFile(fileName), splitLines, prefixLines);
while (true) {
const line = await ch.take();
if (line === END_OF_CHANNEL) break;
console.log(line);
}
}
main();