Skip to content

Commit a58b6d1

Browse files
jbguerrazJean-Baptiste Guerraz
and
Jean-Baptiste Guerraz
authored
feat: implement templated topic (#43)
Co-authored-by: Jean-Baptiste Guerraz <jbguerraz@skilld.cloud>
1 parent 96a7c89 commit a58b6d1

File tree

6 files changed

+86
-34
lines changed

6 files changed

+86
-34
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ There is a docker image `telefonica/prometheus-kafka-adapter:1.6.0` [available o
3939
Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends them to Kafka. This behaviour can be configured with the following environment variables:
4040

4141
- `KAFKA_BROKER_LIST`: defines kafka endpoint and port, defaults to `kafka:9092`.
42-
- `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`.
42+
- `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`. Could use go template, labels are passed (as a map) to the template: e.g: `metrics.{{ index . "__name__" }}` to use per-metric topic. Two template functions are available: replace (`{{ index . "__name__" | replace "message" "msg" }}`) and substring (`{{ index . "__name__" | substring 0 5 }}`)
4343
- `KAFKA_COMPRESSION`: defines the compression type to be used, defaults to `none`.
4444
- `KAFKA_BATCH_NUM_MESSAGES`: defines the number of messages to batch write, defaults to `10000`.
4545
- `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, defaults to `json`.

config.go

+34-15
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,19 @@ package main
1616

1717
import (
1818
"os"
19+
"strings"
20+
"text/template"
1921

20-
"github.com/confluentinc/confluent-kafka-go/kafka"
2122
"github.com/sirupsen/logrus"
2223
)
2324

2425
var (
25-
kafkaBrokerList = "kafka:9092"
26-
kafkaTopic = "metrics"
27-
basicauth = false
28-
basicauthUsername = ""
29-
basicauthPassword = ""
30-
kafkaPartition = kafka.TopicPartition{
31-
Topic: &kafkaTopic,
32-
Partition: kafka.PartitionAny,
33-
}
26+
kafkaBrokerList = "kafka:9092"
27+
kafkaTopic = "metrics"
28+
topicTemplate *template.Template
29+
basicauth = false
30+
basicauthUsername = ""
31+
basicauthPassword = ""
3432
kafkaCompression = "none"
3533
kafkaBatchNumMessages = "10000"
3634
kafkaSslClientCertFile = ""
@@ -55,11 +53,6 @@ func init() {
5553

5654
if value := os.Getenv("KAFKA_TOPIC"); value != "" {
5755
kafkaTopic = value
58-
59-
kafkaPartition = kafka.TopicPartition{
60-
Topic: &kafkaTopic,
61-
Partition: kafka.PartitionAny,
62-
}
6356
}
6457

6558
if value := os.Getenv("BASIC_AUTH_USERNAME"); value != "" {
@@ -100,6 +93,11 @@ func init() {
10093
if err != nil {
10194
logrus.WithError(err).Fatalln("couldn't create a metrics serializer")
10295
}
96+
97+
topicTemplate, err = parseTopicTemplate(kafkaTopic)
98+
if err != nil {
99+
logrus.WithError(err).Fatalln("couldn't parse the topic template")
100+
}
103101
}
104102

105103
func parseLogLevel(value string) logrus.Level {
@@ -124,3 +122,24 @@ func parseSerializationFormat(value string) (Serializer, error) {
124122
return NewJSONSerializer()
125123
}
126124
}
125+
126+
func parseTopicTemplate(tpl string) (*template.Template, error) {
127+
funcMap := template.FuncMap{
128+
"replace": func(old, new, src string) string {
129+
return strings.Replace(src, old, new, -1)
130+
},
131+
"substring": func(start, end int, s string) string {
132+
if start < 0 {
133+
start = 0
134+
}
135+
if end < 0 || end > len(s) {
136+
end = len(s)
137+
}
138+
if start >= end {
139+
panic("template function - substring: start is bigger (or equal) than end. That will produce an empty string.")
140+
}
141+
return s[start:end]
142+
},
143+
}
144+
return template.New("topic").Funcs(funcMap).Parse(tpl)
145+
}

handlers.go

+17-10
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,30 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
5454
return
5555
}
5656

57-
metrics, err := processWriteRequest(&req)
57+
metricsPerTopic, err := processWriteRequest(&req)
5858
if err != nil {
5959
c.AbortWithStatus(http.StatusInternalServerError)
6060
logrus.WithError(err).Error("couldn't process write request")
6161
return
6262
}
6363

64-
for _, metric := range metrics {
65-
err := producer.Produce(&kafka.Message{
66-
TopicPartition: kafkaPartition,
67-
Value: metric,
68-
}, nil)
64+
for topic, metrics := range metricsPerTopic {
65+
t := topic
66+
part := kafka.TopicPartition{
67+
Partition: kafka.PartitionAny,
68+
Topic: &t,
69+
}
70+
for _, metric := range metrics {
71+
err := producer.Produce(&kafka.Message{
72+
TopicPartition: part,
73+
Value: metric,
74+
}, nil)
6975

70-
if err != nil {
71-
c.AbortWithStatus(http.StatusInternalServerError)
72-
logrus.WithError(err).Error("couldn't produce message in kafka")
73-
return
76+
if err != nil {
77+
c.AbortWithStatus(http.StatusInternalServerError)
78+
logrus.WithError(err).Error("couldn't produce message in kafka")
79+
return
80+
}
7481
}
7582
}
7683

prometheus.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/sirupsen/logrus"
2020
)
2121

22-
func processWriteRequest(req *prompb.WriteRequest) ([][]byte, error) {
22+
func processWriteRequest(req *prompb.WriteRequest) (map[string][][]byte, error) {
2323
logrus.WithField("var", req).Debugln()
2424
return Serialize(serializer, req)
2525
}

serializers.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package main
1616

1717
import (
18+
"bytes"
1819
"encoding/json"
1920
"io/ioutil"
2021
"strconv"
@@ -33,8 +34,8 @@ type Serializer interface {
3334
}
3435

3536
// Serialize generates the JSON representation for a given Prometheus metric.
36-
func Serialize(s Serializer, req *prompb.WriteRequest) ([][]byte, error) {
37-
result := [][]byte{}
37+
func Serialize(s Serializer, req *prompb.WriteRequest) (map[string][][]byte, error) {
38+
result := make(map[string][][]byte)
3839

3940
for _, ts := range req.Timeseries {
4041
labels := make(map[string]string, len(ts.Labels))
@@ -43,6 +44,8 @@ func Serialize(s Serializer, req *prompb.WriteRequest) ([][]byte, error) {
4344
labels[string(model.LabelName(l.Name))] = string(model.LabelValue(l.Value))
4445
}
4546

47+
t := topic(labels)
48+
4649
for _, sample := range ts.Samples {
4750
epoch := time.Unix(sample.Timestamp/1000, 0).UTC()
4851

@@ -58,7 +61,7 @@ func Serialize(s Serializer, req *prompb.WriteRequest) ([][]byte, error) {
5861
logrus.WithError(err).Errorln("couldn't marshal timeseries")
5962
}
6063

61-
result = append(result, data)
64+
result[t] = append(result[t], data)
6265
}
6366
}
6467

@@ -104,3 +107,11 @@ func NewAvroJSONSerializer(schemaPath string) (*AvroJSONSerializer, error) {
104107
codec: codec,
105108
}, nil
106109
}
110+
111+
func topic(labels map[string]string) string {
112+
var buf bytes.Buffer
113+
if err := topicTemplate.Execute(&buf, labels); err != nil {
114+
return ""
115+
}
116+
return buf.String()
117+
}

serializers_test.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,15 @@ func TestSerializeToJSON(t *testing.T) {
4242

4343
writeRequest := NewWriteRequest()
4444
output, err := Serialize(serializer, writeRequest)
45-
assert.Len(t, output, 2)
45+
assert.Len(t, output["metrics"], 2)
4646
assert.Nil(t, err)
4747

4848
expectedSamples := []string{
4949
"{\"value\":\"456\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}",
5050
"{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}",
5151
}
5252

53-
for i, metric := range output {
53+
for i, metric := range output["metrics"] {
5454
assert.JSONEqf(t, expectedSamples[i], string(metric[:]), "wrong json serialization found")
5555
}
5656
}
@@ -72,19 +72,34 @@ func TestSerializeToAvro(t *testing.T) {
7272

7373
writeRequest := NewWriteRequest()
7474
output, err := Serialize(serializer, writeRequest)
75-
assert.Len(t, output, 2)
75+
assert.Len(t, output["metrics"], 2)
7676
assert.Nil(t, err)
7777

7878
expectedSamples := []string{
7979
"{\"value\":\"456\",\"timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}",
8080
"{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}",
8181
}
8282

83-
for i, metric := range output {
83+
for i, metric := range output["metrics"] {
8484
assert.JSONEqf(t, expectedSamples[i], string(metric[:]), "wrong json serialization found")
8585
}
8686
}
8787

88+
func TestTemplatedTopic(t *testing.T) {
89+
var err error
90+
topicTemplate, err = parseTopicTemplate("{{ index . \"labelfoo\" | replace \"bar\" \"foo\" | substring 6 -1 }}")
91+
assert.Nil(t, err)
92+
serializer, err := NewJSONSerializer()
93+
assert.Nil(t, err)
94+
95+
writeRequest := NewWriteRequest()
96+
output, err := Serialize(serializer, writeRequest)
97+
98+
for k, _ := range output {
99+
assert.Equal(t, "foo", k, "templated topic failed")
100+
}
101+
}
102+
88103
func BenchmarkSerializeToAvroJSON(b *testing.B) {
89104
serializer, _ := NewAvroJSONSerializer("schemas/metric.avsc")
90105
writeRequest := NewWriteRequest()

0 commit comments

Comments
 (0)