Skip to content

Commit

Permalink
feat(routingprocessor): route on resource attributes (open-telemetry#…
Browse files Browse the repository at this point in the history
…5694)

**Description:** Add a feature to `routingprocessor` to route traces on resource attributes and make it configurable (retaining previous default to read from context).

**Link to tracking Issue:** Fixes open-telemetry#5538

**Testing:** Added unit tests.

**Documentation:** Changed README.

---

Huge shout-out to @astencel-sumo for this patch! 🙇
  • Loading branch information
pmalek authored Oct 15, 2021
1 parent 63af0bf commit 3bf6a49
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 5 deletions.
11 changes: 8 additions & 3 deletions processor/routingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@

Routes traces to specific exporters.

This processor will read a header from the incoming HTTP request (gRPC or plain HTTP) and direct the trace information to specific exporters based on the attribute's value.
This processor will either read a header from the incoming HTTP request (gRPC or plain HTTP), or it will read a resource attribute, and direct the trace information to specific exporters based on the value read.

This processor *does not* let traces to continue through the pipeline and will emit a warning in case other processor(s) are defined after this one. Similarly, exporters defined as part of the pipeline are not authoritative: if you add an exporter to the pipeline, make sure you add it to this processor *as well*, otherwise it won't be used at all. All exporters defined as part of this processor *must also* be defined as part of the pipeline's exporters.

Given that this processor depends on information provided by the client via HTTP headers, processors that aggregate data like `batch` or `groupbytrace` should not be used when this processor is part of the pipeline.
Given that this processor depends on information provided by the client via HTTP headers or resource attributes, caution must be taken when processors that aggregate data like `batch` or `groupbytrace` are used as part of the pipeline.

The following settings are required:

- `from_attribute`: contains the HTTP header name to look up the route's value. Only the OTLP exporter has been tested in connection with the OTLP gRPC Receiver, but any other gRPC receiver should work fine, as long as the client sends the specified HTTP header.
- `from_attribute`: contains the HTTP header name or the resource attribute name to look up the route's value. Only the OTLP exporter has been tested in connection with the OTLP gRPC Receiver, but any other gRPC receiver should work fine, as long as the client sends the specified HTTP header.
- `table`: the routing table for this processor.
- `table.value`: a possible value for the attribute specified under FromAttribute.
- `table.exporters`: the list of exporters to use when the value from the FromAttribute field matches this table item.

The following settings can be optionally configured:

- `attribute_source` defines where to look for the attribute in `from_attribute`. The allowed values are:
- `context` (the default) - to search the [context][context_docs], which includes HTTP headers
- `resource` - to search the resource attributes.
- `default_exporters` contains the list of exporters to use when a more specific record can't be found in the routing table.

