Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

`@rlanz/bus` is a service bus implementation for Node.js. It is designed to be simple and easy to use.

Currently, it supports the following drivers:
Currently, it supports the following transports:

<p>
👉 <strong>Memory:</strong> A simple in-memory driver for testing purposes.<br />
👉 <strong>Redis:</strong> A Redis driver for production usage.
👉 <strong>Memory:</strong> A simple in-memory transport for testing purposes.<br />
👉 <strong>Redis:</strong> A Redis transport for production usage.
</p>

## Table of Contents
Expand All @@ -46,17 +46,17 @@ The module exposes a manager that can be used to register buses.

```typescript
import { BusManager } from '@rlanz/bus'
import { redis } from "@rlanz/bus/drivers/redis"
import { memory } from "@rlanz/bus/drivers/memory"
import { redis } from '@rlanz/bus/transports/redis'
import { memory } from '@rlanz/bus/transports/memory'

const manager = new BusManager({
default: 'main',
transports: {
main: {
driver: memory(),
transport: memory(),
},
redis: {
driver: redis({
transport: redis({
host: 'localhost',
port: 6379,
}),
Expand All @@ -81,6 +81,26 @@ By default, the bus will use the `default` transport. You can specify different
manager.use('redis').publish('channel', 'Hello world')
```

### Without the manager

If you don't need multiple buses, you can create a single bus directly by importing the transports and the Bus class.

```typescript
import { Bus } from '@rlanz/bus'
import { RedisTransport } from '@rlanz/bus/transports/redis'

const transport = new RedisTransport({
host: 'localhost',
port: 6379,
})

const bus = new Bus(transport, {
retryQueue: {
retryInterval: '100ms'
}
})
```

## Retry Queue

