Skip to content

Commit

Permalink
Impelement partitioning for OTLP metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
SHaaD94 authored and Evgeniy Zuykin committed Feb 22, 2024
1 parent 449b6a4 commit 1380a2a
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 23 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafka-exporter-key-by-metric-resources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add an ability to publish kafka messages with message key based on metric resource attributes - it will allow partitioning metrics in Kafka.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29433, 30666]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
1 change: 1 addition & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The following settings can be optionally configured:
- The following encodings are valid *only* for **logs**.
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Config struct {
// trace ID as the message key by default.
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`

PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`
Expand Down
23 changes: 13 additions & 10 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -109,11 +110,12 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -165,6 +167,7 @@ func TestLoadConfig(t *testing.T) {
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
ResolveCanonicalBootstrapServersOnly: true,
Expand Down
6 changes: 6 additions & 0 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
if config.PartitionMetricsByResourceAttributes {
if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok {
keyableMarshaler.Key()
}
}

producer, err := newSaramaProducer(config)
if err != nil {
return nil, err
Expand Down
44 changes: 44 additions & 0 deletions exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kafkaexporter
import (
"encoding/json"
"fmt"
"go.opentelemetry.io/collector/pdata/pmetric"
"testing"
"time"

Expand Down Expand Up @@ -71,6 +72,49 @@ func TestDefaultLogsMarshalers(t *testing.T) {
}
}

func TestOTLPMetricsJsonMarshaling(t *testing.T) {
now := time.Unix(1, 0)

metric := pmetric.NewMetrics()
r := pcommon.NewResource()
r.Attributes().PutStr("service.name", "my_service_name")
r.Attributes().PutStr("service.instance.id", "kek_x_1")
r.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource())

rm := metric.ResourceMetrics().At(0)
rm.SetSchemaUrl(conventions.SchemaURL)

sm := rm.ScopeMetrics().AppendEmpty()
pmetric.NewScopeMetrics()
m := sm.Metrics().AppendEmpty()
m.SetEmptyGauge()
m.Gauge().DataPoints().AppendEmpty().SetStartTimestamp(pcommon.NewTimestampFromTime(now))
m.Gauge().DataPoints().At(0).Attributes().PutStr("gauage_attribute", "attr")
m.Gauge().DataPoints().At(0).SetDoubleValue(1.0)

r1 := pcommon.NewResource()
r1.Attributes().PutStr("service.name", "my_service_name")
r1.Attributes().PutStr("service.instance.id", "kek_x_2")
r1.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource())

standardMarshaler := metricsMarshalers()["otlp_json"]
msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX")
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msgs, 1, "Expected number of messages in the message")
require.Equal(t, nil, msgs[0].Key)

keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler)
require.True(t, ok, "Must be a KeyableMetricsMarshaler")
keyableMarshaler.Key()

msgs, err = keyableMarshaler.Marshal(metric, "KafkaTopicX")
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msgs, 2, "Expected number of messages in the message")

require.Equal(t, sarama.ByteEncoder("90e74a8334a89993bd3f6ad05f9ca02438032a78d4399fb6fecf6c94fcdb13ef"), msgs[0].Key)
require.Equal(t, sarama.ByteEncoder("55e1113a2eace57b91ef58911d811c28e936365f03ac068e8ce23090d9ea748f"), msgs[1].Key)
}

func TestOTLPTracesJsonMarshaling(t *testing.T) {
t.Parallel()

Expand Down
62 changes: 49 additions & 13 deletions exporter/kafkaexporter/pdata_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"github.com/IBM/sarama"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/resourceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
)

type pdataLogsMarshaler struct {
Expand Down Expand Up @@ -42,36 +42,72 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha
}
}

// KeyableMetricsMarshaler is an extension of the MetricsMarshaler interface intended to provide partition key capabilities
// for metrics messages
type KeyableMetricsMarshaler interface {
MetricsMarshaler
Key()
}

type pdataMetricsMarshaler struct {
marshaler pmetric.Marshaler
encoding string
keyed bool
}

// Key configures the pdataMetricsMarshaler to set the message key on the kafka messages
func (p *pdataMetricsMarshaler) Key() {
p.keyed = true
}

