Skip to content

modfin/delta

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Delta

goreportcard.com PkgGoDev

Delta is a lightweight, SQLite-backed message queue for Go. It provides Pub/Sub, load-balanced queues, request/reply, and historical replay with SQLite persistence.

Inspired by NATS.io but designed for single-instance deployments with built-in persistence.

Table of Contents

Features

  • Persistence: All messages stored in SQLite with WAL mode for durability and performance
  • Pub/Sub: Broadcast messages to multiple subscribers (one-to-many)
  • Queue: Load-balanced message distribution across consumer groups (one-to-one)
  • Request/Reply: Synchronous request/response pattern with inbox routing
  • Multiple Streams: Isolated namespaces within a single database
  • Historical Replay: Subscribe from a specific point in time to re-process messages
  • Glob Patterns: Flexible topic matching with * (single level) and ** (multi-level) wildcards
  • Configurable Vacuuming: Automatic cleanup with age-based or count-based retention
  • Async Publishing: Non-blocking publish operations for high-throughput scenarios

Installation

go get github.com/modfin/delta

Note: Delta requires CGo due to the github.com/mattn/go-sqlite3 dependency. Ensure you have a C compiler installed.

Quick Start

package main

import (
	"fmt"
	"github.com/modfin/delta"
)

func main() {
	// Create a temporary MQ (removed on close)
	mq, err := delta.New(delta.URITemp(), delta.DBRemoveOnClose())
	if err != nil {
		panic(err)
	}
	defer mq.Close()

	// Subscribe to a topic
	sub, err := mq.Subscribe("hello.world")
	if err != nil {
		panic(err)
	}
	defer sub.Unsubscribe()

	// Publish a message
	_, err = mq.Publish("hello.world", []byte("Hello, Delta!"))
	if err != nil {
		panic(err)
	}

	// Receive the message
	msg, ok := sub.Next()
	if !ok {
		panic("subscription closed")
	}

	fmt.Printf("Received: %s = %s\n", msg.Topic, string(msg.Payload))
	// Output: Received: hello.world = Hello, Delta!
}

Core Concepts

Topics

Topics are dot-separated strings (e.g., orders.created, users.123.profile). They support glob patterns:

  • * matches a single level: orders.* matches orders.created but not orders.123.processed
  • ** matches multiple levels: orders.** matches orders.created, orders.123.processed, etc.

Streams

Streams provide isolated namespaces within a single database. Each stream has its own message table:

// Default stream
mq, _ := delta.New("file:myapp.db")

// Create a separate "audit" stream
auditMQ, _ := mq.Stream("audit")
auditMQ.Publish("user.login", []byte("alice logged in"))

Message Delivery Guarantees

  • Pub/Sub: Messages are delivered to matching active subscribers
  • Queue: Messages are load-balanced among active subscribers in the same group key
  • Request/Reply: A request creates a reply subscription and forwards a reply when available

Replay behavior is retention-dependent: SubscribeFrom can only replay messages that are still present on disk.

API Reference

Constructor

New(uri string, ops ...Op) (*MQ, error)

Creates a new message queue instance. The uri parameter accepts SQLite URI formats:

  • file:path/to/db.sqlite - File-based database
  • file::memory: or file::memory:?cache=shared - In-memory database

Options (via Op functional options):

Option Description
DBSyncOff() Sets synchronous = off for better write performance (sacrifices durability)
DBRemoveOnClose() Removes database files when MQ is closed (useful for temp databases)
WithLogger(log *slog.Logger) Sets a custom logger (uses discard logger if nil)
WithVacuum(fn VacuumFunc, interval) Configures automatic message cleanup

Database URI Helpers

URITemp() string

Creates a temporary database in the system temp directory. Returns a file: URI with ?tmp=true suffix.

mq, err := delta.New(delta.URITemp(), delta.DBRemoveOnClose())

URIFromPath(path string) (string, error)

