Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions charts/fluid/fluid/templates/csi/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ spec:
- name: fluid-src-dir
mountPath: {{ .Values.runtime.mountRoot | quote }}
mountPropagation: "HostToContainer"
{{- if .Values.csi.useNodeAuthorization }}
{{- if or (.Values.csi.useNodeAuthorization) (semverCompare "<1.30.0-0" .Capabilities.KubeVersion.Version) }}
- name: kubelet-kube-config
mountPath: /etc/kubernetes/kubelet.conf
mountPropagation: "HostToContainer"
Expand All @@ -134,7 +134,7 @@ spec:
hostPath:
path: {{ .Values.csi.kubelet.rootDir | quote }}
type: Directory
{{- if .Values.csi.useNodeAuthorization }}
{{- if or (.Values.csi.useNodeAuthorization) (semverCompare "<1.30.0-0" .Capabilities.KubeVersion.Version) }}
{{- $kubeletRootDir := ternary ( .Values.csi.kubelet.rootDir ) ( print .Values.csi.kubelet.rootDir "/" ) ( hasSuffix "/" .Values.csi.kubelet.rootDir ) }}
{{- if not ( hasPrefix $kubeletRootDir .Values.csi.kubelet.certDir ) }}
- name: kubelet-cert-dir
Expand Down
32 changes: 32 additions & 0 deletions charts/fluid/fluid/templates/csi/validatingadmissionpolicy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{{- if and (not .Values.csi.useNodeAuthorization) (semverCompare ">=1.30.0-0" .Capabilities.KubeVersion.Version) -}}
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingAdmissionPolicy
metadata:
name: "fluid-csi-node-policy"
spec:
failurePolicy: Fail
matchConstraints:
resourceRules:
- apiGroups: [""]
apiVersions: ["v1"]
# supported values: "*", "CONNECT", "CREATE", "DELETE", "UPDATE"
operations: ["UPDATE"]
resources: ["nodes"]
matchConditions:
# only fluid-csi request will be checked.
- name: isRestrictedUser
expression: request.userInfo.username == "system:serviceaccount:fluid-system:fluid-csi"
variables:
- name: userNodeName
expression: >-
request.userInfo.extra[?'authentication.kubernetes.io/node-name'][0].orValue('')
- name: objectNodeName
expression: >-
object.?metadata.name.orValue('')
validations:
- expression: "variables.userNodeName != ''"
message: "userNodeName is empty, user token does not contain node name."
- expression: "variables.objectNodeName == variables.userNodeName"
messageExpression: >-
"objectNodeName '" + variables.objectNodeName + "' is not equal to userNodeName '" + variables.userNodeName + "'"
{{- end }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{- if and (not .Values.csi.useNodeAuthorization) (semverCompare ">=1.30.0-0" .Capabilities.KubeVersion.Version) -}}
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingAdmissionPolicyBinding
metadata:
name: "fluid-csi-node-policy-binding"
spec:
policyName: "fluid-csi-node-policy"
validationActions: [Deny]
{{- end }}
2 changes: 1 addition & 1 deletion charts/fluid/fluid/templates/role/csi/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
{{- if not .Values.csi.useNodeAuthorization }}
{{- if and (not .Values.csi.useNodeAuthorization) (semverCompare ">=1.30.0-0" .Capabilities.KubeVersion.Version) }}
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "patch"]
Expand Down
2 changes: 2 additions & 0 deletions charts/fluid/fluid/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ csi:
# Whether or not to borrow kubelet's config file to use node authorization to restrict CSI Plugin's permission
# See why Fluid's CSI Plugins need node-specific authorization at https://github.com/fluid-cloudnative/fluid/security/advisories/GHSA-93xx-cvmc-9w3v
# See node authorization at https://kubernetes.io/docs/reference/access-authn-authz/node/
# If false, use NodeBinding Token with ValidatingAdmissionPolicy instead of kubelet config for Node-Specific Restrictions.
# can only be set false when k8s.version >= 1.30 and the below kubelet.kubeConfigFile is useless.
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 65 has grammatical issues and unclear meaning. 'can only be set false' should be 'Can only be set to false'. Also, 'useless' is informal; consider 'not required' or 'ignored'. Suggested revision: 'Can only be set to false when k8s.version >= 1.30, and the kubelet.kubeConfigFile setting below will be ignored.'

