Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/routing] Supports matching the statement only once. #28888

Merged
merged 10 commits into from
Dec 14, 2023
27 changes: 27 additions & 0 deletions .chloggen/feat_routing_match_once.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: routingconnector supports matching the statement only once

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26353]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
12 changes: 12 additions & 0 deletions connector/routingconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The following settings are available:
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
- `default_pipelines (optional)`: contains the list of pipelines to use when a record does not meet any of specified conditions.
- `error_mode (optional)`: determines how errors returned from OTTL statements are handled. Valid values are `ignore` and `propagate`. If `ignored` is used and a statement's condition has an error then the payload will be routed to the default pipelines. If not supplied, `propagate` is used.
- `match_once (optional, default: false)`: determines whether the connector matches multiple statements or not. If enabled, the payload will be routed to the first pipeline in the `table` whose routing condition is met.

Example:

Expand All @@ -55,12 +56,23 @@ connectors:
routing:
default_pipelines: [traces/jaeger]
error_mode: ignore
match_once: false
table:
- statement: route() where attributes["X-Tenant"] == "acme"
pipelines: [traces/jaeger-acme]
- statement: delete_key(attributes, "X-Tenant") where IsMatch(attributes["X-Tenant"], ".*corp")
pipelines: [traces/jaeger-ecorp]

routing/match_once:
default_pipelines: [traces/jaeger]
error_mode: ignore
match_once: true
table:
- statement: route() where attributes["X-Tenant"] == "acme"
pipelines: [traces/jaeger-acme]
- statement: route() where attributes["X-Tenant"] == ".*acme"
pipelines: [traces/jaeger-ecorp]

service:
pipelines:
traces/in:
Expand Down
4 changes: 4 additions & 0 deletions connector/routingconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Config struct {
// Table contains the routing table for this processor.
// Required.
Table []RoutingTableItem `mapstructure:"table"`

// MatchOnce determines whether the connector matches multiple statements.
// Optional.
MatchOnce bool `mapstructure:"match_once"`
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}

// Validate checks if the processor configuration is valid.
Expand Down
5 changes: 4 additions & 1 deletion connector/routingconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
rtx := ottlresource.NewTransformContext(rlogs.Resource())

noRoutesMatch := true
for _, route := range c.router.routes {
for _, route := range c.router.routeSlice {
_, isMatch, err := route.statement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
Expand All @@ -84,6 +84,9 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if isMatch {
noRoutesMatch = false
c.group(groups, route.consumer, rlogs)
if c.config.MatchOnce {
break
}
}

}
Expand Down
152 changes: 152 additions & 0 deletions connector/routingconnector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,158 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
})
}

func TestLogsAreCorrectlyMatchOnceWithOTTL(t *testing.T) {
logsDefault := component.NewIDWithName(component.DataTypeLogs, "default")
logs0 := component.NewIDWithName(component.DataTypeLogs, "0")
logs1 := component.NewIDWithName(component.DataTypeLogs, "1")

cfg := &Config{
DefaultPipelines: []component.ID{logsDefault},
Table: []RoutingTableItem{
{
Statement: `route() where IsMatch(attributes["X-Tenant"], ".*acme") == true`,
Pipelines: []component.ID{logs0},
},
{
Statement: `route() where IsMatch(attributes["X-Tenant"], "_acme") == true`,
Pipelines: []component.ID{logs1},
},
{
Statement: `route() where attributes["X-Tenant"] == "ecorp"`,
Pipelines: []component.ID{logsDefault, logs0},
},
},
MatchOnce: true,
}

var defaultSink, sink0, sink1 consumertest.LogsSink

router := connectortest.NewLogsRouter(
connectortest.WithLogsSink(logsDefault, &defaultSink),
connectortest.WithLogsSink(logs0, &sink0),
connectortest.WithLogsSink(logs1, &sink1),
)

resetSinks := func() {
defaultSink.Reset()
sink0.Reset()
sink1.Reset()
}

factory := NewFactory()
conn, err := factory.CreateLogsToLogs(
context.Background(),
connectortest.NewNopCreateSettings(),
cfg,
router.(consumer.Logs),
)

require.NoError(t, err)
require.NotNil(t, conn)
require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

t.Run("logs matched by no expressions", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()
rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "something-else")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 1)
assert.Len(t, sink0.AllLogs(), 0)
assert.Len(t, sink1.AllLogs(), 0)
})

t.Run("logs matched one expression", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "xacme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 0)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)
})

t.Run("logs matched by two expressions, but sinks to one", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "x_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

rl = l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 0)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)

assert.Equal(t, sink0.AllLogs()[0].LogRecordCount(), 2)
})

t.Run("one log matched by multiple expressions, other matched none", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

rl = l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "something-else")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 1)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)

rlog := defaultSink.AllLogs()[0].ResourceLogs().At(0)
attr, ok := rlog.Resource().Attributes().Get("X-Tenant")
assert.True(t, ok, "routing attribute must exists")
assert.Equal(t, attr.AsString(), "something-else")
})

t.Run("logs matched by one expression, multiple pipelines", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "ecorp")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 1)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)

assert.Equal(t, defaultSink.AllLogs()[0].LogRecordCount(), 1)
assert.Equal(t, sink0.AllLogs()[0].LogRecordCount(), 1)
assert.Equal(t, defaultSink.AllLogs(), sink0.AllLogs())
})
}

func TestLogsResourceAttributeDroppedByOTTL(t *testing.T) {
logsDefault := component.NewIDWithName(component.DataTypeLogs, "default")
logsOther := component.NewIDWithName(component.DataTypeLogs, "other")
Expand Down
5 changes: 4 additions & 1 deletion connector/routingconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
rtx := ottlresource.NewTransformContext(rmetrics.Resource())

noRoutesMatch := true
for _, route := range c.router.routes {
for _, route := range c.router.routeSlice {
_, isMatch, err := route.statement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
Expand All @@ -84,6 +84,9 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
if isMatch {
noRoutesMatch = false
c.group(groups, route.consumer, rmetrics)
if c.config.MatchOnce {
break
}
}

}
Expand Down
Loading
Loading