Creates a file: URI from a filesystem path, creating parent directories as needed.

uri, err := delta.URIFromPath("/data/myapp/messages.db")
if err != nil {
    panic(err)
}
mq, err := delta.New(uri)

RemoveStore(uri string, logger *slog.Logger) error

Removes database files (including -shm and -wal files) for a file-based URI.

err := delta.RemoveStore("file:/data/myapp.db", slog.Default())

MQ Methods

Lifecycle

Method Description
Close() error Closes the MQ and all its streams
CurrentStream() string Returns the current stream name

Stream Management

Method Description
Stream(name string, ops ...Op) (*MQ, error) Creates or returns an existing named stream

Publishing

Method Description
Publish(topic string, payload []byte) (*Publication, error) Synchronously publishes a message
PublishAsync(topic string, payload []byte) *Publication Asynchronously publishes a message (non-blocking)

Subscribing

Method Description
Subscribe(topic string) (*Subscription, error) Creates a Pub/Sub subscription (supports glob patterns)
SubscribeFrom(topic string, from time.Time) (*Subscription, error) Subscribes and replays messages from a specific time
Queue(topic string, key string) (*Subscription, error) Creates a load-balanced queue subscription
Request(ctx context.Context, topic string, payload []byte) (*Subscription, error) Publishes a request and returns a subscription used to receive a reply

Message Types

Msg

Represents a message in the queue:

type Msg struct {
    MessageId uint64    // Unique message identifier
    Topic     string    // Topic the message was published to
    Payload   []byte    // Message data
    At        time.Time // Timestamp when message was published
}

Methods:

Method Description
Reply(payload []byte) (Msg, error) Replies to this message (publishes to _inbox.{MessageId})

Publication

Returned by Publish and PublishAsync:

type Publication struct {
    Msg               // Embedded message with ID, topic, payload, timestamp
    Err  error        // Set if publish failed (only for async)
}

Methods:

Method Description
Done() <-chan struct{} Channel that closes when publish completes (useful for async)

Subscription

Represents a subscription to a topic:

Methods:

Method Description
Topic() string Returns the subscription's topic pattern
Id() string Returns the unique subscription ID
Chan() <-chan Msg Returns the message channel for range loops
Next() (Msg, bool) Blocks until next message (returns false if closed)

Fields:

Field Description
Unsubscribe func() Call to close the subscription

Vacuum Strategies

Vacuum functions clean up old messages. Configure with WithVacuum(fn, interval).

Function Description
VacuumOnAge(maxAge time.Duration) VacuumFunc Removes messages older than the specified duration
VacuumKeepN(n int) VacuumFunc Keeps only the N most recent messages

Notes:

  • Retention affects replay. If a message is vacuumed, SubscribeFrom cannot read it.

Constants

Constant Value Description
DEFAULT_STREAM "default" Default stream name
OptimizeLatency 0 Optimization mode for low latency
OptimizeThroughput 1 Optimization mode for high throughput

Examples

Pub/Sub (Broadcast)

Multiple subscribers receive the same message:

mq, _ := delta.New(delta.URITemp(), delta.DBRemoveOnClose())
defer mq.Close()

// Multiple subscribers on the same topic
sub1, _ := mq.Subscribe("notifications")
sub2, _ := mq.Subscribe("notifications")

go func() {
    for msg := range sub1.Chan() {
        fmt.Printf("Subscriber 1: %s\n", string(msg.Payload))
    }
}()

go func() {
    for msg := range sub2.Chan() {
        fmt.Printf("Subscriber 2: %s\n", string(msg.Payload))
    }
}()

// Both subscribers receive this message
mq.Publish("notifications", []byte("New alert!"))

Queue (Load Balancing)

Only one subscriber in a group receives each message:

mq, _ := delta.New(delta.URITemp(), delta.DBRemoveOnClose())
defer mq.Close()

