Skip to content

Commit

Permalink
Add tests for OTEL ingester (#1252)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Earls <kearls@redhat.com>
  • Loading branch information
kevinearls authored Oct 13, 2020
1 parent 1cedc50 commit 5271b85
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 19 deletions.
3 changes: 2 additions & 1 deletion .ci/run-e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ then
make e2e-tests-streaming
elif [ "${TEST_GROUP}" = "streaming-otel" ]
then
echo "Running Streaming Tests with OTEL collector"
echo "Running Streaming Tests with OTEL collector and ingester"
export USE_OTEL_COLLECTOR=true
export USE_OTEL_INGESTER=true
make e2e-tests-streaming
elif [ "${TEST_GROUP}" = "examples1" ]
then
Expand Down
58 changes: 40 additions & 18 deletions test/e2e/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (suite *StreamingTestSuite) TestStreaming() {
waitForKafkaInstance()

jaegerInstanceName := "simple-streaming"
j := jaegerStreamingDefinition(namespace, jaegerInstanceName, testOtelCollector)
j := jaegerStreamingDefinition(namespace, jaegerInstanceName, testOtelCollector, testOtelIngester)
log.Infof("passing %v", j)
err := fw.Client.Create(context.TODO(), j, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
require.NoError(t, err, "Error deploying jaeger")
Expand All @@ -78,8 +78,9 @@ func (suite *StreamingTestSuite) TestStreaming() {

ProductionSmokeTest(jaegerInstanceName)

// Make sure we were using the correct collector image
// Make sure we were using the correct collector and ingester images
verifyCollectorImage(jaegerInstanceName, namespace, testOtelCollector)
verifyIngesterImage(jaegerInstanceName, namespace, testOtelIngester)
}

func (suite *StreamingTestSuite) TestStreamingWithTLS() {
Expand All @@ -105,7 +106,7 @@ func (suite *StreamingTestSuite) TestStreamingWithTLS() {

// Now create a jaeger instance with TLS enabled -- note it has to be deployed in the same namespace as the kafka instance
jaegerInstanceName := "tls-streaming"
jaegerInstance := jaegerStreamingDefinitionWithTLS(kafkaNamespace, jaegerInstanceName, kafkaUserName, testOtelCollector)
jaegerInstance := jaegerStreamingDefinitionWithTLS(kafkaNamespace, jaegerInstanceName, kafkaUserName, testOtelCollector, testOtelIngester)
err = fw.Client.Create(context.TODO(), jaegerInstance, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
require.NoError(t, err, "Error deploying jaeger")
defer undeployJaegerInstance(jaegerInstance)
Expand All @@ -121,8 +122,9 @@ func (suite *StreamingTestSuite) TestStreamingWithTLS() {

ProductionSmokeTestWithNamespace(jaegerInstanceName, kafkaNamespace)

// Make sure we were using the correct collector image
// Make sure we were using the correct collector and ingester images
verifyCollectorImage(jaegerInstanceName, kafkaNamespace, testOtelCollector)
verifyIngesterImage(jaegerInstanceName, kafkaNamespace, testOtelIngester)
}

func (suite *StreamingTestSuite) TestStreamingWithAutoProvisioning() {
Expand All @@ -132,7 +134,7 @@ func (suite *StreamingTestSuite) TestStreamingWithAutoProvisioning() {
// Now create a jaeger instance which will auto provision a kafka instance
jaegerInstanceName := "auto-provisioned"
jaegerInstanceNamespace := namespace
jaegerInstance := jaegerAutoProvisionedDefinition(jaegerInstanceNamespace, jaegerInstanceName, testOtelCollector)
jaegerInstance := jaegerAutoProvisionedDefinition(jaegerInstanceNamespace, jaegerInstanceName, testOtelCollector, testOtelIngester)
err := fw.Client.Create(context.TODO(), jaegerInstance, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
require.NoError(t, err, "Error deploying jaeger")
defer undeployJaegerInstance(jaegerInstance)
Expand All @@ -152,11 +154,12 @@ func (suite *StreamingTestSuite) TestStreamingWithAutoProvisioning() {

ProductionSmokeTestWithNamespace(jaegerInstanceName, jaegerInstanceNamespace)

// Make sure we were using the correct collector image
// Make sure we were using the correct collector and ingester images
verifyCollectorImage(jaegerInstanceName, namespace, testOtelCollector)
verifyIngesterImage(jaegerInstanceName, namespace, testOtelIngester)
}

func jaegerStreamingDefinition(namespace string, name string, useOtelCollector bool) *v1.Jaeger {
func jaegerStreamingDefinition(namespace string, name string, useOtelCollector, useOtelIngester bool) *v1.Jaeger {
kafkaClusterURL := fmt.Sprintf("my-cluster-kafka-brokers.%s:9092", kafkaNamespace)
ingressEnabled := true
collectorOptions := make(map[string]interface{})
Expand Down Expand Up @@ -207,15 +210,32 @@ func jaegerStreamingDefinition(namespace string, name string, useOtelCollector b
j.Spec.Collector.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14269"))
}

if useOtelIngester {
log.Infof("Using OTEL ingester for %s", name)
j.Spec.Ingester.Image = otelIngesterImage
j.Spec.Ingester.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14270"))
}

return j
}

func jaegerStreamingDefinitionWithTLS(namespace string, name, kafkaUserName string, useOtelCollector bool) *v1.Jaeger {
func jaegerStreamingDefinitionWithTLS(namespace string, name, kafkaUserName string, useOtelCollector, useOtelIngester bool) *v1.Jaeger {
volumes := getTLSVolumes(kafkaUserName)
volumeMounts := getTLSVolumeMounts()
ingressEnabled := true

kafkaClusterURL := fmt.Sprintf("my-cluster-kafka-bootstrap.%s.svc.cluster.local:9093", kafkaNamespace)
ingesterOptions := make(map[string]interface{})
ingesterOptions["kafka.consumer.authentication"] = "tls"
ingesterOptions["kafka.consumer.topic"] = "jaeger-spans"
ingesterOptions["kafka.consumer.brokers"] = kafkaClusterURL
ingesterOptions["kafka.consumer.tls.ca"] = "/var/run/secrets/cluster-ca/ca.crt"
ingesterOptions["kafka.consumer.tls.cert"] = "/var/run/secrets/kafkauser/user.crt"
ingesterOptions["kafka.consumer.tls.key"] = "/var/run/secrets/kafkauser/user.key"
if !useOtelIngester {
ingesterOptions["ingester.deadlockInterval"] = 0
}

j := &v1.Jaeger{
TypeMeta: metav1.TypeMeta{
Kind: "Jaeger",
Expand All @@ -242,15 +262,7 @@ func jaegerStreamingDefinitionWithTLS(namespace string, name, kafkaUserName stri
}),
},
Ingester: v1.JaegerIngesterSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.consumer.authentication": "tls",
"kafka.consumer.topic": "jaeger-spans",
"kafka.consumer.brokers": kafkaClusterURL,
"kafka.consumer.tls.ca": "/var/run/secrets/cluster-ca/ca.crt",
"kafka.consumer.tls.cert": "/var/run/secrets/kafkauser/user.crt",
"kafka.consumer.tls.key": "/var/run/secrets/kafkauser/user.key",
"ingester.deadlockInterval": 0,
}),
Options: v1.NewOptions(ingesterOptions),
},
Storage: v1.JaegerStorageSpec{
Type: "elasticsearch",
Expand All @@ -270,11 +282,16 @@ func jaegerStreamingDefinitionWithTLS(namespace string, name, kafkaUserName stri
j.Spec.Collector.Image = otelCollectorImage
j.Spec.Collector.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14269"))
}
if useOtelIngester {
log.Infof("Using OTEL ingester for %s", name)
j.Spec.Ingester.Image = otelIngesterImage
j.Spec.Ingester.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14270"))
}

return j
}

func jaegerAutoProvisionedDefinition(namespace string, name string, useOtelCollector bool) *v1.Jaeger {
func jaegerAutoProvisionedDefinition(namespace string, name string, useOtelCollector, useOtelIngester bool) *v1.Jaeger {
ingressEnabled := true
jaegerInstance := &v1.Jaeger{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -305,6 +322,11 @@ func jaegerAutoProvisionedDefinition(namespace string, name string, useOtelColle
jaegerInstance.Spec.Collector.Image = otelCollectorImage
jaegerInstance.Spec.Collector.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14269"))
}
if useOtelIngester {
log.Infof("Using OTEL ingester for %s", name)
jaegerInstance.Spec.Ingester.Image = otelIngesterImage
jaegerInstance.Spec.Ingester.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14270"))
}

return jaegerInstance
}
Expand Down
22 changes: 22 additions & 0 deletions test/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
saveLogs = getBoolEnv("SAVE_LOGS", false)
skipCassandraTests = getBoolEnv("SKIP_CASSANDRA_TESTS", false)
testOtelCollector = getBoolEnv("USE_OTEL_COLLECTOR", false)
testOtelIngester = getBoolEnv("USE_OTEL_INGESTER", false)
testOtelAgent = getBoolEnv("USE_OTEL_AGENT", false)
testOtelAllInOne = getBoolEnv("USE_OTEL_ALL_IN_ONE", false)

Expand All @@ -56,6 +57,7 @@ var (
cassandraKeyspace = "jaeger_v1_datacenter1"
cassandraDatacenter = "datacenter1"
otelCollectorImage = "jaegertracing/jaeger-opentelemetry-collector:latest"
otelIngesterImage = "jaegertracing/jaeger-opentelemetry-ingester:latest"
otelAgentImage = "jaegertracing/jaeger-opentelemetry-agent:latest"
otelAllInOneImage = "jaegertracing/opentelemetry-all-in-one:latest"

Expand Down Expand Up @@ -564,6 +566,26 @@ func deletePersistentVolumeClaims(namespace string) {
}
}

func verifyIngesterImage(jaegerInstanceName, namespace string, expected bool) {
require.Equal(t, expected, wasUsingOtelIngester(jaegerInstanceName, namespace))
}

// Was this Jaeger Instance using the OTEL ingester?
func wasUsingOtelIngester(jaegerInstanceName, namespace string) bool {
deployment, err := fw.KubeClient.AppsV1().Deployments(namespace).Get(context.Background(), jaegerInstanceName+"-ingester", metav1.GetOptions{})
require.NoError(t, err)
containers := deployment.Spec.Template.Spec.Containers
for _, container := range containers {
if container.Name == "jaeger-ingester" {
logrus.Infof("Test %s is using image %s", t.Name(), container.Image)
return strings.Contains(container.Image, "jaeger-opentelemetry-ingester")
}
}

require.Failf(t, "Did not find a collector image for %s in namespace %s", jaegerInstanceName, namespace)
return false
}

func verifyCollectorImage(jaegerInstanceName, namespace string, expected bool) {
require.Equal(t, expected, wasUsingOtelCollector(jaegerInstanceName, namespace))
}
Expand Down

0 comments on commit 5271b85

Please sign in to comment.