The bus also supports a retry queue. When a message fails to be published, it will be moved to the retry queue.
Expand All @@ -92,7 +112,7 @@ const manager = new BusManager({
default: 'main',
transports: {
main: {
driver: redis({
transport: redis({
host: 'localhost',
port: 6379,
}),
Expand Down Expand Up @@ -126,6 +146,26 @@ export interface RetryQueueOptions {
}
```

## Test helpers

The module also provides some test helpers to make it easier to test the code that relies on the bus. First, you can use the `MemoryTransport` to create a bus that uses an in-memory transport.

You can also use the `ChaosTransport` to simulate a transport that fails randomly, in order to test the resilience of your code.

```ts
import { Bus } from '@rlanz/bus'
import { ChaosTransport } from '@rlanz/bus/test_helpers'

const buggyTransport = new ChaosTransport(new MemoryTransport())
const bus = new Bus(buggyTransport)

/**
* Now, every time you will try to publish a message, the transport
* will throw an error.
*/
buggyTransport.alwaysThrow()
```

[gh-workflow-image]: https://img.shields.io/github/actions/workflow/status/romainlanz/bus/test.yml?branch=main&style=for-the-badge
[gh-workflow-url]: https://github.com/romainlanz/bus/actions/workflows/test.yml
[npm-image]: https://img.shields.io/npm/v/@rlanz/bus.svg?style=for-the-badge&logo=npm
Expand Down
12 changes: 0 additions & 12 deletions drivers/memory.ts

This file was deleted.

13 changes: 0 additions & 13 deletions drivers/redis.ts

This file was deleted.

12 changes: 11 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
],
"exports": {
".": "./build/index.js",
"./drivers/*": "./build/drivers/*.js",
"./transports/*": "./build/src/transports/*.js",
"./test_helpers": "./build/src/test_helpers/index.js",
"./types/*": "./build/src/types/*.js"
},
"scripts": {
Expand All @@ -25,6 +26,14 @@
"update:toc": "npx doctoc README.md",
"test": "c8 node --loader ts-node/esm --enable-source-maps bin/test.ts"
},
"peerDependencies": {
"ioredis": "^5.0.0"
},
"peerDependenciesMeta": {
"ioredis": {
"optional": true
}
},
"dependencies": {
"@paralleldrive/cuid2": "^2.2.2",
"@poppinss/utils": "^6.7.2",
Expand All @@ -39,6 +48,7 @@
"@japa/runner": "^3.1.1",
"@swc/core": "^1.4.0",
"@testcontainers/redis": "^10.7.1",
"@types/node": "^20.12.5",
"@types/object-hash": "^3.0.6",
"c8": "^9.1.0",
"del-cli": "^5.1.0",
Expand Down
18 changes: 9 additions & 9 deletions src/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import debug from './debug.js'
import type { RetryQueueOptions, Serializable, SubscribeHandler, Transport } from './types/main.js'

export class Bus {
readonly #driver: Transport
readonly #transport: Transport
readonly #busId: string
readonly #errorRetryQueue: RetryQueue
readonly #retryQueueInterval: NodeJS.Timeout | undefined

constructor(driver: Transport, options?: { retryQueue?: RetryQueueOptions }) {
this.#driver = driver
constructor(transport: Transport, options?: { retryQueue?: RetryQueueOptions }) {
this.#transport = transport
this.#busId = createId()
this.#errorRetryQueue = new RetryQueue(options?.retryQueue)

Expand All @@ -33,7 +33,7 @@ export class Bus {
}, intervalValue)
}

driver.setId(this.#busId).onReconnect(() => this.#onReconnect())
transport.setId(this.#busId).onReconnect(() => this.#onReconnect())
}

getRetryQueue() {
Expand All @@ -50,15 +50,15 @@ export class Bus {
}

async #onReconnect() {
debug(`bus driver ${this.#driver.constructor.name} reconnected`)
debug(`bus transport ${this.#transport.constructor.name} reconnected`)

await this.processErrorRetryQueue()
}

subscribe<T extends Serializable>(channel: string, handler: SubscribeHandler<T>) {
debug(`subscribing to channel ${channel}`)

return this.#driver.subscribe(channel, async (message) => {
return this.#transport.subscribe(channel, async (message) => {
debug('received message %j from bus', message)
// @ts-expect-error - TODO: Weird typing issue
handler(message)
Expand All @@ -69,7 +69,7 @@ export class Bus {
try {
debug('publishing message "%j" to channel "%s"', message, channel)

await this.#driver.publish(channel, message)
await this.#transport.publish(channel, message)

return true
} catch (error) {
Expand All @@ -92,10 +92,10 @@ export class Bus {
clearInterval(this.#retryQueueInterval)
}

return this.#driver.disconnect()
return this.#transport.disconnect()
}

unsubscribe(channel: string) {
return this.#driver.unsubscribe(channel)
return this.#transport.unsubscribe(channel)
}
}
6 changes: 3 additions & 3 deletions src/bus_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ export class BusManager<KnownTransports extends Record<string, TransportConfig>>
return cachedTransport
}

const driverConfig = this.#transports[transportToUse]
const transportConfig = this.#transports[transportToUse]

debug('creating new transport instance for %s', transportToUse)
const transportInstance = new Bus(driverConfig.driver(), {
retryQueue: driverConfig.retryQueue,
const transportInstance = new Bus(transportConfig.transport(), {
retryQueue: transportConfig.retryQueue,
})
this.#transportsCache[transportToUse] = transportInstance

Expand Down
8 changes: 8 additions & 0 deletions src/test_helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
* @rlanz/bus
*
* @license MIT
* @copyright Romain Lanz <romain.lanz@pm.me>
*/

export { ChaosTransport } from "../test_helpers/chaos_transport.js";
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

import type { Transport, Serializable, SubscribeHandler } from '../types/main.js'

export function memory() {
return () => new MemoryTransport()
}

export class MemoryTransport implements Transport {
#id!: string

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ import type {
TransportMessage,
Serializable,
SubscribeHandler,
RedisTransportConfig,
} from '../types/main.js'

export function redis(config: RedisTransportConfig, encoder?: TransportEncoder) {
return () => new RedisTransport(config, encoder)
}

export class RedisTransport implements Transport {
readonly #publisher: Redis
readonly #subscriber: Redis
Expand Down
2 changes: 1 addition & 1 deletion src/types/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export interface ManagerConfig<KnownTransports extends Record<string, TransportC
}

export interface TransportConfig {
driver: TransportFactory
transport: TransportFactory
retryQueue?: RetryQueueOptions
}

Expand Down
2 changes: 1 addition & 1 deletion test_helpers/chaos_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { Transport, Serializable, SubscribeHandler } from '../src/types/mai

export class ChaosTransport implements Transport {
/**
* The inner transport driver that is wrapped
* The inner transport that is wrapped
*/
readonly #innerTransport: Transport

Expand Down
2 changes: 1 addition & 1 deletion tests/bus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import { setTimeout } from 'node:timers/promises'
import { test } from '@japa/runner'
import { Bus } from '../src/bus.js'
import { MemoryTransport } from '../src/drivers/memory_transport.js'
import { ChaosTransport } from '../test_helpers/chaos_transport.js'
import { MemoryTransport } from '../src/transports/memory_transport.js'

const kTestingChannel = 'testing-channel'

Expand Down
26 changes: 13 additions & 13 deletions tests/bus_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import { test } from '@japa/runner'
import { Bus } from '../src/bus.js'
import { BusManager } from '../src/bus_manager.js'
import { MemoryTransport } from '../src/drivers/memory_transport.js'
import { MemoryTransport } from '../src/transports/memory_transport.js'

test.group('Bus Manager', () => {
test('create bus instance from the manager', ({ assert, expectTypeOf }) => {
const manager = new BusManager({
default: 'memory',
transports: {
memory: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
},
})
Expand All @@ -32,10 +32,10 @@ test.group('Bus Manager', () => {
default: 'memory',
transports: {
memory: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
memory1: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
},
})
Expand All @@ -53,7 +53,7 @@ test.group('Bus Manager', () => {
default: 'memory',
transports: {
memory: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
},
})
Expand All @@ -65,7 +65,7 @@ test.group('Bus Manager', () => {
const manager = new BusManager({
transports: {
memory: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
},
})
Expand All @@ -81,7 +81,7 @@ test.group('Bus Manager', () => {
default: 'memory',
transports: {
memory: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
retryQueue: {
enabled: false,
maxSize: 100,
Expand All @@ -104,10 +104,10 @@ test.group('Bus Manager', () => {
default: 'memory1',
transports: {
memory1: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
memory2: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
},
})
Expand All @@ -128,10 +128,10 @@ test.group('Bus Manager', () => {
default: 'memory1',
transports: {
memory1: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
memory2: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
},
})
Expand All @@ -152,10 +152,10 @@ test.group('Bus Manager', () => {
default: 'memory1',
transports: {
memory1: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
memory2: {
driver: () => new MemoryTransport(),
transport: () => new MemoryTransport(),
},
},
})
Expand Down
Loading