Skip to content

Commit

Permalink
Add a streaming test which uses TLS (#689)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Earls <kearls@redhat.com>
  • Loading branch information
kevinearls authored and jpkrohling committed Oct 11, 2019
1 parent ead4e22 commit 5bc4d73
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 8 deletions.
13 changes: 11 additions & 2 deletions test/e2e/smoketest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,29 @@ func AllInOneSmokeTest(resourceName string) {

// ProductionSmokeTest should be used if query and collector are in separate pods
func ProductionSmokeTest(resourceName string) {
productionSmokeTest(resourceName, namespace)
}

// ProductionSmokeTestWithNamespace is the same as ProductionSmokeTest but for when you can't use the default namespace
func ProductionSmokeTestWithNamespace(resourceName, smokeTestNamespace string) {
productionSmokeTest(resourceName, smokeTestNamespace)
}

func productionSmokeTest(resourceName, smokeTestNamespace string) {
queryPodImageName := "jaeger-query"
collectorPodImageName := "jaeger-collector"
queryPodPrefix := resourceName + "-query"
collectorPodPrefix := resourceName + "-collector"

queryPort := randomPortNumber()
queryPorts := []string{queryPort + ":16686"}
portForw, closeChan := CreatePortForward(namespace, queryPodPrefix, queryPodImageName, queryPorts, fw.KubeConfig)
portForw, closeChan := CreatePortForward(smokeTestNamespace, queryPodPrefix, queryPodImageName, queryPorts, fw.KubeConfig)
defer portForw.Close()
defer close(closeChan)

collectorPort := randomPortNumber()
collectorPorts := []string{collectorPort + ":14268"}
portForwColl, closeChanColl := CreatePortForward(namespace, collectorPodPrefix, collectorPodImageName, collectorPorts, fw.KubeConfig)
portForwColl, closeChanColl := CreatePortForward(smokeTestNamespace, collectorPodPrefix, collectorPodImageName, collectorPorts, fw.KubeConfig)
defer portForwColl.Close()
defer close(closeChanColl)

Expand Down
182 changes: 176 additions & 6 deletions test/e2e/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1"
)
Expand Down Expand Up @@ -54,15 +56,12 @@ func (suite *StreamingTestSuite) AfterTest(suiteName, testName string) {
}

func (suite *StreamingTestSuite) TestStreaming() {
err := WaitForStatefulset(t, fw.KubeClient, storageNamespace, "elasticsearch", retryInterval, timeout)
require.NoError(t, err, "Error waiting for elasticsearch")

err = WaitForStatefulset(t, fw.KubeClient, kafkaNamespace, "my-cluster-kafka", retryInterval, timeout)
require.NoError(t, err, "Error waiting for my-cluster-kafka")
waitForElasticSearch()
waitForKafkaInstance()

j := jaegerStreamingDefinition(namespace, "simple-streaming")
log.Infof("passing %v", j)
err = fw.Client.Create(context.TODO(), j, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
err := fw.Client.Create(context.TODO(), j, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
require.NoError(t, err, "Error deploying jaeger")
defer undeployJaegerInstance(j)

Expand All @@ -78,6 +77,46 @@ func (suite *StreamingTestSuite) TestStreaming() {
ProductionSmokeTest("simple-streaming")
}

func (suite *StreamingTestSuite) TestStreamingWithTLS() {
if !usingOLM {
t.Skip("This test should only run when using OLM")
}
// Make sure ES and the kafka instance are available
waitForElasticSearch()
waitForKafkaInstance()

kafkaUserName := "my-user"
kafkaUser := getKafkaUser(kafkaUserName, kafkaNamespace)
err := fw.Client.Create(context.Background(), kafkaUser, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
require.NoError(t, err, "Error deploying kafkauser")
WaitForSecret(kafkaUserName, kafkaNamespace)

defer func() {
if !debugMode || !t.Failed() {
err = fw.Client.Delete(context.TODO(), kafkaUser)
require.NoError(t, err)
}
}()

// Now create a jaeger instance with TLS enabled -- note it has to be deployed in the same namespace as the kafka instance
jaegerInstanceName := "tls-streaming"
jaegerInstance := jaegerStreamingDefinitionWithTLS(kafkaNamespace, jaegerInstanceName, kafkaUserName)
err = fw.Client.Create(context.TODO(), jaegerInstance, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval})
require.NoError(t, err, "Error deploying jaeger")
defer undeployJaegerInstance(jaegerInstance)

err = WaitForDeployment(t, fw.KubeClient, kafkaNamespace, jaegerInstanceName+"-ingester", 1, retryInterval, timeout)
require.NoError(t, err, "Error waiting for ingester deployment")

err = WaitForDeployment(t, fw.KubeClient, kafkaNamespace, jaegerInstanceName+"-collector", 1, retryInterval, timeout)
require.NoError(t, err, "Error waiting for collector deployment")

err = WaitForDeployment(t, fw.KubeClient, kafkaNamespace, jaegerInstanceName+"-query", 1, retryInterval, timeout)
require.NoError(t, err, "Error waiting for query deployment")

ProductionSmokeTestWithNamespace(jaegerInstanceName, kafkaNamespace)
}

func jaegerStreamingDefinition(namespace string, name string) *v1.Jaeger {
kafkaClusterURL := fmt.Sprintf("my-cluster-kafka-brokers.%s:9092", kafkaNamespace)
j := &v1.Jaeger{
Expand Down Expand Up @@ -113,3 +152,134 @@ func jaegerStreamingDefinition(namespace string, name string) *v1.Jaeger {
}
return j
}

func jaegerStreamingDefinitionWithTLS(namespace string, name, kafkaUserName string) *v1.Jaeger {
volumes := getTLSVolumes(kafkaUserName)
volumeMounts := getTLSVolumeMounts()

kafkaClusterURL := fmt.Sprintf("my-cluster-kafka-bootstrap.%s.svc.cluster.local:9093", kafkaNamespace)
j := &v1.Jaeger{
TypeMeta: metav1.TypeMeta{
Kind: "Jaeger",
APIVersion: "jaegertracing.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: v1.JaegerSpec{
Strategy: "streaming",
Collector: v1.JaegerCollectorSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.producer.authentication": "tls",
"kafka.producer.topic": "jaeger-spans",
"kafka.producer.brokers": kafkaClusterURL,
"kafka.producer.tls.ca": "/var/run/secrets/cluster-ca/ca.crt",
"kafka.producer.tls.cert": "/var/run/secrets/kafkauser/user.crt",
"kafka.producer.tls.key": "/var/run/secrets/kafkauser/user.key",
}),
},
Ingester: v1.JaegerIngesterSpec{
Options: v1.NewOptions(map[string]interface{}{
"kafka.consumer.authentication": "tls",
"kafka.consumer.topic": "jaeger-spans",
"kafka.consumer.brokers": kafkaClusterURL,
"kafka.consumer.tls.ca": "/var/run/secrets/cluster-ca/ca.crt",
"kafka.consumer.tls.cert": "/var/run/secrets/kafkauser/user.crt",
"kafka.consumer.tls.key": "/var/run/secrets/kafkauser/user.key",
"ingester.deadlockInterval": 0,
}),
},
Storage: v1.JaegerStorageSpec{
Type: "elasticsearch",
Options: v1.NewOptions(map[string]interface{}{
"es.server-urls": esServerUrls,
}),
},
JaegerCommonSpec: v1.JaegerCommonSpec{
Volumes: volumes,
VolumeMounts: volumeMounts,
},
},
}
return j
}

func getKafkaUser(name, namespace string) *unstructured.Unstructured {
kafkaUser := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "kafka.strimzi.io/v1beta1",
"kind": "KafkaUser",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
"labels": map[string]interface{}{
"strimzi.io/cluster": "my-cluster",
},
},
"spec": map[string]interface{}{
"authentication": map[string]interface{}{
"type": "tls",
},
},
},
}

return kafkaUser
}

