Skip to content

Fast, reliable, and scalable channels implementation based on Redis streams.

License

Notifications You must be signed in to change notification settings

hearit-io/redis-channels

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

28 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

@hearit-io/redis-channels

Contributor Covenant js-standard-style NPM License NPM Downloads

It is a fast, reliable, and scalable message broker implementation based on Redis streams.

Suitable for IoT applications with massive network traffic, pub/sub-use cases, or any implementation with multiple producers/consumers.

Allows simple integration with SSE(Server-Sent Event)/WebSocket implementation. See our SSE Fastify based example.

See a live SSE frontend implementation used in our HEARIT.IO project here

Implements a possibility to scale the message processing across different consumers, without single consumers having to process all the messages. A group of consumers, working as a team, can cooperate and consume a separate portion of those messages from the same channel.

It can be used with a single Redis instance and later updated easily to a cluster configuration without the need for any application change. Under the hood ioredis is used as a client.

Simple integration in web frameworks, already available plug-in fastify-redis-channels for our favorite framework Fastify. See all step-by-step examples there.

The implementation uses native Promises.

Do you want your project to grow? Then start right from the begging.

Table of Contents

Install

$ npm install @hearit-io/redis-channels --save

Usage example

Requires running Redis server on a host localhost and a port 6379.

Every consumer processes all messages

'use strict'

const {RedisChannels} = require('@hearit-io/redis-channels')

// A consumer implementation
async function consume(tunnel, channels) {
  for await (const messages of channels.consume(tunnel)) {
    for (const i in messages) {
      // Do something with messages
      console.log(messages[i].data)
    }
  }
}


async function main () {
  try {
  
    const channels = new RedisChannels()
    const tunnelProducer = await channels.use('room')
    
    const tunnelConsumerOne = await channels.use('room')
    const tunnelConsumerTwo = await channels.use('room')
    
    // Subscribe consumer tunnels
    await channels.subscribe(tunnelConsumerOne)
    await channels.subscribe(tunnelConsumerTwo)

    // Start consuming 
    consume(tunnelConsumerOne).catch((error) => {
      console.error(error)
    })
    consume(tunnelConsumerTwo).catch((error) => {
      console.error(error)
    })

    // Produce messages
    await channels.produce(tunnelProducer, 'Hello')
    await channels.produce(tunnelProducer, 'Wold')

    // Unsubscribe all consumers
    await channels.unsubscribe(tunnelConsumerOne)
    await channels.unsubscribe(tunnelConsumerTwo)

    // Delete a channel related to the 'room'.
    await channels.delete('room')
  }
  catch (error) {
    console.error(error)
  }
}
main()

The result on the console is:

Hello
World
Hello
World

Consumer's team

In order to create a group of consumers (a team) which cooperates consuming a different portion of the messages form the same channel, just add a team parameter in the subscribemethod like this:

 
 // Subscribe consumer tunnels
 await channels.subscribe(tunnelConsumerOne, 'team')
 await channels.subscribe(tunnelConsumerTwo, 'team')
 

The result on the console is:

Hello
World

API Documentation

new RedisChannels([options])

Creates an instance of a RedisChannels class. It uses ioredis to manage connections to a Redis.

Options:

Property Type Default Description
channels Object Options related to channels
channels.application string 'app' Application name, used as a prefix part in the Redis key
channels.version number 1 Version number, used as a prefix part in the Redis key
channels.schema string 'channel' Schema name, used as a prefix part in the Redis key
channels.overflow number 100 Trims the channel size (approximately), but never less than the specified number of elements. You must choose a value suitable for your use case and available memory
channels.slots number 32 The number of pre-sharded slots. Possible values are 32 or 64
channels.sharded boolean false Whether the channels will be spread even over the shards. It makes sense if your application connects to a Redis cluster
channels.log Object A logger instance, by default logging is disabled, we use abstract-logging for this purpose
redis Object Options for a Redis clients
redis.nodes Array An array of nodes in the cluster, [{ port: number, host: string }]. If nodes are defined a connection to a cluster will be tried.
redis.url string A URL string to a single Redis server, for example 'redis://user:password@redis-service.com:6379/'
redis.host string 'localhost' A host name of a single Redis server
redis.port number 6379 A port number to connect to a single Redis server
redis.... Rest properites Any other options will be passed to the Redis or the Cluster client

