Skip to content

A simple yet powerful abstraction that enables communication between asynchronous tasks.

License

Notifications You must be signed in to change notification settings

cdellacqua/channel.js

Repository files navigation

reactive-channel

A simple yet powerful abstraction that enables communication between asynchronous tasks.

A Channel is an abstraction that enables communication between asynchronous tasks. A channel exposes two objects: tx and rx, which respectively provide methods to transmit and receive data.

Channels can be used and combined in a multitude of ways. The simplest way to use a channel is to create a simplex communication: one task transmits data, another consumes it. A full-duplex communication can be achieved by creating two channels and exchanging the rx and tx objects between two tasks.

It's also possible to create a Multiple Producers Single Consumer (mpsc) scenario by sharing a single channel among several tasks.

NPM Package

npm install reactive-channel

Documentation

Highlights

Channel<T> provides the following properties:

  • tx, an object that exposes methods and properties to interact and query the transmitting end of the channel;
  • rx, an object that exposes methods and properties to interact and query the receiving end of the channel;
  • buffer, a readonly queue that represents the internal buffer containing data waiting to be consumed by the receiving end.

The tx object is a ChannelTx<T>, which in turn exposes the following:

  • capacity, the total capacity of the channel, configured at creation time;
  • send(...), to send data to the receiving end. If the receiving end is not waiting for data, this method will enqueue the message in the channel buffer;
  • sendWait(...), similar to send(...), but let the caller wait for the receiving end to consume the data;
  • close(), to close the channel. This will also reject all pending receiving Promises;
  • closed$, a boolean store that can be used to check if the channel has been closed, i.e. the close() method has been called by either the receiving or transmission end of the channel;
  • canWrite$, a boolean store that can be used to check if the channel is capable of receiving data, that is its buffer is not full and the channel is not closed;
  • availableOutboxSlots$, a store that can be used to check how much space is available in the transmission buffer.

The rx object is a ChannelRx<T>, which in turn exposes the following:

  • capacity, the total capacity of the channel, configured at creation time;
  • recv(...), to receive data. If the transmission buffer contains some data the returned promise resolves immediately, otherwise it will resolve once the transmitting end calls send(...) or sendWait(...);
  • iter()/[Symbol.asyncIterator](), to consume the channel buffer using an async iterator;
  • close(), to close the channel. This will also reject all pending receiving Promises;
  • closed$, a store store that can be used to check if the channel has been closed, i.e. the close() method has been called by either the receiving or transmission end of the channel;
  • canRead$, a boolean store that can be used to check if the channel can be consumed, that is its buffer is not empty and the channel is not closed;
  • filledInboxSlots$, a store that can be used to check how much space is filled in the transmission buffer;
  • pendingRecvPromises$, a store that can be used to check how many recv calls are pending.

This library provides a function called makeChannel to create Channel<T> objects.

Examples

Basics:

import {makeChannel} from 'reactive-channel';

// Create a channel that can be used to pass strings.
const {rx, tx} = makeChannel<string>();
rx.recv().then((message) => console.log(message));
// ...
tx.send('hello!'); // this will resolve the above promise, printing "hello!"

Reactivity:

import {makeChannel} from 'reactive-channel';

// Create a channel that can be used to pass strings.
const {rx, tx} = makeChannel<string>();
tx.availableOutboxSlots$
	// this will immediately print 1024, which is the default channel capacity.
	.subscribe((n) => console.log(`Available slots: ${n}`));

// this will trigger the above subscription, printing 1023,
// because there is no pending `recv` that can consume the data.
tx.send('hello!');
// this will also trigger the subscription, printing 1024 again,
// because `recv()` will consume the channel buffer, emptying it.
rx.recv();

Simple job queue:

import {makeChannel} from 'reactive-channel';

// Create a channel that can be used to pass strings.
const {rx, tx} = makeChannel<{email: string; content: string}>();

async function processEmailQueue() {
	while (!rx.closed$.content()) {
		try {
			const {email, content} = await rx.recv();
			console.log(`Now sending an email to ${email}`);
			// ... your favorite email dispatcher here.
		} catch (err) {
			console.error('unable to send because of the following error:', err);
		}
	}
}

// Somewhere in the startup of your application.
processEmailQueue();

// Somewhere where an email should be sent, e.g. when a user subscribes to your newsletter.
tx.send({
	email: user.email,
	content: 'Subscription activated, you will receive our best deals and coupons!',
});

Abort a blocking recv(...)

You can abort a recv using a simple timeout:

import {makeChannel} from 'reactive-channel';

// Create a channel that can be used to pass strings.
const {rx, tx} = makeChannel<{email: string; content: string}>();
// You can abort a `recv` using a simple timeout:
try {
	const {email, content} = await rx.recv({signal: AbortSignal.timeout(5000)});
} catch (err) {
	if (err instanceof DOMException && err.name === 'TimeoutError') {
		console.warn('No data to consume, timeout expired');
	}
	// ...
}

Or you can abort a recv using some custom logic, e.g. when the user clicks a button:

import {makeChannel} from 'reactive-channel';

// Create a channel that can be used to pass strings.
const {rx, tx} = makeChannel<{email: string; content: string}>();

const abortController = new AbortController();
// Assuming you have a `abortButton` variable
abortButton.addEventListener(
	'click',
	() => abortController.abort(new Error('Action cancelled by the user'))
);

// You can abort a `recv` using an abort signal:
try {
	const {email, content} = await rx.recv({signal: abortController.signal});
} catch (err) {
	// ...
}

About

A simple yet powerful abstraction that enables communication between asynchronous tasks.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published