Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change how Kafka is configured for collector and ingester #390

Merged
merged 2 commits into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,18 @@ metadata:
name: simple-streaming
spec:
strategy: streaming
collector:
options:
kafka: # <1>
producer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the strongest argument in favor of strong typing for CLI options, IMO

topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
ingester:
options:
kafka: # <1>
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
consumer:
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
ingester:
deadlockInterval: 0 # <2>
storage:
Expand All @@ -262,6 +269,8 @@ spec:
<1> Identifies the kafka configuration used by the collector, to produce the messages, and the ingester to consume the messages
<2> The deadlock interval can be disabled to avoid the ingester being terminated when no messages arrive within the default 1 minute period

TIP: A Kafka environment can be configured using link:https://strimzi.io/[Strimzi's Kafka operator].

== Elasticsearch storage

Under some circumstances, the Jaeger Operator can make use of the link:https://github.com/openshift/elasticsearch-operator[Elasticsearch Operator] to provision a suitable Elasticsearch cluster.
Expand Down
11 changes: 9 additions & 2 deletions deploy/examples/simple-streaming.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ metadata:
name: simple-streaming
spec:
strategy: streaming
collector:
options:
kafka:
producer:
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
ingester:
options:
kafka:
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
consumer:
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
ingester:
deadlockInterval: 0
storage:
Expand Down
3 changes: 1 addition & 2 deletions pkg/deployment/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ func (c *Collector) Get() *appsv1.Deployment {
storageType = "kafka"
}
options := allArgs(c.jaeger.Spec.Collector.Options,
c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(storageType)),
c.jaeger.Spec.Ingester.Options.Filter(storage.OptionsPrefix(storageType)))
c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(storageType)))

sampling.Update(c.jaeger, commonSpec, &options)

Expand Down
14 changes: 7 additions & 7 deletions pkg/deployment/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,23 +350,23 @@ func TestCollectorWithDirectStorageType(t *testing.T) {
assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[0])
}

func TestCollectorWithIngesterStorageType(t *testing.T) {
func TestCollectorWithKafkaStorageType(t *testing.T) {
jaeger := &v1.Jaeger{
ObjectMeta: metav1.ObjectMeta{
Name: "TestCollectorWithIngesterStorageType",
},
Spec: v1.JaegerSpec{
Strategy: "streaming",
Ingester: v1.JaegerIngesterSpec{
Collector: v1.JaegerCollectorSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.topic": "mytopic",
"kafka.producer.topic": "mytopic",
}),
},
Storage: v1.JaegerStorageSpec{
Type: "elasticsearch",
Options: v1.NewOptions(map[string]interface{}{
"kafka.brokers": "http://brokers",
"es.server-urls": "http://somewhere",
"kafka.producer.brokers": "http://brokers",
"es.server-urls": "http://somewhere",
}),
},
},
Expand All @@ -386,8 +386,8 @@ func TestCollectorWithIngesterStorageType(t *testing.T) {
}
assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env)
assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3)
assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0])
assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[1])
assert.Equal(t, "--kafka.producer.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0])
assert.Equal(t, "--kafka.producer.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[1])
}

func TestCollectorWithIngesterNoOptionsStorageType(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/deployment/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func (i *Ingester) Get() *appsv1.Deployment {
}

options := allArgs(i.jaeger.Spec.Ingester.Options,
i.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(i.jaeger.Spec.Storage.Type)),
i.jaeger.Spec.Storage.Options.Filter("kafka"))
i.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(i.jaeger.Spec.Storage.Type)))

// ensure we have a consistent order of the arguments
// see https://github.com/jaegertracing/jaeger-operator/issues/334
Expand Down
8 changes: 4 additions & 4 deletions pkg/deployment/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ func TestIngesterWithStorageType(t *testing.T) {
Strategy: "streaming",
Ingester: v1.JaegerIngesterSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.topic": "mytopic",
"kafka.consumer.topic": "mytopic",
"kafka.consumer.brokers": "http://brokers",
}),
},
Storage: v1.JaegerStorageSpec{
Type: "elasticsearch",
Options: v1.NewOptions(map[string]interface{}{
"kafka.brokers": "http://brokers",
"es.server-urls": "http://somewhere",
}),
},
Expand All @@ -336,8 +336,8 @@ func TestIngesterWithStorageType(t *testing.T) {
assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env)
assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3)
assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[0])
assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[1])
assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[2])
assert.Equal(t, "--kafka.consumer.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[1])
assert.Equal(t, "--kafka.consumer.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[2])
}

func TestIngesterLabels(t *testing.T) {
Expand Down
10 changes: 8 additions & 2 deletions pkg/strategy/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,15 @@ func TestStreamingOptionsArePassed(t *testing.T) {
},
Spec: v1.JaegerSpec{
Strategy: "streaming",
Collector: v1.JaegerCollectorSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.producer.topic": "mytopic",
}),
},
Ingester: v1.JaegerIngesterSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.topic": "mytopic",
"kafka.consumer.topic": "mytopic",
"kafka.consumer.group-id": "mygroup",
}),
},
Storage: v1.JaegerStorageSpec{
Expand Down Expand Up @@ -107,7 +113,7 @@ func TestStreamingOptionsArePassed(t *testing.T) {
assert.Equal(t, 0, escount)
} else if strings.Contains(dep.Name, "ingester") {
// Including parameters for ES and kafka topic
assert.Len(t, args, 4)
assert.Len(t, args, 5)
assert.Equal(t, 3, escount)

} else {
Expand Down