A message broker toolkit for Effect.
- π Effectful wrappers for AMQP Connection and Channel
- π Auto-reconnect functionality when the connection is lost
- π§ Seamless consumption continuation after reconnection
- π Distributed tracing support (spans propagate from publishers to subscribers)
Warning
This project is currently under development. Please note that future releases might introduce breaking changes.
First, you need to establish a connection to your AMQP server:
import { AMQPConnection } from "@effect-messaging/amqp"
import { Effect } from "effect"
const program = Effect.gen(function* (_) {
// Your application logic that requires an AMQP connection
const connection = yield* AMQPConnection.AMQPConnection
const props = yield* connection.serverProperties
yield* Effect.logInfo(`connected to ${props.hostname}:${props.port}`)
})
const runnable = program.pipe(
// provide the AMQP Connection dependency
Effect.provide(
AMQPConnection.layer({
hostname: "localhost",
port: 5672,
username: "guest",
password: "guest",
heartbeat: 10
})
)
)
// Run the program
Effect.runPromise(runnable)To send messages, create a publisher:
import {
AMQPChannel,
AMQPConnection,
AMQPPublisher
} from "@effect-messaging/amqp"
import { Context, Effect } from "effect"
class MyPublisher extends Context.Tag("MyPublisher")<
MyPublisher,
AMQPPublisher.AMQPPublisher
>() {}
const program = Effect.gen(function* (_) {
const publisher = yield* MyPublisher
yield* publisher.publish({
exchange: "my-exchange",
routingKey: "my-routing-key",
content: Buffer.from('{ "hello": "world" }'),
options: {
persistent: true,
contentType: "application/json",
expiration: 60000,
headers: {
"x-custom-header": "custom-value"
}
}
})
})
const runnable = program.pipe(
Effect.provideServiceEffect(MyPublisher, AMQPPublisher.make()),
// provide the AMQP Channel dependency
Effect.provide(AMQPChannel.layer),
// provide the AMQP Connection dependency
Effect.provide(
AMQPConnection.layer({
hostname: "localhost",
port: 5672,
username: "guest",
password: "guest",
heartbeat: 10
})
)
)
// Run the program
Effect.runPromise(runnable)To receive messages, create a subscriber:
import {
AMQPChannel,
AMQPConnection,
AMQPConsumeMessage,
AMQPSubscriber
} from "@effect-messaging/amqp"
import { Effect } from "effect"
const messageHandler = Effect.gen(function* (_) {
const message = yield* AMQPConsumeMessage.AMQPConsumeMessage
// You can add your message processing logic here
yield* Effect.logInfo(`Received message: ${message.content.toString()}`)
})
const program = Effect.gen(function* (_) {
const subscriber = yield* AMQPSubscriber.make("my-queue")
// The subscriber will automatically handle message ack and nack
// based on the success or failure of the message handler
yield* subscriber.subscribe(messageHandler)
})
const runnable = program.pipe(
// provide the AMQP Channel dependency
Effect.provide(AMQPChannel.layer),
// provide the AMQP Connection dependency
Effect.provide(
AMQPConnection.layer({
hostname: "localhost",
port: 5672,
username: "guest",
password: "guest",
heartbeat: 10
})
)
)
// Run the program
Effect.runPromise(runnable)Basic abstractions:
- Add a
Publisherinterface - Add a
Subscriberinterface
Application-level API for consumer apps:
- Add support for routing based on the topic / subject
- Add support for middlewares
Higher-level declarative API:
- Add declarative API to define messages schemas
- Generate publisher based on message definitions
- Generate consumer app based on message definitions
- AsyncAPI specification generation
- Effect wrappers for AMQP Connection & AMQP Channel
- Implement publisher and subscriber
- Integration tests
- Add examples & documentation
- Kafka
- NATS
- Google PubSub