func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) {
bts, err := p.marshaler.MarshalMetrics(ld)
if err != nil {
return nil, err
}
return []*sarama.ProducerMessage{
{
var msgs []*sarama.ProducerMessage
if p.keyed {
metrics := ld.ResourceMetrics()

for i := 0; i < metrics.Len(); i++ {
resourceMetrics := metrics.At(i)
hash := resourceutil.CalculateResourceAttributesHash(resourceMetrics.Resource())

newMetrics := pmetric.NewMetrics()
resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty())

bts, err := p.marshaler.MarshalMetrics(newMetrics)
if err != nil {
return nil, err
}
msgs = append(msgs, &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
Key: sarama.ByteEncoder(hash),
})
}
} else {
bts, err := p.marshaler.MarshalMetrics(ld)
if err != nil {
return nil, err
}
msgs = append(msgs, &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
},
}, nil
})
}

return msgs, nil
}

func (p pdataMetricsMarshaler) Encoding() string {
return p.encoding
}

func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string) MetricsMarshaler {
return pdataMetricsMarshaler{
return &pdataMetricsMarshaler{
marshaler: marshaler,
encoding: encoding,
}
}

// KeyableTracesMarshaler is an extension of the TracesMarshaler interface inteded to provide partition key capabilities
// KeyableTracesMarshaler is an extension of the TracesMarshaler interface intended to provide partition key capabilities
// for trace messages
type KeyableTracesMarshaler interface {
TracesMarshaler
Expand Down
1 change: 1 addition & 0 deletions exporter/kafkaexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ kafka:
required_acks: -1 # WaitForAll
timeout: 10s
partition_traces_by_id: true
partition_metrics_by_resource_attributes: true
auth:
plain_text:
username: jdoe
Expand Down
32 changes: 32 additions & 0 deletions internal/coreinternal/resourceutil/resourceutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package resourceutil

import (
"crypto/sha256"
"encoding/hex"
"go.opentelemetry.io/collector/pdata/pcommon"
"sort"
)

type keyValueLabelPair struct {
Key string
Value string
}

func CalculateResourceAttributesHash(resourceMetrics pcommon.Resource) string {
var pairs []keyValueLabelPair
resourceMetrics.Attributes().Range(func(k string, v pcommon.Value) bool {
pairs = append(pairs, keyValueLabelPair{Key: k, Value: v.AsString()})
return true
})

sort.SliceStable(pairs, func(i, j int) bool {
return pairs[i].Key < pairs[j].Key
})

h := sha256.New()
for _, pair := range pairs {
h.Write([]byte(pair.Key))
h.Write([]byte(pair.Value))
}
return hex.EncodeToString(h.Sum(nil))
}
47 changes: 47 additions & 0 deletions internal/coreinternal/resourceutil/resourceutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package resourceutil

import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"testing"
)

func TestHashEmptyResource(t *testing.T) {
r := pcommon.NewResource()

assert.EqualValues(t, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", CalculateResourceAttributesHash(r))
}

func TestHashSimpleResource(t *testing.T) {
r := pcommon.NewResource()
r.Attributes().PutStr("k1", "v1")
r.Attributes().PutStr("k2", "v2")

assert.EqualValues(t, "3590bbad8f8a328dbbd5d01c35d8a5fab92c3588cf7e468e995c31d45a51cbef", CalculateResourceAttributesHash(r))
}

func TestHashReorderedAttributes(t *testing.T) {
r1 := pcommon.NewResource()
r1.Attributes().PutStr("k1", "v1")
r1.Attributes().PutStr("k2", "v2")

r2 := pcommon.NewResource()
r2.Attributes().PutStr("k2", "v2")
r2.Attributes().PutStr("k1", "v1")

assert.EqualValues(t, CalculateResourceAttributesHash(r1), CalculateResourceAttributesHash(r2))
}

func TestHashDifferentAttributeValues(t *testing.T) {
r := pcommon.NewResource()
r.Attributes().PutBool("k1", false)
r.Attributes().PutDouble("k2", 1.0)
r.Attributes().PutEmpty("k3")
r.Attributes().PutEmptyBytes("k4")
r.Attributes().PutEmptyMap("k5")
r.Attributes().PutEmptySlice("k6")
r.Attributes().PutInt("k7", 1)
r.Attributes().PutStr("k8", "v8")

assert.EqualValues(t, "46852adab1751045942d67dace7c88665ec0e68b7f4b81a33bb05e5b954a8e57", CalculateResourceAttributesHash(r))
}

0 comments on commit 1380a2a

Please sign in to comment.