Skip to content

Commit

Permalink
Add controller for TargetAllocator CR
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Sep 9, 2024
1 parent e203cbc commit 61ae85d
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 33 deletions.
3 changes: 2 additions & 1 deletion apis/v1alpha1/targetallocator_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
)

func init() {
v1beta1.SchemeBuilder.Register(&TargetAllocator{}, &TargetAllocatorList{})
SchemeBuilder.Register(&TargetAllocator{}, &TargetAllocatorList{})
}

//+kubebuilder:object:root=true
//+kubebuilder:storageversion
//+kubebuilder:subresource:status

// TargetAllocator is the Schema for the targetallocators API.
Expand Down
2 changes: 1 addition & 1 deletion controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func BuildCollector(params manifests.Params) ([]client.Object, error) {
Recorder: params.Recorder,
Log: params.Log,
Config: params.Config,
Collector: params.OtelCol,
Collector: &params.OtelCol,
TargetAllocator: *params.TargetAllocator,
}
taResources, err := BuildTargetAllocator(taParams)
Expand Down
4 changes: 4 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ func TestMain(m *testing.M) {
fmt.Printf("failed to SetupWebhookWithManager: %v", err)
os.Exit(1)
}
if err = v1alpha1.SetupTargetAllocatorWebhook(mgr, config.New(), reviewer); err != nil {
fmt.Printf("failed to SetupWebhookWithManager: %v", err)
os.Exit(1)
}

