Skip to content

Commit

Permalink
Change how Kafka is configured for collector and ingester (#390)
Browse files Browse the repository at this point in the history
* Change how Kafka is configured for collector and ingester

Signed-off-by: Gary Brown <gary@brownuk.com>

* Fix test and change NOTE to TIP

Signed-off-by: Gary Brown <gary@brownuk.com>
  • Loading branch information
objectiser authored May 3, 2019
1 parent 945569d commit fc63d2b
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 21 deletions.
13 changes: 11 additions & 2 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,18 @@ metadata:
name: simple-streaming
spec:
strategy: streaming
collector:
options:
kafka: # <1>
producer:
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
ingester:
options:
kafka: # <1>
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
consumer:
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
ingester:
deadlockInterval: 0 # <2>
storage:
Expand All @@ -262,6 +269,8 @@ spec:
<1> Identifies the kafka configuration used by the collector, to produce the messages, and the ingester to consume the messages
<2> The deadlock interval can be disabled to avoid the ingester being terminated when no messages arrive within the default 1 minute period

TIP: A Kafka environment can be configured using link:https://strimzi.io/[Strimzi's Kafka operator].

== Elasticsearch storage

Under some circumstances, the Jaeger Operator can make use of the link:https://github.com/openshift/elasticsearch-operator[Elasticsearch Operator] to provision a suitable Elasticsearch cluster.
Expand Down
11 changes: 9 additions & 2 deletions deploy/examples/simple-streaming.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ metadata:
name: simple-streaming
spec:
strategy: streaming
collector:
options:
kafka:
producer:
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
ingester:
options:
kafka:
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
consumer:
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
ingester:
deadlockInterval: 0
storage:
Expand Down
3 changes: 1 addition & 2 deletions pkg/deployment/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ func (c *Collector) Get() *appsv1.Deployment {
storageType = "kafka"
}
options := allArgs(c.jaeger.Spec.Collector.Options,
c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(storageType)),
c.jaeger.Spec.Ingester.Options.Filter(storage.OptionsPrefix(storageType)))
c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(storageType)))

sampling.Update(c.jaeger, commonSpec, &options)

Expand Down
14 changes: 7 additions & 7 deletions pkg/deployment/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,23 +350,23 @@ func TestCollectorWithDirectStorageType(t *testing.T) {
assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[0])
}

func TestCollectorWithIngesterStorageType(t *testing.T) {
func TestCollectorWithKafkaStorageType(t *testing.T) {
jaeger := &v1.Jaeger{
ObjectMeta: metav1.ObjectMeta{
Name: "TestCollectorWithIngesterStorageType",
},
Spec: v1.JaegerSpec{
Strategy: "streaming",
Ingester: v1.JaegerIngesterSpec{
Collector: v1.JaegerCollectorSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.topic": "mytopic",
"kafka.producer.topic": "mytopic",
}),
},
Storage: v1.JaegerStorageSpec{
Type: "elasticsearch",
Options: v1.NewOptions(map[string]interface{}{
"kafka.brokers": "http://brokers",
"es.server-urls": "http://somewhere",
"kafka.producer.brokers": "http://brokers",
"es.server-urls": "http://somewhere",
}),
},
},
Expand All @@ -386,8 +386,8 @@ func TestCollectorWithIngesterStorageType(t *testing.T) {
}
assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env)
assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3)
assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0])
assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[1])
assert.Equal(t, "--kafka.producer.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0])
assert.Equal(t, "--kafka.producer.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[1])
}

func TestCollectorWithIngesterNoOptionsStorageType(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/deployment/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func (i *Ingester) Get() *appsv1.Deployment {
}

options := allArgs(i.jaeger.Spec.Ingester.Options,
i.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(i.jaeger.Spec.Storage.Type)),
i.jaeger.Spec.Storage.Options.Filter("kafka"))
i.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(i.jaeger.Spec.Storage.Type)))

// ensure we have a consistent order of the arguments
// see https://github.com/jaegertracing/jaeger-operator/issues/334
Expand Down
8 changes: 4 additions & 4 deletions pkg/deployment/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ func TestIngesterWithStorageType(t *testing.T) {
Strategy: "streaming",
Ingester: v1.JaegerIngesterSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.topic": "mytopic",
"kafka.consumer.topic": "mytopic",
"kafka.consumer.brokers": "http://brokers",
}),
},
Storage: v1.JaegerStorageSpec{
Type: "elasticsearch",
Options: v1.NewOptions(map[string]interface{}{
"kafka.brokers": "http://brokers",
"es.server-urls": "http://somewhere",
}),
},
Expand All @@ -336,8 +336,8 @@ func TestIngesterWithStorageType(t *testing.T) {
assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env)
assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3)
assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[0])
assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[1])
assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[2])
assert.Equal(t, "--kafka.consumer.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[1])
assert.Equal(t, "--kafka.consumer.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[2])
}

func TestIngesterLabels(t *testing.T) {
Expand Down
10 changes: 8 additions & 2 deletions pkg/strategy/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,15 @@ func TestStreamingOptionsArePassed(t *testing.T) {
},
Spec: v1.JaegerSpec{
Strategy: "streaming",
Collector: v1.JaegerCollectorSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.producer.topic": "mytopic",
}),
},
Ingester: v1.JaegerIngesterSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.topic": "mytopic",
"kafka.consumer.topic": "mytopic",
"kafka.consumer.group-id": "mygroup",
}),
},
Storage: v1.JaegerStorageSpec{
Expand Down Expand Up @@ -107,7 +113,7 @@ func TestStreamingOptionsArePassed(t *testing.T) {
assert.Equal(t, 0, escount)
} else if strings.Contains(dep.Name, "ingester") {
// Including parameters for ES and kafka topic
assert.Len(t, args, 4)
assert.Len(t, args, 5)
assert.Equal(t, 3, escount)

} else {
Expand Down

0 comments on commit fc63d2b

Please sign in to comment.