Suggested change
# can only be set false when k8s.version >= 1.30 and the below kubelet.kubeConfigFile is useless.
# Can only be set to false when k8s.version >= 1.30, and the kubelet.kubeConfigFile setting below will be ignored.

Copilot uses AI. Check for mistakes.
useNodeAuthorization: true
kubelet:
kubeConfigFile: /etc/kubernetes/kubelet.conf
Expand Down
5 changes: 2 additions & 3 deletions pkg/csi/plugins/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"path/filepath"
"strings"

"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"

Expand All @@ -41,7 +40,7 @@ const (
type driver struct {
client client.Client
apiReader client.Reader
nodeAuthorizedClient *kubernetes.Clientset
nodeAuthorizedClient NodeAuthorizedClient
csiDriver *csicommon.CSIDriver
nodeId, endpoint string

Expand All @@ -50,7 +49,7 @@ type driver struct {

var _ manager.Runnable = &driver{}

func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient *kubernetes.Clientset, locks *utils.VolumeLocks) *driver {
func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient NodeAuthorizedClient, locks *utils.VolumeLocks) *driver {
glog.Infof("Driver: %v version: %v", driverName, version)

proto, addr := utils.SplitSchemaAddr(endpoint)
Expand Down
64 changes: 64 additions & 0 deletions pkg/csi/plugins/node_resource_operator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright 2025 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package plugins

import (
"context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type NodeAuthorizedClient interface {
Get(nodeName string) (*corev1.Node, error)
Patch(node *corev1.Node, patchType types.PatchType, data []byte) error
}

// restrictedNodeClient uses node binding token with validating policy to avoid security problems.
type restrictedNodeClient struct {
Client client.Client
}

// kubeletNodeClient uses mounted kubelet config to avoid security problems.
type kubeletNodeClient struct {
Clientset *kubernetes.Clientset
}

func (p *restrictedNodeClient) Get(nodeName string) (*corev1.Node, error) {
node := &corev1.Node{}
key := types.NamespacedName{Name: nodeName}
if err := p.Client.Get(context.TODO(), key, node); err != nil {
return nil, err
}
return node, nil
}

func (p *restrictedNodeClient) Patch(node *corev1.Node, patchType types.PatchType, data []byte) error {
err := p.Client.Patch(context.TODO(), node, client.RawPatch(patchType, data))
return err
}
Comment on lines +53 to +55
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The intermediate variable err is unnecessary. Return the error directly: return p.Client.Patch(context.TODO(), node, client.RawPatch(patchType, data))

Suggested change
err := p.Client.Patch(context.TODO(), node, client.RawPatch(patchType, data))
return err
}
return p.Client.Patch(context.TODO(), node, client.RawPatch(patchType, data))
}
}

Copilot uses AI. Check for mistakes.

func (p *kubeletNodeClient) Get(nodeName string) (*corev1.Node, error) {
return p.Clientset.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
}

func (p *kubeletNodeClient) Patch(node *corev1.Node, patchType types.PatchType, data []byte) error {
_, err := p.Clientset.CoreV1().Nodes().Patch(context.TODO(), node.Name, patchType, data, metav1.PatchOptions{})
return err
}
36 changes: 7 additions & 29 deletions pkg/csi/plugins/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@ import (
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/cmdguard"
"github.com/fluid-cloudnative/fluid/pkg/utils/dataset/volume"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/mount"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -59,7 +56,7 @@ type nodeServer struct {
*csicommon.DefaultNodeServer
client client.Client
apiReader client.Reader
nodeAuthorizedClient *kubernetes.Clientset
nodeAuthorizedClient NodeAuthorizedClient
locks *utils.VolumeLocks
node *corev1.Node
}
Expand Down Expand Up @@ -474,15 +471,8 @@ func (ns *nodeServer) getNode() (node *corev1.Node, err error) {
}
}

