Skip to content

Production-ready Go library for building concurrent applications with rate limiting, worker pools, task scheduling, and functional streaming primitives.

License

Notifications You must be signed in to change notification settings

vnykmshr/goflow

Repository files navigation

goflow

Go Reference Go Report Card CI codecov License: MIT

A Go library for concurrent applications with rate limiting, task scheduling, and streaming.

Features

Rate Limiting (pkg/ratelimit)

  • Token bucket and leaky bucket algorithms
  • Concurrency limiting
  • Prometheus metrics

Task Scheduling (pkg/scheduling)

  • Worker pools with graceful shutdown
  • Cron-based scheduling
  • Multi-stage pipelines
  • Context-aware timeouts

Streaming (pkg/streaming)

  • Functional stream operations
  • Background buffering
  • Backpressure control
  • Channel utilities

Installation

go get github.com/vnykmshr/goflow

Usage

package main

import (
    "context"
    "fmt"
    "log"
    "time"
    
    "github.com/vnykmshr/goflow/pkg/ratelimit/bucket"
    "github.com/vnykmshr/goflow/pkg/scheduling/workerpool"
    "github.com/vnykmshr/goflow/pkg/scheduling/scheduler"
)

func main() {
    limiter, err := bucket.NewSafe(10, 20) // 10 RPS, burst 20
    if err != nil {
        log.Fatal(err)
    }

    pool, err := workerpool.NewWithConfigSafe(workerpool.Config{
        WorkerCount: 5,
        QueueSize:   100,
        TaskTimeout: 30 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer func() { <-pool.Shutdown() }()

    if limiter.Allow() {
        task := workerpool.TaskFunc(func(ctx context.Context) error {
            fmt.Println("Processing request...")
            return nil
        })

        if err := pool.Submit(task); err != nil {
            log.Printf("Failed to submit task: %v", err)
        }
    }
}

Components

Rate Limiters

  • bucket.NewSafe(rate, burst) - Token bucket with burst capacity
  • leakybucket.New(rate) - Smooth rate limiting
  • concurrency.NewSafe(limit) - Concurrent operations control

Scheduling

  • workerpool.NewSafe(workers, queueSize) - Background task processing
  • scheduler.New() - Cron and interval scheduling

Streaming

  • stream.FromSlice(data) - Functional data processing
  • writer.New(config) - Async buffered writing

Documentation

Development

make install-hooks  # Install pre-commit hook
make test           # Run tests with race detection
make lint           # Run linter
make benchmark      # Run performance benchmarks

The pre-commit hook automatically:

  • Checks for potential secrets
  • Formats Go code with goimports and gofmt
  • Runs golangci-lint on changed files
  • Verifies the build succeeds

Contributing

See Contributing for contribution guidelines.

License

MIT License - see LICENSE for details.

About

Production-ready Go library for building concurrent applications with rate limiting, worker pools, task scheduling, and functional streaming primitives.

Topics

Resources

License

Stars

Watchers

Forks

Sponsor this project

Packages

No packages published

Contributors 2

  •  
  •  

Languages