Skip to content

karangupta31/go-sluice

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Go Sluice - Batch Processor

Go Reference

go-sluice provides a generic, concurrent batch processing utility for Go, available under the sluice package. It allows collecting individual items (like user IDs for fetching details) and processing them in controlled batches. This is useful for optimizing operations like database queries or API calls by grouping multiple small requests into fewer, larger ones.

The sluice.Batcher is the core component, designed for ease of use and efficient, concurrent batch execution.

Features

  • Generic: Works with any input (T) and output (Q) types using Go 1.18+ generics.
  • Concurrent: Processes multiple batches concurrently using a configurable worker pool.
  • Configurable Batching: Triggers batch processing based on:
    • Maximum batch size.
    • Time interval (batch window) - with per-key timers when sharding is enabled.
  • Smart Sharding: Group items by custom keys (e.g., experiment ID, user type) to control which items are batched together. Each shard gets its own independent timer.
  • Memory Efficient: Automatic cleanup of inactive batch keys to prevent memory leaks when using many different keys over time.
  • ID Correlation: Submitted items are given an ID, used by the BatchFunc to map results back, ensuring you get the correct output for your input.
  • Synchronous Submission: The SubmitAndAwait method provides a simple way to submit an item and block until its specific result is available.
  • Graceful Shutdown: The Stop() method ensures that any enqueued items are processed before the batcher shuts down its worker goroutines.
  • Panic Safe: Recovers from panics within the user-provided BatchFunc, logging the error and ensuring the batcher itself remains operational.

Installation

go get github.com/karangupta31/go-sluice

Usage

package main

import (
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/karangupta31/go-sluice"
)

// User represents the data structure for user details.
type User struct {
	ID   string
	Name string
	// Add more fields like Email, Age, etc.
}

func main() {
	// 1. Define your batch processing function (BatchFunc)
	// This function takes a slice of user IDs (inputs) and returns a map of User objects.
	batchFetchUserDetails := func(userIDs []string, commonArgs ...interface{}) (map[string]User, error) {
		fmt.Printf("BatchFunc: Processing batch of %d user IDs: %v\n", len(userIDs), userIDs)

		// Simulate fetching data from a database or an API.
		// In a real application, you might do:
		// users, err := db.GetUsersByIDs(userIDs)
		time.Sleep(50 * time.Millisecond) // Simulate I/O latency

		results := make(map[string]User)
		for _, id := range userIDs {
			// The key in the 'results' map MUST match the ID provided to SubmitAndAwait.
			results[id] = User{
				ID:   id,
				Name: "User-" + id, // Example: "User-user123"
			}
		}
		// If a critical error occurs for the whole batch (e.g., DB connection lost), return it.
		// return nil, fmt.Errorf("database connection failed")
		return results, nil
	}

	// 2. Create BatchProcessorConfig
	// This holds your BatchFunc and any common arguments (e.g., a database connection pool).
	processorConfig := sluice.BatchProcessorConfig[string, User]{
		Func: batchFetchUserDetails,
		// CommonArgs: []interface{}{dbPool}, // Optional: for passing shared resources
	}

	// 3. Create a new Batcher instance
	batcher := sluice.NewBatcher(
		processorConfig,
		100*time.Millisecond, // Process batch every 100ms if not full (batch window)
		10,                   // Max 10 user IDs per batch (max batch size)
		5,                    // Max 5 concurrent workers processing batches
	)
	// Ensure graceful shutdown when main exits or Batcher is no longer needed.
	defer batcher.Stop()

	// 4. Submit requests and await results
	// Example: Fetching details for a few users.
	userIDsToFetch := []string{"user123", "user456", "user789", "user000"}
	var wg sync.WaitGroup

	fmt.Println("Submitting user IDs for batch fetching:")
	for _, userID := range userIDsToFetch {
		wg.Add(1)
		go func(id string) {
			defer wg.Done()
			fmt.Printf("Submitting: %s\n", id)
			// SubmitAndAwait blocks until this specific item is processed.
			// The first argument is the ID for correlation, the second is the input data.
			user, err := batcher.SubmitAndAwait(id, id)
			if err != nil {
				log.Printf("Error fetching user %s: %v\n", id, err)
				return
			}
			fmt.Printf("Received details for %s: %+v\n", id, user)
		}(userID)
	}
	wg.Wait() // Wait for all goroutines to complete.

	fmt.Println("\nAll user details fetched.")
	// Allow time for any final log messages or Stop() to complete.
	time.Sleep(200 * time.Millisecond)
}

Advanced: Sharding/Grouping

You can control how items are grouped into batches using a KeyFunc. This is useful when you want to group requests by certain criteria (e.g., experiment ID, user type, etc.) for more efficient processing.

package main

import (
	"fmt"
	"github.com/karangupta31/go-sluice"
)

type Request struct {
	ExperimentID string
	UserID       string
}

type Response struct {
	ExperimentID string
	UserID       string  
	Variant      string
}

func main() {
	// BatchFunc that processes requests for the same experiment together
	batchFunc := func(requests []Request, args ...interface{}) (map[string]Response, error) {
		// This will receive requests grouped by experiment ID
		fmt.Printf("Processing %d requests for experiment: %s\n", 
			len(requests), requests[0].ExperimentID)
		
		results := make(map[string]Response)
		for i, req := range requests {
			id := fmt.Sprintf("%s-%s", req.ExperimentID, req.UserID)
			results[id] = Response{
				ExperimentID: req.ExperimentID,
				UserID:       req.UserID,
				Variant:      fmt.Sprintf("variant_%d", i+1),
			}
		}
		return results, nil
	}

	// KeyFunc groups requests by experiment ID
	keyFunc := func(req Request) string {
		return req.ExperimentID // All requests with same experiment ID will be batched together
	}

	config := sluice.BatchProcessorConfig[Request, Response]{
		Func:    batchFunc,
		KeyFunc: keyFunc, // Enable sharding
	}

	batcher := sluice.NewBatcher(config, 100*time.Millisecond, 10, 2)
	defer batcher.Stop()

	// These requests will be automatically grouped by experiment ID
	requests := []Request{
		{"exp_123", "user_1"}, // ┐
		{"exp_123", "user_2"}, // ├─ Batched together
		{"exp_123", "user_3"}, // ┘
		{"exp_456", "user_4"}, // ┐
		{"exp_456", "user_5"}, // ┘ Batched together
	}

	for _, req := range requests {
		id := fmt.Sprintf("%s-%s", req.ExperimentID, req.UserID)
		response, err := batcher.SubmitAndAwait(id, req)
		// Handle response...
	}
}

Memory Management For Sharding use case

  • LRU Strategy: Keys that haven't been used recently are eligible for removal
  • Configurable Thresholds:
    • Cleanup only triggers when there are 1000+ keys
    • Keys must be inactive for 10+ minutes before removal
    • Cleanup runs every 5 minutes
  • Only Empty Batches: Only removes keys for batches that are currently empty (no pending items)

This design ensures optimal performance: non-sharded mode has no cleanup overhead, while sharded mode gets automatic memory management to prevent leaks when using many different keys over time.

Examples

License

This project is licensed under the MIT License - see the LICENSE file for details.

About

Go Sluice - Batch Processor helps you auto batch your downstream calls, to save time and resource

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages