Skip to content

soroosh-tanzadeh/taskrunner

Repository files navigation

TaskRunner

TaskRunner is a high-performance Go library for distributed, reliable task processing built on Redis Streams. It provides horizontal scalability, leader election, delayed scheduling, unique jobs, and rich timing metrics with a simple API.

Highlights

  • Distributed & Scalable: Add instances to scale throughput horizontally.
  • Reliable: Visibility timeout with heartbeats prevents double-processing; pending reclaims; retries.
  • Simple API: Register tasks and dispatch payloads with minimal boilerplate.
  • Delayed Scheduling: Schedule jobs for future execution via Redis ZSET + Streams.
  • Unique Jobs: Enforce de-duplication across a time window via distributed locks.
  • Leader Election: Cooperative leadership for scheduling and maintenance loops.
  • Observability: Per-task timing metrics and queue statistics.

Installation

go get github.com/soroosh-tanzadeh/taskrunner

Requires Go 1.22+ and Redis 6+. For local development and tests, the project uses miniredis to simulate Redis in-memory.

Quickstart (Task Queue)

package main

import (
	"context"
	"fmt"
	"strconv"
	"sync"
	"time"

	"github.com/redis/go-redis/v9"
	"github.com/soroosh-tanzadeh/taskrunner/redisstream"
	"github.com/soroosh-tanzadeh/taskrunner/runner"
)

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379", DB: 0, PoolSize: 50})
	queue := redisstream.NewRedisStreamMessageQueueWithOptions(
		rdb,
		redisstream.WithPrefix("example"),
		redisstream.WithQueue("tasks"),
		redisstream.WithReClaimDelay(30*time.Second),
		redisstream.WithDeleteOnAck(true),
	)

	tr := runner.NewTaskRunner(runner.TaskRunnerConfig{
		BatchSize:       10,
		ConsumerGroup:   "example",
		ConsumersPrefix: "default",
		NumWorkers:      8,
		NumFetchers:     4,
		LongQueueHook: func(s runner.Stats) { fmt.Printf("%+v\n", s) },
		LongQueueThreshold: 30 * time.Second,
	}, rdb, queue)

	tr.RegisterTask(&runner.Task{
		Name:     "exampletask",
		MaxRetry: 5,
		Action: func(ctx context.Context, payload any) error {
			fmt.Printf("Hello from example task %v\n", payload)
			return nil
		},
	})

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	wg := sync.WaitGroup{}
	wg.Add(1)
	go func() { defer wg.Done(); _ = tr.Start(ctx) }()

	for i := 0; i < 100; i++ {
		_ = tr.Dispatch(context.Background(), "exampletask", strconv.Itoa(i))
	}

	time.Sleep(2 * time.Second)
	cancel()
	wg.Wait()
}

Delayed Task Scheduler

Run a cooperative scheduler that enqueues due jobs from a ZSET into the stream. The scheduler ticks every 5 seconds; delays shorter than this will be rounded up to the next tick.

ctx, cancel := context.WithCancel(context.Background())
// Start workers
go tr.Start(ctx)
// Start scheduler with a batch size for due jobs
go tr.StartDelayedSchedule(ctx, 1000)

// Dispatch delayed jobs
_ = tr.DispatchDelayed(context.Background(), "exampletask", "I run in ~5s", 5*time.Second)
_ = tr.DispatchDelayed(context.Background(), "exampletask", "I run in ~10s", 10*time.Second)

// ... later
cancel()

Notes:

  • The scheduler respects leader election; only the leader instance enqueues due jobs.
  • Use ScheduleFor(ctx, taskName, payload, time.Time) to schedule for an absolute time.

Unique Jobs

Prevent duplicate enqueues of the same job (optionally scoped by a custom key) for a specified window.

tr.RegisterTask(&runner.Task{
	Name:     "sendEmail",
	MaxRetry: 3,
	Unique:   true,
	UniqueFor: 60, // seconds
	UniqueKey: func(payload any) string { return payload.(string) }, // e.g., email ID
	Action: func(ctx context.Context, payload any) error { return nil },
})

// First dispatch succeeds
_ = tr.Dispatch(context.Background(), "sendEmail", "order-123")
// Second dispatch within 60s will fail with runner.ErrTaskAlreadyDispatched
err := tr.Dispatch(context.Background(), "sendEmail", "order-123")

Configuration

TaskRunner is configured via runner.TaskRunnerConfig:

  • Host: Optional, defaults to hostname; used in metrics and identity.
  • BatchSize: Number of messages fetched per read per fetcher.
  • ConsumerGroup: Redis Streams consumer group name.
  • ConsumersPrefix: Prefix for consumer names.
  • NumWorkers: Concurrent workers processing messages.
  • NumFetchers: Concurrent fetchers reading from the stream (each reads BatchSize).
  • FailedTaskHandler: Callback when a task exhausts retries.
  • LongQueueHook: Periodic timing/queue stats callback; frequency set by LongQueueThreshold.
  • LongQueueThreshold: Duration that influences the cadence of timing aggregation.
  • BlockDuration: Stream read block duration (defaults to 5s).
  • MetricsResetInterval: Interval to reset timing metrics (default 24h; set 0 to disable).
  • ReplicationFactor: Deprecated; maintained for backward compatibility only.

Redis Stream queue configuration via options on NewRedisStreamMessageQueueWithOptions:

  • WithPrefix(prefix string)
  • WithQueue(queue string)
  • WithReClaimDelay(d time.Duration) – reclaim pending messages after d.
  • WithDeleteOnAck(enabled bool)
  • WithRedisVersion(version string) – override auto-detected version if needed.

Examples

See runnable examples and tests under examples/:

  • examples/simple: Basic queue usage
  • examples/scheduler: Delayed tasks scheduler
  • examples/unique: Unique jobs

Testing

Run the full test suite:

go test ./...

The examples are covered by tests using miniredis so they run without a real Redis server.

Contributing

Contributions are welcome! Please:

  • Open an issue to discuss substantial changes.
  • Write tests for new features and ensure go test ./... passes.
  • Feature branches name must be in this format: feature/{feature-name}
  • Follow idiomatic Go style and keep APIs small and focused.

License

GPL-3.0. See LICENSE for details.

Contributors 4

  •  
  •  
  •  
  •  

Languages