useNodeAuthorization := ns.nodeAuthorizedClient != nil
if useNodeAuthorization {
if node, err = ns.nodeAuthorizedClient.CoreV1().Nodes().Get(context.TODO(), ns.nodeId, metav1.GetOptions{}); err != nil {
return nil, err
}
} else {
if node, err = kubeclient.GetNode(ns.apiReader, ns.nodeId); err != nil {
return nil, err
}
if node, err = ns.nodeAuthorizedClient.Get(ns.nodeId); err != nil {
return nil, err
}

glog.V(1).Infof("Got node %s from api server", node.Name)
Expand Down Expand Up @@ -520,22 +510,10 @@ func (ns *nodeServer) patchNodeWithLabel(node *corev1.Node, labelsToModify commo
if err != nil {
return err
}
useNodeAuthorization := ns.nodeAuthorizedClient != nil
if useNodeAuthorization {
_, err = ns.nodeAuthorizedClient.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, patchByteData, metav1.PatchOptions{})
if err != nil {
return err
}
} else {
nodeToPatch := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: node.Name,
},
}
err = ns.client.Patch(context.TODO(), nodeToPatch, client.RawPatch(types.StrategicMergePatchType, patchByteData))
if err != nil {
return err
}

err = ns.nodeAuthorizedClient.Patch(node, types.StrategicMergePatchType, patchByteData)
if err != nil {
return err
}

return nil
Expand Down
31 changes: 21 additions & 10 deletions pkg/csi/plugins/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,35 @@ import (
"os"

"github.com/fluid-cloudnative/fluid/pkg/csi/config"
"github.com/fluid-cloudnative/fluid/pkg/utils/compatibility"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubelet"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

// getNodeAuthorizedClientFromKubeletConfig retrieves a node-authorized Kubernetes client from the Kubelet configuration file.
// This function checks if the specified Kubelet configuration file exists. If the file does not exist, it returns an empty client without an error .
// isUseKubeletConfig checks if the specified Kubelet configuration file exists. If the file does not exist, it returns an empty client without an error .
// If the file exists, it attempts to initialize and return a node-authorized Kubernetes client.
Comment on lines +29 to 30
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function documentation is inaccurate and contains issues: (1) There's an extra space before the period in 'without an error .', (2) The comment describes returning 'an empty client' but the function returns a bool, not a client, (3) The comment mentions 'attempts to initialize and return' but the function only checks file existence. Update the comment to accurately describe what the function does: 'isUseKubeletConfig checks if the specified Kubelet configuration file exists and returns true if it does, false otherwise.'

Suggested change
// isUseKubeletConfig checks if the specified Kubelet configuration file exists. If the file does not exist, it returns an empty client without an error .
// If the file exists, it attempts to initialize and return a node-authorized Kubernetes client.
// isUseKubeletConfig checks if the specified Kubelet configuration file exists and returns true if it does, false otherwise.

Copilot uses AI. Check for mistakes.
func getNodeAuthorizedClientFromKubeletConfig(kubeletConfigPath string) (*kubernetes.Clientset, error) {
func isUseKubeletConfig(kubeletConfigPath string) bool {
_, err := os.Stat(kubeletConfigPath)
if err != nil {
if os.IsNotExist(err) {
glog.Warningf("kubelet config file %s not exists, continue without node authorization...", kubeletConfigPath)
return nil, nil
return false
}
return nil, errors.Wrapf(err, "fail to stat kubelet config file %s", kubeletConfigPath)
glog.Warningf("fail to stat kubelet config file %s", kubeletConfigPath)
}

Comment on lines +38 to 40
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When os.Stat fails with an error other than IsNotExist, the function logs a warning but continues execution and returns true on line 41. This means if there's a permission error or any other filesystem issue, the code will incorrectly assume the kubelet config should be used, leading to a failure when trying to initialize the client later. The function should either return false or include the error in the warning message with proper handling context.

Suggested change
glog.Warningf("fail to stat kubelet config file %s", kubeletConfigPath)
}
glog.Warningf("fail to stat kubelet config file %s: %v, continue without node authorization...", kubeletConfigPath, err)
return false
}

Copilot uses AI. Check for mistakes.
return kubelet.InitNodeAuthorizedClient(kubeletConfigPath)
return true
}