Example:
Expand All @@ -37,3 +40,5 @@ exporters:
```
The full list of settings exposed for this processor are documented [here](./config.go) with detailed sample configuration [here](./testdata/config.yaml).
[context_docs]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/context/context.md
8 changes: 8 additions & 0 deletions processor/routingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ type Config struct {
// Optional.
DefaultExporters []string `mapstructure:"default_exporters"`

// AttributeSource defines where the attribute defined in `from_attribute` is searched for.
// The allowed values are:
// - "context" - the attribute must exist in the incoming context
// - "resource" - the attribute must exist in resource attributes
// The default value is "context".
// Optional.
AttributeSource string `mapstructure:"attribute_source"`

// FromAttribute contains the attribute name to look up the route value. This attribute should be part of the context propagated
// down from the previous receivers and/or processors. If all the receivers and processors are propagating the entire context correctly,
// this could be the HTTP/gRPC header from the original request/RPC. Typically, aggregation processors (batch, groupbytrace)
Expand Down
1 change: 1 addition & 0 deletions processor/routingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestLoadConfig(t *testing.T) {
&Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
DefaultExporters: []string{"otlp"},
AttributeSource: "context",
FromAttribute: "X-Tenant",
Table: []RoutingTableItem{
{
Expand Down
6 changes: 6 additions & 0 deletions processor/routingprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
const (
// The value of "type" key in configuration.
typeStr = "routing"

contextAttributeSource = "context"
resourceAttributeSource = "resource"

defaultAttributeSource = contextAttributeSource
)

// NewFactory creates a factory for the routing processor.
Expand All @@ -40,6 +45,7 @@ func NewFactory() component.ProcessorFactory {
func createDefaultConfig() config.Processor {
return &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
AttributeSource: defaultAttributeSource,
}
}

Expand Down
22 changes: 21 additions & 1 deletion processor/routingprocessor/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ func (e *processorImp) Shutdown(context.Context) error {
}

func (e *processorImp) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
value := e.extractValueFromContext(ctx)
var value string
if e.config.AttributeSource == resourceAttributeSource {
value = e.extractValueFromResource(td)
} else {
value = e.extractValueFromContext(ctx)
}

if len(value) == 0 {
// the attribute's value hasn't been found, send data to the default exporter
return e.pushDataToExporters(ctx, td, e.defaultTracesExporters)
Expand Down Expand Up @@ -161,6 +167,20 @@ func (e *processorImp) pushDataToExporters(ctx context.Context, td pdata.Traces,
return nil
}

func (e *processorImp) extractValueFromResource(traces pdata.Traces) string {
if traces.ResourceSpans().Len() == 0 {
return ""
}

firstResourceAttributes := traces.ResourceSpans().At(0).Resource().Attributes()
routingAttribute, found := firstResourceAttributes.Get(e.config.FromAttribute)
if !found {
return ""
}

return routingAttribute.AsString()
}

func (e *processorImp) extractValueFromContext(ctx context.Context) string {
// right now, we only support looking up attributes from requests that have gone through the gRPC server
// in that case, it will add the HTTP headers as context metadata
Expand Down
45 changes: 44 additions & 1 deletion processor/routingprocessor/routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestInvalidExporter(t *testing.T) {
assert.Error(t, err)
}

func TestValueFromExistingGRPCAttribute(t *testing.T) {
func TestValueFromExistingContextAttribute(t *testing.T) {
// prepare
exp, err := newProcessor(zap.NewNop(), &Config{
DefaultExporters: []string{"otlp"},
Expand All @@ -270,6 +270,49 @@ func TestValueFromExistingGRPCAttribute(t *testing.T) {
assert.Equal(t, "acme", val)
}

func TestValueFromExistingResourceAttribute(t *testing.T) {
// prepare
wg := &sync.WaitGroup{}
wg.Add(1)

exp := &processorImp{
config: Config{
AttributeSource: resourceAttributeSource,
FromAttribute: "k8s.namespace.name",
},
logger: zap.NewNop(),
defaultTracesExporters: []component.TracesExporter{
&mockExporter{
ConsumeTracesFunc: func(context.Context, pdata.Traces) error {
assert.Fail(t, "Should not route to default exporters.")
wg.Done()
return nil
},
},
},
traceExporters: map[string][]component.TracesExporter{
"namespace-1": {
&mockExporter{
ConsumeTracesFunc: func(context.Context, pdata.Traces) error {
wg.Done()
return nil
},
},
},
},
}
traces := pdata.NewTraces()
traces.ResourceSpans().AppendEmpty()
traces.ResourceSpans().At(0).Resource().Attributes().InsertString("k8s.namespace.name", "namespace-1")

// test
err := exp.ConsumeTraces(context.Background(), traces)

// verify
wg.Wait() // ensure that the exporter has been called
assert.NoError(t, err)
}

func TestMultipleValuesFromExistingGRPCAttribute(t *testing.T) {
// prepare
exp, err := newProcessor(zap.NewNop(), &Config{
Expand Down
1 change: 1 addition & 0 deletions processor/routingprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ processors:
routing:
default_exporters:
- otlp
attribute_source: context
from_attribute: X-Tenant
table:
- value: acme
Expand Down

0 comments on commit 3bf6a49

Please sign in to comment.