go-sluice
provides a generic, concurrent batch processing utility for Go, available under the sluice
package. It allows collecting individual items (like user IDs for fetching details) and processing them in controlled batches. This is useful for optimizing operations like database queries or API calls by grouping multiple small requests into fewer, larger ones.
The sluice.Batcher
is the core component, designed for ease of use and efficient, concurrent batch execution.
- Generic: Works with any input (
T
) and output (Q
) types using Go 1.18+ generics. - Concurrent: Processes multiple batches concurrently using a configurable worker pool.
- Configurable Batching: Triggers batch processing based on:
- Maximum batch size.
- Time interval (batch window) - with per-key timers when sharding is enabled.
- Smart Sharding: Group items by custom keys (e.g., experiment ID, user type) to control which items are batched together. Each shard gets its own independent timer.
- Memory Efficient: Automatic cleanup of inactive batch keys to prevent memory leaks when using many different keys over time.
- ID Correlation: Submitted items are given an ID, used by the
BatchFunc
to map results back, ensuring you get the correct output for your input. - Synchronous Submission: The
SubmitAndAwait
method provides a simple way to submit an item and block until its specific result is available. - Graceful Shutdown: The
Stop()
method ensures that any enqueued items are processed before the batcher shuts down its worker goroutines. - Panic Safe: Recovers from panics within the user-provided
BatchFunc
, logging the error and ensuring the batcher itself remains operational.
go get github.com/karangupta31/go-sluice
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/karangupta31/go-sluice"
)
// User represents the data structure for user details.
type User struct {
ID string
Name string
// Add more fields like Email, Age, etc.
}
func main() {
// 1. Define your batch processing function (BatchFunc)
// This function takes a slice of user IDs (inputs) and returns a map of User objects.
batchFetchUserDetails := func(userIDs []string, commonArgs ...interface{}) (map[string]User, error) {
fmt.Printf("BatchFunc: Processing batch of %d user IDs: %v\n", len(userIDs), userIDs)
// Simulate fetching data from a database or an API.
// In a real application, you might do:
// users, err := db.GetUsersByIDs(userIDs)
time.Sleep(50 * time.Millisecond) // Simulate I/O latency
results := make(map[string]User)
for _, id := range userIDs {
// The key in the 'results' map MUST match the ID provided to SubmitAndAwait.
results[id] = User{
ID: id,
Name: "User-" + id, // Example: "User-user123"
}
}
// If a critical error occurs for the whole batch (e.g., DB connection lost), return it.
// return nil, fmt.Errorf("database connection failed")
return results, nil
}
// 2. Create BatchProcessorConfig
// This holds your BatchFunc and any common arguments (e.g., a database connection pool).
processorConfig := sluice.BatchProcessorConfig[string, User]{
Func: batchFetchUserDetails,
// CommonArgs: []interface{}{dbPool}, // Optional: for passing shared resources
}
// 3. Create a new Batcher instance
batcher := sluice.NewBatcher(
processorConfig,
100*time.Millisecond, // Process batch every 100ms if not full (batch window)
10, // Max 10 user IDs per batch (max batch size)
5, // Max 5 concurrent workers processing batches
)
// Ensure graceful shutdown when main exits or Batcher is no longer needed.
defer batcher.Stop()
// 4. Submit requests and await results
// Example: Fetching details for a few users.
userIDsToFetch := []string{"user123", "user456", "user789", "user000"}
var wg sync.WaitGroup
fmt.Println("Submitting user IDs for batch fetching:")
for _, userID := range userIDsToFetch {
wg.Add(1)
go func(id string) {
defer wg.Done()
fmt.Printf("Submitting: %s\n", id)
// SubmitAndAwait blocks until this specific item is processed.
// The first argument is the ID for correlation, the second is the input data.
user, err := batcher.SubmitAndAwait(id, id)
if err != nil {
log.Printf("Error fetching user %s: %v\n", id, err)
return
}
fmt.Printf("Received details for %s: %+v\n", id, user)
}(userID)
}
wg.Wait() // Wait for all goroutines to complete.
fmt.Println("\nAll user details fetched.")
// Allow time for any final log messages or Stop() to complete.
time.Sleep(200 * time.Millisecond)
}
You can control how items are grouped into batches using a KeyFunc
. This is useful when you want to group requests by certain criteria (e.g., experiment ID, user type, etc.) for more efficient processing.
package main
import (
"fmt"
"github.com/karangupta31/go-sluice"
)
type Request struct {
ExperimentID string
UserID string
}
type Response struct {
ExperimentID string
UserID string
Variant string
}
func main() {
// BatchFunc that processes requests for the same experiment together
batchFunc := func(requests []Request, args ...interface{}) (map[string]Response, error) {
// This will receive requests grouped by experiment ID
fmt.Printf("Processing %d requests for experiment: %s\n",
len(requests), requests[0].ExperimentID)
results := make(map[string]Response)
for i, req := range requests {
id := fmt.Sprintf("%s-%s", req.ExperimentID, req.UserID)
results[id] = Response{
ExperimentID: req.ExperimentID,
UserID: req.UserID,
Variant: fmt.Sprintf("variant_%d", i+1),
}
}
return results, nil
}
// KeyFunc groups requests by experiment ID
keyFunc := func(req Request) string {
return req.ExperimentID // All requests with same experiment ID will be batched together
}
config := sluice.BatchProcessorConfig[Request, Response]{
Func: batchFunc,
KeyFunc: keyFunc, // Enable sharding
}
batcher := sluice.NewBatcher(config, 100*time.Millisecond, 10, 2)
defer batcher.Stop()
// These requests will be automatically grouped by experiment ID
requests := []Request{
{"exp_123", "user_1"}, // ┐
{"exp_123", "user_2"}, // ├─ Batched together
{"exp_123", "user_3"}, // ┘
{"exp_456", "user_4"}, // ┐
{"exp_456", "user_5"}, // ┘ Batched together
}
for _, req := range requests {
id := fmt.Sprintf("%s-%s", req.ExperimentID, req.UserID)
response, err := batcher.SubmitAndAwait(id, req)
// Handle response...
}
}
- LRU Strategy: Keys that haven't been used recently are eligible for removal
- Configurable Thresholds:
- Cleanup only triggers when there are 1000+ keys
- Keys must be inactive for 10+ minutes before removal
- Cleanup runs every 5 minutes
- Only Empty Batches: Only removes keys for batches that are currently empty (no pending items)
This design ensures optimal performance: non-sharded mode has no cleanup overhead, while sharded mode gets automatic memory management to prevent leaks when using many different keys over time.
- Basic Usage - Simple user data fetching
- Sharding Example - Advanced grouping by experiment ID
This project is licensed under the MIT License - see the LICENSE file for details.