Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ mtlog is a high-performance structured logging library for Go, inspired by [Seri
- **Elasticsearch sink** for centralized log storage and search
- **Splunk sink** with HEC (HTTP Event Collector) support
- **OpenTelemetry (OTLP) sink** with gRPC/HTTP transport, batching, and trace correlation
- **Conditional sink** for predicate-based routing with zero overhead
- **Router sink** for multi-destination routing with FirstMatch/AllMatch modes
- **Async sink wrapper** for high-throughput scenarios
- **Durable buffering** with persistent storage for reliability

Expand Down Expand Up @@ -656,6 +658,108 @@ mtlog.WithDurable(
)
```

### Event Routing with Conditional and Router Sinks

Route log events to different destinations based on their properties:

#### Conditional Sink

Filter events based on predicates with zero overhead for non-matching events:

```go
// Create a conditional sink for critical alerts
alertSink, _ := sinks.NewFileSink("alerts.log")
criticalAlertSink := sinks.NewConditionalSink(
func(event *core.LogEvent) bool {
return event.Level >= core.ErrorLevel &&
event.Properties["Alert"] != nil
},
alertSink,
)

// Use built-in predicates
auditSink := sinks.NewConditionalSink(
sinks.PropertyPredicate("Audit"),
auditFileSink,
)

// Combine predicates
complexFilter := sinks.NewConditionalSink(
sinks.AndPredicate(
sinks.LevelPredicate(core.ErrorLevel),
sinks.PropertyPredicate("Critical"),
sinks.PropertyValuePredicate("Environment", "production"),
),
targetSink,
)

logger := mtlog.New(
mtlog.WithSink(sinks.NewConsoleSink()),
mtlog.WithSink(criticalAlertSink),
mtlog.WithSink(auditSink),
)

// Only critical errors with Alert property go to alerts.log
logger.With("Alert", true).Error("Database connection lost")
```

#### Router Sink

Advanced routing with multiple destinations and routing modes:

```go
// FirstMatch mode - exclusive routing (stops at first match)
router := sinks.NewRouterSink(sinks.FirstMatch,
sinks.Route{
Name: "errors",
Predicate: sinks.LevelPredicate(core.ErrorLevel),
Sink: errorSink,
},
sinks.Route{
Name: "warnings",
Predicate: sinks.LevelPredicate(core.WarningLevel),
Sink: warningSink,
},
)

// AllMatch mode - broadcast to all matching routes
router := sinks.NewRouterSink(sinks.AllMatch,
sinks.MetricRoute("metrics", metricsSink),
sinks.AuditRoute("audit", auditSink),
sinks.ErrorRoute("errors", errorSink),
)

// With default sink for non-matching events
router := sinks.NewRouterSinkWithDefault(
sinks.FirstMatch,
defaultSink,
routes...,
)

// Dynamic route management at runtime
router.AddRoute(sinks.Route{
Name: "debug",
Predicate: func(e *core.LogEvent) bool {
return e.Level <= core.DebugLevel
},
Sink: debugSink,
})
router.RemoveRoute("debug")

// Fluent route builder API
route := sinks.NewRoute("special-events").
When(func(e *core.LogEvent) bool {
category, _ := e.Properties["Category"].(string)
return category == "Special"
}).
To(specialSink)

logger := mtlog.New(
mtlog.WithSink(router),
mtlog.WithSink(sinks.NewConsoleSink()),
)
```

## Dynamic Level Control

Control logging levels at runtime without restarting your application:
Expand Down Expand Up @@ -1177,6 +1281,7 @@ For comprehensive guides and examples, see the [docs](./docs) directory:
- **[Quick Reference](./docs/quick-reference.md)** - Quick reference for all features
- **[Template Syntax](./docs/template-syntax.md)** - Guide to message template syntaxes
- **[Sinks Guide](./docs/sinks.md)** - Complete guide to all output destinations
- **[Routing Patterns](./docs/routing-patterns.md)** - Advanced event routing patterns and best practices
- **[Dynamic Level Control](./docs/dynamic-levels.md)** - Runtime level management
- **[Type-Safe Generics](./docs/generics.md)** - Compile-time safe logging methods
- **[Configuration](./docs/configuration.md)** - JSON-based configuration
Expand Down
224 changes: 224 additions & 0 deletions configuration/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func NewLoggerBuilder() *LoggerBuilder {
lb.RegisterSink("Splunk", createSplunkSink)
lb.RegisterSink("Async", createAsyncSink)
lb.RegisterSink("Durable", createDurableSink)
lb.RegisterSink("Conditional", createConditionalSink)
lb.RegisterSink("Router", createRouterSink)

// Register default enrichers
lb.RegisterEnricher("WithMachineName", func(args map[string]any) (core.LogEventEnricher, error) {
Expand Down Expand Up @@ -534,3 +536,225 @@ func (e *fixedPropertyEnricher) Enrich(event *core.LogEvent, propertyFactory cor
prop := propertyFactory.CreateProperty(e.propertyName, e.value)
event.Properties[prop.Name] = prop.Value
}

func createConditionalSink(args map[string]any) (core.LogEventSink, error) {
// Validate required fields
if _, hasWhen := args["when"]; !hasWhen {
return nil, fmt.Errorf("conditional sink requires 'when' field to specify predicate type")
}

// Get the wrapped sink configuration
wrappedConfig, ok := args["writeTo"].(map[string]any)
if !ok {
return nil, fmt.Errorf("conditional sink requires 'writeTo' configuration for target sink")
}

// Create wrapped sink
sinkName, ok := wrappedConfig["Name"].(string)
if !ok {
return nil, fmt.Errorf("wrapped sink must have 'Name'")
}

// Get wrapped sink args
wrappedArgs, _ := wrappedConfig["Args"].(map[string]any)

// Use a temporary builder to create the wrapped sink
tempBuilder := NewLoggerBuilder()
wrapped, err := tempBuilder.createSink(SinkConfiguration{
Name: sinkName,
Args: wrappedArgs,
})
if err != nil {
return nil, fmt.Errorf("failed to create wrapped sink: %w", err)
}

// Create predicate based on configuration
predicateType := GetString(args, "when", "")
var predicate func(*core.LogEvent) bool

switch predicateType {
case "level":
levelStr := GetString(args, "minimumLevel", "Error")
level, err := ParseLevel(levelStr)
if err != nil {
return nil, err
}
predicate = sinks.LevelPredicate(level)

case "property":
propertyName := GetString(args, "property", "")
if propertyName == "" {
return nil, fmt.Errorf("conditional sink with 'property' requires 'property' argument")
}
predicate = sinks.PropertyPredicate(propertyName)

case "propertyValue":
propertyName := GetString(args, "property", "")
propertyValue := args["value"]
if propertyName == "" {
return nil, fmt.Errorf("conditional sink with 'propertyValue' requires 'property' argument")
}
predicate = sinks.PropertyValuePredicate(propertyName, propertyValue)

default:
return nil, fmt.Errorf("conditional sink requires 'when' argument (level, property, or propertyValue)")
}

// Optional name for debugging
name := GetString(args, "name", "")
if name != "" {
return sinks.NewNamedConditionalSink(name, predicate, wrapped), nil
}

return sinks.NewConditionalSink(predicate, wrapped), nil
}

func createRouterSink(args map[string]any) (core.LogEventSink, error) {
// Validate has routes
if _, hasRoutes := args["routes"]; !hasRoutes {
return nil, fmt.Errorf("router sink requires 'routes' array")
}

// Get routing mode
modeStr := GetString(args, "mode", "FirstMatch")
var mode sinks.RoutingMode
switch modeStr {
case "FirstMatch":
mode = sinks.FirstMatch
case "AllMatch":
mode = sinks.AllMatch
default:
return nil, fmt.Errorf("unknown routing mode: %s (use 'FirstMatch' or 'AllMatch')", modeStr)
}

// Parse routes
var routes []sinks.Route
if routeConfigs, ok := args["routes"].([]interface{}); ok {
if len(routeConfigs) == 0 {
return nil, fmt.Errorf("router sink requires at least one route")
}
for _, rc := range routeConfigs {
routeConfig, ok := rc.(map[string]interface{})
if !ok {
continue
}

// Get route name
name := ""
if n, ok := routeConfig["name"].(string); ok {
name = n
}

// Validate route has 'when' predicate
if _, hasWhen := routeConfig["when"]; !hasWhen {
return nil, fmt.Errorf("route '%s' requires 'when' field to specify predicate type", name)
}

// Get sink configuration
sinkConfig, ok := routeConfig["writeTo"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("route '%s' requires 'writeTo' configuration for target sink", name)
}

sinkName, ok := sinkConfig["Name"].(string)
if !ok {
return nil, fmt.Errorf("route '%s' sink must have 'Name'", name)
}

// Get sink args
sinkArgs, _ := sinkConfig["Args"].(map[string]interface{})

// Create the sink
tempBuilder := NewLoggerBuilder()
sink, err := tempBuilder.createSink(SinkConfiguration{
Name: sinkName,
Args: sinkArgs,
})
if err != nil {
return nil, fmt.Errorf("failed to create sink for route '%s': %w", name, err)
}

// Create predicate
var predicate func(*core.LogEvent) bool
whenType := ""
if w, ok := routeConfig["when"].(string); ok {
whenType = w
}

switch whenType {
case "level":
levelStr := ""
if l, ok := routeConfig["minimumLevel"].(string); ok {
levelStr = l
}
level, err := ParseLevel(levelStr)
if err != nil {
return nil, err
}
predicate = sinks.LevelPredicate(level)

case "property":
propertyName := ""
if p, ok := routeConfig["property"].(string); ok {
propertyName = p
}
if propertyName == "" {
return nil, fmt.Errorf("route '%s' with 'property' requires 'property' argument", name)
}
predicate = sinks.PropertyPredicate(propertyName)

case "propertyValue":
propertyName := ""
if p, ok := routeConfig["property"].(string); ok {
propertyName = p
}
propertyValue := routeConfig["value"]
if propertyName == "" {
return nil, fmt.Errorf("route '%s' with 'propertyValue' requires 'property' argument", name)
}
predicate = sinks.PropertyValuePredicate(propertyName, propertyValue)

case "error":
predicate = sinks.LevelPredicate(core.ErrorLevel)

case "audit":
predicate = sinks.PropertyPredicate("Audit")

case "metric":
predicate = sinks.PropertyPredicate("Metric")

default:
return nil, fmt.Errorf("route '%s' requires 'when' argument", name)
}

routes = append(routes, sinks.Route{
Name: name,
Predicate: predicate,
Sink: sink,
})
}
}

// Check for default sink
var defaultSink core.LogEventSink
if defaultConfig, ok := args["defaultSink"].(map[string]interface{}); ok {
sinkName, ok := defaultConfig["Name"].(string)
if !ok {
return nil, fmt.Errorf("default sink must have 'Name'")
}

sinkArgs, _ := defaultConfig["Args"].(map[string]interface{})

tempBuilder := NewLoggerBuilder()
defaultSink, _ = tempBuilder.createSink(SinkConfiguration{
Name: sinkName,
Args: sinkArgs,
})
}

if defaultSink != nil {
return sinks.NewRouterSinkWithDefault(mode, defaultSink, routes...), nil
}

return sinks.NewRouterSink(mode, routes...), nil
}
Loading
Loading