Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 111 additions & 25 deletions clusterloader2/pkg/dependency/dra/dra.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"context"
"embed"
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
Expand All @@ -31,16 +33,16 @@ import (
)

const (
draDependencyName = "DRATestDriver"
//TODO: this needs to be converted into a parameter. Will will not need this until parititionable devices test
draDependencyName = "DRATestDriver"
draNamespace = "dra-example-driver"
draManifests = "dra-example-driver"
defaultWorkerNodeCount = "100"
draDaemonsetName = "dra-example-driver-kubeletplugin"
checkDRAReadyInterval = 30 * time.Second
defaultDRATimeout = 10 * time.Minute
)

//go:embed manifests/*.yaml
//go:embed manifests/**/*.yaml
var manifestsFS embed.FS

func init() {
Expand All @@ -57,13 +59,24 @@ type draDependency struct{}

func (d *draDependency) Setup(config *dependency.Config) error {
klog.V(2).Infof("%s: Installing DRA example driver", d)
if err := client.CreateNamespace(config.ClusterFramework.GetClientSets().GetClient(), draNamespace); err != nil {
return fmt.Errorf("namespace %s creation error: %v", draNamespace, err)

namespace, err := getNamespace(config)
if err != nil {
return err
}

namespace, ok := config.Params["Namespace"]
if !ok {
namespace = draNamespace
if err := client.CreateNamespace(config.ClusterFramework.GetClientSets().GetClient(), namespace); err != nil {
return fmt.Errorf("namespace %s creation error: %v", namespace, err)
}

manifests, err := getManifests(config)
if err != nil {
return err
}

daemonsetName, err := getDaemonset(config)
if err != nil {
return err
}

mapping := map[string]interface{}{
Expand All @@ -72,7 +85,7 @@ func (d *draDependency) Setup(config *dependency.Config) error {
}
if err := config.ClusterFramework.ApplyTemplatedManifests(
manifestsFS,
"manifests/*.yaml",
manifests,
mapping,
client.Retry(client.IsRetryableAPIError),
); err != nil {
Expand All @@ -82,8 +95,8 @@ func (d *draDependency) Setup(config *dependency.Config) error {
if err != nil {
return err
}
klog.V(2).Infof("%s: checking if DRA driver %s is healthy", d, draDaemonsetName)
if err := d.waitForDRADriverToBeHealthy(config, timeout); err != nil {
klog.V(2).Infof("%s: checking if DRA driver %s is healthy", d, daemonsetName)
if err := d.waitForDRADriverToBeHealthy(config, timeout, daemonsetName, namespace); err != nil {
return err
}

Expand All @@ -94,60 +107,76 @@ func (d *draDependency) Setup(config *dependency.Config) error {
func (d *draDependency) Teardown(config *dependency.Config) error {
klog.V(2).Infof("%s: Tearing down DRA example driver", d)

namespace, err := getNamespace(config)
if err != nil {
return err
}

// Delete namespace (this will delete all resources in it)
if err := client.DeleteNamespace(config.ClusterFramework.GetClientSets().GetClient(), draNamespace); err != nil {
return fmt.Errorf("deleting %s namespace error: %v", draNamespace, err)
if err := client.DeleteNamespace(config.ClusterFramework.GetClientSets().GetClient(), namespace); err != nil {
return fmt.Errorf("deleting %s namespace error: %v", namespace, err)
}

if err := client.WaitForDeleteNamespace(config.ClusterFramework.GetClientSets().GetClient(), draNamespace, client.DefaultNamespaceDeletionTimeout); err != nil {
if err := client.WaitForDeleteNamespace(config.ClusterFramework.GetClientSets().GetClient(), namespace, client.DefaultNamespaceDeletionTimeout); err != nil {
return err
}

klog.V(2).Infof("%s: DRA example driver uninstalled successfully", d)
return nil
}

func (d *draDependency) waitForDRADriverToBeHealthy(config *dependency.Config, timeout time.Duration) error {
func (d *draDependency) waitForDRADriverToBeHealthy(config *dependency.Config, timeout time.Duration, daemonsetName string, namespace string) error {
if err := wait.PollImmediate(
checkDRAReadyInterval,
timeout,
func() (done bool, err error) {
return d.isDRADriverReady(config)
return d.isDRADriverReady(config, daemonsetName, namespace)
}); err != nil {
return err
}
if err := wait.PollImmediate(
checkDRAReadyInterval,
timeout,
func() (done bool, err error) {
return isResourceSlicesPublished(config)
return isResourceSlicesPublished(config, namespace)
}); err != nil {
return err
}
return nil
}

func (d *draDependency) isDRADriverReady(config *dependency.Config) (done bool, err error) {
func (d *draDependency) isDRADriverReady(config *dependency.Config, daemonsetName string, namespace string) (done bool, err error) {
ds, err := config.ClusterFramework.GetClientSets().
GetClient().
AppsV1().
DaemonSets(draNamespace).
Get(context.Background(), draDaemonsetName, metav1.GetOptions{})
DaemonSets(namespace).
Get(context.Background(), daemonsetName, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("failed to get %s: %v", draDaemonsetName, err)
return false, fmt.Errorf("failed to get %s: %v", daemonsetName, err)
}
ready := ds.Status.NumberReady == ds.Status.DesiredNumberScheduled
if !ready {
klog.V(2).Infof("%s is not ready, "+
"DesiredNumberScheduled: %d, NumberReady: %d", draDaemonsetName, ds.Status.DesiredNumberScheduled, ds.Status.NumberReady)
"DesiredNumberScheduled: %d, NumberReady: %d", daemonsetName, ds.Status.DesiredNumberScheduled, ds.Status.NumberReady)
}
return ready, nil
}

func isResourceSlicesPublished(config *dependency.Config) (bool, error) {
workerCount := int(getWorkerCount(config).(float64))
func isResourceSlicesPublished(config *dependency.Config, namespace string) (bool, error) {
// Get a list of all nodes
// nodes, err := getReadyNodesCount(config)
// if err != nil {
// return false, fmt.Errorf("failed to list nodes: %v", err)
// }

resourceSlices, err := config.ClusterFramework.GetClientSets().GetClient().ResourceV1beta1().ResourceSlices().List(context.Background(), metav1.ListOptions{})
driverPluginPods, err := getDriverPluginPods(config, namespace, draDaemonsetName)
if err != nil {
return false, fmt.Errorf("failed to list driverPluginPods: %v", err)
}

workerCount := driverPluginPods

resourceSlices, err := config.ClusterFramework.GetClientSets().GetClient().ResourceV1().ResourceSlices().List(context.Background(), metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("failed to list resourceslices: %v", err)
}
Expand All @@ -159,6 +188,26 @@ func isResourceSlicesPublished(config *dependency.Config) (bool, error) {
return true, nil
}

func getDriverPluginPods(config *dependency.Config, namespace string, namePrefix string) (int, error) {
pods, err := config.ClusterFramework.GetClientSets().GetClient().CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{})
if err != nil {
return 0, fmt.Errorf("failed to list pods in namespace %s: %w", namespace, err)
}

runningPods := 0
for _, pod := range pods.Items {
if !strings.HasPrefix(pod.Name, namePrefix) {
continue
}

if pod.Status.Phase == corev1.PodRunning {
runningPods++
}
}

return runningPods, nil
}

func getWorkerCount(config *dependency.Config) interface{} {
workerCount, ok := config.Params["WorkerNodeCount"]
if !ok {
Expand All @@ -167,6 +216,43 @@ func getWorkerCount(config *dependency.Config) interface{} {
return workerCount
}

func getNamespace(config *dependency.Config) (string, error) {
namespace, ok := config.Params["Namespace"]
if !ok {
namespace = draNamespace
}
namespaceString, ok := namespace.(string)

if !ok {
return "", fmt.Errorf("namespace parameter is not a string: %v", namespace)
}
return namespaceString, nil
}

func getManifests(config *dependency.Config) (string, error) {
manifests, ok := config.Params["Manifests"]
if !ok {
manifests = draManifests
}
manifestsString, ok := manifests.(string)
if !ok {
return "", fmt.Errorf("manifests parameter is not a string: %v", manifests)
}
return "manifests/" + manifestsString + "/*.yaml", nil
}

func getDaemonset(config *dependency.Config) (string, error) {
daemonsetName, ok := config.Params["DaemonsetName"]
if !ok {
daemonsetName = draDaemonsetName
}
daemonsetNameString, ok := daemonsetName.(string)
if !ok {
return "", fmt.Errorf("DaemonsetName parameter is not a string: %v", daemonsetName)
}
return daemonsetNameString, nil
}

// String returns string representation of this dependency.
func (d *draDependency) String() string {
return draDependencyName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ rules:
verbs: ["get"]
- apiGroups: ["resource.k8s.io"]
resources: ["resourceslices"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ subjects:
roleRef:
kind: ClusterRole
name: dra-example-driver-role
apiGroup: rbac.authorization.k8s.io
apiGroup: rbac.authorization.k8s.io
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
---
# Source: dra-example-driver/templates/deviceclass.yaml
apiVersion: resource.k8s.io/v1beta1
apiVersion: resource.k8s.io/v1
kind: DeviceClass
metadata:
name: gpu.example.com
spec:
selectors:
- cel:
expression: "device.driver == 'gpu.example.com'"
expression: "device.driver == 'gpu.example.com'"
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ metadata:
name: dra-example-driver-kubeletplugin
namespace: {{.Namespace}}
labels:
helm.sh/chart: dra-example-driver-0.1.3
helm.sh/chart: dra-example-driver-0.0.0-dev
app.kubernetes.io/name: dra-example-driver
app.kubernetes.io/instance: dra-example-driver
app.kubernetes.io/version: "v0.1.0"
app.kubernetes.io/version: "v0.2.0"
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/component: kubeletplugin
spec:
selector:
Expand All @@ -26,22 +27,33 @@ spec:
app.kubernetes.io/instance: dra-example-driver
app.kubernetes.io/component: kubeletplugin
spec:
priorityClassName: system-node-critical
serviceAccountName: dra-example-driver-service-account
securityContext:
{}
containers:
- name: plugin
securityContext:
privileged: true
image: registry.k8s.io/dra-example-driver/dra-example-driver:v0.1.0
imagePullPolicy: IfNotPresent
# image: /:v0.2.0
image: registry.k8s.io/dra-example-driver/dra-example-driver:v0.2.0
imagePullPolicy: Always
command: ["dra-example-kubeletplugin"]
resources:
{}

livenessProbe:
grpc:
port: 51515
service: liveness
failureThreshold: 3
periodSeconds: 10
env:
- name: CDI_ROOT
value: /var/run/cdi
- name: KUBELET_REGISTRAR_DIRECTORY_PATH
value: "/var/lib/kubelet/plugins_registry"
- name: KUBELET_PLUGINS_DIRECTORY_PATH
value: "/var/lib/kubelet/plugins"
- name: NODE_NAME
valueFrom:
fieldRef:
Expand All @@ -53,20 +65,26 @@ spec:
# Simulated number of devices the example driver will pretend to have.
- name: NUM_DEVICES
value: "8"
- name: HEALTHCHECK_PORT
value: "51515"
volumeMounts:
- name: plugins-registry
mountPath: /var/lib/kubelet/plugins_registry
mountPath: "/var/lib/kubelet/plugins_registry"
- name: plugins
mountPath: /var/lib/kubelet/plugins
mountPath: "/var/lib/kubelet/plugins"
- name: cdi
mountPath: /var/run/cdi
volumes:
- name: plugins-registry
hostPath:
path: /var/lib/kubelet/plugins_registry
path: "/var/lib/kubelet/plugins_registry"
- name: plugins
hostPath:
path: /var/lib/kubelet/plugins
path: "/var/lib/kubelet/plugins"
- name: cdi
hostPath:
path: /var/run/cdi
tolerations:
- effect: NoSchedule
key: google.com/tpu
operator: Exists
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ metadata:
name: dra-example-driver-service-account
namespace: {{.Namespace}}
labels:
helm.sh/chart: dra-example-driver-0.1.3
helm.sh/chart: dra-example-driver-0.0.0-dev
app.kubernetes.io/name: dra-example-driver
app.kubernetes.io/instance: dra-example-driver
app.kubernetes.io/version: "v0.1.0"
app.kubernetes.io/version: "v0.2.0"
app.kubernetes.io/managed-by: Helm
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
matchConstraints:
resourceRules:
- apiGroups: ["resource.k8s.io"]
apiVersions: ["v1beta1"]
apiVersions: ["v1"]
operations: ["CREATE", "UPDATE", "DELETE"]
resources: ["resourceslices"]
matchConditions:
Expand All @@ -30,4 +30,4 @@ spec:
- expression: variables.userNodeName == variables.objectNodeName
messageExpression: >-
"this user running on node '"+variables.userNodeName+"' may not modify " +
(variables.objectNodeName == "" ?"cluster resourceslices" : "resourceslices on node '"+variables.objectNodeName+"'")
(variables.objectNodeName == "" ?"cluster resourceslices" : "resourceslices on node '"+variables.objectNodeName+"'")
Loading