Examples

Channels will use a Redis cluster connected to three nodes with enabled offline queue in a sharded mode.

'use strict'

const {RedisChannels} = require('@hearit-io/redis-channels')

const options = {
  channels: {
    sharded: true
  },
  redis: {
    nodes: [
      { host: '127.0.0.1', port: 6380 },
      { host: '127.0.0.1', port: 6381 },
      { host: '127.0.0.1', port: 6382 }
    ],
    options: {
      enableOfflineQueue: true
    }
  }
}
const channels = new RedisChannels(options)

Channels will use a single Redis connection defined with a URL and keep alive set to 10 seconds.

'use strict'

const {RedisChannels} = require('@hearit-io/redis-channels')

const options = {
  redis: {
    url: 'redis://10.101.201.100:3333',
    keepAlive: 10000
  }
}
const channels = new RedisChannels(options)

The same as above but defined with a host and port.

'use strict'

const {RedisChannels} = require('@hearit-io/redis-channels')

const options = {
  redis: {
    host: '10.101.201.100',
    port: 3333,
    keepAlive: 10000
  }
}
const channels = new RedisChannels(options)

RedisChannels methods

channels.use(group)

Creates a tunnel Object required to do any further operation with the channel associated with the group.

Parameter Type Default Description
group string A name of a channel group (i.e. chat room etc.)

Returns a Promise to a tunnel Object.

channels.produce(tunnel, message[, type = 'all'])

Produces a message to a channel.

Parameter Type Default Description
tunnel Object A tunnel required to peform any operation with the channel
message string A message to produce
type string 'all' Identifies a message source (origination)

Returns a Promise.

channels.subscribe(tunnel[, team, consumer])

Subscribes a tunnel to make possible a consume operation.

Parameter Type Default Description
tunnel Object A tunnel required to perform any operation with the channel
team string the same as a consumer A name of a consumer team. If specified, every consumer within a team will receive different part of the messages arrived in the channels
consumer string a generated v4 uuid A unique consumer name within a team

Returns a Promise.

channels.consume(tunnel[, type = 'all', count = 100, timeout = 10000, fromId = time_in_milisecounds-sequence, messageOnTimeOut = false])

It is an asynchronous iterator which returns an array of messages.

Every message is a couple of { id: <string>, data: <string> }

Parameter Type Default Description
tunnel Object A tunnel required to peform any operation with the channel
type string 'all' Identifies a message source (origination) to consume
count number 100 Defines a maximum number of messages consumed per iteration
timeout number 10000 A blocking timeout in milliseconds
fromId string '>' or '*' Starts consuming messages newer then a given id. Default value is set to > or * whether if it is consumed in a team or not. This means staring form messages that were never delivered to any other consumer. The format is time_in_milisecounds-sequence or only the miliseconds part of the id.
messageOnTimeOut boolean false If set, in a case of a timeout a message array [{id: data: null}] will be returned to indecate it. If there were no consumed messages the id value will be undefined. This feature is usefull if you want to build in your consumer an additional functionality - for example sending to an SSE(Service Send Event)/websocket ping messages on every consumer timeout.

Returns a Promise to an Array of Objects.

channels.unsubscribe(tunnel)

Unsubscribes form a tunnel. This causes the consume method to finish with the iteration and cleans up the associated consumer.

Parameter Type Default Description
tunnel Object A tunnel required to peform any operation with the channel. Use only subscribed tunnels, otherways an exception will throw.

Returns a Promise.

channels.delete(group)

Deletes all data in the Redis database associated with the group.

Parameter Type Default Description
group string A name of a channel group (i.e. chat room etc.)

Returns a Promise .

channels.cleanup()

Closes all redis clients and deletes all consumers/consumer groups. Useful in agraceful application shutdown.

Returns a Promise .

Error handling

In an error all methods throw an instance of RedisChannelsError class.

It has following properties:

Property Type Default Description
message string An error description
error Object An error object caused the exception

A usage example of an error handling:

'use strict'
const {RedisChannels, RedisChannelsError} = require('@hearit-io/redis-channels')

try {
   // Your code
} 
catch (exception) {
   if (exception instanceof RedisChannelsError) {
      // Handle the error
      console.error(exception.message)
      console.error (exception.error)
   }
}

