Skip to content

Commit

Permalink
Refactor dynamic client initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
lmouhib committed Aug 22, 2024
1 parent 72d0c7c commit 04dd2c8
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 26 deletions.
10 changes: 7 additions & 3 deletions controllers/auto_register_spark_ui_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
logger "k8s.io/klog/v2"
)
Expand All @@ -21,6 +22,7 @@ import (
func createOrUpdateSparkUIIngressObject(
ctx context.Context,
clientset kubernetes.Interface,
dynamicClient dynamic.Interface,
service *v1.Service,
ingressPath networkingv1.HTTPIngressPath,
ingressName string,
Expand All @@ -38,7 +40,7 @@ func createOrUpdateSparkUIIngressObject(

if ingressType == "traefik" {
// Create the Traefik middleware
err := ManageTraefikMiddleware(service.Namespace, "create", &authenticationSecret)
err := ManageTraefikMiddleware(dynamicClient, service.Namespace, "create", &authenticationSecret)
if err != nil {
logger.Error(err)
return
Expand Down Expand Up @@ -124,6 +126,7 @@ func createOrUpdateSparkUIIngressObject(
func Add(
ctx context.Context,
clientset kubernetes.Interface,
dynamicClient dynamic.Interface,
service *v1.Service,
namespacedIngressPath bool,
ingressName string,
Expand Down Expand Up @@ -156,7 +159,7 @@ func Add(
}

//Call the function responsible for creating or patching the Ingress object
createOrUpdateSparkUIIngressObject(ctx, clientset, service, ingressPath, ingressName, ingressType, *authenticationSecret)
createOrUpdateSparkUIIngressObject(ctx, clientset, dynamicClient, service, ingressPath, ingressName, ingressType, *authenticationSecret)

}

Expand All @@ -166,6 +169,7 @@ func Add(
func Delete(
ctx context.Context,
clientset *kubernetes.Clientset,
dynamicClient dynamic.Interface,
service *v1.Service,
namespacedIngressPath bool,
ingressName string,
Expand Down Expand Up @@ -212,7 +216,7 @@ func Delete(

// Delete the Traefik middleware
if ingressType == "traefik" {
ManageTraefikMiddleware(namespace, "delete", authenticationSecret)
ManageTraefikMiddleware(dynamicClient, namespace, "delete", authenticationSecret)
log.Printf("Deleted middleware for authentication and url strip as ingress object is deleted")
}
return
Expand Down
8 changes: 6 additions & 2 deletions controllers/auto_register_spark_ui_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
Expand All @@ -31,6 +34,7 @@ func TestAdd(t *testing.T) {

// Create a fake Kubernetes clientset
var clientset kubernetes.Interface = fake.NewSimpleClientset()
var dynamicClient dynamic.Interface = dynamicfake.NewSimpleDynamicClient(runtime.NewScheme())

// Define other parameters
ctx := context.TODO()
Expand All @@ -55,7 +59,7 @@ func TestAdd(t *testing.T) {
mockLogger.On("Infof", "Create ingress rule for Spark Application : %s \n", defaultService.GetName()).Return()

// Call the Add function
Add(ctx, clientset, defaultService, namespacedIngressPath, ingressName, ingressType, authenticationSecret)
Add(ctx, clientset, dynamicClient, defaultService, namespacedIngressPath, ingressName, ingressType, authenticationSecret)

// Verify that the correct ingress was created
ingresses, err := clientset.NetworkingV1().Ingresses("").List(ctx, metav1.ListOptions{})
Expand All @@ -81,7 +85,7 @@ func TestAdd(t *testing.T) {
}

// Call the Add function
Add(ctx, clientset, defaultService, namespacedIngressPath, ingressName, ingressType, authenticationSecret)
Add(ctx, clientset, dynamicClient, defaultService, namespacedIngressPath, ingressName, ingressType, authenticationSecret)

// Verify that the correct ingress was created
ingresses, err = clientset.NetworkingV1().Ingresses("").List(ctx, metav1.ListOptions{})
Expand Down
25 changes: 5 additions & 20 deletions controllers/traefik_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,16 @@ package controllers

import (
"context"
"errors"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
logger "k8s.io/klog/v2"
)

func ManageTraefikMiddleware(namespace, action string, authenticationSecret *string) error {

// Extract the config from the clientset
config, err := rest.InClusterConfig()
if err != nil {
logger.Errorf("error creating in-cluster config: %v", err)
return err
}

// Create the dynamic client to create generic resources
// This is needed since Treafik Middleware is a custom resource
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
logger.Errorf("error creating dynamic client: %v", err)
return err
}
func ManageTraefikMiddleware(dynamicClient dynamic.Interface, namespace, action string, authenticationSecret *string) error {

// Define the GVR (GroupVersionResource)
gvr := schema.GroupVersionResource{
Expand Down Expand Up @@ -65,7 +50,7 @@ func ManageTraefikMiddleware(namespace, action string, authenticationSecret *str

case "delete":
// Delete the Middleware object
err = dynamicClient.Resource(gvr).Namespace(namespace).Delete(context.TODO(), "spark-ui-url-strip", metav1.DeleteOptions{})
err := dynamicClient.Resource(gvr).Namespace(namespace).Delete(context.TODO(), "spark-ui-url-strip", metav1.DeleteOptions{})

if authenticationSecret != nil {
err = dynamicClient.Resource(gvr).Namespace(namespace).Delete(context.TODO(), "spark-ui-url-auth", metav1.DeleteOptions{})
Expand All @@ -79,14 +64,14 @@ func ManageTraefikMiddleware(namespace, action string, authenticationSecret *str

default:
logger.Errorf("invalid action: %v", action)
return err
return errors.New("invalid action")
}

return nil
}

func createMiddlewareObject(
dynamicClient *dynamic.DynamicClient,
dynamicClient dynamic.Interface,
gvr schema.GroupVersionResource,
middlewareName string,
namespace string,
Expand Down
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -34,6 +35,8 @@ func main() {
}
logger.Infof("Connected to kubernetes cluster")

dynamicClient, err := dynamic.NewForConfig(config)

// Check for the environment variable for spark service selector
labelKey := os.Getenv("SPARK_LABEL_SERVICE_SELECTOR")
if labelKey != "" {
Expand Down Expand Up @@ -127,7 +130,7 @@ func main() {
service := obj.(*v1.Service)
if hasLabel(service, labelKey) {
logger.Infof("Service %v created with label %v\n", service.GetName(), labelKey)
controllers.Add(ctx, clientset, service, namespacedIngressPath, ingressName, ingressType, authenticationSecret)
controllers.Add(ctx, clientset, dynamicClient, service, namespacedIngressPath, ingressName, ingressType, authenticationSecret)
}
},
DeleteFunc: func(obj interface{}) {
Expand Down

0 comments on commit 04dd2c8

Please sign in to comment.