Skip to content

A different approach to filtering? #17

Open
@rauschma

Description

@rauschma

I love CSP and think it may be a better approach than asynchronous iteration. For filtering channels, I’d prefer an approach that is different than constructor + callback. What do you think? If this makes sense, it could be added as, e.g., Channel.filter().

Error handling and the sentinel value END_OF_CHANNEL feel like work-arounds. Are there ways of handling closing and errors that are more in line with the style of the API?

Example:

import fs from 'fs';

import Channel from 'async-csp';

const END_OF_CHANNEL = Symbol();

function filter(inputChannel, ...filterFuncs) {
    for (const filterFunc of filterFuncs) {
        const outputChannel = new Channel();
        filterFunc(inputChannel, outputChannel);
        inputChannel = outputChannel;
    }
    return inputChannel;
}

function readFile(fileName) {
    const channel = new Channel();
    const readStream = fs.createReadStream(fileName,
        { encoding: 'utf8', bufferSize: 1024 });
    readStream.on('data', buffer => {
        const str = buffer.toString('utf8');
        channel.put(str);
    });
    readStream.on('error', err => {
        channel.put(err);
    });
    readStream.on('end', () => {
        // Signal end of output sequence
        channel.put(END_OF_CHANNEL);
    });
    return channel;
}

async function splitLines(input, output) {
    let previous = '';
    while (true) {
        const chunk = await input.take();
        if (chunk === END_OF_CHANNEL) break;
        if (chunk instanceof Error) {
            output.put(chunk);
            return;
        }
        previous += chunk;
        let eolIndex;
        while ((eolIndex = previous.indexOf('\n')) >= 0) {
            const line = previous.slice(0, eolIndex);
            output.put(line);
            previous = previous.slice(eolIndex+1);
        }
    }
    if (previous.length > 0) {
        output.put(previous);
    }
    output.put(END_OF_CHANNEL);
}

async function prefixLines(input, output) {
    while (true) {
        const line = await input.take();
        if (line === END_OF_CHANNEL) {
            output.put(END_OF_CHANNEL);
            return;
        }
        if (line instanceof Error) {
            output.put(line);
            return;
        }
        output.put('> '+line);
    }
}

async function main() {
    const fileName = process.argv[2];

    const ch = filter(readFile(fileName), splitLines, prefixLines);

    while (true) {
        const line = await ch.take();
        if (line === END_OF_CHANNEL) break;
        console.log(line);
    }
}
main();

Metadata

Metadata

Assignees

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions