Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for OTEL ingester #1252

Merged
merged 1 commit into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .ci/run-e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ then
make e2e-tests-streaming
elif [ "${TEST_GROUP}" = "streaming-otel" ]
then
echo "Running Streaming Tests with OTEL collector"
echo "Running Streaming Tests with OTEL collector and ingester"
export USE_OTEL_COLLECTOR=true
export USE_OTEL_INGESTER=true
make e2e-tests-streaming
elif [ "${TEST_GROUP}" = "examples1" ]
then
Expand Down
58 changes: 40 additions & 18 deletions test/e2e/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (suite *StreamingTestSuite) TestStreaming() {
waitForKafkaInstance()

jaegerInstanceName := "simple-streaming"
j := jaegerStreamingDefinition(namespace, jaegerInstanceName, testOtelCollector)
j := jaegerStreamingDefinition(namespace, jaegerInstanceName, testOtelCollector, testOtelIngester)
log.Infof("passing %v", j)
err := fw.Client.Create(context.TODO(), j, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
require.NoError(t, err, "Error deploying jaeger")
Expand All @@ -78,8 +78,9 @@ func (suite *StreamingTestSuite) TestStreaming() {

ProductionSmokeTest(jaegerInstanceName)

// Make sure we were using the correct collector image
// Make sure we were using the correct collector and ingester images
verifyCollectorImage(jaegerInstanceName, namespace, testOtelCollector)
verifyIngesterImage(jaegerInstanceName, namespace, testOtelIngester)
}

func (suite *StreamingTestSuite) TestStreamingWithTLS() {
Expand All @@ -105,7 +106,7 @@ func (suite *StreamingTestSuite) TestStreamingWithTLS() {

// Now create a jaeger instance with TLS enabled -- note it has to be deployed in the same namespace as the kafka instance
jaegerInstanceName := "tls-streaming"
jaegerInstance := jaegerStreamingDefinitionWithTLS(kafkaNamespace, jaegerInstanceName, kafkaUserName, testOtelCollector)
jaegerInstance := jaegerStreamingDefinitionWithTLS(kafkaNamespace, jaegerInstanceName, kafkaUserName, testOtelCollector, testOtelIngester)
err = fw.Client.Create(context.TODO(), jaegerInstance, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
require.NoError(t, err, "Error deploying jaeger")
defer undeployJaegerInstance(jaegerInstance)
Expand All @@ -121,8 +122,9 @@ func (suite *StreamingTestSuite) TestStreamingWithTLS() {

ProductionSmokeTestWithNamespace(jaegerInstanceName, kafkaNamespace)

// Make sure we were using the correct collector image
// Make sure we were using the correct collector and ingester images
verifyCollectorImage(jaegerInstanceName, kafkaNamespace, testOtelCollector)
verifyIngesterImage(jaegerInstanceName, kafkaNamespace, testOtelIngester)
}

func (suite *StreamingTestSuite) TestStreamingWithAutoProvisioning() {
Expand All @@ -132,7 +134,7 @@ func (suite *StreamingTestSuite) TestStreamingWithAutoProvisioning() {
// Now create a jaeger instance which will auto provision a kafka instance
jaegerInstanceName := "auto-provisioned"
jaegerInstanceNamespace := namespace
jaegerInstance := jaegerAutoProvisionedDefinition(jaegerInstanceNamespace, jaegerInstanceName, testOtelCollector)
jaegerInstance := jaegerAutoProvisionedDefinition(jaegerInstanceNamespace, jaegerInstanceName, testOtelCollector, testOtelIngester)
err := fw.Client.Create(context.TODO(), jaegerInstance, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
require.NoError(t, err, "Error deploying jaeger")
defer undeployJaegerInstance(jaegerInstance)
Expand All @@ -152,11 +154,12 @@ func (suite *StreamingTestSuite) TestStreamingWithAutoProvisioning() {

ProductionSmokeTestWithNamespace(jaegerInstanceName, jaegerInstanceNamespace)

// Make sure we were using the correct collector image
// Make sure we were using the correct collector and ingester images
verifyCollectorImage(jaegerInstanceName, namespace, testOtelCollector)
verifyIngesterImage(jaegerInstanceName, namespace, testOtelIngester)
}

func jaegerStreamingDefinition(namespace string, name string, useOtelCollector bool) *v1.Jaeger {
func jaegerStreamingDefinition(namespace string, name string, useOtelCollector, useOtelIngester bool) *v1.Jaeger {
kafkaClusterURL := fmt.Sprintf("my-cluster-kafka-brokers.%s:9092", kafkaNamespace)
ingressEnabled := true
collectorOptions := make(map[string]interface{})
Expand Down Expand Up @@ -207,15 +210,32 @@ func jaegerStreamingDefinition(namespace string, name string, useOtelCollector b
j.Spec.Collector.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14269"))
}

if useOtelIngester {
log.Infof("Using OTEL ingester for %s", name)
j.Spec.Ingester.Image = otelIngesterImage
j.Spec.Ingester.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14270"))
}

return j
}

func jaegerStreamingDefinitionWithTLS(namespace string, name, kafkaUserName string, useOtelCollector bool) *v1.Jaeger {
func jaegerStreamingDefinitionWithTLS(namespace string, name, kafkaUserName string, useOtelCollector, useOtelIngester bool) *v1.Jaeger {
volumes := getTLSVolumes(kafkaUserName)
volumeMounts := getTLSVolumeMounts()
ingressEnabled := true

kafkaClusterURL := fmt.Sprintf("my-cluster-kafka-bootstrap.%s.svc.cluster.local:9093", kafkaNamespace)
ingesterOptions := make(map[string]interface{})
ingesterOptions["kafka.consumer.authentication"] = "tls"
ingesterOptions["kafka.consumer.topic"] = "jaeger-spans"
ingesterOptions["kafka.consumer.brokers"] = kafkaClusterURL
ingesterOptions["kafka.consumer.tls.ca"] = "/var/run/secrets/cluster-ca/ca.crt"
ingesterOptions["kafka.consumer.tls.cert"] = "/var/run/secrets/kafkauser/user.crt"
ingesterOptions["kafka.consumer.tls.key"] = "/var/run/secrets/kafkauser/user.key"
if !useOtelIngester {
ingesterOptions["ingester.deadlockInterval"] = 0
}

j := &v1.Jaeger{
TypeMeta: metav1.TypeMeta{
Kind: "Jaeger",
Expand All @@ -242,15 +262,7 @@ func jaegerStreamingDefinitionWithTLS(namespace string, name, kafkaUserName stri
}),
},
Ingester: v1.JaegerIngesterSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.consumer.authentication": "tls",
"kafka.consumer.topic": "jaeger-spans",
"kafka.consumer.brokers": kafkaClusterURL,
"kafka.consumer.tls.ca": "/var/run/secrets/cluster-ca/ca.crt",
"kafka.consumer.tls.cert": "/var/run/secrets/kafkauser/user.crt",
"kafka.consumer.tls.key": "/var/run/secrets/kafkauser/user.key",
"ingester.deadlockInterval": 0,
}),
Options: v1.NewOptions(ingesterOptions),
},
Storage: v1.JaegerStorageSpec{
Type: "elasticsearch",
Expand All @@ -270,11 +282,16 @@ func jaegerStreamingDefinitionWithTLS(namespace string, name, kafkaUserName stri
j.Spec.Collector.Image = otelCollectorImage
j.Spec.Collector.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14269"))
}
if useOtelIngester {
log.Infof("Using OTEL ingester for %s", name)
j.Spec.Ingester.Image = otelIngesterImage
j.Spec.Ingester.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14270"))
}

return j
}

func jaegerAutoProvisionedDefinition(namespace string, name string, useOtelCollector bool) *v1.Jaeger {
func jaegerAutoProvisionedDefinition(namespace string, name string, useOtelCollector, useOtelIngester bool) *v1.Jaeger {
ingressEnabled := true
jaegerInstance := &v1.Jaeger{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -305,6 +322,11 @@ func jaegerAutoProvisionedDefinition(namespace string, name string, useOtelColle
jaegerInstance.Spec.Collector.Image = otelCollectorImage
jaegerInstance.Spec.Collector.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14269"))
}
if useOtelIngester {
log.Infof("Using OTEL ingester for %s", name)
jaegerInstance.Spec.Ingester.Image = otelIngesterImage
jaegerInstance.Spec.Ingester.Config = v1.NewFreeForm(getOtelConfigForHealthCheckPort("14270"))
}

return jaegerInstance
}
Expand Down
22 changes: 22 additions & 0 deletions test/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
saveLogs = getBoolEnv("SAVE_LOGS", false)
skipCassandraTests = getBoolEnv("SKIP_CASSANDRA_TESTS", false)
testOtelCollector = getBoolEnv("USE_OTEL_COLLECTOR", false)
testOtelIngester = getBoolEnv("USE_OTEL_INGESTER", false)
testOtelAgent = getBoolEnv("USE_OTEL_AGENT", false)
testOtelAllInOne = getBoolEnv("USE_OTEL_ALL_IN_ONE", false)

Expand All @@ -56,6 +57,7 @@ var (
cassandraKeyspace = "jaeger_v1_datacenter1"
cassandraDatacenter = "datacenter1"
otelCollectorImage = "jaegertracing/jaeger-opentelemetry-collector:latest"
otelIngesterImage = "jaegertracing/jaeger-opentelemetry-ingester:latest"
otelAgentImage = "jaegertracing/jaeger-opentelemetry-agent:latest"
otelAllInOneImage = "jaegertracing/opentelemetry-all-in-one:latest"

Expand Down Expand Up @@ -564,6 +566,26 @@ func deletePersistentVolumeClaims(namespace string) {
}
}

func verifyIngesterImage(jaegerInstanceName, namespace string, expected bool) {
require.Equal(t, expected, wasUsingOtelIngester(jaegerInstanceName, namespace))
}

// Was this Jaeger Instance using the OTEL ingester?
func wasUsingOtelIngester(jaegerInstanceName, namespace string) bool {
deployment, err := fw.KubeClient.AppsV1().Deployments(namespace).Get(context.Background(), jaegerInstanceName+"-ingester", metav1.GetOptions{})
require.NoError(t, err)
containers := deployment.Spec.Template.Spec.Containers
for _, container := range containers {
if container.Name == "jaeger-ingester" {
logrus.Infof("Test %s is using image %s", t.Name(), container.Image)
return strings.Contains(container.Image, "jaeger-opentelemetry-ingester")
}
}

require.Failf(t, "Did not find a collector image for %s in namespace %s", jaegerInstanceName, namespace)
return false
}

func verifyCollectorImage(jaegerInstanceName, namespace string, expected bool) {
require.Equal(t, expected, wasUsingOtelCollector(jaegerInstanceName, namespace))
}
Expand Down