gostage is a workflow orchestration and state management library for Go that enables you to build multi-stage stateful workflows with runtime modification capabilities. It provides a framework for organizing complex processes into manageable stages and actions with rich metadata support.
gostage provides a structured approach to workflow management with these core components:
- Workflows - The top-level container representing an entire process
- Stages - Sequential phases within a workflow, each containing multiple actions
- Actions - Individual units of work that implement specific tasks
- State Store - A type-safe key-value store for workflow data
- Metadata - Rich tagging and property system for organization and querying
- Sequential Execution - Workflows execute stages and actions in defined order
- Dynamic Modification - Add or modify workflow components during execution
- Tag-Based Organization - Categorize and filter components for better organization
- Type-Safe Store - Store and retrieve data with type checking and TTL support
- Conditional Execution - Enable/disable specific components at runtime
- Rich Metadata - Associate tags and properties with workflow components
- Serializable State - Store workflow state during and between executions
- Extensible Middleware - Add cross-cutting concerns like logging, error handling, and retry logic
- Hierarchical Middleware - Customize workflow behavior at multiple levels
- Process Spawning - Execute workflows in separate child processes with full IPC
- Context Messaging - Rich metadata and targeted message handling for debugging and monitoring
- gRPC-Based IPC - Type-safe inter-process communication using Protocol Buffers
- Advanced Middleware System - Transform messages and manage process lifecycle
go get github.com/davidroman0O/gostage
Here's a simple example of creating and executing a workflow:
package main
import (
"context"
"fmt"
"github.com/davidroman0O/gostage"
)
// Define a custom action by embedding BaseAction
type GreetingAction struct {
gostage.BaseAction
}
// Implement the Execute method required by the Action interface
func (a GreetingAction) Execute(ctx *gostage.ActionContext) error {
name, err := gostage.ContextGetOrDefault(ctx, "user.name", "World")
if err != nil {
return err
}
ctx.Logger.Info("Hello, %s!", name)
return nil
}
func main() {
// Create a new workflow
wf := gostage.NewWorkflow(
"hello-world",
"Hello World Workflow",
"A simple introductory workflow",
)
// Create a stage
stage := gostage.NewStage(
"greeting",
"Greeting Stage",
"Demonstrates a simple greeting",
)
// Add actions to the stage
stage.AddAction(&GreetingAction{
BaseAction: gostage.NewBaseAction("greet", "Greeting Action"),
})
// Add the stage to the workflow
wf.AddStage(stage)
// Set up a logger
logger := gostage.NewDefaultLogger()
// Create a runner
runner := gostage.NewRunner()
// Add middleware for logging, error handling, etc.
runner.Use(gostage.LoggingMiddleware())
// Execute the workflow
if err := runner.Execute(context.Background(), wf, logger); err != nil {
fmt.Printf("Error executing workflow: %v\n", err)
return
}
fmt.Println("Workflow completed successfully!")
}
- Workflows - The top-level container representing an entire process
- Stages - Sequential phases within a workflow, each containing multiple actions
- Actions - Individual units of work that implement specific tasks
- State Store - A type-safe key-value store for workflow data
- Context Messaging - Rich metadata and targeted message handling system
- Middleware - Customizable hooks that wrap execution at different levels
Actions are the building blocks of a workflow:
type Action interface {
// Name returns the action's name
Name() string
// Description returns a human-readable description
Description() string
// Tags returns the action's tags for organization and filtering
Tags() []string
// Execute performs the action's work
Execute(ctx *ActionContext) error
}
Stages are containers for actions that execute sequentially:
// Create a stage with tags
stage := gostage.NewStageWithTags(
"validation",
"Order Validation",
"Validates incoming orders",
[]string{"critical", "input"},
)
// Add actions to the stage
stage.AddAction(myAction)
// Set initial data for the stage (replaces direct InitialStore access)
stage.SetInitialData("key", value)
Note: In earlier versions, stages exposed an InitialStore
field directly. This has been changed to use the SetInitialData()
method for better encapsulation:
// Before (deprecated)
stage.InitialStore.Put("key", value)
// After (current)
stage.SetInitialData("key", value)
Workflows manage the execution of stages:
// Create a workflow
wf := gostage.NewWorkflow(
"process-orders",
"Order Processing",
"Handles end-to-end order processing",
)
// Add stages to the workflow
wf.AddStage(stage1)
wf.AddStage(stage2)
Runners execute workflows and can be customized with middleware:
// Create a runner
runner := gostage.NewRunner()
// Add middleware for logging, error handling, etc.
runner.Use(gostage.LoggingMiddleware())
// Execute the workflow
runner.Execute(context.Background(), wf, logger)
Runners can also be extended to create domain-specific workflow executors.
Middleware provides a powerful way to intercept and enhance workflow execution with cross-cutting concerns. Each middleware wraps the execution flow, allowing you to perform actions before and after workflow execution.
// Create a runner with multiple middleware components
runner := gostage.NewRunner(
gostage.WithMiddleware(
gostage.LoggingMiddleware(), // Built-in logging
ErrorHandlingMiddleware([]string{"non-critical"}), // Custom error handling
TimingMiddleware(), // Performance monitoring
RetryMiddleware(3, 100*time.Millisecond), // Automatic retries
),
)
Key characteristics of middleware:
- Pre/Post Execution - Run code before and after workflow execution
- Error Handling - Catch, transform, or recover from errors
- Context Modification - Add values to or modify the execution context
- Chain Execution - Multiple middleware components work together in a chain
Creating your own middleware is straightforward:
func MyCustomMiddleware() gostage.Middleware {
return func(next gostage.RunnerFunc) gostage.RunnerFunc {
return func(ctx context.Context, wf *gostage.Workflow, logger gostage.Logger) error {
// Pre-execution logic
logger.Info("Starting workflow execution with custom middleware")
// Execute the next middleware in the chain (or the workflow itself)
err := next(ctx, wf, logger)
// Post-execution logic
logger.Info("Workflow execution completed with result: %v", err == nil)
// Optionally transform or handle the error
return err
}
}
}
The library includes examples of several middleware patterns:
- Error Handling Middleware - Catch and recover from specific errors
- Retry Middleware - Automatically retry workflows that fail
- Timing Middleware - Measure and record execution time
- Validation Middleware - Ensure workflows meet certain criteria
- State Injection Middleware - Add initial state to workflows
- Tracing Middleware - Add distributed tracing capabilities
- Audit Middleware - Record workflow execution for compliance
See the examples/middleware
directory for complete implementations of these patterns.
The order of middleware registration is important. Middleware is applied in reverse order, so the last middleware registered is the first to execute and the closest to the actual workflow execution.
runner.Use(
middlewareA, // Applied third (outer layer)
middlewareB, // Applied second (middle layer)
middlewareC, // Applied first (inner layer, closest to workflow)
)
This structure allows outer middleware to take action based on the results of inner middleware.
gostage includes a key-value store with type safety:
// Store data
ctx.Store().Put("order.id", "ORD-12345")
// Retrieve data with type safety
orderId, err := store.Get[string](ctx.Store(), "order.id")
// Store with TTL (time-to-live)
ctx.Store().PutWithTTL("session.token", token, 24*time.Hour)
// Store with metadata
metadata := store.NewMetadata()
metadata.AddTag("sensitive")
metadata.SetProperty("source", "external-api")
ctx.Store().PutWithMetadata("customer.data", customerData, metadata)
Note: The ActionContext
provides store access through a Store()
method rather than a direct field. This ensures all actions operate on the same store instance:
// Before (deprecated)
ctx.Store.Put("key", value)
data, err := store.Get[MyType](ctx.Store, "other-key")
// After (current)
ctx.Store().Put("key", value)
data, err := store.Get[MyType](ctx.Store(), "other-key")
gostage provides rich context messaging that automatically includes metadata about message sources and enables targeted message handling. This works for both normal workflows and spawned child processes.
// Works with both normal workflows and spawned workflows
runner := gostage.NewRunner() // or WithGRPCTransport for spawning
// Register global context handler
runner.Broker.RegisterHandlerWithContext(gostage.MessageTypeStorePut,
func(msgType gostage.MessageType, payload json.RawMessage, context gostage.MessageContext) error {
fmt.Printf("📨 Message from %s->%s->%s (PID: %d)\n",
context.WorkflowID, context.StageID, context.ActionName, context.ProcessID)
return nil
})
// Register workflow-specific handler
runner.Broker.RegisterWorkflowHandler(gostage.MessageTypeLog, "critical-workflow",
func(msgType gostage.MessageType, payload json.RawMessage, context gostage.MessageContext) error {
// Only receives messages from "critical-workflow"
return nil
})
Every message automatically includes comprehensive metadata:
type MessageContext struct {
WorkflowID string // Which workflow sent this message
StageID string // Which stage sent this message
ActionName string // Which action sent this message
ProcessID int32 // PID of the sending process
IsChildProcess bool // Whether from spawned child process
ActionIndex int32 // Position of action within stage
IsLastAction bool // Whether this is the last action in stage
SessionID string // Unique session ID for workflow execution
SequenceNumber int64 // Message sequence number for ordering
}
Actions use the same ctx.Send()
API - context metadata is added automatically:
func (a *MyAction) Execute(ctx *gostage.ActionContext) error {
// Same API as before - context metadata added automatically
ctx.Send(gostage.MessageTypeStorePut, map[string]interface{}{
"key": "processing_status",
"value": "completed",
})
return nil
}
Context messaging enables sophisticated debugging, monitoring, and message routing.
gostage provides powerful process spawning capabilities that allow workflows to execute in separate child processes with full inter-process communication (IPC). This enables isolation, fault tolerance, and distributed execution.
Execute workflows in separate child processes:
// Create a runner with spawn capability
runner := gostage.NewRunner()
// Define a workflow to run in a child process
workflowDef := gostage.SubWorkflowDef{
ID: "child-workflow",
Name: "Child Process Workflow",
Description: "Runs in a separate process",
Stages: []gostage.StageDef{
{
ID: "main-stage",
Actions: []gostage.ActionDef{
{ID: "process-info"},
},
},
},
}
// Spawn the workflow in a child process
err := runner.Spawn(context.Background(), workflowDef)
The spawn functionality creates actual operating system processes with different PIDs:
type ProcessInfoAction struct {
gostage.BaseAction
}
func (a *ProcessInfoAction) Execute(ctx *gostage.ActionContext) error {
processID := os.Getpid()
parentPID := os.Getppid()
ctx.Logger.Info("Child Process ID: %d", processID)
ctx.Logger.Info("Parent Process ID: %d", parentPID)
// Child can send data back to parent via IPC
ctx.Send(gostage.MessageTypeStorePut, map[string]interface{}{
"key": "child_pid",
"value": processID,
})
return nil
}
In child processes, create runners with brokers using the convenient constructor:
func childMain() {
// Set up broker for parent communication
broker := gostage.NewRunnerBroker(os.Stdout)
// Create runner with broker - clean and simple!
runner := gostage.NewRunnerWithBroker(broker)
// Or use the option-based approach
runner := gostage.NewRunner(gostage.WithBroker(broker))
// Now ctx.Send() will work properly in actions
// ... rest of child process logic
}
Note: The NewRunnerWithBroker()
constructor is a convenience method that's particularly useful for child processes, replacing the previous pattern of manual broker assignment.
Set up message handlers in the parent to receive data from child processes:
runner := gostage.NewRunner()
// Handle log messages from child processes
runner.Broker.RegisterHandler(gostage.MessageTypeLog, func(msgType gostage.MessageType, payload json.RawMessage) error {
var logData map[string]string
json.Unmarshal(payload, &logData)
level := logData["level"]
message := logData["message"]
fmt.Printf("[CHILD-%s] %s\n", level, message)
return nil
})
// Handle store updates from child processes
runner.Broker.RegisterHandler(gostage.MessageTypeStorePut, func(msgType gostage.MessageType, payload json.RawMessage) error {
var data map[string]interface{}
json.Unmarshal(payload, &data)
key := data["key"].(string)
value := data["value"]
fmt.Printf("📦 Received from child: %s = %v\n", key, value)
return nil
})
Transform and enhance messages between parent and child processes:
// Message transformation middleware
func MessageTransformMiddleware() gostage.IPCMiddlewareFunc {
return gostage.IPCMiddlewareFunc{
ProcessOutboundFunc: func(msgType gostage.MessageType, payload interface{}) (gostage.MessageType, interface{}, error) {
if msgType == gostage.MessageTypeLog {
if logData, ok := payload.(map[string]string); ok {
// Add timestamp and prefix to all log messages
logData["message"] = "[ENHANCED] " + logData["message"]
logData["timestamp"] = time.Now().Format("15:04:05.000")
}
}
return msgType, payload, nil
},
}
}
// Add middleware to runner
runner.AddIPCMiddleware(MessageTransformMiddleware())
Hook into the process lifecycle for monitoring and management:
// Process lifecycle middleware
func ProcessLifecycleMiddleware() gostage.SpawnMiddlewareFunc {
return gostage.SpawnMiddlewareFunc{
BeforeSpawnFunc: func(ctx context.Context, def gostage.SubWorkflowDef) (context.Context, gostage.SubWorkflowDef, error) {
fmt.Printf("🚀 About to spawn child process for workflow: %s\n", def.ID)
// Add context values
enhancedCtx := context.WithValue(ctx, "spawn_time", time.Now())
return enhancedCtx, def, nil
},
AfterSpawnFunc: func(ctx context.Context, def gostage.SubWorkflowDef, err error) error {
if spawnTime, ok := ctx.Value("spawn_time").(time.Time); ok {
duration := time.Since(spawnTime)
fmt.Printf("⏱️ Child process completed in %v\n", duration)
}
return nil
},
OnChildMessageFunc: func(msgType gostage.MessageType, payload json.RawMessage) error {
fmt.Printf("📨 Received message type: %s\n", msgType)
return nil
},
}
}
// Add spawn middleware to runner
runner.UseSpawnMiddleware(ProcessLifecycleMiddleware())
The IPC system works seamlessly across all platforms:
- Windows: Uses named pipes and
CreateProcess()
- Linux: Uses POSIX pipes and
fork()/exec()
- macOS: Uses POSIX pipes and
fork()/exec()
- Other platforms: Uses Go's standard cross-platform mechanisms
// Cross-platform file operations in child process
filename := filepath.Join(os.TempDir(), fmt.Sprintf("child_process_%d.txt", os.Getpid()))
content := fmt.Sprintf("Created by child process %d on %s/%s\n",
os.Getpid(), runtime.GOOS, runtime.GOARCH)
err := os.WriteFile(filename, []byte(content), 0644)
func MessageEncryptionMiddleware() gostage.IPCMiddlewareFunc {
return gostage.IPCMiddlewareFunc{
ProcessOutboundFunc: func(msgType gostage.MessageType, payload interface{}) (gostage.MessageType, interface{}, error) {
if msgType == gostage.MessageTypeStorePut {
if storeData, ok := payload.(map[string]interface{}); ok {
if key, exists := storeData["key"]; exists {
if keyStr, ok := key.(string); ok && strings.Contains(keyStr, "sensitive") {
storeData["encrypted"] = true
storeData["key"] = "encrypted_" + keyStr
}
}
}
}
return msgType, payload, nil
},
ProcessInboundFunc: func(msgType gostage.MessageType, payload json.RawMessage) (gostage.MessageType, json.RawMessage, error) {
// Decrypt messages on the receiving end
if msgType == gostage.MessageTypeStorePut {
var storeData map[string]interface{}
if err := json.Unmarshal(payload, &storeData); err == nil {
if encrypted, exists := storeData["encrypted"]; exists && encrypted == true {
if key, exists := storeData["key"]; exists {
if keyStr, ok := key.(string); ok && strings.HasPrefix(keyStr, "encrypted_") {
storeData["key"] = strings.TrimPrefix(keyStr, "encrypted_")
delete(storeData, "encrypted")
if newPayload, err := json.Marshal(storeData); err == nil {
payload = json.RawMessage(newPayload)
}
}
}
}
}
}
return msgType, payload, nil
},
}
}
// Middleware that implements both IPC and Spawn interfaces
type MetricsMiddleware struct {
MessageCount map[gostage.MessageType]int
TotalBytes int
}
func (m *MetricsMiddleware) ProcessOutbound(msgType gostage.MessageType, payload interface{}) (gostage.MessageType, interface{}, error) {
m.MessageCount[msgType]++
if data, err := json.Marshal(payload); err == nil {
m.TotalBytes += len(data)
}
return msgType, payload, nil
}
func (m *MetricsMiddleware) AfterSpawn(ctx context.Context, def gostage.SubWorkflowDef, err error) error {
fmt.Println("📊 Communication Statistics:")
for msgType, count := range m.MessageCount {
fmt.Printf(" %s: %d messages\n", msgType, count)
}
fmt.Printf(" Total bytes: %d\n", m.TotalBytes)
return nil
}
// Add to runner (implements both interfaces)
metrics := NewMetricsMiddleware()
runner.AddIPCMiddleware(metrics)
runner.UseSpawnMiddleware(metrics)
The IPC system uses structured JSON messages for communication between parent and child processes:
MessageTypeLog
- Log messages from child to parentMessageTypeStorePut
- Store updates from child to parentMessageTypeStoreDelete
- Store deletions from child to parentMessageTypeWorkflowStart
- Initial message from parent to child to start executionMessageTypeWorkflowResult
- Final message from child to parent with outcome
Parent Process Child Process
┌─────────────────┐ ┌─────────────────┐
│ │ stdin │ │
│ Runner with │─────────────→ │ Child Runner │
│ Message │ WorkflowStart │ with Actions │
│ Handlers │ │ │
│ │ stdout │ │
│ │←───────────── │ │
│ │ Log/Store Msgs│ │
└─────────────────┘ └─────────────────┘
// Register custom message handlers
runner.Broker.RegisterHandler("custom-message-type", func(msgType gostage.MessageType, payload json.RawMessage) error {
var customData MyCustomData
if err := json.Unmarshal(payload, &customData); err != nil {
return err
}
// Process custom message
fmt.Printf("Received custom data: %+v\n", customData)
return nil
})
// Send custom messages from child process
ctx.Send("custom-message-type", MyCustomData{
Field1: "value1",
Field2: 42,
})
GoStage provides a powerful hierarchical middleware system that allows you to customize behavior at different levels of execution:
- Runner Middleware: Wraps the entire workflow execution
- Workflow Middleware: Wraps individual stage executions
- Stage Middleware: Wraps all actions within a stage
The execution flow with middleware follows a nested pattern:
Runner Middleware (start)
Workflow (start)
Workflow Middleware for Stage 1 (start)
Stage 1 Middleware (start)
Actions in Stage 1
Stage 1 Middleware (end)
Workflow Middleware for Stage 1 (end)
Workflow Middleware for Stage 2 (start)
Stage 2 Middleware (start)
Actions in Stage 2
Stage 2 Middleware (end)
Workflow Middleware for Stage 2 (end)
Workflow (end)
Runner Middleware (end)
Runner middleware wraps the execution of an entire workflow:
runner := gostage.NewRunner()
// Add logging middleware
runner.Use(func(next gostage.RunnerFunc) gostage.RunnerFunc {
return func(ctx context.Context, w *gostage.Workflow, logger gostage.Logger) error {
logger.Info("Starting workflow: %s", w.Name)
err := next(ctx, w, logger)
logger.Info("Completed workflow: %s", w.Name)
return err
}
})
// Execute the workflow
runner.Execute(context.Background(), workflow, logger)
Workflow middleware wraps the execution of each stage within a workflow:
workflow := gostage.NewWorkflow("example", "Example Workflow", "A workflow with middleware")
// Add stage notification middleware
workflow.Use(func(next gostage.WorkflowStageRunnerFunc) gostage.WorkflowStageRunnerFunc {
return func(ctx context.Context, s *gostage.Stage, w *gostage.Workflow, logger gostage.Logger) error {
logger.Info("Starting stage: %s", s.Name)
err := next(ctx, s, w, logger)
logger.Info("Completed stage: %s", s.Name)
return err
}
})
// Add stages and actions...
Stage middleware wraps the execution of all actions within a stage:
stage := gostage.NewStage("container-stage", "Container Stage", "A stage that runs in a container")
// Add container middleware
stage.Use(func(next gostage.StageRunnerFunc) gostage.StageRunnerFunc {
return func(ctx context.Context, s *gostage.Stage, w *gostage.Workflow, logger gostage.Logger) error {
// Start container
logger.Info("Starting container for stage: %s", s.Name)
// Execute all actions in the container
err := next(ctx, s, w, logger)
// Always stop container
logger.Info("Stopping container for stage: %s", s.Name)
return err
}
})
// Add actions that will run in the container...
The middleware system allows you to create various utility middleware functions. Here are examples of middleware you could build with the system:
// Example logging middleware for runners
func LoggingMiddleware() gostage.Middleware {
return func(next gostage.RunnerFunc) gostage.RunnerFunc {
return func(ctx context.Context, w *gostage.Workflow, logger gostage.Logger) error {
start := time.Now()
logger.Info("Starting workflow: %s", w.Name)
err := next(ctx, w, logger)
elapsed := time.Since(start)
logger.Info("Completed workflow: %s (in %v)", w.Name, elapsed)
return err
}
}
}
// Example time limit middleware
func TimeLimitMiddleware(duration time.Duration) gostage.Middleware {
return func(next gostage.RunnerFunc) gostage.RunnerFunc {
return func(ctx context.Context, w *gostage.Workflow, logger gostage.Logger) error {
ctx, cancel := context.WithTimeout(ctx, duration)
defer cancel()
return next(ctx, w, logger)
}
}
}
// Example stage notification middleware
func StageNotificationMiddleware(beforeFn, afterFn func(stageName string)) gostage.WorkflowMiddleware {
return func(next gostage.WorkflowStageRunnerFunc) gostage.WorkflowStageRunnerFunc {
return func(ctx context.Context, s *gostage.Stage, w *gostage.Workflow, logger gostage.Logger) error {
if beforeFn != nil {
beforeFn(s.Name)
}
err := next(ctx, s, w, logger)
if afterFn != nil {
afterFn(s.Name)
}
return err
}
}
}
// Example container middleware for stages
func ContainerStageMiddleware(image, name string) gostage.StageMiddleware {
return func(next gostage.StageRunnerFunc) gostage.StageRunnerFunc {
return func(ctx context.Context, s *gostage.Stage, w *gostage.Workflow, logger gostage.Logger) error {
// Start container (pseudocode)
logger.Info("Starting container %s with image %s", name, image)
// Run all actions in the container
err := next(ctx, s, w, logger)
// Always stop container
logger.Info("Stopping container %s", name)
return err
}
}
}
Actions can dynamically generate additional actions during execution:
func (a DynamicAction) Execute(ctx *gostage.ActionContext) error {
// Create a new action dynamically
newAction := &CustomAction{
BaseAction: gostage.NewBaseAction("dynamic-action", "Dynamically Created Action"),
}
// Add it to be executed after this action
ctx.AddDynamicAction(newAction)
return nil
}
Stages can be created dynamically during workflow execution:
func (a StageGeneratorAction) Execute(ctx *gostage.ActionContext) error {
// Create a new stage dynamically
newStage := gostage.NewStage(
"dynamic-stage",
"Dynamic Stage",
"Created based on runtime conditions",
)
// Add actions to the stage
newStage.AddAction(newAction)
// Add it to be executed after the current stage
ctx.AddDynamicStage(newStage)
return nil
}
Actions and stages can be conditionally enabled or disabled:
// Disable a specific action
ctx.DisableAction("resource-intensive-action")
// Disable actions by tag
ctx.DisableActionsByTag("optional")
// Disable a specific stage
ctx.DisableStage("cleanup-stage")
// Enable/disable based on conditions
if !ctx.Store().HasTag("important-resource", "protected") {
ctx.EnableStage("cleanup-stage")
}
Find workflow components using advanced filtering:
// Find actions by tag
criticalActions := ctx.FindActionsByTag("critical")
// Find actions by multiple tags
backupActions := ctx.FindActionsByTags([]string{"backup", "database"})
// Find stages by description substring
reportStages := ctx.FindStagesByDescription("report")
// Find actions by type
uploadActions := ctx.FindActionsByType((*UploadAction)(nil))
// Custom filtering
complexActions := ctx.FilterActions(func(a gostage.Action) bool {
return strings.Contains(a.Description(), "complex")
})
The Runner can be extended to create domain-specific workflow execution environments:
// Create a custom runner by embedding the base Runner
type ExtendedRunner struct {
// Embed the base runner
*gostage.Runner
// Add domain-specific components
configProvider ConfigProvider
resourceManager ResourceManager
// Add custom settings
defaultEnvironment string
setupTimeout time.Duration
}
// Override Execute to add custom preparation logic
func (r *ExtendedRunner) Execute(ctx context.Context, wf *Workflow, logger Logger) error {
// Add preparation logic
if err := r.prepareWorkflow(wf); err != nil {
return err
}
// Call the base implementation
return r.Runner.Execute(ctx, wf, logger)
}
This pattern is helpful when you need to:
- Provide domain-specific initialization - Set up resources, load configuration
- Share common resources - Make services, clients, or tools available to all actions
- Create a specialized execution environment - Add middleware specific to your domain
- Simplify workflow creation - Pre-configure workflows with standard components
The examples/extended_runner
directory shows a complete example of this pattern, including:
- How to properly store and retrieve domain objects in the workflow store
- How to provide helper functions for accessing typed resources
- How to create a fluent configuration API for your extended runner
- Sequential execution of stages and actions
- Dynamic modification of workflows during execution
- Tag-based organization and filtering
- Type-safe state storage with support for any Go type
- Conditional execution based on runtime state
- Rich metadata for traceability and organization
- Serializable workflow state for persistence
- Hierarchical middleware system
The repository includes several examples demonstrating different features:
- Basic Usage - Workflow creation and execution fundamentals
- Dynamic Generation - Dynamic stage and action creation during runtime
- File Operations - File processing workflows with state management
- Action Wrapping - Advanced action composition patterns
- Conditional Execution - Enable/disable components based on runtime conditions
- Extended Runner - Domain-specific workflow execution environments
- Middleware Patterns - Cross-cutting concerns implementation
- Error Handling - Recovery strategies and fault tolerance
- Context Messaging (
examples/context_messaging/
) - Rich metadata and targeted message handling - Process Spawning (
examples/spawn_process/
) - Child process execution with gRPC IPC - Spawn Middleware (
examples/spawn_middleware/
) - Advanced IPC and spawn middleware system
The spawn examples demonstrate real child process execution with gRPC-based IPC:
cd examples/spawn_process
go run main.go
Features demonstrated:
- Real child processes with different PIDs
- gRPC-based inter-process communication
- Context messaging with rich metadata
- File operations proving process isolation
- Store synchronization between parent and child
- Cross-platform compatibility
cd examples/spawn_middleware
go run main.go
Features demonstrated:
- IPC middleware for message transformation
- Spawn middleware for process lifecycle management
- Message encryption simulation
- Communication metrics collection
- Enhanced logging with timestamps and prefixes
Check the examples/
directory for complete examples.
See the examples directory for more usage examples.
Contributions are welcome! Please feel free to submit a Pull Request.