From fc63d2b6513305dd9d28336b1a22e00c07651f55 Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Fri, 3 May 2019 14:08:16 +0100 Subject: [PATCH] Change how Kafka is configured for collector and ingester (#390) * Change how Kafka is configured for collector and ingester Signed-off-by: Gary Brown * Fix test and change NOTE to TIP Signed-off-by: Gary Brown --- README.adoc | 13 +++++++++++-- deploy/examples/simple-streaming.yaml | 11 +++++++++-- pkg/deployment/collector.go | 3 +-- pkg/deployment/collector_test.go | 14 +++++++------- pkg/deployment/ingester.go | 3 +-- pkg/deployment/ingester_test.go | 8 ++++---- pkg/strategy/streaming_test.go | 10 ++++++++-- 7 files changed, 41 insertions(+), 21 deletions(-) diff --git a/README.adoc b/README.adoc index ba0844ca6..41ee42ca8 100644 --- a/README.adoc +++ b/README.adoc @@ -246,11 +246,18 @@ metadata: name: simple-streaming spec: strategy: streaming + collector: + options: + kafka: # <1> + producer: + topic: jaeger-spans + brokers: my-cluster-kafka-brokers.kafka:9092 ingester: options: kafka: # <1> - topic: jaeger-spans - brokers: my-cluster-kafka-brokers.kafka:9092 + consumer: + topic: jaeger-spans + brokers: my-cluster-kafka-brokers.kafka:9092 ingester: deadlockInterval: 0 # <2> storage: @@ -262,6 +269,8 @@ spec: <1> Identifies the kafka configuration used by the collector, to produce the messages, and the ingester to consume the messages <2> The deadlock interval can be disabled to avoid the ingester being terminated when no messages arrive within the default 1 minute period +TIP: A Kafka environment can be configured using link:https://strimzi.io/[Strimzi's Kafka operator]. + == Elasticsearch storage Under some circumstances, the Jaeger Operator can make use of the link:https://github.com/openshift/elasticsearch-operator[Elasticsearch Operator] to provision a suitable Elasticsearch cluster. diff --git a/deploy/examples/simple-streaming.yaml b/deploy/examples/simple-streaming.yaml index f6b5b31df..52d3b45d2 100644 --- a/deploy/examples/simple-streaming.yaml +++ b/deploy/examples/simple-streaming.yaml @@ -6,11 +6,18 @@ metadata: name: simple-streaming spec: strategy: streaming + collector: + options: + kafka: + producer: + topic: jaeger-spans + brokers: my-cluster-kafka-brokers.kafka:9092 ingester: options: kafka: - topic: jaeger-spans - brokers: my-cluster-kafka-brokers.kafka:9092 + consumer: + topic: jaeger-spans + brokers: my-cluster-kafka-brokers.kafka:9092 ingester: deadlockInterval: 0 storage: diff --git a/pkg/deployment/collector.go b/pkg/deployment/collector.go index c2397d134..3453c37a5 100644 --- a/pkg/deployment/collector.go +++ b/pkg/deployment/collector.go @@ -78,8 +78,7 @@ func (c *Collector) Get() *appsv1.Deployment { storageType = "kafka" } options := allArgs(c.jaeger.Spec.Collector.Options, - c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(storageType)), - c.jaeger.Spec.Ingester.Options.Filter(storage.OptionsPrefix(storageType))) + c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(storageType))) sampling.Update(c.jaeger, commonSpec, &options) diff --git a/pkg/deployment/collector_test.go b/pkg/deployment/collector_test.go index 84c55cc0a..e1d75d480 100644 --- a/pkg/deployment/collector_test.go +++ b/pkg/deployment/collector_test.go @@ -350,23 +350,23 @@ func TestCollectorWithDirectStorageType(t *testing.T) { assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[0]) } -func TestCollectorWithIngesterStorageType(t *testing.T) { +func TestCollectorWithKafkaStorageType(t *testing.T) { jaeger := &v1.Jaeger{ ObjectMeta: metav1.ObjectMeta{ Name: "TestCollectorWithIngesterStorageType", }, Spec: v1.JaegerSpec{ Strategy: "streaming", - Ingester: v1.JaegerIngesterSpec{ + Collector: v1.JaegerCollectorSpec{ Options: v1.NewOptions(map[string]interface{}{ - "kafka.topic": "mytopic", + "kafka.producer.topic": "mytopic", }), }, Storage: v1.JaegerStorageSpec{ Type: "elasticsearch", Options: v1.NewOptions(map[string]interface{}{ - "kafka.brokers": "http://brokers", - "es.server-urls": "http://somewhere", + "kafka.producer.brokers": "http://brokers", + "es.server-urls": "http://somewhere", }), }, }, @@ -386,8 +386,8 @@ func TestCollectorWithIngesterStorageType(t *testing.T) { } assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3) - assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0]) - assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[1]) + assert.Equal(t, "--kafka.producer.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0]) + assert.Equal(t, "--kafka.producer.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[1]) } func TestCollectorWithIngesterNoOptionsStorageType(t *testing.T) { diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go index f6f03dc6a..fb8aadb1c 100644 --- a/pkg/deployment/ingester.go +++ b/pkg/deployment/ingester.go @@ -74,8 +74,7 @@ func (i *Ingester) Get() *appsv1.Deployment { } options := allArgs(i.jaeger.Spec.Ingester.Options, - i.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(i.jaeger.Spec.Storage.Type)), - i.jaeger.Spec.Storage.Options.Filter("kafka")) + i.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(i.jaeger.Spec.Storage.Type))) // ensure we have a consistent order of the arguments // see https://github.com/jaegertracing/jaeger-operator/issues/334 diff --git a/pkg/deployment/ingester_test.go b/pkg/deployment/ingester_test.go index b6b0924aa..d0164f9bc 100644 --- a/pkg/deployment/ingester_test.go +++ b/pkg/deployment/ingester_test.go @@ -312,13 +312,13 @@ func TestIngesterWithStorageType(t *testing.T) { Strategy: "streaming", Ingester: v1.JaegerIngesterSpec{ Options: v1.NewOptions(map[string]interface{}{ - "kafka.topic": "mytopic", + "kafka.consumer.topic": "mytopic", + "kafka.consumer.brokers": "http://brokers", }), }, Storage: v1.JaegerStorageSpec{ Type: "elasticsearch", Options: v1.NewOptions(map[string]interface{}{ - "kafka.brokers": "http://brokers", "es.server-urls": "http://somewhere", }), }, @@ -336,8 +336,8 @@ func TestIngesterWithStorageType(t *testing.T) { assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3) assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[0]) - assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[1]) - assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[2]) + assert.Equal(t, "--kafka.consumer.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[1]) + assert.Equal(t, "--kafka.consumer.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[2]) } func TestIngesterLabels(t *testing.T) { diff --git a/pkg/strategy/streaming_test.go b/pkg/strategy/streaming_test.go index a7bdf2fb5..6996fe4bd 100644 --- a/pkg/strategy/streaming_test.go +++ b/pkg/strategy/streaming_test.go @@ -74,9 +74,15 @@ func TestStreamingOptionsArePassed(t *testing.T) { }, Spec: v1.JaegerSpec{ Strategy: "streaming", + Collector: v1.JaegerCollectorSpec{ + Options: v1.NewOptions(map[string]interface{}{ + "kafka.producer.topic": "mytopic", + }), + }, Ingester: v1.JaegerIngesterSpec{ Options: v1.NewOptions(map[string]interface{}{ - "kafka.topic": "mytopic", + "kafka.consumer.topic": "mytopic", + "kafka.consumer.group-id": "mygroup", }), }, Storage: v1.JaegerStorageSpec{ @@ -107,7 +113,7 @@ func TestStreamingOptionsArePassed(t *testing.T) { assert.Equal(t, 0, escount) } else if strings.Contains(dep.Name, "ingester") { // Including parameters for ES and kafka topic - assert.Len(t, args, 4) + assert.Len(t, args, 5) assert.Equal(t, 3, escount) } else {