if err = v1alpha1.SetupOpAMPBridgeWebhook(mgr, config.New()); err != nil {
fmt.Printf("failed to SetupWebhookWithManager: %v", err)
Expand Down
152 changes: 152 additions & 0 deletions controllers/targetallocator_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package controllers contains the main controller, where the reconciliation starts.
package controllers

import (
"context"

"github.com/go-logr/logr"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyV1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
"github.com/open-telemetry/opentelemetry-operator/internal/config"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/targetallocator"
taStatus "github.com/open-telemetry/opentelemetry-operator/internal/status/targetallocator"
"github.com/open-telemetry/opentelemetry-operator/pkg/featuregate"
)

// TargetAllocatorReconciler reconciles a TargetAllocator object.
type TargetAllocatorReconciler struct {
client.Client
recorder record.EventRecorder
scheme *runtime.Scheme
log logr.Logger
config config.Config
}

// TargetAllocatorReconcilerParams is the set of options to build a new TargetAllocatorReconciler.
type TargetAllocatorReconcilerParams struct {
client.Client
Recorder record.EventRecorder
Scheme *runtime.Scheme
Log logr.Logger
Config config.Config
}

func (r *TargetAllocatorReconciler) getParams(instance v1alpha1.TargetAllocator) targetallocator.Params {
p := targetallocator.Params{
Config: r.config,
Client: r.Client,
Log: r.log,
Scheme: r.scheme,
Recorder: r.recorder,
TargetAllocator: instance,
}

return p
}

// NewTargetAllocatorReconciler creates a new reconciler for TargetAllocator objects.
func NewTargetAllocatorReconciler(
client client.Client,
scheme *runtime.Scheme,
recorder record.EventRecorder,
config config.Config,
logger logr.Logger,
) *TargetAllocatorReconciler {
return &TargetAllocatorReconciler{
Client: client,
log: logger,
scheme: scheme,
config: config,
recorder: recorder,
}
}

// TODO: Uncomment the lines below after enabling the TA controller in main.go
// // +kubebuilder:rbac:groups="",resources=pods;configmaps;services;serviceaccounts;persistentvolumeclaims;persistentvolumes,verbs=get;list;watch;create;update;patch;delete
// // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// // +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete
// // +kubebuilder:rbac:groups=monitoring.coreos.com,resources=servicemonitors;podmonitors,verbs=get;list;watch;create;update;patch;delete
// // +kubebuilder:rbac:groups=opentelemetry.io,resources=opentelemetrycollectors,verbs=get;list;watch;update;patch
// // +kubebuilder:rbac:groups=opentelemetry.io,resources=targetallocators,verbs=get;list;watch;update;patch
// // +kubebuilder:rbac:groups=opentelemetry.io,resources=targetallocators/status,verbs=get;update;patch

// Reconcile the current state of a TargetAllocator resource with the desired state.
func (r *TargetAllocatorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.log.WithValues("targetallocator", req.NamespacedName)

var instance v1alpha1.TargetAllocator
if err := r.Client.Get(ctx, req.NamespacedName, &instance); err != nil {
if !apierrors.IsNotFound(err) {
log.Error(err, "unable to fetch TargetAllocator")
}

// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// We have a deletion, short circuit and let the deletion happen
if deletionTimestamp := instance.GetDeletionTimestamp(); deletionTimestamp != nil {
return ctrl.Result{}, nil
}

if instance.Spec.ManagementState == v1beta1.ManagementStateUnmanaged {
log.Info("Skipping reconciliation for unmanaged TargetAllocator resource", "name", req.String())
// Stop requeueing for unmanaged TargetAllocator custom resources
return ctrl.Result{}, nil
}

params := r.getParams(instance)
desiredObjects, buildErr := BuildTargetAllocator(params)
if buildErr != nil {
return ctrl.Result{}, buildErr
}

err := reconcileDesiredObjects(ctx, r.Client, log, &params.TargetAllocator, params.Scheme, desiredObjects, nil)
return taStatus.HandleReconcileStatus(ctx, log, params, err)
}

// SetupWithManager tells the manager what our controller is interested in.
func (r *TargetAllocatorReconciler) SetupWithManager(mgr ctrl.Manager) error {
builder := ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.TargetAllocator{}).
Owns(&corev1.ConfigMap{}).
Owns(&corev1.ServiceAccount{}).
Owns(&corev1.Service{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.PersistentVolume{}).
Owns(&corev1.PersistentVolumeClaim{}).
Owns(&policyV1.PodDisruptionBudget{})

if featuregate.PrometheusOperatorIsAvailable.IsEnabled() {
builder.Owns(&monitoringv1.ServiceMonitor{})
builder.Owns(&monitoringv1.PodMonitor{})
}

return builder.Complete(r)
}
132 changes: 132 additions & 0 deletions controllers/targetallocator_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllers_test

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
k8sreconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
"github.com/open-telemetry/opentelemetry-operator/controllers"
"github.com/open-telemetry/opentelemetry-operator/internal/config"
)

var testLogger = logf.Log.WithName("opamp-bridge-controller-unit-tests")

func TestNewObjectsOnReconciliation_TargetAllocator(t *testing.T) {
// prepare
cfg := config.New(
config.WithTargetAllocatorImage("default-ta"),
)
nsn := types.NamespacedName{Name: "my-instance", Namespace: "default"}
reconciler := controllers.NewTargetAllocatorReconciler(
k8sClient,
testScheme,
record.NewFakeRecorder(10),
cfg,
testLogger,
)
created := &v1alpha1.TargetAllocator{
ObjectMeta: metav1.ObjectMeta{
Name: nsn.Name,
Namespace: nsn.Namespace,
},
Spec: v1alpha1.TargetAllocatorSpec{},
}
err := k8sClient.Create(context.Background(), created)
require.NoError(t, err)

// test
req := k8sreconcile.Request{
NamespacedName: nsn,
}
_, err = reconciler.Reconcile(context.Background(), req)

// verify
require.NoError(t, err)

// the base query for the underlying objects
opts := []client.ListOption{
client.InNamespace(nsn.Namespace),
client.MatchingLabels(map[string]string{
"app.kubernetes.io/instance": fmt.Sprintf("%s.%s", nsn.Namespace, nsn.Name),
"app.kubernetes.io/managed-by": "opentelemetry-operator",
"app.kubernetes.io/component": "opentelemetry-targetallocator",
}),
}

// verify that we have at least one object for each of the types we create
// whether we have the right ones is up to the specific tests for each type
{
list := &corev1.ConfigMapList{}
err = k8sClient.List(context.Background(), list, opts...)
assert.NoError(t, err)
assert.NotEmpty(t, list.Items)
}
{
list := &corev1.ServiceAccountList{}
err = k8sClient.List(context.Background(), list, opts...)
assert.NoError(t, err)
assert.NotEmpty(t, list.Items)
}
{
list := &corev1.ServiceList{}
err = k8sClient.List(context.Background(), list, opts...)
assert.NoError(t, err)
assert.NotEmpty(t, list.Items)
}
{
list := &appsv1.DeploymentList{}
err = k8sClient.List(context.Background(), list, opts...)
assert.NoError(t, err)
assert.NotEmpty(t, list.Items)
}
// cleanup
require.NoError(t, k8sClient.Delete(context.Background(), created))
}

func TestSkipWhenInstanceDoesNotExist_TargetAllocator(t *testing.T) {
// prepare
cfg := config.New()
nsn := types.NamespacedName{Name: "non-existing-my-instance", Namespace: "default"}
reconciler := controllers.NewTargetAllocatorReconciler(
k8sClient,
testScheme,
record.NewFakeRecorder(10),
cfg,
testLogger,
)

// test
req := k8sreconcile.Request{
NamespacedName: nsn,
}
_, err := reconciler.Reconcile(context.Background(), req)

// verify
assert.NoError(t, err)
}
39 changes: 27 additions & 12 deletions internal/manifests/targetallocator/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,38 @@ func ConfigMap(params Params) (*corev1.ConfigMap, error) {
taSpec := instance.Spec

taConfig := make(map[interface{}]interface{})

taConfig["collector_selector"] = metav1.LabelSelector{
MatchLabels: manifestutils.SelectorLabels(params.Collector.ObjectMeta, collector.ComponentOpenTelemetryCollector),
}

// Set config if global or scrape configs set
config := map[string]interface{}{}
globalConfig, err := getGlobalConfig(taSpec.GlobalConfig, params.Collector.Spec.Config)
if err != nil {
return nil, err
var (
globalConfig map[string]any
scrapeConfigs []v1beta1.AnyConfig
collectorSelector *metav1.LabelSelector
err error
)
if params.Collector != nil {
collectorSelector = &metav1.LabelSelector{
MatchLabels: manifestutils.SelectorLabels(params.Collector.ObjectMeta, collector.ComponentOpenTelemetryCollector),
}

globalConfig, err = getGlobalConfig(taSpec.GlobalConfig, params.Collector.Spec.Config)
if err != nil {
return nil, err
}

scrapeConfigs, err = getScrapeConfigs(taSpec.ScrapeConfigs, params.Collector.Spec.Config)
if err != nil {
return nil, err
}
} else { // if there's no collector, just use what's in the TargetAllocator CR
collectorSelector = nil
globalConfig = taSpec.GlobalConfig.Object
scrapeConfigs = taSpec.ScrapeConfigs
}

if len(globalConfig) > 0 {
config["global"] = globalConfig
}

scrapeConfigs, err := getScrapeConfigs(taSpec.ScrapeConfigs, params.Collector.Spec.Config)
if err != nil {
return nil, err
}
if len(scrapeConfigs) > 0 {
config["scrape_configs"] = scrapeConfigs
}
Expand All @@ -65,6 +78,8 @@ func ConfigMap(params Params) (*corev1.ConfigMap, error) {
taConfig["config"] = config
}

taConfig["collector_selector"] = collectorSelector

if len(taSpec.AllocationStrategy) > 0 {
taConfig["allocation_strategy"] = taSpec.AllocationStrategy
} else {
Expand Down
Loading

0 comments on commit 61ae85d

Please sign in to comment.