From 367c6237b9c0e9c77d141229748d91429ff31791 Mon Sep 17 00:00:00 2001 From: Rahul Varma Date: Wed, 4 Aug 2021 10:14:41 -0700 Subject: [PATCH] Updated folder structure to reduce code duplication between collector and targetallocator & Minor changes --- .../opentelemetrycollector_webhook.go | 5 + .../opentelemetrycollector_controller.go | 8 +- .../opentelemetrycollector_controller_test.go | 2 +- controllers/targetallocator_controller.go | 30 +- .../targetallocator_controller_test.go | 18 +- main.go | 11 +- pkg/collector/container_test.go | 3 +- pkg/collector/deployment_test.go | 1 - pkg/collector/reconcile/configmap.go | 174 ---------- pkg/collector/reconcile/configmap_test.go | 141 -------- pkg/collector/reconcile/deployment.go | 135 -------- pkg/collector/reconcile/deployment_test.go | 130 ------- pkg/collector/reconcile/service_test.go | 231 ------------- pkg/collector/reconcile/suite_test.go | 156 --------- .../reconcile/configmap.go | 82 ++++- .../reconcile/configmap_test.go | 148 +++++++- pkg/{collector => }/reconcile/daemonset.go | 0 .../reconcile/daemonset_test.go | 2 +- .../reconcile/deployment.go | 69 ++-- .../reconcile/deployment_test.go | 215 ++++++++---- pkg/{targetallocator => }/reconcile/helper.go | 5 - .../reconcile/opentelemetry.go | 0 .../reconcile/opentelemetry_test.go | 4 +- pkg/{collector => }/reconcile/params.go | 0 pkg/{collector => }/reconcile/service.go | 86 ++++- pkg/reconcile/service_test.go | 320 ++++++++++++++++++ .../reconcile/serviceaccount.go | 0 .../reconcile/serviceaccount_test.go | 10 +- pkg/{collector => }/reconcile/statefulset.go | 0 .../reconcile/statefulset_test.go | 2 +- .../reconcile/suite_test.go | 62 +++- .../reconcile/suite_test.yaml | 0 pkg/targetallocator/container_test.go | 3 +- pkg/targetallocator/deployment_test.go | 1 - pkg/targetallocator/reconcile/params.go | 33 -- pkg/targetallocator/reconcile/service.go | 160 --------- pkg/targetallocator/reconcile/service_test.go | 100 ------ 37 files changed, 894 insertions(+), 1453 deletions(-) delete mode 100644 pkg/collector/reconcile/configmap.go delete mode 100644 pkg/collector/reconcile/configmap_test.go delete mode 100644 pkg/collector/reconcile/deployment.go delete mode 100644 pkg/collector/reconcile/deployment_test.go delete mode 100644 pkg/collector/reconcile/service_test.go delete mode 100644 pkg/collector/reconcile/suite_test.go rename pkg/{targetallocator => }/reconcile/configmap.go (70%) rename pkg/{targetallocator => }/reconcile/configmap_test.go (53%) rename pkg/{collector => }/reconcile/daemonset.go (100%) rename pkg/{collector => }/reconcile/daemonset_test.go (99%) rename pkg/{targetallocator => }/reconcile/deployment.go (69%) rename pkg/{targetallocator => }/reconcile/deployment_test.go (58%) rename pkg/{targetallocator => }/reconcile/helper.go (81%) rename pkg/{collector => }/reconcile/opentelemetry.go (100%) rename pkg/{collector => }/reconcile/opentelemetry_test.go (92%) rename pkg/{collector => }/reconcile/params.go (100%) rename pkg/{collector => }/reconcile/service.go (78%) create mode 100644 pkg/reconcile/service_test.go rename pkg/{collector => }/reconcile/serviceaccount.go (100%) rename pkg/{collector => }/reconcile/serviceaccount_test.go (85%) rename pkg/{collector => }/reconcile/statefulset.go (100%) rename pkg/{collector => }/reconcile/statefulset_test.go (99%) rename pkg/{targetallocator => }/reconcile/suite_test.go (79%) rename pkg/{targetallocator => }/reconcile/suite_test.yaml (100%) delete mode 100644 pkg/targetallocator/reconcile/params.go delete mode 100644 pkg/targetallocator/reconcile/service.go delete mode 100644 pkg/targetallocator/reconcile/service_test.go diff --git a/api/v1alpha1/opentelemetrycollector_webhook.go b/api/v1alpha1/opentelemetrycollector_webhook.go index d0ace6187b..e25437d344 100644 --- a/api/v1alpha1/opentelemetrycollector_webhook.go +++ b/api/v1alpha1/opentelemetrycollector_webhook.go @@ -91,5 +91,10 @@ func (r *OpenTelemetryCollector) validateCRDSpec() error { return fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the attribute 'tolerations'", r.Spec.Mode) } + // validate target allocation + if r.Spec.TargetAllocator.Enabled && !(r.Spec.Mode == ModeStatefulSet) { + return fmt.Errorf("the OpenTelemetry Collector mode is set to %s, which does not support the target allocation deployment", r.Spec.Mode) + } + return nil } diff --git a/controllers/opentelemetrycollector_controller.go b/controllers/opentelemetrycollector_controller.go index 27a54ff62a..9a40403605 100644 --- a/controllers/opentelemetrycollector_controller.go +++ b/controllers/opentelemetrycollector_controller.go @@ -30,7 +30,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" "github.com/open-telemetry/opentelemetry-operator/internal/config" - "github.com/open-telemetry/opentelemetry-operator/pkg/collector/reconcile" + "github.com/open-telemetry/opentelemetry-operator/pkg/reconcile" ) // OpenTelemetryCollectorReconciler reconciles a OpenTelemetryCollector object. @@ -66,7 +66,7 @@ func NewReconciler(p Params) *OpenTelemetryCollectorReconciler { p.Tasks = []Task{ { "config maps", - reconcile.ConfigMaps, + reconcile.CollectorConfigMaps, true, }, { @@ -76,12 +76,12 @@ func NewReconciler(p Params) *OpenTelemetryCollectorReconciler { }, { "services", - reconcile.Services, + reconcile.CollectorServices, true, }, { "deployments", - reconcile.Deployments, + reconcile.CollectorDeployments, true, }, { diff --git a/controllers/opentelemetrycollector_controller_test.go b/controllers/opentelemetrycollector_controller_test.go index 7f2d203900..4cc14ec5fe 100644 --- a/controllers/opentelemetrycollector_controller_test.go +++ b/controllers/opentelemetrycollector_controller_test.go @@ -36,7 +36,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" "github.com/open-telemetry/opentelemetry-operator/controllers" "github.com/open-telemetry/opentelemetry-operator/internal/config" - "github.com/open-telemetry/opentelemetry-operator/pkg/collector/reconcile" + "github.com/open-telemetry/opentelemetry-operator/pkg/reconcile" ) var logger = logf.Log.WithName("unit-tests") diff --git a/controllers/targetallocator_controller.go b/controllers/targetallocator_controller.go index 3f7d5e92c0..9a73fc691b 100644 --- a/controllers/targetallocator_controller.go +++ b/controllers/targetallocator_controller.go @@ -29,7 +29,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" "github.com/open-telemetry/opentelemetry-operator/internal/config" - "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator/reconcile" + "github.com/open-telemetry/opentelemetry-operator/pkg/reconcile" ) // OpenTelemetryTargetAllocatorReconciler reconciles a OpenTelemetryTargetAllocator object. @@ -38,42 +38,26 @@ type OpenTelemetryTargetAllocatorReconciler struct { log logr.Logger scheme *runtime.Scheme config config.Config - tasks []TgAlTask -} - -// TgAlTask represents a reconciliation task to be executed by the reconciler. -type TgAlTask struct { - Name string - Do func(context.Context, reconcile.Params) error - BailOnError bool -} - -// TgAlParams is the set of options to build a new OpenTelemetryTargetAllocatorReconciler. -type TgAlParams struct { - client.Client - Log logr.Logger - Scheme *runtime.Scheme - Config config.Config - Tasks []TgAlTask + tasks []Task } // NewTargetAlllocatorReconciler creates a new reconciler for OpenTelemetryTargetAllocator objects. -func NewTargetAllocatorReconciler(p TgAlParams) *OpenTelemetryTargetAllocatorReconciler { +func NewTargetAllocatorReconciler(p Params) *OpenTelemetryTargetAllocatorReconciler { if len(p.Tasks) == 0 { - p.Tasks = []TgAlTask{ + p.Tasks = []Task{ { "config maps", - reconcile.ConfigMaps, + reconcile.TAConfigMaps, true, }, { "deployments", - reconcile.Deployments, + reconcile.TADeployments, true, }, { "services", - reconcile.Services, + reconcile.TAServices, true, }, } diff --git a/controllers/targetallocator_controller_test.go b/controllers/targetallocator_controller_test.go index b79b569a00..ea136455dd 100644 --- a/controllers/targetallocator_controller_test.go +++ b/controllers/targetallocator_controller_test.go @@ -34,17 +34,17 @@ import ( "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" "github.com/open-telemetry/opentelemetry-operator/controllers" "github.com/open-telemetry/opentelemetry-operator/internal/config" - "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator/reconcile" + "github.com/open-telemetry/opentelemetry-operator/pkg/reconcile" ) func TestNewObjectsOnTargetAllocatorReconciliation(t *testing.T) { // prepare cfg := config.New() - configYAML, err := ioutil.ReadFile("../pkg/targetallocator/reconcile/suite_test.yaml") + configYAML, err := ioutil.ReadFile("../pkg/reconcile/suite_test.yaml") require.NoError(t, err) nsn := types.NamespacedName{Name: "my-instance", Namespace: "default"} - reconciler := controllers.NewTargetAllocatorReconciler(controllers.TgAlParams{ + reconciler := controllers.NewTargetAllocatorReconciler(controllers.Params{ Client: k8sClient, Log: logger, Scheme: testScheme, @@ -113,9 +113,9 @@ func TestNewObjectsOnTargetAllocatorReconciliation(t *testing.T) { func TestContinueOnRecoverableTargetAllocatorFailure(t *testing.T) { // prepare taskCalled := false - reconciler := controllers.NewTargetAllocatorReconciler(controllers.TgAlParams{ + reconciler := controllers.NewTargetAllocatorReconciler(controllers.Params{ Log: logger, - Tasks: []controllers.TgAlTask{ + Tasks: []controllers.Task{ { Name: "should-fail", Do: func(context.Context, reconcile.Params) error { @@ -147,12 +147,12 @@ func TestBreakOnUnrecoverableTargetAllocatorError(t *testing.T) { taskCalled := false expectedErr := errors.New("should fail!") nsn := types.NamespacedName{Name: "my-instance", Namespace: "default"} - reconciler := controllers.NewTargetAllocatorReconciler(controllers.TgAlParams{ + reconciler := controllers.NewTargetAllocatorReconciler(controllers.Params{ Client: k8sClient, Log: logger, Scheme: scheme.Scheme, Config: cfg, - Tasks: []controllers.TgAlTask{ + Tasks: []controllers.Task{ { Name: "should-fail", Do: func(context.Context, reconcile.Params) error { @@ -197,12 +197,12 @@ func TestTargetAllocatorSkipWhenInstanceDoesNotExist(t *testing.T) { // prepare cfg := config.New() nsn := types.NamespacedName{Name: "non-existing-my-instance", Namespace: "default"} - reconciler := controllers.NewTargetAllocatorReconciler(controllers.TgAlParams{ + reconciler := controllers.NewTargetAllocatorReconciler(controllers.Params{ Client: k8sClient, Log: logger, Scheme: scheme.Scheme, Config: cfg, - Tasks: []controllers.TgAlTask{ + Tasks: []controllers.Task{ { Name: "should-not-be-called", Do: func(context.Context, reconcile.Params) error { diff --git a/main.go b/main.go index 424ed67bd6..9f75b648a8 100644 --- a/main.go +++ b/main.go @@ -155,11 +155,12 @@ func main() { os.Exit(1) } - if err = controllers.NewTargetAllocatorReconciler(controllers.TgAlParams{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("TargetAllocator"), - Scheme: mgr.GetScheme(), - Config: cfg, + if err = controllers.NewTargetAllocatorReconciler(controllers.Params{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("TargetAllocator"), + Scheme: mgr.GetScheme(), + Config: cfg, + Recorder: mgr.GetEventRecorderFor("opentelemetry-targetallocator"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TargetAllocator") os.Exit(1) diff --git a/pkg/collector/container_test.go b/pkg/collector/container_test.go index 5ada9ec80e..82c70aab95 100644 --- a/pkg/collector/container_test.go +++ b/pkg/collector/container_test.go @@ -17,12 +17,11 @@ package collector_test import ( "testing" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" logf "sigs.k8s.io/controller-runtime/pkg/log" - "github.com/stretchr/testify/assert" - "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" "github.com/open-telemetry/opentelemetry-operator/internal/config" . "github.com/open-telemetry/opentelemetry-operator/pkg/collector" diff --git a/pkg/collector/deployment_test.go b/pkg/collector/deployment_test.go index dea1c308c4..1cfe13594e 100644 --- a/pkg/collector/deployment_test.go +++ b/pkg/collector/deployment_test.go @@ -18,7 +18,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/collector/reconcile/configmap.go b/pkg/collector/reconcile/configmap.go deleted file mode 100644 index 82d57b5393..0000000000 --- a/pkg/collector/reconcile/configmap.go +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reconcile - -import ( - "context" - "fmt" - "reflect" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - - "github.com/open-telemetry/opentelemetry-operator/pkg/collector" - "github.com/open-telemetry/opentelemetry-operator/pkg/naming" -) - -// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete - -// ConfigMaps reconciles the config map(s) required for the instance in the current context. -func ConfigMaps(ctx context.Context, params Params) error { - desired := []corev1.ConfigMap{ - desiredConfigMap(ctx, params), - } - - // first, handle the create/update parts - if err := expectedConfigMaps(ctx, params, desired, true); err != nil { - return fmt.Errorf("failed to reconcile the expected configmaps: %v", err) - } - - // then, delete the extra objects - if err := deleteConfigMaps(ctx, params, desired); err != nil { - return fmt.Errorf("failed to reconcile the configmaps to be deleted: %v", err) - } - - return nil -} - -func desiredConfigMap(_ context.Context, params Params) corev1.ConfigMap { - name := naming.ConfigMap(params.Instance) - labels := collector.Labels(params.Instance) - labels["app.kubernetes.io/name"] = name - - return corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: params.Instance.Namespace, - Labels: labels, - Annotations: params.Instance.Annotations, - }, - Data: map[string]string{ - "collector.yaml": params.Instance.Spec.Config, - }, - } -} - -func expectedConfigMaps(ctx context.Context, params Params, expected []corev1.ConfigMap, retry bool) error { - for _, obj := range expected { - desired := obj - - if err := controllerutil.SetControllerReference(¶ms.Instance, &desired, params.Scheme); err != nil { - return fmt.Errorf("failed to set controller reference: %w", err) - } - - existing := &corev1.ConfigMap{} - nns := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name} - err := params.Client.Get(ctx, nns, existing) - if err != nil && errors.IsNotFound(err) { - if err := params.Client.Create(ctx, &desired); err != nil { - if errors.IsAlreadyExists(err) && retry { - // let's try again? we probably had multiple updates at one, and now it exists already - if err := expectedConfigMaps(ctx, params, expected, false); err != nil { - // somethin else happened now... - return err - } - - // we succeeded in the retry, exit this attempt - return nil - } - return fmt.Errorf("failed to create: %w", err) - } - params.Log.V(2).Info("created", "configmap.name", desired.Name, "configmap.namespace", desired.Namespace) - continue - } else if err != nil { - return fmt.Errorf("failed to get: %w", err) - } - - // it exists already, merge the two if the end result isn't identical to the existing one - updated := existing.DeepCopy() - if updated.Annotations == nil { - updated.Annotations = map[string]string{} - } - if updated.Labels == nil { - updated.Labels = map[string]string{} - } - - updated.Data = desired.Data - updated.BinaryData = desired.BinaryData - updated.ObjectMeta.OwnerReferences = desired.ObjectMeta.OwnerReferences - - for k, v := range desired.ObjectMeta.Annotations { - updated.ObjectMeta.Annotations[k] = v - } - for k, v := range desired.ObjectMeta.Labels { - updated.ObjectMeta.Labels[k] = v - } - - patch := client.MergeFrom(existing) - - if err := params.Client.Patch(ctx, updated, patch); err != nil { - return fmt.Errorf("failed to apply changes: %w", err) - } - if configMapChanged(&desired, existing) { - params.Recorder.Event(updated, "Normal", "ConfigUpdate ", fmt.Sprintf("OpenTelemetry Config changed - %s/%s", desired.Namespace, desired.Name)) - } - - params.Log.V(2).Info("applied", "configmap.name", desired.Name, "configmap.namespace", desired.Namespace) - } - - return nil -} - -func deleteConfigMaps(ctx context.Context, params Params, expected []corev1.ConfigMap) error { - opts := []client.ListOption{ - client.InNamespace(params.Instance.Namespace), - client.MatchingLabels(map[string]string{ - "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Namespace, params.Instance.Name), - "app.kubernetes.io/managed-by": "opentelemetry-operator", - }), - } - list := &corev1.ConfigMapList{} - if err := params.Client.List(ctx, list, opts...); err != nil { - return fmt.Errorf("failed to list: %w", err) - } - - for i := range list.Items { - existing := list.Items[i] - del := true - for _, keep := range expected { - if keep.Name == existing.Name && keep.Namespace == existing.Namespace { - del = false - } - } - - if del { - if err := params.Client.Delete(ctx, &existing); err != nil { - return fmt.Errorf("failed to delete: %w", err) - } - params.Log.V(2).Info("deleted", "configmap.name", existing.Name, "configmap.namespace", existing.Namespace) - } - } - - return nil -} - -func configMapChanged(desired *corev1.ConfigMap, actual *corev1.ConfigMap) bool { - return !reflect.DeepEqual(desired.Data, actual.Data) - -} diff --git a/pkg/collector/reconcile/configmap_test.go b/pkg/collector/reconcile/configmap_test.go deleted file mode 100644 index 7eaee37fc5..0000000000 --- a/pkg/collector/reconcile/configmap_test.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reconcile - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/record" - - "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" - "github.com/open-telemetry/opentelemetry-operator/internal/config" -) - -func TestDesiredConfigMap(t *testing.T) { - t.Run("should return expected config map", func(t *testing.T) { - expectedLables := map[string]string{ - "app.kubernetes.io/managed-by": "opentelemetry-operator", - "app.kubernetes.io/instance": "default.test", - "app.kubernetes.io/part-of": "opentelemetry", - "app.kubernetes.io/component": "opentelemetry-collector", - "app.kubernetes.io/name": "test-collector", - } - - expectedData := map[string]string{ - "collector.yaml": ` - receivers: - jaeger: - protocols: - grpc: - processors: - - exporters: - logging: - - service: - pipelines: - traces: - receivers: [jaeger] - processors: [] - exporters: [logging] - -`, - } - - actual := desiredConfigMap(context.Background(), params()) - - assert.Equal(t, "test-collector", actual.Name) - assert.Equal(t, expectedLables, actual.Labels) - assert.Equal(t, expectedData, actual.Data) - - }) - -} - -func TestExpectedConfigMap(t *testing.T) { - t.Run("should create config map", func(t *testing.T) { - err := expectedConfigMaps(context.Background(), params(), []v1.ConfigMap{desiredConfigMap(context.Background(), params())}, true) - assert.NoError(t, err) - - exists, err := populateObjectIfExists(t, &v1.ConfigMap{}, types.NamespacedName{Namespace: "default", Name: "test-collector"}) - - assert.NoError(t, err) - assert.True(t, exists) - }) - - t.Run("should update config map", func(t *testing.T) { - - param := Params{ - Config: config.New(), - Client: k8sClient, - Instance: v1alpha1.OpenTelemetryCollector{ - TypeMeta: metav1.TypeMeta{ - Kind: "opentelemetry.io", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - UID: instanceUID, - }, - }, - Scheme: testScheme, - Log: logger, - Recorder: record.NewFakeRecorder(10), - } - cm := desiredConfigMap(context.Background(), param) - createObjectIfNotExists(t, "test-collector", &cm) - - err := expectedConfigMaps(context.Background(), params(), []v1.ConfigMap{desiredConfigMap(context.Background(), params())}, true) - assert.NoError(t, err) - - actual := v1.ConfigMap{} - exists, err := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collector"}) - - assert.NoError(t, err) - assert.True(t, exists) - assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID) - assert.Equal(t, params().Instance.Spec.Config, actual.Data["collector.yaml"]) - }) - - t.Run("should delete config map", func(t *testing.T) { - - deletecm := v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-delete-collector", - Namespace: "default", - Labels: map[string]string{ - "app.kubernetes.io/instance": "default.test", - "app.kubernetes.io/managed-by": "opentelemetry-operator", - }, - }, - } - createObjectIfNotExists(t, "test-delete-collector", &deletecm) - - exists, _ := populateObjectIfExists(t, &v1.ConfigMap{}, types.NamespacedName{Namespace: "default", Name: "test-delete-collector"}) - assert.True(t, exists) - - err := deleteConfigMaps(context.Background(), params(), []v1.ConfigMap{desiredConfigMap(context.Background(), params())}) - assert.NoError(t, err) - - exists, _ = populateObjectIfExists(t, &v1.ConfigMap{}, types.NamespacedName{Namespace: "default", Name: "test-delete-collector"}) - assert.False(t, exists) - }) -} diff --git a/pkg/collector/reconcile/deployment.go b/pkg/collector/reconcile/deployment.go deleted file mode 100644 index 41b0f0de65..0000000000 --- a/pkg/collector/reconcile/deployment.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reconcile - -import ( - "context" - "fmt" - - appsv1 "k8s.io/api/apps/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - - "github.com/open-telemetry/opentelemetry-operator/pkg/collector" -) - -// +kubebuilder:rbac:groups="apps",resources=deployments,verbs=get;list;watch;create;update;patch;delete - -// Deployments reconciles the deployment(s) required for the instance in the current context. -func Deployments(ctx context.Context, params Params) error { - desired := []appsv1.Deployment{} - if params.Instance.Spec.Mode == "deployment" { - desired = append(desired, collector.Deployment(params.Config, params.Log, params.Instance)) - } - - // first, handle the create/update parts - if err := expectedDeployments(ctx, params, desired); err != nil { - return fmt.Errorf("failed to reconcile the expected deployments: %v", err) - } - - // then, delete the extra objects - if err := deleteDeployments(ctx, params, desired); err != nil { - return fmt.Errorf("failed to reconcile the deployments to be deleted: %v", err) - } - - return nil -} - -func expectedDeployments(ctx context.Context, params Params, expected []appsv1.Deployment) error { - for _, obj := range expected { - desired := obj - - if err := controllerutil.SetControllerReference(¶ms.Instance, &desired, params.Scheme); err != nil { - return fmt.Errorf("failed to set controller reference: %w", err) - } - - existing := &appsv1.Deployment{} - nns := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name} - err := params.Client.Get(ctx, nns, existing) - if err != nil && k8serrors.IsNotFound(err) { - if err := params.Client.Create(ctx, &desired); err != nil { - return fmt.Errorf("failed to create: %w", err) - } - params.Log.V(2).Info("created", "deployment.name", desired.Name, "deployment.namespace", desired.Namespace) - continue - } else if err != nil { - return fmt.Errorf("failed to get: %w", err) - } - - // it exists already, merge the two if the end result isn't identical to the existing one - updated := existing.DeepCopy() - if updated.Annotations == nil { - updated.Annotations = map[string]string{} - } - if updated.Labels == nil { - updated.Labels = map[string]string{} - } - - updated.Spec = desired.Spec - updated.ObjectMeta.OwnerReferences = desired.ObjectMeta.OwnerReferences - - for k, v := range desired.ObjectMeta.Annotations { - updated.ObjectMeta.Annotations[k] = v - } - for k, v := range desired.ObjectMeta.Labels { - updated.ObjectMeta.Labels[k] = v - } - - patch := client.MergeFrom(existing) - - if err := params.Client.Patch(ctx, updated, patch); err != nil { - return fmt.Errorf("failed to apply changes: %w", err) - } - - params.Log.V(2).Info("applied", "deployment.name", desired.Name, "deployment.namespace", desired.Namespace) - } - - return nil -} - -func deleteDeployments(ctx context.Context, params Params, expected []appsv1.Deployment) error { - opts := []client.ListOption{ - client.InNamespace(params.Instance.Namespace), - client.MatchingLabels(map[string]string{ - "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Namespace, params.Instance.Name), - "app.kubernetes.io/managed-by": "opentelemetry-operator", - }), - } - list := &appsv1.DeploymentList{} - if err := params.Client.List(ctx, list, opts...); err != nil { - return fmt.Errorf("failed to list: %w", err) - } - - for i := range list.Items { - existing := list.Items[i] - del := true - for _, keep := range expected { - if keep.Name == existing.Name && keep.Namespace == existing.Namespace { - del = false - } - } - - if del { - if err := params.Client.Delete(ctx, &existing); err != nil { - return fmt.Errorf("failed to delete: %w", err) - } - params.Log.V(2).Info("deleted", "deployment.name", existing.Name, "deployment.namespace", existing.Namespace) - } - } - - return nil -} diff --git a/pkg/collector/reconcile/deployment_test.go b/pkg/collector/reconcile/deployment_test.go deleted file mode 100644 index 7295ef05fc..0000000000 --- a/pkg/collector/reconcile/deployment_test.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reconcile - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - - "github.com/open-telemetry/opentelemetry-operator/pkg/collector" -) - -func TestExpectedDeployments(t *testing.T) { - param := params() - expectedDeploy := collector.Deployment(param.Config, logger, param.Instance) - - t.Run("should create deployment", func(t *testing.T) { - err := expectedDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}) - assert.NoError(t, err) - - exists, err := populateObjectIfExists(t, &v1.Deployment{}, types.NamespacedName{Namespace: "default", Name: "test-collector"}) - - assert.NoError(t, err) - assert.True(t, exists) - - }) - t.Run("should update deployment", func(t *testing.T) { - createObjectIfNotExists(t, "test-collector", &expectedDeploy) - err := expectedDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}) - assert.NoError(t, err) - - actual := v1.Deployment{} - exists, err := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collector"}) - - assert.NoError(t, err) - assert.True(t, exists) - assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID) - assert.Equal(t, int32(2), *actual.Spec.Replicas) - }) - - t.Run("should delete deployment", func(t *testing.T) { - labels := map[string]string{ - "app.kubernetes.io/instance": "default.test", - "app.kubernetes.io/managed-by": "opentelemetry-operator", - } - deploy := v1.Deployment{} - deploy.Name = "dummy" - deploy.Namespace = "default" - deploy.Labels = labels - deploy.Spec = v1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: labels, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: labels, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: "dummy", - Image: "busybox", - }}, - }, - }, - } - createObjectIfNotExists(t, "dummy", &deploy) - - err := deleteDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}) - assert.NoError(t, err) - - actual := v1.Deployment{} - exists, _ := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "dummy"}) - - assert.False(t, exists) - - }) - - t.Run("should not delete deployment", func(t *testing.T) { - labels := map[string]string{ - "app.kubernetes.io/instance": "default.test", - "app.kubernetes.io/managed-by": "helm-opentelemetry-operator", - } - deploy := v1.Deployment{} - deploy.Name = "dummy" - deploy.Namespace = "default" - deploy.Spec = v1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: labels, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: labels, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: "dummy", - Image: "busybox", - }}, - }, - }, - } - createObjectIfNotExists(t, "dummy", &deploy) - - err := deleteDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}) - assert.NoError(t, err) - - actual := v1.Deployment{} - exists, _ := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "dummy"}) - - assert.True(t, exists) - - }) -} diff --git a/pkg/collector/reconcile/service_test.go b/pkg/collector/reconcile/service_test.go deleted file mode 100644 index 461f5f69ab..0000000000 --- a/pkg/collector/reconcile/service_test.go +++ /dev/null @@ -1,231 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reconcile - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" - - "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" - "github.com/open-telemetry/opentelemetry-operator/internal/config" - "github.com/open-telemetry/opentelemetry-operator/pkg/collector" -) - -func TestExtractPortNumbersAndNames(t *testing.T) { - t.Run("should return extracted port names and numbers", func(t *testing.T) { - ports := []v1.ServicePort{{Name: "web", Port: 8080}, {Name: "tcp", Port: 9200}} - expectedPortNames := map[string]bool{"web": true, "tcp": true} - expectedPortNumbers := map[int32]bool{8080: true, 9200: true} - - actualPortNumbers, actualPortNames := extractPortNumbersAndNames(ports) - assert.Equal(t, expectedPortNames, actualPortNames) - assert.Equal(t, expectedPortNumbers, actualPortNumbers) - - }) -} - -func TestFilterPort(t *testing.T) { - - tests := []struct { - name string - candidate v1.ServicePort - portNumbers map[int32]bool - portNames map[string]bool - expected v1.ServicePort - }{ - { - name: "should filter out duplicate port", - candidate: v1.ServicePort{Name: "web", Port: 8080}, - portNumbers: map[int32]bool{8080: true, 9200: true}, - portNames: map[string]bool{"test": true, "metrics": true}, - }, - - { - name: "should not filter unique port", - candidate: v1.ServicePort{Name: "web", Port: 8090}, - portNumbers: map[int32]bool{8080: true, 9200: true}, - portNames: map[string]bool{"test": true, "metrics": true}, - expected: v1.ServicePort{Name: "web", Port: 8090}, - }, - - { - name: "should change the duplicate portName", - candidate: v1.ServicePort{Name: "web", Port: 8090}, - portNumbers: map[int32]bool{8080: true, 9200: true}, - portNames: map[string]bool{"web": true, "metrics": true}, - expected: v1.ServicePort{Name: "port-8090", Port: 8090}, - }, - - { - name: "should return nil if fallback name clashes with existing portName", - candidate: v1.ServicePort{Name: "web", Port: 8090}, - portNumbers: map[int32]bool{8080: true, 9200: true}, - portNames: map[string]bool{"web": true, "port-8090": true}, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - actual := filterPort(logger, test.candidate, test.portNumbers, test.portNames) - if test.expected != (v1.ServicePort{}) { - assert.Equal(t, test.expected, *actual) - return - } - assert.Nil(t, actual) - - }) - - } -} - -func TestDesiredService(t *testing.T) { - t.Run("should return nil service for unknown receiver and protocol", func(t *testing.T) { - params := Params{ - Config: config.Config{}, - Client: k8sClient, - Log: logger, - Instance: v1alpha1.OpenTelemetryCollector{ - Spec: v1alpha1.OpenTelemetryCollectorSpec{Config: `receivers: - test: - protocols: - unknown:`}, - }, - } - - actual := desiredService(context.Background(), params) - assert.Nil(t, actual) - - }) - t.Run("should return service with port mentioned in Instance.Spec.Ports and inferred ports", func(t *testing.T) { - - jaegerPorts := v1.ServicePort{ - Name: "jaeger-grpc", - Protocol: "TCP", - Port: 14250, - } - ports := append(params().Instance.Spec.Ports, jaegerPorts) - expected := service("test-collector", ports) - actual := desiredService(context.Background(), params()) - - assert.Equal(t, expected, *actual) - - }) - -} - -func TestExpectedServices(t *testing.T) { - t.Run("should create the service", func(t *testing.T) { - err := expectedServices(context.Background(), params(), []v1.Service{service("test-collector", params().Instance.Spec.Ports)}) - assert.NoError(t, err) - - exists, err := populateObjectIfExists(t, &v1.Service{}, types.NamespacedName{Namespace: "default", Name: "test-collector"}) - - assert.NoError(t, err) - assert.True(t, exists) - - }) - t.Run("should update service", func(t *testing.T) { - serviceInstance := service("test-collector", params().Instance.Spec.Ports) - createObjectIfNotExists(t, "test-collector", &serviceInstance) - - extraPorts := v1.ServicePort{ - Name: "port-web", - Protocol: "TCP", - Port: 8080, - TargetPort: intstr.FromInt(8080), - } - - ports := append(params().Instance.Spec.Ports, extraPorts) - err := expectedServices(context.Background(), params(), []v1.Service{service("test-collector", ports)}) - assert.NoError(t, err) - - actual := v1.Service{} - exists, err := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collector"}) - - assert.NoError(t, err) - assert.True(t, exists) - assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID) - assert.Contains(t, actual.Spec.Ports, extraPorts) - - }) -} - -func TestDeleteServices(t *testing.T) { - t.Run("should delete excess services", func(t *testing.T) { - ports := []v1.ServicePort{{ - Port: 80, - Name: "web", - }} - deleteService := service("delete-service-collector", ports) - createObjectIfNotExists(t, "delete-service-collector", &deleteService) - - exists, err := populateObjectIfExists(t, &v1.Service{}, types.NamespacedName{Namespace: "default", Name: "delete-service-collector"}) - assert.NoError(t, err) - assert.True(t, exists) - - desired := desiredService(context.Background(), params()) - err = deleteServices(context.Background(), params(), []v1.Service{*desired}) - assert.NoError(t, err) - - exists, err = populateObjectIfExists(t, &v1.Service{}, types.NamespacedName{Namespace: "default", Name: "delete-service-collector"}) - assert.NoError(t, err) - assert.False(t, exists) - - }) -} - -func TestHeadlessService(t *testing.T) { - t.Run("should return headless service", func(t *testing.T) { - actual := headless(context.Background(), params()) - assert.Equal(t, actual.Spec.ClusterIP, "None") - }) -} - -func TestMonitoringService(t *testing.T) { - t.Run("returned service should expose monitoring port", func(t *testing.T) { - expected := []v1.ServicePort{{ - Name: "monitoring", - Port: 8888, - }} - actual := monitoringService(context.Background(), params()) - assert.Equal(t, expected, actual.Spec.Ports) - - }) -} - -func service(name string, ports []v1.ServicePort) v1.Service { - labels := collector.Labels(params().Instance) - labels["app.kubernetes.io/name"] = name - - selector := labels - return v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "default", - Labels: labels, - Annotations: params().Instance.Annotations, - }, - Spec: v1.ServiceSpec{ - Selector: selector, - ClusterIP: "", - Ports: ports, - }, - } -} diff --git a/pkg/collector/reconcile/suite_test.go b/pkg/collector/reconcile/suite_test.go deleted file mode 100644 index d2469ad44a..0000000000 --- a/pkg/collector/reconcile/suite_test.go +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reconcile - -import ( - "context" - "fmt" - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" - logf "sigs.k8s.io/controller-runtime/pkg/log" - - "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" - "github.com/open-telemetry/opentelemetry-operator/internal/config" -) - -var k8sClient client.Client -var testEnv *envtest.Environment -var testScheme *runtime.Scheme = scheme.Scheme -var logger = logf.Log.WithName("unit-tests") - -var instanceUID = uuid.NewUUID() - -func TestMain(m *testing.M) { - testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, - } - - cfg, err := testEnv.Start() - if err != nil { - fmt.Printf("failed to start testEnv: %v", err) - os.Exit(1) - } - - if err := v1alpha1.AddToScheme(testScheme); err != nil { - fmt.Printf("failed to register scheme: %v", err) - os.Exit(1) - } - // +kubebuilder:scaffold:scheme - - k8sClient, err = client.New(cfg, client.Options{Scheme: testScheme}) - if err != nil { - fmt.Printf("failed to setup a Kubernetes client: %v", err) - os.Exit(1) - } - - code := m.Run() - - err = testEnv.Stop() - if err != nil { - fmt.Printf("failed to stop testEnv: %v", err) - os.Exit(1) - } - - os.Exit(code) -} - -func params() Params { - replicas := int32(2) - return Params{ - Config: config.New(), - Client: k8sClient, - Instance: v1alpha1.OpenTelemetryCollector{ - TypeMeta: metav1.TypeMeta{ - Kind: "opentelemetry.io", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - UID: instanceUID, - }, - Spec: v1alpha1.OpenTelemetryCollectorSpec{ - Ports: []v1.ServicePort{{ - Name: "web", - Port: 80, - TargetPort: intstr.IntOrString{ - Type: intstr.Int, - IntVal: 80, - }, - NodePort: 0, - }}, - Replicas: &replicas, - Config: ` - receivers: - jaeger: - protocols: - grpc: - processors: - - exporters: - logging: - - service: - pipelines: - traces: - receivers: [jaeger] - processors: [] - exporters: [logging] - -`, - }, - }, - Scheme: testScheme, - Log: logger, - Recorder: record.NewFakeRecorder(10), - } -} - -func createObjectIfNotExists(tb testing.TB, name string, object client.Object) { - tb.Helper() - err := k8sClient.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: name}, object) - if errors.IsNotFound(err) { - err := k8sClient.Create(context.Background(), - object) - assert.NoError(tb, err) - } -} - -func populateObjectIfExists(t testing.TB, object client.Object, namespacedName types.NamespacedName) (bool, error) { - t.Helper() - err := k8sClient.Get(context.Background(), namespacedName, object) - if errors.IsNotFound(err) { - return false, nil - } - if err != nil { - return false, err - } - return true, nil - -} diff --git a/pkg/targetallocator/reconcile/configmap.go b/pkg/reconcile/configmap.go similarity index 70% rename from pkg/targetallocator/reconcile/configmap.go rename to pkg/reconcile/configmap.go index b768fdb6ec..85b193b930 100644 --- a/pkg/targetallocator/reconcile/configmap.go +++ b/pkg/reconcile/configmap.go @@ -17,6 +17,7 @@ package reconcile import ( "context" "fmt" + "reflect" "gopkg.in/yaml.v2" corev1 "k8s.io/api/core/v1" @@ -26,18 +27,45 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "github.com/open-telemetry/opentelemetry-operator/pkg/collector" "github.com/open-telemetry/opentelemetry-operator/pkg/naming" "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator" ) // +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete -// ConfigMaps reconciles the config map(s) required for the instance in the current context. -func ConfigMaps(ctx context.Context, params Params) error { +// CollectorConfigMaps reconciles the config map(s) required for the collector instance in the current context. +func CollectorConfigMaps(ctx context.Context, params Params) error { + desired := []corev1.ConfigMap{ + desiredCollectorConfigMap(ctx, params), + } + + // first, handle the create/update parts + if err := expectedConfigMaps(ctx, params, desired, true); err != nil { + return fmt.Errorf("failed to reconcile the expected configmaps: %v", err) + } + + // then, delete the extra objects + opts := []client.ListOption{ + client.InNamespace(params.Instance.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Namespace, params.Instance.Name), + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + if err := deleteConfigMaps(ctx, params, desired, opts); err != nil { + return fmt.Errorf("failed to reconcile the configmaps to be deleted: %v", err) + } + + return nil +} + +// TAConfigMaps reconciles the config map(s) required for the target allocator instance in the current context. +func TAConfigMaps(ctx context.Context, params Params) error { desired := []corev1.ConfigMap{} - if IsAllocatorEnabled(params) { - cm, err := desiredConfigMap(ctx, params) + if params.Instance.Spec.TargetAllocator.Enabled { + cm, err := desiredTAConfigMap(ctx, params) if err != nil { return fmt.Errorf("failed to parse config: %v", err) } @@ -50,14 +78,39 @@ func ConfigMaps(ctx context.Context, params Params) error { } // then, delete the extra objects - if err := deleteConfigMaps(ctx, params, desired); err != nil { + opts := []client.ListOption{ + client.InNamespace(params.Instance.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Name, "targetallocator"), + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + if err := deleteConfigMaps(ctx, params, desired, opts); err != nil { return fmt.Errorf("failed to reconcile the configmaps to be deleted: %v", err) } return nil } -func desiredConfigMap(_ context.Context, params Params) (corev1.ConfigMap, error) { +func desiredCollectorConfigMap(_ context.Context, params Params) corev1.ConfigMap { + name := naming.ConfigMap(params.Instance) + labels := collector.Labels(params.Instance) + labels["app.kubernetes.io/name"] = name + + return corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: params.Instance.Namespace, + Labels: labels, + Annotations: params.Instance.Annotations, + }, + Data: map[string]string{ + "collector.yaml": params.Instance.Spec.Config, + }, + } +} + +func desiredTAConfigMap(_ context.Context, params Params) (corev1.ConfigMap, error) { name := naming.TAConfigMap(params.Instance) labels := targetallocator.Labels(params.Instance) labels["app.kubernetes.io/name"] = name @@ -147,6 +200,9 @@ func expectedConfigMaps(ctx context.Context, params Params, expected []corev1.Co if err := params.Client.Patch(ctx, updated, patch); err != nil { return fmt.Errorf("failed to apply changes: %w", err) } + if configMapChanged(&desired, existing) { + params.Recorder.Event(updated, "Normal", "ConfigUpdate ", fmt.Sprintf("OpenTelemetry Config changed - %s/%s", desired.Namespace, desired.Name)) + } params.Log.V(2).Info("applied", "configmap.name", desired.Name, "configmap.namespace", desired.Namespace) } @@ -154,14 +210,7 @@ func expectedConfigMaps(ctx context.Context, params Params, expected []corev1.Co return nil } -func deleteConfigMaps(ctx context.Context, params Params, expected []corev1.ConfigMap) error { - opts := []client.ListOption{ - client.InNamespace(params.Instance.Namespace), - client.MatchingLabels(map[string]string{ - "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Name, "targetallocator"), - "app.kubernetes.io/managed-by": "opentelemetry-operator", - }), - } +func deleteConfigMaps(ctx context.Context, params Params, expected []corev1.ConfigMap, opts []client.ListOption) error { list := &corev1.ConfigMapList{} if err := params.Client.List(ctx, list, opts...); err != nil { return fmt.Errorf("failed to list: %w", err) @@ -186,3 +235,8 @@ func deleteConfigMaps(ctx context.Context, params Params, expected []corev1.Conf return nil } + +func configMapChanged(desired *corev1.ConfigMap, actual *corev1.ConfigMap) bool { + return !reflect.DeepEqual(desired.Data, actual.Data) + +} diff --git a/pkg/targetallocator/reconcile/configmap_test.go b/pkg/reconcile/configmap_test.go similarity index 53% rename from pkg/targetallocator/reconcile/configmap_test.go rename to pkg/reconcile/configmap_test.go index e15e8c6176..2a95086b3f 100644 --- a/pkg/targetallocator/reconcile/configmap_test.go +++ b/pkg/reconcile/configmap_test.go @@ -24,13 +24,57 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/internal/config" "github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters" ta "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator/adapters" ) -func TestDesiredConfigMap(t *testing.T) { +func TestDesiredCollectorConfigMap(t *testing.T) { + t.Run("should return expected config map", func(t *testing.T) { + expectedLables := map[string]string{ + "app.kubernetes.io/managed-by": "opentelemetry-operator", + "app.kubernetes.io/instance": "default.test", + "app.kubernetes.io/part-of": "opentelemetry", + "app.kubernetes.io/component": "opentelemetry-collector", + "app.kubernetes.io/name": "test-collector", + } + + expectedData := map[string]string{ + "collector.yaml": ` + receivers: + jaeger: + protocols: + grpc: + processors: + + exporters: + logging: + + service: + pipelines: + traces: + receivers: [jaeger] + processors: [] + exporters: [logging] + +`, + } + + actual := desiredCollectorConfigMap(context.Background(), paramsCollector()) + + assert.Equal(t, "test-collector", actual.Name) + assert.Equal(t, expectedLables, actual.Labels) + assert.Equal(t, expectedData, actual.Data) + + }) + +} + +func TestDesiredTAConfigMap(t *testing.T) { t.Run("should return expected config map", func(t *testing.T) { expectedLables := map[string]string{ "app.kubernetes.io/managed-by": "opentelemetry-operator", @@ -55,7 +99,7 @@ label_selector: `, } - actual, err := desiredConfigMap(context.Background(), params()) + actual, err := desiredTAConfigMap(context.Background(), paramsTA()) assert.NoError(t, err) assert.Equal(t, "test-targetallocator", actual.Name) @@ -66,10 +110,89 @@ label_selector: } -func TestExpectedConfigMap(t *testing.T) { - param := params() +func TestExpectedCollectorConfigMap(t *testing.T) { + param := paramsCollector() + t.Run("should create config map", func(t *testing.T) { + err := expectedConfigMaps(context.Background(), param, []v1.ConfigMap{desiredCollectorConfigMap(context.Background(), param)}, true) + assert.NoError(t, err) + + exists, err := populateObjectIfExists(t, &v1.ConfigMap{}, types.NamespacedName{Namespace: "default", Name: "test-collector"}) + + assert.NoError(t, err) + assert.True(t, exists) + }) + + t.Run("should update config map", func(t *testing.T) { + + param := Params{ + Config: config.New(), + Client: k8sClient, + Instance: v1alpha1.OpenTelemetryCollector{ + TypeMeta: metav1.TypeMeta{ + Kind: "opentelemetry.io", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + UID: instanceUID, + }, + }, + Scheme: testScheme, + Log: logger, + Recorder: record.NewFakeRecorder(10), + } + cm := desiredCollectorConfigMap(context.Background(), param) + createObjectIfNotExists(t, "test-collector", &cm) + + err := expectedConfigMaps(context.Background(), param, []v1.ConfigMap{desiredCollectorConfigMap(context.Background(), param)}, true) + assert.NoError(t, err) + + actual := v1.ConfigMap{} + exists, err := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collector"}) + + assert.NoError(t, err) + assert.True(t, exists) + assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID) + assert.Equal(t, param.Instance.Spec.Config, actual.Data["collector.yaml"]) + }) + + t.Run("should delete config map", func(t *testing.T) { + + deletecm := v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-delete-collector", + Namespace: "default", + Labels: map[string]string{ + "app.kubernetes.io/instance": "default.test", + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }, + }, + } + createObjectIfNotExists(t, "test-delete-collector", &deletecm) + + exists, _ := populateObjectIfExists(t, &v1.ConfigMap{}, types.NamespacedName{Namespace: "default", Name: "test-delete-collector"}) + assert.True(t, exists) + + opts := []client.ListOption{ + client.InNamespace("default"), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": "default.test", + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + err := deleteConfigMaps(context.Background(), param, []v1.ConfigMap{desiredCollectorConfigMap(context.Background(), param)}, opts) + assert.NoError(t, err) + + exists, _ = populateObjectIfExists(t, &v1.ConfigMap{}, types.NamespacedName{Namespace: "default", Name: "test-delete-collector"}) + assert.False(t, exists) + }) +} + +func TestExpectedTAConfigMap(t *testing.T) { + param := paramsTA() t.Run("should create config map", func(t *testing.T) { - configMap, err := desiredConfigMap(context.Background(), param) + configMap, err := desiredTAConfigMap(context.Background(), param) assert.NoError(t, err) err = expectedConfigMaps(context.Background(), param, []v1.ConfigMap{configMap}, true) assert.NoError(t, err) @@ -114,11 +237,11 @@ func TestExpectedConfigMap(t *testing.T) { Scheme: testScheme, Log: logger, } - cm, err := desiredConfigMap(context.Background(), newParam) + cm, err := desiredTAConfigMap(context.Background(), newParam) assert.EqualError(t, err, "no receivers available as part of the configuration") createObjectIfNotExists(t, "test-targetallocator", &cm) - configMap, err := desiredConfigMap(context.Background(), param) + configMap, err := desiredTAConfigMap(context.Background(), param) assert.NoError(t, err) err = expectedConfigMaps(context.Background(), param, []v1.ConfigMap{configMap}, true) assert.NoError(t, err) @@ -164,9 +287,16 @@ func TestExpectedConfigMap(t *testing.T) { exists, _ := populateObjectIfExists(t, &v1.ConfigMap{}, types.NamespacedName{Namespace: "default", Name: "test-delete-targetallocator"}) assert.True(t, exists) - configMap, err := desiredConfigMap(context.Background(), param) + configMap, err := desiredTAConfigMap(context.Background(), param) assert.NoError(t, err) - err = deleteConfigMaps(context.Background(), param, []v1.ConfigMap{configMap}) + opts := []client.ListOption{ + client.InNamespace("default"), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": "test.targetallocator", + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + err = deleteConfigMaps(context.Background(), param, []v1.ConfigMap{configMap}, opts) assert.NoError(t, err) exists, _ = populateObjectIfExists(t, &v1.ConfigMap{}, types.NamespacedName{Namespace: "default", Name: "test-delete-targetallocator"}) diff --git a/pkg/collector/reconcile/daemonset.go b/pkg/reconcile/daemonset.go similarity index 100% rename from pkg/collector/reconcile/daemonset.go rename to pkg/reconcile/daemonset.go diff --git a/pkg/collector/reconcile/daemonset_test.go b/pkg/reconcile/daemonset_test.go similarity index 99% rename from pkg/collector/reconcile/daemonset_test.go rename to pkg/reconcile/daemonset_test.go index 8a4751e8b6..24e8e99759 100644 --- a/pkg/collector/reconcile/daemonset_test.go +++ b/pkg/reconcile/daemonset_test.go @@ -28,7 +28,7 @@ import ( ) func TestExpectedDaemonsets(t *testing.T) { - param := params() + param := paramsCollector() expectedDs := collector.DaemonSet(param.Config, logger, param.Instance) t.Run("should create Daemonset", func(t *testing.T) { diff --git a/pkg/targetallocator/reconcile/deployment.go b/pkg/reconcile/deployment.go similarity index 69% rename from pkg/targetallocator/reconcile/deployment.go rename to pkg/reconcile/deployment.go index a0161645f9..751b58b459 100644 --- a/pkg/targetallocator/reconcile/deployment.go +++ b/pkg/reconcile/deployment.go @@ -17,7 +17,6 @@ package reconcile import ( "context" "fmt" - "reflect" appsv1 "k8s.io/api/apps/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -25,16 +24,44 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "github.com/open-telemetry/opentelemetry-operator/pkg/collector" "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator" ) // +kubebuilder:rbac:groups="apps",resources=deployments,verbs=get;list;watch;create;update;patch;delete -// Deployments reconciles the deployment(s) required for the instance in the current context. -func Deployments(ctx context.Context, params Params) error { +// CollectorDeployments reconciles the deployment(s) required for the instance in the current context. +func CollectorDeployments(ctx context.Context, params Params) error { + opts := []client.ListOption{ + client.InNamespace(params.Instance.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Namespace, params.Instance.Name), + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + desired := []appsv1.Deployment{} + if params.Instance.Spec.Mode == "deployment" { + desired = append(desired, collector.Deployment(params.Config, params.Log, params.Instance)) + } + + // first, handle the create/update parts + if err := expectedDeployments(ctx, params, desired, false); err != nil { + return fmt.Errorf("failed to reconcile the expected deployments: %v", err) + } + + // then, delete the extra objects + if err := deleteDeployments(ctx, params, desired, opts); err != nil { + return fmt.Errorf("failed to reconcile the deployments to be deleted: %v", err) + } + + return nil +} + +// TADeployments reconciles the deployment(s) required for the instance in the current context. +func TADeployments(ctx context.Context, params Params) error { desired := []appsv1.Deployment{} - if IsAllocatorEnabled(params) { + if params.Instance.Spec.TargetAllocator.Enabled { _, err := GetPromConfig(params) if err != nil { return fmt.Errorf("failed to parse Prometheus config: %v", err) @@ -43,19 +70,26 @@ func Deployments(ctx context.Context, params Params) error { } // first, handle the create/update parts - if err := expectedDeployments(ctx, params, desired); err != nil { + if err := expectedDeployments(ctx, params, desired, true); err != nil { return fmt.Errorf("failed to reconcile the expected deployments: %v", err) } // then, delete the extra objects - if err := deleteDeployments(ctx, params, desired); err != nil { + opts := []client.ListOption{ + client.InNamespace(params.Instance.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Name, "targetallocator"), + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + if err := deleteDeployments(ctx, params, desired, opts); err != nil { return fmt.Errorf("failed to reconcile the deployments to be deleted: %v", err) } return nil } -func expectedDeployments(ctx context.Context, params Params, expected []appsv1.Deployment) error { +func expectedDeployments(ctx context.Context, params Params, expected []appsv1.Deployment, isTargetAllocator bool) error { for _, obj := range expected { desired := obj @@ -74,8 +108,6 @@ func expectedDeployments(ctx context.Context, params Params, expected []appsv1.D continue } else if err != nil { return fmt.Errorf("failed to get: %w", err) - } else if !deploymentImageChanged(&desired, existing) { - continue } // it exists already, merge the two if the end result isn't identical to the existing one @@ -84,7 +116,11 @@ func expectedDeployments(ctx context.Context, params Params, expected []appsv1.D updated.Labels = map[string]string{} } - updated.Spec = desired.Spec + if isTargetAllocator { + updated.Spec.Template.Spec.Containers[0].Image = desired.Spec.Template.Spec.Containers[0].Image + } else { + updated.Spec = desired.Spec + } updated.ObjectMeta.OwnerReferences = desired.ObjectMeta.OwnerReferences for k, v := range desired.ObjectMeta.Labels { @@ -103,14 +139,7 @@ func expectedDeployments(ctx context.Context, params Params, expected []appsv1.D return nil } -func deleteDeployments(ctx context.Context, params Params, expected []appsv1.Deployment) error { - opts := []client.ListOption{ - client.InNamespace(params.Instance.Namespace), - client.MatchingLabels(map[string]string{ - "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Name, "targetallocator"), - "app.kubernetes.io/managed-by": "opentelemetry-operator", - }), - } +func deleteDeployments(ctx context.Context, params Params, expected []appsv1.Deployment, opts []client.ListOption) error { list := &appsv1.DeploymentList{} if err := params.Client.List(ctx, list, opts...); err != nil { return fmt.Errorf("failed to list: %w", err) @@ -135,7 +164,3 @@ func deleteDeployments(ctx context.Context, params Params, expected []appsv1.Dep return nil } - -func deploymentImageChanged(desired *appsv1.Deployment, actual *appsv1.Deployment) bool { - return !reflect.DeepEqual(desired.Spec.Template.Spec.Containers[0].Image, actual.Spec.Template.Spec.Containers[0].Image) -} diff --git a/pkg/targetallocator/reconcile/deployment_test.go b/pkg/reconcile/deployment_test.go similarity index 58% rename from pkg/targetallocator/reconcile/deployment_test.go rename to pkg/reconcile/deployment_test.go index 135a7759a3..f3b1eeea81 100644 --- a/pkg/targetallocator/reconcile/deployment_test.go +++ b/pkg/reconcile/deployment_test.go @@ -23,77 +23,142 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/pkg/collector" "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator" ) -func TestExpectedDeployments(t *testing.T) { - param := params() - expectedDeploy := targetallocator.Deployment(param.Config, logger, param.Instance) +func TestExpectedCollectorDeployments(t *testing.T) { + param := paramsCollector() + expectedDeploy := collector.Deployment(param.Config, logger, param.Instance) t.Run("should create deployment", func(t *testing.T) { - err := expectedDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}) + err := expectedDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}, false) assert.NoError(t, err) - exists, err := populateObjectIfExists(t, &v1.Deployment{}, types.NamespacedName{Namespace: "default", Name: "test-targetallocator"}) + exists, err := populateObjectIfExists(t, &v1.Deployment{}, types.NamespacedName{Namespace: "default", Name: "test-collector"}) assert.NoError(t, err) assert.True(t, exists) }) + t.Run("should update deployment", func(t *testing.T) { + createObjectIfNotExists(t, "test-collector", &expectedDeploy) + err := expectedDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}, false) + assert.NoError(t, err) - t.Run("should not create deployment when otel collector mode is not StatefulSet", func(t *testing.T) { - modes := []v1alpha1.Mode{v1alpha1.ModeDaemonSet, v1alpha1.ModeDeployment, v1alpha1.ModeSidecar} - - for _, mode := range modes { - newParam := Params{ - Client: k8sClient, - Instance: v1alpha1.OpenTelemetryCollector{ - TypeMeta: metav1.TypeMeta{ - Kind: "opentelemetry.io", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - UID: instanceUID, - }, - Spec: v1alpha1.OpenTelemetryCollectorSpec{ - Mode: mode, - TargetAllocator: v1alpha1.OpenTelemetryTargetAllocator{ - Enabled: true, - }, - Config: ` - receivers: - jaeger: - protocols: - grpc: - processors: - - exporters: - logging: - - service: - pipelines: - traces: - receivers: [jaeger] - processors: [] - exporters: [logging] - - `, - }, + actual := v1.Deployment{} + exists, err := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collector"}) + + assert.NoError(t, err) + assert.True(t, exists) + assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID) + assert.Equal(t, int32(2), *actual.Spec.Replicas) + }) + + t.Run("should delete deployment", func(t *testing.T) { + labels := map[string]string{ + "app.kubernetes.io/instance": "default.test", + "app.kubernetes.io/managed-by": "opentelemetry-operator", + } + deploy := v1.Deployment{} + deploy.Name = "dummy" + deploy.Namespace = "default" + deploy.Labels = labels + deploy.Spec = v1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "dummy", + Image: "busybox", + }}, + }, + }, + } + createObjectIfNotExists(t, "dummy", &deploy) + + opts := []client.ListOption{ + client.InNamespace("default"), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": "default.test", + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + err := deleteDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}, opts) + assert.NoError(t, err) + + actual := v1.Deployment{} + exists, _ := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "dummy"}) + + assert.False(t, exists) + + }) + + t.Run("should not delete deployment", func(t *testing.T) { + labels := map[string]string{ + "app.kubernetes.io/instance": "default.test", + "app.kubernetes.io/managed-by": "helm-opentelemetry-operator", + } + deploy := v1.Deployment{} + deploy.Name = "dummy" + deploy.Namespace = "default" + deploy.Spec = v1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, }, - Scheme: testScheme, - Log: logger, - } - expected := []v1.Deployment{} - if newParam.Instance.Spec.Mode == v1alpha1.ModeStatefulSet { - expected = append(expected, targetallocator.Deployment(newParam.Config, newParam.Log, newParam.Instance)) - } - - assert.Len(t, expected, 0) + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "dummy", + Image: "busybox", + }}, + }, + }, } + createObjectIfNotExists(t, "dummy", &deploy) + + opts := []client.ListOption{ + client.InNamespace("default"), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": "default.test", + "app.kubernetes.io/managed-by": "helm-opentelemetry-operator", + }), + } + err := deleteDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}, opts) + assert.NoError(t, err) + + actual := v1.Deployment{} + exists, _ := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "dummy"}) + + assert.True(t, exists) + + }) +} + +func TestExpectedTADeployments(t *testing.T) { + param := paramsTA() + expectedDeploy := targetallocator.Deployment(param.Config, logger, param.Instance) + + t.Run("should create deployment", func(t *testing.T) { + err := expectedDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}, true) + assert.NoError(t, err) + + exists, err := populateObjectIfExists(t, &v1.Deployment{}, types.NamespacedName{Namespace: "default", Name: "test-targetallocator"}) + + assert.NoError(t, err) + assert.True(t, exists) + }) t.Run("should not create deployment when targetallocator is not enabled", func(t *testing.T) { @@ -149,7 +214,7 @@ func TestExpectedDeployments(t *testing.T) { updatedDeploy := targetallocator.Deployment(newParams().Config, logger, param.Instance) - err := expectedDeployments(ctx, param, []v1.Deployment{updatedDeploy}) + err := expectedDeployments(ctx, param, []v1.Deployment{updatedDeploy}, true) assert.NoError(t, err) actual := v1.Deployment{} @@ -170,7 +235,7 @@ func TestExpectedDeployments(t *testing.T) { updatedParam := newParams("test/test-img") updatedDeploy := targetallocator.Deployment(updatedParam.Config, logger, updatedParam.Instance) - err := expectedDeployments(ctx, param, []v1.Deployment{updatedDeploy}) + err := expectedDeployments(ctx, param, []v1.Deployment{updatedDeploy}, true) assert.NoError(t, err) actual := v1.Deployment{} @@ -189,7 +254,7 @@ func TestExpectedDeployments(t *testing.T) { "app.kubernetes.io/managed-by": "opentelemetry-operator", } deploy := v1.Deployment{} - deploy.Name = "dummy" + deploy.Name = "dummy-ta" deploy.Namespace = "default" deploy.Labels = labels deploy.Spec = v1.DeploymentSpec{ @@ -202,19 +267,26 @@ func TestExpectedDeployments(t *testing.T) { }, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: "dummy", + Name: "dummy-ta", Image: "busybox", }}, }, }, } - createObjectIfNotExists(t, "dummy", &deploy) - - err := deleteDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}) + createObjectIfNotExists(t, "dummy-ta", &deploy) + + opts := []client.ListOption{ + client.InNamespace("default"), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": "test.targetallocator", + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + err := deleteDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}, opts) assert.NoError(t, err) actual := v1.Deployment{} - exists, _ := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "dummy"}) + exists, _ := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "dummy-ta"}) assert.False(t, exists) @@ -226,7 +298,7 @@ func TestExpectedDeployments(t *testing.T) { "app.kubernetes.io/managed-by": "helm-opentelemetry-operator", } deploy := v1.Deployment{} - deploy.Name = "dummy" + deploy.Name = "dummy-ta" deploy.Namespace = "default" deploy.Spec = v1.DeploymentSpec{ Selector: &metav1.LabelSelector{ @@ -238,19 +310,26 @@ func TestExpectedDeployments(t *testing.T) { }, Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: "dummy", + Name: "dummy-ta", Image: "busybox", }}, }, }, } - createObjectIfNotExists(t, "dummy", &deploy) - - err := deleteDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}) + createObjectIfNotExists(t, "dummy-ta", &deploy) + + opts := []client.ListOption{ + client.InNamespace("default"), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": "test.targetallocator", + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + err := deleteDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}, opts) assert.NoError(t, err) actual := v1.Deployment{} - exists, _ := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "dummy"}) + exists, _ := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "dummy-ta"}) assert.True(t, exists) diff --git a/pkg/targetallocator/reconcile/helper.go b/pkg/reconcile/helper.go similarity index 81% rename from pkg/targetallocator/reconcile/helper.go rename to pkg/reconcile/helper.go index 7094031056..06ea1db6a4 100644 --- a/pkg/targetallocator/reconcile/helper.go +++ b/pkg/reconcile/helper.go @@ -15,15 +15,10 @@ package reconcile import ( - "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" "github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters" ta "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator/adapters" ) -func IsAllocatorEnabled(params Params) bool { - return params.Instance.Spec.Mode == v1alpha1.ModeStatefulSet && params.Instance.Spec.TargetAllocator.Enabled -} - func GetPromConfig(params Params) (map[interface{}]interface{}, error) { config, err := adapters.ConfigFromString(params.Instance.Spec.Config) if err != nil { diff --git a/pkg/collector/reconcile/opentelemetry.go b/pkg/reconcile/opentelemetry.go similarity index 100% rename from pkg/collector/reconcile/opentelemetry.go rename to pkg/reconcile/opentelemetry.go diff --git a/pkg/collector/reconcile/opentelemetry_test.go b/pkg/reconcile/opentelemetry_test.go similarity index 92% rename from pkg/collector/reconcile/opentelemetry_test.go rename to pkg/reconcile/opentelemetry_test.go index 50c08d78e1..f608623c58 100644 --- a/pkg/collector/reconcile/opentelemetry_test.go +++ b/pkg/reconcile/opentelemetry_test.go @@ -26,9 +26,9 @@ import ( func TestSelf(t *testing.T) { t.Run("should add version to the status", func(t *testing.T) { - instance := params().Instance + instance := paramsCollector().Instance createObjectIfNotExists(t, "test", &instance) - err := Self(context.Background(), params()) + err := Self(context.Background(), paramsCollector()) assert.NoError(t, err) actual := v1alpha1.OpenTelemetryCollector{} diff --git a/pkg/collector/reconcile/params.go b/pkg/reconcile/params.go similarity index 100% rename from pkg/collector/reconcile/params.go rename to pkg/reconcile/params.go diff --git a/pkg/collector/reconcile/service.go b/pkg/reconcile/service.go similarity index 78% rename from pkg/collector/reconcile/service.go rename to pkg/reconcile/service.go index 4f5f09813f..8894e21d9c 100644 --- a/pkg/collector/reconcile/service.go +++ b/pkg/reconcile/service.go @@ -23,6 +23,7 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -30,16 +31,17 @@ import ( "github.com/open-telemetry/opentelemetry-operator/pkg/collector" "github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters" "github.com/open-telemetry/opentelemetry-operator/pkg/naming" + "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator" ) // +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete -// Services reconciles the service(s) required for the instance in the current context. -func Services(ctx context.Context, params Params) error { +// CollectorServices reconciles the service(s) required for the instance in the current context. +func CollectorServices(ctx context.Context, params Params) error { desired := []corev1.Service{} if params.Instance.Spec.Mode != v1alpha1.ModeSidecar { type builder func(context.Context, Params) *corev1.Service - for _, builder := range []builder{desiredService, headless, monitoringService} { + for _, builder := range []builder{desiredCollectorService, headless, monitoringService} { svc := builder(ctx, params) // add only the non-nil to the list if svc != nil { @@ -54,14 +56,53 @@ func Services(ctx context.Context, params Params) error { } // then, delete the extra objects - if err := deleteServices(ctx, params, desired); err != nil { + opts := []client.ListOption{ + client.InNamespace(params.Instance.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Namespace, params.Instance.Name), + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + if err := deleteServices(ctx, params, desired, opts); err != nil { + return fmt.Errorf("failed to reconcile the services to be deleted: %v", err) + } + + return nil +} + +// TAServices reconciles the service(s) required for the instance in the current context. +func TAServices(ctx context.Context, params Params) error { + desired := []corev1.Service{} + + if params.Instance.Spec.TargetAllocator.Enabled { + _, err := GetPromConfig(params) + if err != nil { + return fmt.Errorf("failed to parse Prometheus config: %v", err) + } + desired = append(desired, desiredTAService(params)) + } + + // first, handle the create/update parts + if err := expectedServices(ctx, params, desired); err != nil { + return fmt.Errorf("failed to reconcile the expected services: %v", err) + } + + // then, delete the extra objects + opts := []client.ListOption{ + client.InNamespace(params.Instance.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Name, "targetallocator"), + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + if err := deleteServices(ctx, params, desired, opts); err != nil { return fmt.Errorf("failed to reconcile the services to be deleted: %v", err) } return nil } -func desiredService(ctx context.Context, params Params) *corev1.Service { +func desiredCollectorService(ctx context.Context, params Params) *corev1.Service { labels := collector.Labels(params.Instance) labels["app.kubernetes.io/name"] = naming.Service(params.Instance) @@ -121,8 +162,32 @@ func desiredService(ctx context.Context, params Params) *corev1.Service { } } +func desiredTAService(params Params) corev1.Service { + labels := targetallocator.Labels(params.Instance) + labels["app.kubernetes.io/name"] = naming.TAService(params.Instance) + + selector := targetallocator.Labels(params.Instance) + selector["app.kubernetes.io/name"] = naming.TargetAllocator(params.Instance) + + return corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: naming.TAService(params.Instance), + Namespace: params.Instance.Namespace, + Labels: labels, + }, + Spec: corev1.ServiceSpec{ + Selector: selector, + Ports: []corev1.ServicePort{{ + Name: "targetallocation", + Port: 443, + TargetPort: intstr.FromInt(443), + }}, + }, + } +} + func headless(ctx context.Context, params Params) *corev1.Service { - h := desiredService(ctx, params) + h := desiredCollectorService(ctx, params) if h == nil { return nil } @@ -208,14 +273,7 @@ func expectedServices(ctx context.Context, params Params, expected []corev1.Serv return nil } -func deleteServices(ctx context.Context, params Params, expected []corev1.Service) error { - opts := []client.ListOption{ - client.InNamespace(params.Instance.Namespace), - client.MatchingLabels(map[string]string{ - "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Namespace, params.Instance.Name), - "app.kubernetes.io/managed-by": "opentelemetry-operator", - }), - } +func deleteServices(ctx context.Context, params Params, expected []corev1.Service, opts []client.ListOption) error { list := &corev1.ServiceList{} if err := params.Client.List(ctx, list, opts...); err != nil { return fmt.Errorf("failed to list: %w", err) diff --git a/pkg/reconcile/service_test.go b/pkg/reconcile/service_test.go new file mode 100644 index 0000000000..edeec9e527 --- /dev/null +++ b/pkg/reconcile/service_test.go @@ -0,0 +1,320 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reconcile + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/internal/config" + "github.com/open-telemetry/opentelemetry-operator/pkg/collector" + "github.com/open-telemetry/opentelemetry-operator/pkg/naming" + "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator" +) + +func TestExtractPortNumbersAndNames(t *testing.T) { + t.Run("should return extracted port names and numbers", func(t *testing.T) { + ports := []corev1.ServicePort{{Name: "web", Port: 8080}, {Name: "tcp", Port: 9200}} + expectedPortNames := map[string]bool{"web": true, "tcp": true} + expectedPortNumbers := map[int32]bool{8080: true, 9200: true} + + actualPortNumbers, actualPortNames := extractPortNumbersAndNames(ports) + assert.Equal(t, expectedPortNames, actualPortNames) + assert.Equal(t, expectedPortNumbers, actualPortNumbers) + + }) +} + +func TestFilterPort(t *testing.T) { + + tests := []struct { + name string + candidate corev1.ServicePort + portNumbers map[int32]bool + portNames map[string]bool + expected corev1.ServicePort + }{ + { + name: "should filter out duplicate port", + candidate: corev1.ServicePort{Name: "web", Port: 8080}, + portNumbers: map[int32]bool{8080: true, 9200: true}, + portNames: map[string]bool{"test": true, "metrics": true}, + }, + + { + name: "should not filter unique port", + candidate: corev1.ServicePort{Name: "web", Port: 8090}, + portNumbers: map[int32]bool{8080: true, 9200: true}, + portNames: map[string]bool{"test": true, "metrics": true}, + expected: corev1.ServicePort{Name: "web", Port: 8090}, + }, + + { + name: "should change the duplicate portName", + candidate: corev1.ServicePort{Name: "web", Port: 8090}, + portNumbers: map[int32]bool{8080: true, 9200: true}, + portNames: map[string]bool{"web": true, "metrics": true}, + expected: corev1.ServicePort{Name: "port-8090", Port: 8090}, + }, + + { + name: "should return nil if fallback name clashes with existing portName", + candidate: corev1.ServicePort{Name: "web", Port: 8090}, + portNumbers: map[int32]bool{8080: true, 9200: true}, + portNames: map[string]bool{"web": true, "port-8090": true}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual := filterPort(logger, test.candidate, test.portNumbers, test.portNames) + if test.expected != (corev1.ServicePort{}) { + assert.Equal(t, test.expected, *actual) + return + } + assert.Nil(t, actual) + + }) + + } +} + +func TestDesiredCollectorService(t *testing.T) { + t.Run("should return nil service for unknown receiver and protocol", func(t *testing.T) { + params := Params{ + Config: config.Config{}, + Client: k8sClient, + Log: logger, + Instance: v1alpha1.OpenTelemetryCollector{ + Spec: v1alpha1.OpenTelemetryCollectorSpec{Config: `receivers: + test: + protocols: + unknown:`}, + }, + } + + actual := desiredCollectorService(context.Background(), params) + assert.Nil(t, actual) + + }) + t.Run("should return service with port mentioned in Instance.Spec.Ports and inferred ports", func(t *testing.T) { + + jaegerPorts := corev1.ServicePort{ + Name: "jaeger-grpc", + Protocol: "TCP", + Port: 14250, + } + ports := append(paramsCollector().Instance.Spec.Ports, jaegerPorts) + expected := serviceCollector("test-collector", ports) + actual := desiredCollectorService(context.Background(), paramsCollector()) + + assert.Equal(t, expected, *actual) + + }) + +} + +func TestExpectedCollectorServices(t *testing.T) { + t.Run("should create the service", func(t *testing.T) { + err := expectedServices(context.Background(), paramsCollector(), []corev1.Service{serviceCollector("test-collector", paramsCollector().Instance.Spec.Ports)}) + assert.NoError(t, err) + + exists, err := populateObjectIfExists(t, &corev1.Service{}, types.NamespacedName{Namespace: "default", Name: "test-collector"}) + + assert.NoError(t, err) + assert.True(t, exists) + + }) + t.Run("should update service", func(t *testing.T) { + serviceInstance := serviceCollector("test-collector", paramsCollector().Instance.Spec.Ports) + createObjectIfNotExists(t, "test-collector", &serviceInstance) + + extraPorts := corev1.ServicePort{ + Name: "port-web", + Protocol: "TCP", + Port: 8080, + TargetPort: intstr.FromInt(8080), + } + + ports := append(paramsCollector().Instance.Spec.Ports, extraPorts) + err := expectedServices(context.Background(), paramsCollector(), []corev1.Service{serviceCollector("test-collector", ports)}) + assert.NoError(t, err) + + actual := corev1.Service{} + exists, err := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collector"}) + + assert.NoError(t, err) + assert.True(t, exists) + assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID) + assert.Contains(t, actual.Spec.Ports, extraPorts) + + }) +} + +func TestDeleteCollectorServices(t *testing.T) { + t.Run("should delete excess services", func(t *testing.T) { + ports := []corev1.ServicePort{{ + Port: 80, + Name: "web", + }} + deleteService := serviceCollector("delete-service-collector", ports) + createObjectIfNotExists(t, "delete-service-collector", &deleteService) + + exists, err := populateObjectIfExists(t, &corev1.Service{}, types.NamespacedName{Namespace: "default", Name: "delete-service-collector"}) + assert.NoError(t, err) + assert.True(t, exists) + + desired := desiredCollectorService(context.Background(), paramsCollector()) + opts := []client.ListOption{ + client.InNamespace(paramsTA().Instance.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", paramsCollector().Instance.Namespace, paramsCollector().Instance.Name), + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + err = deleteServices(context.Background(), paramsCollector(), []corev1.Service{*desired}, opts) + assert.NoError(t, err) + + exists, err = populateObjectIfExists(t, &corev1.Service{}, types.NamespacedName{Namespace: "default", Name: "delete-service-collector"}) + assert.NoError(t, err) + assert.False(t, exists) + + }) +} + +func TestHeadlessService(t *testing.T) { + t.Run("should return headless service", func(t *testing.T) { + actual := headless(context.Background(), paramsCollector()) + assert.Equal(t, actual.Spec.ClusterIP, "None") + }) +} + +func TestMonitoringService(t *testing.T) { + t.Run("returned service should expose monitoring port", func(t *testing.T) { + expected := []corev1.ServicePort{{ + Name: "monitoring", + Port: 8888, + }} + actual := monitoringService(context.Background(), paramsCollector()) + assert.Equal(t, expected, actual.Spec.Ports) + + }) +} + +func TestDesiredTAService(t *testing.T) { + t.Run("should return service with default port", func(t *testing.T) { + expected := serviceTA("test-targetallocator") + actual := desiredTAService(paramsTA()) + + assert.Equal(t, expected, actual) + }) + +} + +func TestExpectedTAServices(t *testing.T) { + t.Run("should create the service", func(t *testing.T) { + err := expectedServices(context.Background(), paramsTA(), []corev1.Service{serviceTA("targetallocator")}) + assert.NoError(t, err) + + exists, err := populateObjectIfExists(t, &corev1.Service{}, types.NamespacedName{Namespace: "default", Name: "targetallocator"}) + + assert.NoError(t, err) + assert.True(t, exists) + + }) +} + +func TestDeleteTAServices(t *testing.T) { + t.Run("should delete excess services", func(t *testing.T) { + deleteService := serviceTA("test-delete-targetallocator", 8888) + createObjectIfNotExists(t, "test-delete-targetallocator", &deleteService) + + exists, err := populateObjectIfExists(t, &corev1.Service{}, types.NamespacedName{Namespace: "default", Name: "test-delete-targetallocator"}) + assert.NoError(t, err) + assert.True(t, exists) + + opts := []client.ListOption{ + client.InNamespace(paramsTA().Instance.Namespace), + client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", paramsTA().Instance.Name, "targetallocator"), + "app.kubernetes.io/managed-by": "opentelemetry-operator", + }), + } + err = deleteServices(context.Background(), paramsTA(), []corev1.Service{desiredTAService(paramsTA())}, opts) + assert.NoError(t, err) + + exists, err = populateObjectIfExists(t, &corev1.Service{}, types.NamespacedName{Namespace: "default", Name: "test-delete-targetallocator"}) + assert.NoError(t, err) + assert.False(t, exists) + + }) +} + +func serviceCollector(name string, ports []corev1.ServicePort) corev1.Service { + labels := collector.Labels(paramsCollector().Instance) + labels["app.kubernetes.io/name"] = name + + selector := labels + return corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + Labels: labels, + Annotations: paramsCollector().Instance.Annotations, + }, + Spec: corev1.ServiceSpec{ + Selector: selector, + ClusterIP: "", + Ports: ports, + }, + } +} + +func serviceTA(name string, portOpt ...int32) corev1.Service { + port := int32(443) + if len(portOpt) > 0 { + port = portOpt[0] + } + params := paramsTA() + labels := targetallocator.Labels(params.Instance) + labels["app.kubernetes.io/name"] = naming.TAService(params.Instance) + + selector := targetallocator.Labels(params.Instance) + selector["app.kubernetes.io/name"] = naming.TargetAllocator(params.Instance) + + return corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: params.Instance.Namespace, + Labels: labels, + }, + Spec: corev1.ServiceSpec{ + Selector: selector, + Ports: []corev1.ServicePort{{ + Name: "targetallocation", + Port: port, + TargetPort: intstr.FromInt(443), + }}, + }, + } +} diff --git a/pkg/collector/reconcile/serviceaccount.go b/pkg/reconcile/serviceaccount.go similarity index 100% rename from pkg/collector/reconcile/serviceaccount.go rename to pkg/reconcile/serviceaccount.go diff --git a/pkg/collector/reconcile/serviceaccount_test.go b/pkg/reconcile/serviceaccount_test.go similarity index 85% rename from pkg/collector/reconcile/serviceaccount_test.go rename to pkg/reconcile/serviceaccount_test.go index 2886fae935..be995751a4 100644 --- a/pkg/collector/reconcile/serviceaccount_test.go +++ b/pkg/reconcile/serviceaccount_test.go @@ -28,8 +28,8 @@ import ( func TestExpectedServiceAccounts(t *testing.T) { t.Run("should create service account", func(t *testing.T) { - desired := collector.ServiceAccount(params().Instance) - err := expectedServiceAccounts(context.Background(), params(), []v1.ServiceAccount{desired}) + desired := collector.ServiceAccount(paramsCollector().Instance) + err := expectedServiceAccounts(context.Background(), paramsCollector(), []v1.ServiceAccount{desired}) assert.NoError(t, err) exists, err := populateObjectIfExists(t, &v1.ServiceAccount{}, types.NamespacedName{Namespace: "default", Name: "test-collector"}) @@ -50,7 +50,7 @@ func TestExpectedServiceAccounts(t *testing.T) { assert.NoError(t, err) assert.True(t, exists) - err = expectedServiceAccounts(context.Background(), params(), []v1.ServiceAccount{collector.ServiceAccount(params().Instance)}) + err = expectedServiceAccounts(context.Background(), paramsCollector(), []v1.ServiceAccount{collector.ServiceAccount(paramsCollector().Instance)}) assert.NoError(t, err) actual := v1.ServiceAccount{} @@ -77,7 +77,7 @@ func TestDeleteServiceAccounts(t *testing.T) { assert.NoError(t, err) assert.True(t, exists) - err = deleteServiceAccounts(context.Background(), params(), []v1.ServiceAccount{collector.ServiceAccount(params().Instance)}) + err = deleteServiceAccounts(context.Background(), paramsCollector(), []v1.ServiceAccount{collector.ServiceAccount(paramsCollector().Instance)}) assert.NoError(t, err) exists, err = populateObjectIfExists(t, &v1.ServiceAccount{}, types.NamespacedName{Namespace: "default", Name: "test-delete-collector"}) @@ -101,7 +101,7 @@ func TestDeleteServiceAccounts(t *testing.T) { assert.NoError(t, err) assert.True(t, exists) - err = deleteServiceAccounts(context.Background(), params(), []v1.ServiceAccount{collector.ServiceAccount(params().Instance)}) + err = deleteServiceAccounts(context.Background(), paramsCollector(), []v1.ServiceAccount{collector.ServiceAccount(paramsCollector().Instance)}) assert.NoError(t, err) exists, err = populateObjectIfExists(t, &v1.ServiceAccount{}, types.NamespacedName{Namespace: "default", Name: "test-delete-collector"}) diff --git a/pkg/collector/reconcile/statefulset.go b/pkg/reconcile/statefulset.go similarity index 100% rename from pkg/collector/reconcile/statefulset.go rename to pkg/reconcile/statefulset.go diff --git a/pkg/collector/reconcile/statefulset_test.go b/pkg/reconcile/statefulset_test.go similarity index 99% rename from pkg/collector/reconcile/statefulset_test.go rename to pkg/reconcile/statefulset_test.go index 4a0de27f27..5a77ea10f8 100644 --- a/pkg/collector/reconcile/statefulset_test.go +++ b/pkg/reconcile/statefulset_test.go @@ -28,7 +28,7 @@ import ( ) func TestExpectedStatefulsets(t *testing.T) { - param := params() + param := paramsCollector() expectedSs := collector.StatefulSet(param.Config, logger, param.Instance) t.Run("should create StatefulSet", func(t *testing.T) { diff --git a/pkg/targetallocator/reconcile/suite_test.go b/pkg/reconcile/suite_test.go similarity index 79% rename from pkg/targetallocator/reconcile/suite_test.go rename to pkg/reconcile/suite_test.go index cef4205192..560adabd13 100644 --- a/pkg/targetallocator/reconcile/suite_test.go +++ b/pkg/reconcile/suite_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -48,7 +49,7 @@ var instanceUID = uuid.NewUUID() func TestMain(m *testing.M) { testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, + CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, } cfg, err := testEnv.Start() @@ -80,7 +81,7 @@ func TestMain(m *testing.M) { os.Exit(code) } -func params() Params { +func paramsTA() Params { replicas := int32(1) configYAML, err := ioutil.ReadFile("suite_test.yaml") if err != nil { @@ -117,8 +118,61 @@ func params() Params { Config: string(configYAML), }, }, - Scheme: testScheme, - Log: logger, + Scheme: testScheme, + Log: logger, + Recorder: record.NewFakeRecorder(10), + } +} + +func paramsCollector() Params { + replicas := int32(2) + return Params{ + Config: config.New(), + Client: k8sClient, + Instance: v1alpha1.OpenTelemetryCollector{ + TypeMeta: metav1.TypeMeta{ + Kind: "opentelemetry.io", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + UID: instanceUID, + }, + Spec: v1alpha1.OpenTelemetryCollectorSpec{ + Ports: []v1.ServicePort{{ + Name: "web", + Port: 80, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + NodePort: 0, + }}, + Replicas: &replicas, + Config: ` + receivers: + jaeger: + protocols: + grpc: + processors: + + exporters: + logging: + + service: + pipelines: + traces: + receivers: [jaeger] + processors: [] + exporters: [logging] + +`, + }, + }, + Scheme: testScheme, + Log: logger, + Recorder: record.NewFakeRecorder(10), } } diff --git a/pkg/targetallocator/reconcile/suite_test.yaml b/pkg/reconcile/suite_test.yaml similarity index 100% rename from pkg/targetallocator/reconcile/suite_test.yaml rename to pkg/reconcile/suite_test.yaml diff --git a/pkg/targetallocator/container_test.go b/pkg/targetallocator/container_test.go index 98a8c3a0e4..61b130e988 100644 --- a/pkg/targetallocator/container_test.go +++ b/pkg/targetallocator/container_test.go @@ -17,9 +17,8 @@ package targetallocator import ( "testing" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "github.com/stretchr/testify/assert" + logf "sigs.k8s.io/controller-runtime/pkg/log" "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" "github.com/open-telemetry/opentelemetry-operator/internal/config" diff --git a/pkg/targetallocator/deployment_test.go b/pkg/targetallocator/deployment_test.go index ce8e4f0fcb..302c78ac74 100644 --- a/pkg/targetallocator/deployment_test.go +++ b/pkg/targetallocator/deployment_test.go @@ -18,7 +18,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" diff --git a/pkg/targetallocator/reconcile/params.go b/pkg/targetallocator/reconcile/params.go deleted file mode 100644 index e9e6fa3dbf..0000000000 --- a/pkg/targetallocator/reconcile/params.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reconcile - -import ( - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/open-telemetry/opentelemetry-operator/api/v1alpha1" - "github.com/open-telemetry/opentelemetry-operator/internal/config" -) - -// Params holds the reconciliation-specific parameters. -type Params struct { - Config config.Config - Client client.Client - Instance v1alpha1.OpenTelemetryCollector - Log logr.Logger - Scheme *runtime.Scheme -} diff --git a/pkg/targetallocator/reconcile/service.go b/pkg/targetallocator/reconcile/service.go deleted file mode 100644 index 3dc2993076..0000000000 --- a/pkg/targetallocator/reconcile/service.go +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reconcile - -import ( - "context" - "fmt" - - corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - - "github.com/open-telemetry/opentelemetry-operator/pkg/naming" - "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator" -) - -// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete - -// Services reconciles the service(s) required for the instance in the current context. -func Services(ctx context.Context, params Params) error { - desired := []corev1.Service{} - - if IsAllocatorEnabled(params) { - _, err := GetPromConfig(params) - if err != nil { - return fmt.Errorf("failed to parse Prometheus config: %v", err) - } - desired = append(desired, desiredService(params)) - } - - // first, handle the create/update parts - if err := expectedServices(ctx, params, desired); err != nil { - return fmt.Errorf("failed to reconcile the expected services: %v", err) - } - - // then, delete the extra objects - if err := deleteServices(ctx, params, desired); err != nil { - return fmt.Errorf("failed to reconcile the services to be deleted: %v", err) - } - - return nil -} - -func desiredService(params Params) corev1.Service { - labels := targetallocator.Labels(params.Instance) - labels["app.kubernetes.io/name"] = naming.TAService(params.Instance) - - selector := targetallocator.Labels(params.Instance) - selector["app.kubernetes.io/name"] = naming.TargetAllocator(params.Instance) - - return corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: naming.TAService(params.Instance), - Namespace: params.Instance.Namespace, - Labels: labels, - }, - Spec: corev1.ServiceSpec{ - Selector: selector, - Ports: []corev1.ServicePort{{ - Name: "targetallocation", - Port: 443, - TargetPort: intstr.FromInt(443), - }}, - }, - } -} - -func expectedServices(ctx context.Context, params Params, expected []corev1.Service) error { - for _, obj := range expected { - desired := obj - - if err := controllerutil.SetControllerReference(¶ms.Instance, &desired, params.Scheme); err != nil { - return fmt.Errorf("failed to set controller reference: %w", err) - } - - existing := &corev1.Service{} - nns := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name} - err := params.Client.Get(ctx, nns, existing) - if err != nil && k8serrors.IsNotFound(err) { - if err := params.Client.Create(ctx, &desired); err != nil { - return fmt.Errorf("failed to create: %w", err) - } - params.Log.V(2).Info("created", "service.name", desired.Name, "service.namespace", desired.Namespace) - continue - } else if err != nil { - return fmt.Errorf("failed to get: %w", err) - } - - // it exists already, merge the two if the end result isn't identical to the existing one - updated := existing.DeepCopy() - if updated.Labels == nil { - updated.Labels = map[string]string{} - } - updated.ObjectMeta.OwnerReferences = desired.ObjectMeta.OwnerReferences - - for k, v := range desired.ObjectMeta.Labels { - updated.ObjectMeta.Labels[k] = v - } - updated.Spec.Ports = desired.Spec.Ports - - patch := client.MergeFrom(existing) - - if err := params.Client.Patch(ctx, updated, patch); err != nil { - return fmt.Errorf("failed to apply changes: %w", err) - } - - params.Log.V(2).Info("applied", "service.name", desired.Name, "service.namespace", desired.Namespace) - } - - return nil -} - -func deleteServices(ctx context.Context, params Params, expected []corev1.Service) error { - opts := []client.ListOption{ - client.InNamespace(params.Instance.Namespace), - client.MatchingLabels(map[string]string{ - "app.kubernetes.io/instance": fmt.Sprintf("%s.%s", params.Instance.Name, "targetallocator"), - "app.kubernetes.io/managed-by": "opentelemetry-operator", - }), - } - list := &corev1.ServiceList{} - if err := params.Client.List(ctx, list, opts...); err != nil { - return fmt.Errorf("failed to list: %w", err) - } - - for i := range list.Items { - existing := list.Items[i] - del := true - for _, keep := range expected { - if keep.Name == existing.Name && keep.Namespace == existing.Namespace { - del = false - } - } - - if del { - if err := params.Client.Delete(ctx, &existing); err != nil { - return fmt.Errorf("failed to delete: %w", err) - } - params.Log.V(2).Info("deleted", "service.name", existing.Name, "service.namespace", existing.Namespace) - } - } - - return nil -} diff --git a/pkg/targetallocator/reconcile/service_test.go b/pkg/targetallocator/reconcile/service_test.go deleted file mode 100644 index aa1672a11f..0000000000 --- a/pkg/targetallocator/reconcile/service_test.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reconcile - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" - - "github.com/open-telemetry/opentelemetry-operator/pkg/naming" - "github.com/open-telemetry/opentelemetry-operator/pkg/targetallocator" -) - -func TestDesiredService(t *testing.T) { - t.Run("should return service with default port", func(t *testing.T) { - expected := service("test-targetallocator") - actual := desiredService(params()) - - assert.Equal(t, expected, actual) - }) - -} - -func TestExpectedServices(t *testing.T) { - t.Run("should create the service", func(t *testing.T) { - err := expectedServices(context.Background(), params(), []corev1.Service{service("targetallocator")}) - assert.NoError(t, err) - - exists, err := populateObjectIfExists(t, &corev1.Service{}, types.NamespacedName{Namespace: "default", Name: "targetallocator"}) - - assert.NoError(t, err) - assert.True(t, exists) - - }) -} - -func TestDeleteServices(t *testing.T) { - t.Run("should delete excess services", func(t *testing.T) { - deleteService := service("test-delete-targetallocator", 8888) - createObjectIfNotExists(t, "test-delete-targetallocator", &deleteService) - - exists, err := populateObjectIfExists(t, &corev1.Service{}, types.NamespacedName{Namespace: "default", Name: "test-delete-targetallocator"}) - assert.NoError(t, err) - assert.True(t, exists) - - err = deleteServices(context.Background(), params(), []corev1.Service{desiredService(params())}) - assert.NoError(t, err) - - exists, err = populateObjectIfExists(t, &corev1.Service{}, types.NamespacedName{Namespace: "default", Name: "test-delete-targetallocator"}) - assert.NoError(t, err) - assert.False(t, exists) - - }) -} - -func service(name string, portOpt ...int32) corev1.Service { - port := int32(443) - if len(portOpt) > 0 { - port = portOpt[0] - } - params := params() - labels := targetallocator.Labels(params.Instance) - labels["app.kubernetes.io/name"] = naming.TAService(params.Instance) - - selector := targetallocator.Labels(params.Instance) - selector["app.kubernetes.io/name"] = naming.TargetAllocator(params.Instance) - - return corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: params.Instance.Namespace, - Labels: labels, - }, - Spec: corev1.ServiceSpec{ - Selector: selector, - Ports: []corev1.ServicePort{{ - Name: "targetallocation", - Port: port, - TargetPort: intstr.FromInt(443), - }}, - }, - } -}