TaskRunner is a high-performance Go library for distributed, reliable task processing built on Redis Streams. It provides horizontal scalability, leader election, delayed scheduling, unique jobs, and rich timing metrics with a simple API.
- Distributed & Scalable: Add instances to scale throughput horizontally.
- Reliable: Visibility timeout with heartbeats prevents double-processing; pending reclaims; retries.
- Simple API: Register tasks and dispatch payloads with minimal boilerplate.
- Delayed Scheduling: Schedule jobs for future execution via Redis ZSET + Streams.
- Unique Jobs: Enforce de-duplication across a time window via distributed locks.
- Leader Election: Cooperative leadership for scheduling and maintenance loops.
- Observability: Per-task timing metrics and queue statistics.
go get github.com/soroosh-tanzadeh/taskrunnerRequires Go 1.22+ and Redis 6+. For local development and tests, the project uses miniredis to simulate Redis in-memory.
package main
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/soroosh-tanzadeh/taskrunner/redisstream"
"github.com/soroosh-tanzadeh/taskrunner/runner"
)
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379", DB: 0, PoolSize: 50})
queue := redisstream.NewRedisStreamMessageQueueWithOptions(
rdb,
redisstream.WithPrefix("example"),
redisstream.WithQueue("tasks"),
redisstream.WithReClaimDelay(30*time.Second),
redisstream.WithDeleteOnAck(true),
)
tr := runner.NewTaskRunner(runner.TaskRunnerConfig{
BatchSize: 10,
ConsumerGroup: "example",
ConsumersPrefix: "default",
NumWorkers: 8,
NumFetchers: 4,
LongQueueHook: func(s runner.Stats) { fmt.Printf("%+v\n", s) },
LongQueueThreshold: 30 * time.Second,
}, rdb, queue)
tr.RegisterTask(&runner.Task{
Name: "exampletask",
MaxRetry: 5,
Action: func(ctx context.Context, payload any) error {
fmt.Printf("Hello from example task %v\n", payload)
return nil
},
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := sync.WaitGroup{}
wg.Add(1)
go func() { defer wg.Done(); _ = tr.Start(ctx) }()
for i := 0; i < 100; i++ {
_ = tr.Dispatch(context.Background(), "exampletask", strconv.Itoa(i))
}
time.Sleep(2 * time.Second)
cancel()
wg.Wait()
}Run a cooperative scheduler that enqueues due jobs from a ZSET into the stream. The scheduler ticks every 5 seconds; delays shorter than this will be rounded up to the next tick.
ctx, cancel := context.WithCancel(context.Background())
// Start workers
go tr.Start(ctx)
// Start scheduler with a batch size for due jobs
go tr.StartDelayedSchedule(ctx, 1000)
// Dispatch delayed jobs
_ = tr.DispatchDelayed(context.Background(), "exampletask", "I run in ~5s", 5*time.Second)
_ = tr.DispatchDelayed(context.Background(), "exampletask", "I run in ~10s", 10*time.Second)
// ... later
cancel()Notes:
- The scheduler respects leader election; only the leader instance enqueues due jobs.
- Use
ScheduleFor(ctx, taskName, payload, time.Time)to schedule for an absolute time.
Prevent duplicate enqueues of the same job (optionally scoped by a custom key) for a specified window.
tr.RegisterTask(&runner.Task{
Name: "sendEmail",
MaxRetry: 3,
Unique: true,
UniqueFor: 60, // seconds
UniqueKey: func(payload any) string { return payload.(string) }, // e.g., email ID
Action: func(ctx context.Context, payload any) error { return nil },
})
// First dispatch succeeds
_ = tr.Dispatch(context.Background(), "sendEmail", "order-123")
// Second dispatch within 60s will fail with runner.ErrTaskAlreadyDispatched
err := tr.Dispatch(context.Background(), "sendEmail", "order-123")TaskRunner is configured via runner.TaskRunnerConfig:
- Host: Optional, defaults to hostname; used in metrics and identity.
- BatchSize: Number of messages fetched per read per fetcher.
- ConsumerGroup: Redis Streams consumer group name.
- ConsumersPrefix: Prefix for consumer names.
- NumWorkers: Concurrent workers processing messages.
- NumFetchers: Concurrent fetchers reading from the stream (each reads
BatchSize). - FailedTaskHandler: Callback when a task exhausts retries.
- LongQueueHook: Periodic timing/queue stats callback; frequency set by
LongQueueThreshold. - LongQueueThreshold: Duration that influences the cadence of timing aggregation.
- BlockDuration: Stream read block duration (defaults to 5s).
- MetricsResetInterval: Interval to reset timing metrics (default 24h; set 0 to disable).
ReplicationFactor: Deprecated; maintained for backward compatibility only.
Redis Stream queue configuration via options on NewRedisStreamMessageQueueWithOptions:
WithPrefix(prefix string)WithQueue(queue string)WithReClaimDelay(d time.Duration)– reclaim pending messages afterd.WithDeleteOnAck(enabled bool)WithRedisVersion(version string)– override auto-detected version if needed.
See runnable examples and tests under examples/:
examples/simple: Basic queue usageexamples/scheduler: Delayed tasks schedulerexamples/unique: Unique jobs
Run the full test suite:
go test ./...The examples are covered by tests using miniredis so they run without a real Redis server.
Contributions are welcome! Please:
- Open an issue to discuss substantial changes.
- Write tests for new features and ensure
go test ./...passes. - Feature branches name must be in this format:
feature/{feature-name} - Follow idiomatic Go style and keep APIs small and focused.
GPL-3.0. See LICENSE for details.