Running tests

We want to deliver the highest possible quality to our valuable open source community by implementing 100 % test coverage for all packages.

To run the test cases a running Redis server is required.

In the environment variable REDIS_NODES you can set a list of Redis nodes separated by space as example:

export REDIS_NODES="127.0.0.1:6380 127.0.0.1:6381 127.0.0.1:6382"

If there are more than one entry in the list a connection to a Redis Cluster will be used for the tests. If REDIS_NODES is not set a connection to 127.0.0.1:6379 will be tried.

Similar to a variable REDIS_NODES, a variable REDIS_DOWN_NODES is used in the test cases covering missing connection to a Redis server. A default value is '127.0.0.1:3333'.

After each test, the data base will be FLUSHED. Please make sure there is no valuable data in the data base before running the tests!

Unit tests

Start the unit tests with:

npm test

Heavy load/reach the limit tests

The purpose of those tests is to verify the behavior of your system in a situation of a massive traffic - go to the limits.

This can be used to estimate the maximum number of consumer/producers, the throughput, and the need of operation system tunning.

You can use following environment variables to configure your tests:

  • NUMBER_OF_GROUPS - defines the number of channel groups (i. e. chat rooms, ...). Default value is 10.
  • NUMBER_OF_MESSAGES - number of messages to produce per group. Default value is 100
  • NUMBER_OF_CONSUMERS - number of consumers per group. Default value is 10
  • SHARDED - If channels sharded mode will be used. Default value is 'false', to enable it set to 'true'
  • GROUP_PREFIX - The prefix for the group name. Default value is 'GROUP'.

Following command starts four node processes:

cd ./test/heavy-load 
./heavy-run.sh

The test results for each process can be found in the ./test/heavy-load/log directory.

To clean-up the data base call:

./heavy-run-cleanup.sh

Make sure there is no valuable data in the data base before running the tests!

Tunning Redis and OS

You may need to adapt maxclients configuration parameter in the Redis server confg file (usually /etc/redis.conf). For more details see the Redis documentation about this topic.

On a Unix based system increase the maximum allowed number of temporary ports.

Check the current system value of local port range with:

cat /proc/sys/net/ipv4/ip_local_port_range  

Increase the value by defining a bigger range, for example with the command:

echo "1024 65535" > /proc/sys/net/ipv4/ip_local_port_range

To make the change permanent add following line in /etc/sysctl.conf file:

net.ipv4.ip_local_port_range = 1024 65535

Check the used TCP and UDP connection with the command:

netstat -an | grep -e tcp -e udp | wc -l

or

ss -s

Benchmarks

We reached follwing values in our benchmark tests on a Linux Debain server with 64 GB memory, single i7-7700 CPU 3.60GHz:

  • Message size - 4 000 bytes.
  • Number of Redis servers - 1.
  • Number of node processes - 4.
  • Number of channels groups (i.e. chat rooms, ...) - 3 000.
  • Number of consumers - 30 000.
  • Number of produced messages - 300 000.
  • Number of consumed messages - 3 000 000.
  • Avarage processed messages per second - 24 200.

TODO

The list of already implemented / planed features:

  • Limit the maximum number of channel elements in the produce method (capped streams).
  • Implement a scenario where all consumers are served with the same messages arrived in the channels.
  • Add Benchmarks.
  • Implement a scenario where consumers are served with the different part of the messages arrived in the channels (a work in a team).
  • Introduce a parameter in the consume method which allows starting message consuming form a given period in the past.
  • Implement consume with acknowledge functionality.
  • Implement a channel monitoring capability.
  • TypeScript support.

Project status

Smart home automatization designed for visually impaired people.

@heart-io/redis-channels is used productive in our progressive web app. You can try it with a user demo@hearit.io and password: 11223344

The package will be updated and maintained regularly.

The main goal of hearit.io is to make accessible the world of IoT to everyone. We created a speaking home suitable for all.

We will be grateful to you if you make awareness to other people of our project.

To finance our idea, we would be glad to support and work on your projects. Contact Emil emil@hearit.io for details and our availability.

Other open-source packages, part of our project, will be available soon. We use Fastify as an application framework.

Authors and acknowledgment

Emil Usunov

hearit.io

License

MIT