// Create 3 workers in the same queue group
for i := 0; i < 3; i++ {
    worker := i
    sub, _ := mq.Queue("tasks", "worker-pool")

    go func() {
        for msg := range sub.Chan() {
            fmt.Printf("Worker %d processing: %s\n", worker, string(msg.Payload))
        }
    }()
}

// Each message goes to exactly one worker
for i := 0; i < 10; i++ {
    mq.Publish("tasks", []byte(fmt.Sprintf("task-%d", i)))
}

Request/Reply

Synchronous request-response pattern:

mq, _ := delta.New(delta.URITemp(), delta.DBRemoveOnClose())
defer mq.Close()

// Create a service that responds to requests
service, _ := mq.Queue("greet.*", "greeting-service")
go func() {
    for msg := range service.Chan() {
        // Extract name from topic (greet.{name})
        parts := strings.Split(msg.Topic, ".")
        name := parts[1]

        // Reply to the request
        msg.Reply([]byte(fmt.Sprintf("Hello, %s!", name)))
    }
}()

// Send a request and wait for response
ctx := context.Background()
replySub, _ := mq.Request(ctx, "greet.alice", nil)

msg, ok := replySub.Next()
if ok {
    fmt.Printf("Response: %s\n", string(msg.Payload))
    // Output: Response: Hello, alice!
}

Historical Replay

Subscribe from a specific point in time:

mq, _ := delta.New(delta.URITemp(), delta.DBRemoveOnClose())
defer mq.Close()

// Publish some messages now
for i := 0; i < 5; i++ {
    mq.Publish("events", []byte(fmt.Sprintf("msg-%d", i)))
}

// Record the time
from := time.Now()

// Publish more messages
for i := 5; i < 10; i++ {
    mq.Publish("events", []byte(fmt.Sprintf("msg-%d", i)))
}

// Subscribe from the recorded time (replays msg-5 through msg-9)
sub, _ := mq.SubscribeFrom("events", from)
for msg := range sub.Chan() {
    fmt.Printf("Replayed: %s\n", string(msg.Payload))
}

Async Publishing

Non-blocking publish for high throughput:

mq, _ := delta.New(delta.URITemp(), delta.DBRemoveOnClose())
defer mq.Close()

// Start async publishes
var publications []*delta.Publication
for i := 0; i < 1000; i++ {
    pub := mq.PublishAsync("metrics", []byte(fmt.Sprintf("metric-%d", i)))
    publications = append(publications, pub)
}

// Wait for all to complete
for _, pub := range publications {
    <-pub.Done()
    if pub.Err != nil {
        log.Printf("Publish failed: %v", pub.Err)
    }
}

Streams (Namespaces)

Isolated streams within one database:

mq, _ := delta.New("file:myapp.db")
defer mq.Close()

// Default stream for application events
mq.Publish("user.signup", []byte(`{"user": "alice"}`))

// Create separate streams
auditMQ, _ := mq.Stream("audit")
metricsMQ, _ := mq.Stream("metrics")

// Each stream is isolated
eventsMQ, _ := mq.Stream("events")
eventsMQ.Publish("order.created", []byte(`{"order": 123}`))

// Messages don't cross streams
sub, _ := mq.Subscribe("order.created")     // Won't see eventsMQ messages
eventsSub, _ := eventsMQ.Subscribe("order.created") // Will see them

Vacuum Strategies

Automatic message cleanup:

// Age-based: Remove messages older than 24 hours
mq, _ := delta.New(
    "file:myapp.db",
    delta.WithVacuum(delta.VacuumOnAge(24*time.Hour), time.Hour),
)

// Count-based: Keep only the 10000 most recent messages
mq, _ := delta.New(
    "file:myapp.db",
    delta.WithVacuum(delta.VacuumKeepN(10000), 30*time.Minute),
)

Configuration

Database Options

// High durability (default)
mq, _ := delta.New("file:myapp.db")

// High performance (synchronous=off)
mq, _ := delta.New("file:myapp.db", delta.DBSyncOff())