// Register initializes the csi driver and registers it to the controller manager.
func Register(mgr manager.Manager, ctx config.RunningContext) error {
client, err := getNodeAuthorizedClientFromKubeletConfig(ctx.KubeletConfigPath)
nodeAuthClient, err := getNodeAuthClient(mgr, ctx)
if err != nil {
return err
}

csiDriver := NewDriver(ctx.NodeId, ctx.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), client, ctx.VolumeLocks)
csiDriver := NewDriver(ctx.NodeId, ctx.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), nodeAuthClient, ctx.VolumeLocks)

if err := mgr.Add(csiDriver); err != nil {
return err
Expand All @@ -59,6 +57,19 @@ func Register(mgr manager.Manager, ctx config.RunningContext) error {
return nil
}

func getNodeAuthClient(mgr manager.Manager, ctx config.RunningContext) (NodeAuthorizedClient, error) {
// use and support node binding token
if !isUseKubeletConfig(ctx.KubeletConfigPath) && compatibility.IsNodeBindingTokenSupported() {
return &restrictedNodeClient{mgr.GetClient()}, nil
}
// otherwise, use kubelet config
nodeAuthClient, err := kubelet.InitNodeAuthorizedClient(ctx.KubeletConfigPath)
if err != nil {
return nil, err
}
return &kubeletNodeClient{nodeAuthClient}, nil
}

// Enabled checks if the csi driver should be enabled.
func Enabled() bool {
return true
Expand Down
68 changes: 68 additions & 0 deletions pkg/utils/compatibility/node_restrict.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2025 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package compatibility

import (
"github.com/blang/semver/v4"

nativeLog "log"
"sync"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/discovery"
ctrl "sigs.k8s.io/controller-runtime"
)

var (
nodeBindingTokenSupported = false
nodeBindingTokenOnce sync.Once
)

// Beta release, default enabled. see https://github.com/kubernetes/enhancements/issues/4193
const nodeBindingTokenSupportedVersion = "v1.30.0"

// Checks the ServiceAccountTokenPodNodeInfo feature gate, whether the apiserver embeds the node name for the associated node when issuing service account tokens bound to Pod objects.
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is missing proper formatting. Go convention requires comments for exported functions to start with the function name. This comment should start with 'discoverNodeBindingTokenCompatibility checks...' However, since this function is unexported, consider making this comment more concise or moving it above the exported IsNodeBindingTokenSupported function where it would be more appropriate.

Copilot uses AI. Check for mistakes.
func discoverNodeBindingTokenCompatibility() {
nativeLog.Printf("Discovering k8s version to check NodeBindingToken compatibility...")
restConfig := ctrl.GetConfigOrDie()
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restConfig)

serverVersion, err := discoveryClient.ServerVersion()
if err != nil && !errors.IsNotFound(err) {
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling logic is incorrect. If ServerVersion() returns an error (whether IsNotFound or any other error), serverVersion will be nil, causing a nil pointer dereference on line 49 when accessing serverVersion.GitVersion. The code should fatal on any error, not just non-NotFound errors. Change the condition to if err != nil to properly handle all error cases.

Suggested change
if err != nil && !errors.IsNotFound(err) {
if err != nil {

Copilot uses AI. Check for mistakes.
nativeLog.Fatalf("failed to discover Kubernetes server version: %v", err)
}
// transform to semver.Version and compare
currentVersion, err := semver.ParseTolerant(serverVersion.GitVersion)
if err != nil {
nativeLog.Fatalf("Failed to parse current version: %v", err)
}
targetVersion, err := semver.ParseTolerant(nodeBindingTokenSupportedVersion)
if err != nil {
nativeLog.Fatalf("Failed to parse target version: %v", err)
}

if currentVersion.GTE(targetVersion) {
nodeBindingTokenSupported = true
}
}

func IsNodeBindingTokenSupported() bool {
nodeBindingTokenOnce.Do(func() {
discoverNodeBindingTokenCompatibility()
})
return nodeBindingTokenSupported
}
Loading