func getTLSVolumeMounts() []corev1.VolumeMount {
kafkaUserVolumeMount := corev1.VolumeMount{
Name: "kafkauser",
MountPath: "/var/run/secrets/kafkauser",
}
clusterCaVolumeMount := corev1.VolumeMount{
Name: "cluster-ca",
MountPath: "/var/run/secrets/cluster-ca",
}

volumeMounts := []corev1.VolumeMount{
kafkaUserVolumeMount, clusterCaVolumeMount,
}

return volumeMounts
}

func getTLSVolumes(kafkaUserName string) []corev1.Volume {
kafkaUserSecretName := corev1.SecretVolumeSource{
SecretName: kafkaUserName,
}
clusterCaSecretName := corev1.SecretVolumeSource{
SecretName: "my-cluster-cluster-ca-cert",
}

kafkaUserVolume := corev1.Volume{
Name: "kafkauser",
VolumeSource: corev1.VolumeSource{
Secret: &kafkaUserSecretName,
},
}
clusterCaVolume := corev1.Volume{
Name: "cluster-ca",
VolumeSource: corev1.VolumeSource{
Secret: &clusterCaSecretName,
},
}

volumes := []corev1.Volume{
kafkaUserVolume,
clusterCaVolume,
}

return volumes
}

func waitForKafkaInstance() {
err := WaitForStatefulset(t, fw.KubeClient, kafkaNamespace, "my-cluster-kafka", retryInterval, timeout)
require.NoError(t, err, "Error waiting for my-cluster-kafka")
}

func waitForElasticSearch() {
err := WaitForStatefulset(t, fw.KubeClient, storageNamespace, "elasticsearch", retryInterval, timeout)
require.NoError(t, err, "Error waiting for elasticsearch")
}
18 changes: 18 additions & 0 deletions test/e2e/wait_util.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package e2e

import (
"strings"
"testing"
"time"

"github.com/operator-framework/operator-sdk/pkg/test/e2eutil"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"k8s.io/api/extensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -192,3 +194,19 @@ func WaitForDeployment(t *testing.T, kubeclient kubernetes.Interface, namespace,
logrus.Infof("Deployment of %s in namespace %s took %s\n", name, namespace, elapsed)
return err
}

// WaitForSecret waits for a secret to be available
func WaitForSecret(secretName, secretNamespace string) {
err := wait.Poll(retryInterval, timeout, func() (done bool, err error) {
secret, err := fw.KubeClient.CoreV1().Secrets(secretNamespace).Get(secretName, metav1.GetOptions{IncludeUninitialized: false})
if err == nil {
logrus.Debugf("Found secret %s\n", secret.Name)
return true, nil
} else if err != nil && strings.Contains(err.Error(), "not found") {
return false, nil
} else {
return false, err
}
})
require.NoError(t, err)
}

0 comments on commit 5bc4d73

Please sign in to comment.