// Temporary database (auto-cleanup)
mq, _ := delta.New(delta.URITemp(), delta.DBRemoveOnClose())

// Custom path with auto-created directories
uri, _ := delta.URIFromPath("/var/lib/myapp/data.db")
mq, _ := delta.New(uri)

Logging

// With default logger
mq, _ := delta.New("file:myapp.db", delta.WithLogger(slog.Default()))

// Discard all logs (default behavior)
mq, _ := delta.New("file:myapp.db", delta.WithLogger(nil))

Topic Patterns

Delta supports flexible topic matching with glob patterns:

Pattern Matches Doesn't Match
foo.bar foo.bar foo.baz, foo.bar.baz
foo.* foo.bar, foo.baz foo.bar.baz, foo
foo.** foo.bar, foo.bar.baz, foo.a.b.c bar.foo
*.bar foo.bar, baz.bar foo.baz.bar
orders.*.status orders.123.status, orders.abc.status orders.123.created
events.** events.user.login, events.system.disk.full logs.events

Tip: place ** at the end of a pattern (for example orders.**) for the clearest matching behavior.

Brace Groups

Topics can include brace-enclosed groups for special characters:

// Email addresses with dots
mq.Publish("users.{alice@example.com}.profile", []byte("..."))

// Complex keys
mq.Subscribe("users.{complex-key.with.dots}.events")

Benchmarks

Run on Intel Core Ultra 9 185H (2026-02-24):

Parallel Publishing:

BenchmarkParPub/simple-22           63,416      26,542 ns/op      37,681 msg/s     2,273 B/op      64 allocs/op
BenchmarkParPub/no-sync-22          66,301      16,307 ns/op      61,338 msg/s     1,188 B/op      26 allocs/op

Pub/Sub (Multiple Subscribers):

BenchmarkMultipleSubscribers/1-22                  119,448      26,937 ns/op      36,726 read-msg/s    37,123 write-msg/s    2,540 B/op      71 allocs/op
BenchmarkMultipleSubscribers/4-22                   61,191      24,093 ns/op     106,783 read-msg/s    41,506 write-msg/s    2,298 B/op      57 allocs/op
BenchmarkMultipleSubscribers/num-cpu_(22)-22        92,359      40,103 ns/op     504,170 read-msg/s    24,936 write-msg/s    4,927 B/op      91 allocs/op
BenchmarkMultipleSubscribers/2x_num-cpu_(44)-22    111,342      32,275 ns/op     726,985 read-msg/s    30,983 write-msg/s    4,759 B/op      75 allocs/op

Variable Message Sizes:

BenchmarkMultipleSubscribersSize/_0.1mb-22   5,748     187,816 ns/op  11,668 read-MB/s   116,684 read-msg/s    532.4 write-MB/s   5,324 write-msg/s   218,112 B/op   108 allocs/op
BenchmarkMultipleSubscribersSize/_1.0mb-22     684   1,585,876 ns/op  13,832 read-MB/s    13,832 read-msg/s    630.6 write-MB/s     630.6 write-msg/s 2,017,489 B/op   138 allocs/op
BenchmarkMultipleSubscribersSize/10.0mb-22      68  15,121,519 ns/op  14,333 read-MB/s     1,433 read-msg/s    661.3 write-MB/s      66.13 write-msg/s 19,719,668 B/op   142 allocs/op

UID Generation:

BenchmarkUID-22               3,189,724        360.6 ns/op         144 B/op        8 allocs/op

Performance characteristics:

  • Single-writer throughput: ~37K msg/s (durability) to ~61K msg/s (no-sync)
  • Read throughput scales linearly with subscriber count (up to 727K msg/s with 44 subscribers)
  • Large message handling maintains ~11-14 GB/s aggregate read throughput
  • Low allocation overhead for typical workloads

License

This project is licensed under the MIT License. See the LICENSE file for details.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Languages