diff --git a/cmd/hostpathplugin/main.go b/cmd/hostpathplugin/main.go index ed78ee6b3..55debe371 100644 --- a/cmd/hostpathplugin/main.go +++ b/cmd/hostpathplugin/main.go @@ -36,6 +36,11 @@ var ( ephemeral = flag.Bool("ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)") maxVolumesPerNode = flag.Int64("maxvolumespernode", 0, "limit of volumes per node") showVersion = flag.Bool("version", false, "Show version.") + capacity = func() hostpath.Capacity { + c := hostpath.Capacity{} + flag.Var(c, "capacity", "Simulate storage capacity. The parameter is = where is the value of a 'kind' storage class parameter and is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.") + return c + }() // Set by the build process version = "" ) @@ -53,12 +58,7 @@ func main() { fmt.Fprintln(os.Stderr, "Deprecation warning: The ephemeral flag is deprecated and should only be used when deploying on Kubernetes 1.15. It will be removed in the future.") } - handle() - os.Exit(0) -} - -func handle() { - driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *endpoint, *ephemeral, *maxVolumesPerNode, version) + driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *endpoint, *ephemeral, *maxVolumesPerNode, version, capacity) if err != nil { fmt.Printf("Failed to initialize driver: %s", err.Error()) os.Exit(1) diff --git a/deploy/kubernetes-distributed/README.md b/deploy/kubernetes-distributed/README.md new file mode 100644 index 000000000..fc0e37524 --- /dev/null +++ b/deploy/kubernetes-distributed/README.md @@ -0,0 +1,8 @@ +This deployment is meant for Kubernetes clusters with +CSIStorageCapacity enabled. It deploys the hostpath driver on each +node, using distributed provisioning, and configures it so that it has +10Gi of "fast" storage and 100Gi of "slow" storage. + +The "kind" storage class parameter can selected between the two. If +not set, an arbitrary kind with enough capacity is picked. + diff --git a/deploy/kubernetes-distributed/app-generic-ephemeral.yaml b/deploy/kubernetes-distributed/app-generic-ephemeral.yaml new file mode 100644 index 000000000..77a00f217 --- /dev/null +++ b/deploy/kubernetes-distributed/app-generic-ephemeral.yaml @@ -0,0 +1,25 @@ +# This example Pod definition demonstrates +# how to use generic ephemeral inline volumes +# with a hostpath storage class. +kind: Pod +apiVersion: v1 +metadata: + name: my-csi-app-inline-volume +spec: + containers: + - name: my-frontend + image: k8s.gcr.io/pause + volumeMounts: + - mountPath: "/data" + name: my-csi-volume + volumes: + - name: my-csi-volume + ephemeral: + volumeClaimTemplate: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 4Gi + storageClassName: csi-hostpath-fast diff --git a/deploy/kubernetes-distributed/deploy.sh b/deploy/kubernetes-distributed/deploy.sh new file mode 100755 index 000000000..3a46111eb --- /dev/null +++ b/deploy/kubernetes-distributed/deploy.sh @@ -0,0 +1,221 @@ +#!/usr/bin/env bash + +# This script captures the steps required to successfully +# deploy the hostpath plugin driver. This should be considered +# authoritative and all updates for this process should be +# done here and referenced elsewhere. + +# The script assumes that kubectl is available on the OS path +# where it is executed. + +set -e +set -o pipefail + +BASE_DIR=$(dirname "$0") + +# If set, the following env variables override image registry and/or tag for each of the images. +# They are named after the image name, with hyphen replaced by underscore and in upper case. +# +# - CSI_ATTACHER_REGISTRY +# - CSI_ATTACHER_TAG +# - CSI_NODE_DRIVER_REGISTRAR_REGISTRY +# - CSI_NODE_DRIVER_REGISTRAR_TAG +# - CSI_PROVISIONER_REGISTRY +# - CSI_PROVISIONER_TAG +# - CSI_SNAPSHOTTER_REGISTRY +# - CSI_SNAPSHOTTER_TAG +# - HOSTPATHPLUGIN_REGISTRY +# - HOSTPATHPLUGIN_TAG +# +# Alternatively, it is possible to override all registries or tags with: +# - IMAGE_REGISTRY +# - IMAGE_TAG +# These are used as fallback when the more specific variables are unset or empty. +# +# IMAGE_TAG=canary is ignored for images that are blacklisted in the +# deployment's optional canary-blacklist.txt file. This is meant for +# images which have known API breakages and thus cannot work in those +# deployments anymore. That text file must have the name of the blacklisted +# image on a line by itself, other lines are ignored. Example: +# +# # The following canary images are known to be incompatible with this +# # deployment: +# csi-snapshotter +# +# Beware that the .yaml files do not have "imagePullPolicy: Always". That means that +# also the "canary" images will only be pulled once. This is good for testing +# (starting a pod multiple times will always run with the same canary image), but +# implies that refreshing that image has to be done manually. +# +# As a special case, 'none' as registry removes the registry name. + +# The default is to use the RBAC rules that match the image that is +# being used, also in the case that the image gets overridden. This +# way if there are breaking changes in the RBAC rules, the deployment +# will continue to work. +# +# However, such breaking changes should be rare and only occur when updating +# to a new major version of a sidecar. Nonetheless, to allow testing the scenario +# where the image gets overridden but not the RBAC rules, updating the RBAC +# rules can be disabled. +: ${UPDATE_RBAC_RULES:=true} +function rbac_version () { + yaml="$1" + image="$2" + update_rbac="$3" + + # get version from `image: quay.io/k8scsi/csi-attacher:v1.0.1`, ignoring comments + version="$(sed -e 's/ *#.*$//' "$yaml" | grep "image:.*$image" | sed -e 's/ *#.*//' -e 's/.*://')" + + if $update_rbac; then + # apply overrides + varname=$(echo $image | tr - _ | tr a-z A-Z) + eval version=\${${varname}_TAG:-\${IMAGE_TAG:-\$version}} + fi + + # When using canary images, we have to assume that the + # canary images were built from the corresponding branch. + case "$version" in canary) version=master;; + *-canary) version="$(echo "$version" | sed -e 's/\(.*\)-canary/release-\1/')";; + esac + + echo "$version" +} + +# version_gt returns true if arg1 is greater than arg2. +# +# This function expects versions to be one of the following formats: +# X.Y.Z, release-X.Y.Z, vX.Y.Z +# +# where X,Y, and Z are any number. +# +# Partial versions (1.2, release-1.2) work as well. +# The follow substrings are stripped before version comparison: +# - "v" +# - "release-" +# +# Usage: +# version_gt release-1.3 v1.2.0 (returns true) +# version_gt v1.1.1 v1.2.0 (returns false) +# version_gt 1.1.1 v1.2.0 (returns false) +# version_gt 1.3.1 v1.2.0 (returns true) +# version_gt 1.1.1 release-1.2.0 (returns false) +# version_gt 1.2.0 1.2.2 (returns false) +function version_gt() { + versions=$(for ver in "$@"; do ver=${ver#release-}; ver=${ver#kubernetes-}; echo ${ver#v}; done) + greaterVersion=${1#"release-"}; + greaterVersion=${greaterVersion#"kubernetes-"}; + greaterVersion=${greaterVersion#"v"}; + test "$(printf '%s' "$versions" | sort -V | head -n 1)" != "$greaterVersion" +} + + +CSI_PROVISIONER_RBAC_YAML="https://raw.githubusercontent.com/kubernetes-csi/external-provisioner/$(rbac_version "${BASE_DIR}/hostpath/csi-hostpath-plugin.yaml" csi-provisioner false)/deploy/kubernetes/rbac.yaml" +: ${CSI_PROVISIONER_RBAC:=https://raw.githubusercontent.com/kubernetes-csi/external-provisioner/$(rbac_version "${BASE_DIR}/hostpath/csi-hostpath-plugin.yaml" csi-provisioner "${UPDATE_RBAC_RULES}")/deploy/kubernetes/rbac.yaml} + +# Some images are not affected by *_REGISTRY/*_TAG and IMAGE_* variables. +# The default is to update unless explicitly excluded. +update_image () { + case "$1" in socat) return 1;; esac +} + +run () { + echo "$@" >&2 + "$@" +} + +# rbac rules +echo "applying RBAC rules" +for component in CSI_PROVISIONER; do + eval current="\${${component}_RBAC}" + eval original="\${${component}_RBAC_YAML}" + if [ "$current" != "$original" ]; then + echo "Using non-default RBAC rules for $component. Changes from $original to $current are:" + diff -c <(wget --quiet -O - "$original") <(if [[ "$current" =~ ^http ]]; then wget --quiet -O - "$current"; else cat "$current"; fi) || true + fi + run kubectl apply -f "${current}" +done + +if kubectl get csistoragecapacities 2>&1 | grep "the server doesn't have a resource type"; then + have_csistoragecapacity=false +else + have_csistoragecapacity=true +fi + +# deploy hostpath plugin and registrar sidecar +echo "deploying hostpath components" +for i in $(ls ${BASE_DIR}/hostpath/*.yaml | sort); do + echo " $i" + modified="$(cat "$i" | while IFS= read -r line; do + nocomments="$(echo "$line" | sed -e 's/ *#.*$//')" + if echo "$nocomments" | grep -q '^[[:space:]]*image:[[:space:]]*'; then + # Split 'image: quay.io/k8scsi/csi-attacher:v1.0.1' + # into image (quay.io/k8scsi/csi-attacher:v1.0.1), + # registry (quay.io/k8scsi), + # name (csi-attacher), + # tag (v1.0.1). + image=$(echo "$nocomments" | sed -e 's;.*image:[[:space:]]*;;') + registry=$(echo "$image" | sed -e 's;\(.*\)/.*;\1;') + name=$(echo "$image" | sed -e 's;.*/\([^:]*\).*;\1;') + tag=$(echo "$image" | sed -e 's;.*:;;') + + # Variables are with underscores and upper case. + varname=$(echo $name | tr - _ | tr a-z A-Z) + + # Now replace registry and/or tag, if set as env variables. + # If not set, the replacement is the same as the original value. + # Only do this for the images which are meant to be configurable. + if update_image "$name"; then + prefix=$(eval echo \${${varname}_REGISTRY:-${IMAGE_REGISTRY:-${registry}}}/ | sed -e 's;none/;;') + if [ "$IMAGE_TAG" = "canary" ] && + [ -f ${BASE_DIR}/canary-blacklist.txt ] && + grep -q "^$name\$" ${BASE_DIR}/canary-blacklist.txt; then + # Ignore IMAGE_TAG=canary for this particular image because its + # canary image is blacklisted in the deployment blacklist. + suffix=$(eval echo :\${${varname}_TAG:-${tag}}) + else + suffix=$(eval echo :\${${varname}_TAG:-${IMAGE_TAG:-${tag}}}) + fi + line="$(echo "$nocomments" | sed -e "s;$image;${prefix}${name}${suffix};")" + fi + echo " using $line" >&2 + fi + if ! $have_csistoragecapacity; then + line="$(echo "$line" | grep -v -e 'storageCapacity: true' -e '--enable-capacity')" + fi + echo "$line" + done)" + if ! echo "$modified" | kubectl apply -f -; then + echo "modified version of $i:" + echo "$modified" + exit 1 + fi +done + +wait_for_daemonset () { + retries=10 + while [ $retries -ge 0 ]; do + ready=$(kubectl get -n $1 daemonset $2 -o jsonpath="{.status.numberReady}") + required=$(kubectl get -n $1 daemonset $2 -o jsonpath="{.status.desiredNumberScheduled}") + if [ $ready -gt 0 ] && [ $ready -eq $required ]; then + return 0 + fi + retries=$((retries - 1)) + sleep 3 + done + return 1 +} + + +# Wait until the DaemonSet is running on all nodes. +if ! wait_for_daemonset default csi-hostpathplugin; then + echo "driver not ready" + kubectl describe daemonsets/csi-hostpathplugin + exit 1 +fi + +# Create a test driver configuration in the place where the prow job +# expects it? +if [ "${CSI_PROW_TEST_DRIVER}" ]; then + cp "${BASE_DIR}/test-driver.yaml" "${CSI_PROW_TEST_DRIVER}" +fi diff --git a/deploy/kubernetes-distributed/hostpath/csi-hostpath-driverinfo.yaml b/deploy/kubernetes-distributed/hostpath/csi-hostpath-driverinfo.yaml new file mode 100644 index 000000000..33e4a81a7 --- /dev/null +++ b/deploy/kubernetes-distributed/hostpath/csi-hostpath-driverinfo.yaml @@ -0,0 +1,16 @@ +apiVersion: storage.k8s.io/v1 +kind: CSIDriver +metadata: + name: hostpath.csi.k8s.io +spec: + # Supports persistent and ephemeral inline volumes. + volumeLifecycleModes: + - Persistent + - Ephemeral + # To determine at runtime which mode a volume uses, pod info and its + # "csi.storage.k8s.io/ephemeral" entry are needed. + podInfoOnMount: true + # No attacher needed. + attachRequired: false + # alpha: opt into capacity-aware scheduling + storageCapacity: true diff --git a/deploy/kubernetes-distributed/hostpath/csi-hostpath-plugin.yaml b/deploy/kubernetes-distributed/hostpath/csi-hostpath-plugin.yaml new file mode 100644 index 000000000..6312ec08d --- /dev/null +++ b/deploy/kubernetes-distributed/hostpath/csi-hostpath-plugin.yaml @@ -0,0 +1,155 @@ +kind: DaemonSet +apiVersion: apps/v1 +metadata: + name: csi-hostpathplugin +spec: + selector: + matchLabels: + app: csi-hostpathplugin + template: + metadata: + labels: + app: csi-hostpathplugin + spec: + serviceAccount: csi-provisioner + containers: + - name: csi-provisioner + image: k8s.gcr.io/sig-storage/csi-provisioner:v2.1.0 + args: + - -v=5 + - --csi-address=/csi/csi.sock + - --feature-gates=Topology=true + - --enable-capacity + - --capacity-ownerref-level=0 # pod is owner + - --node-deployment=true + - --strict-topology=true + - --immediate-topology=false + - --worker-threads=5 + env: + - name: NODE_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.nodeName + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + securityContext: + # This is necessary only for systems with SELinux, where + # non-privileged sidecar containers cannot access unix domain socket + # created by privileged CSI driver container. + privileged: true + volumeMounts: + - mountPath: /csi + name: socket-dir + + - name: node-driver-registrar + image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.0.1 + args: + - --v=5 + - --csi-address=/csi/csi.sock + - --kubelet-registration-path=/var/lib/kubelet/plugins/csi-hostpath/csi.sock + securityContext: + # This is necessary only for systems with SELinux, where + # non-privileged sidecar containers cannot access unix domain socket + # created by privileged CSI driver container. + privileged: true + env: + - name: KUBE_NODE_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.nodeName + volumeMounts: + - mountPath: /csi + name: socket-dir + - mountPath: /registration + name: registration-dir + - mountPath: /csi-data-dir + name: csi-data-dir + + - name: hostpath + image: k8s.gcr.io/sig-storage/hostpathplugin:v1.6.0 + args: + - --drivername=hostpath.csi.k8s.io + - --v=5 + - --endpoint=$(CSI_ENDPOINT) + - --nodeid=$(KUBE_NODE_NAME) + - --capacity=slow=10Gi + - --capacity=fast=100Gi + env: + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + - name: KUBE_NODE_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.nodeName + securityContext: + privileged: true + ports: + - containerPort: 9898 + name: healthz + protocol: TCP + livenessProbe: + failureThreshold: 5 + httpGet: + path: /healthz + port: healthz + initialDelaySeconds: 10 + timeoutSeconds: 3 + periodSeconds: 2 + volumeMounts: + - mountPath: /csi + name: socket-dir + - mountPath: /var/lib/kubelet/pods + mountPropagation: Bidirectional + name: mountpoint-dir + - mountPath: /var/lib/kubelet/plugins + mountPropagation: Bidirectional + name: plugins-dir + - mountPath: /csi-data-dir + name: csi-data-dir + - mountPath: /dev + name: dev-dir + - name: liveness-probe + volumeMounts: + - mountPath: /csi + name: socket-dir + image: k8s.gcr.io/sig-storage/livenessprobe:v2.2.0 + args: + - --csi-address=/csi/csi.sock + - --health-port=9898 + + volumes: + - hostPath: + path: /var/lib/kubelet/plugins/csi-hostpath + type: DirectoryOrCreate + name: socket-dir + - hostPath: + path: /var/lib/kubelet/pods + type: DirectoryOrCreate + name: mountpoint-dir + - hostPath: + path: /var/lib/kubelet/plugins_registry + type: Directory + name: registration-dir + - hostPath: + path: /var/lib/kubelet/plugins + type: Directory + name: plugins-dir + - hostPath: + # 'path' is where PV data is persisted on host. + # using /tmp is also possible while the PVs will not available after plugin container recreation or host reboot + path: /var/lib/csi-hostpath-data/ + type: DirectoryOrCreate + name: csi-data-dir + - hostPath: + path: /dev + type: Directory + name: dev-dir diff --git a/deploy/kubernetes-distributed/hostpath/csi-hostpath-storageclass-fast.yaml b/deploy/kubernetes-distributed/hostpath/csi-hostpath-storageclass-fast.yaml new file mode 100644 index 000000000..8371b9a04 --- /dev/null +++ b/deploy/kubernetes-distributed/hostpath/csi-hostpath-storageclass-fast.yaml @@ -0,0 +1,8 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: csi-hostpath-fast +provisioner: hostpath.csi.k8s.io +volumeBindingMode: WaitForFirstConsumer +parameters: + kind: fast diff --git a/deploy/kubernetes-distributed/hostpath/csi-hostpath-storageclass-slow.yaml b/deploy/kubernetes-distributed/hostpath/csi-hostpath-storageclass-slow.yaml new file mode 100644 index 000000000..ca43687c1 --- /dev/null +++ b/deploy/kubernetes-distributed/hostpath/csi-hostpath-storageclass-slow.yaml @@ -0,0 +1,8 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: csi-hostpath-slow +provisioner: hostpath.csi.k8s.io +volumeBindingMode: WaitForFirstConsumer +parameters: + kind: slow diff --git a/deploy/kubernetes-distributed/hostpath/csi-hostpath-testing.yaml b/deploy/kubernetes-distributed/hostpath/csi-hostpath-testing.yaml new file mode 100644 index 000000000..a79743bc3 --- /dev/null +++ b/deploy/kubernetes-distributed/hostpath/csi-hostpath-testing.yaml @@ -0,0 +1,73 @@ +# WARNING: this is only for testing purposes. Do not install in a production +# cluster. +# +# This exposes the hostpath's Unix domain csi.sock as a TCP port to the +# outside world. The mapping from Unix domain socket to TCP is done +# by socat. +# +# This is useful for testing with csi-sanity or csc. + +apiVersion: v1 +kind: Service +metadata: + name: hostpath-service +spec: + type: NodePort + selector: + app: csi-hostpath-socat + ports: + - port: 10000 # fixed port inside the pod, dynamically allocated port outside +--- +kind: StatefulSet +apiVersion: apps/v1 +metadata: + name: csi-hostpath-socat +spec: + serviceName: "csi-hostpath-socat" + replicas: 1 + selector: + matchLabels: + app: csi-hostpath-socat + template: + metadata: + labels: + app: csi-hostpath-socat + spec: + affinity: + podAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app + operator: In + values: + - csi-hostpathplugin + topologyKey: kubernetes.io/hostname + containers: + - name: socat + image: alpine/socat:1.0.3 + args: + - tcp-listen:10000,fork,reuseaddr + - unix-connect:/csi/csi.sock + securityContext: + # This is necessary only for systems with SELinux, where + # non-privileged sidecar containers cannot access unix domain socket + # created by privileged CSI driver container. + privileged: true + volumeMounts: + - mountPath: /csi + name: socket-dir + # This is used for sanity testing because the pod name of the + # driver itself is non-deterministic when using a daemonset. + - mountPath: /var/lib/kubelet/pods + mountPropagation: Bidirectional + name: mountpoint-dir + volumes: + - hostPath: + path: /var/lib/kubelet/plugins/csi-hostpath + type: DirectoryOrCreate + name: socket-dir + - hostPath: + path: /var/lib/kubelet/pods + type: DirectoryOrCreate + name: mountpoint-dir diff --git a/deploy/kubernetes-distributed/test-driver.yaml b/deploy/kubernetes-distributed/test-driver.yaml new file mode 100644 index 000000000..85af032c7 --- /dev/null +++ b/deploy/kubernetes-distributed/test-driver.yaml @@ -0,0 +1,18 @@ +# This file describes how to test this deployment of the CSI hostpath driver +# using the Kubernetes 1.17 E2E test suite. For details see: +# https://github.com/kubernetes/kubernetes/tree/v1.17.0/test/e2e/storage/external + +StorageClass: + FromExistingClassName: csi-hostpath-fast +DriverInfo: + Name: hostpath.csi.k8s.io + SupportedSizeRange: + Min: 1Mi + Capabilities: + block: true + exec: true + multipods: true + nodeExpansion: true + persistence: true + singleNodeVolume: true + topology: true diff --git a/go.mod b/go.mod index 21f5f389b..54d73d448 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d // indirect google.golang.org/genproto v0.0.0-20201209185603-f92720507ed4 // indirect google.golang.org/grpc v1.34.0 - k8s.io/apimachinery v0.21.0-alpha.0 // indirect + k8s.io/apimachinery v0.21.0-alpha.0 k8s.io/kubernetes v1.20.0 k8s.io/mount-utils v0.20.0 // indirect k8s.io/utils v0.0.0-20201110183641-67b214c5f920 diff --git a/pkg/hostpath/capacity.go b/pkg/hostpath/capacity.go new file mode 100644 index 000000000..ab9b1823c --- /dev/null +++ b/pkg/hostpath/capacity.go @@ -0,0 +1,123 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostpath + +import ( + "errors" + "flag" + "fmt" + "strings" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/api/resource" +) + +// Capacity simulates linear storage of certain types ("fast", +// "slow"). When volumes of those types get created, they must +// allocate storage (which can fail!) and that storage must +// be freed again when volumes get destroyed. +// +// Available capacity is configurable with a command line flag +// -capacity = where is a string and +// is a quantity (1T, 1Gi). More than one of those +// flags can be used. +type Capacity map[string]resource.Quantity + +// Set is an implementation of flag.Value.Set. +func (c Capacity) Set(arg string) error { + parts := strings.SplitN(arg, "=", 2) + if len(parts) != 2 { + return errors.New("must be of format =") + } + quantity, err := resource.ParseQuantity(parts[1]) + if err != nil { + return err + } + + // We overwrite any previous value. + c[parts[0]] = quantity + return nil +} + +func (c Capacity) String() string { + return fmt.Sprintf("%v", map[string]resource.Quantity(c)) +} + +var _ flag.Value = &Capacity{} + +// Alloc reserves a certain amount of bytes. Errors are +// usable as result of gRPC calls. Empty kind means +// that any large enough one is fine. +func (c Capacity) Alloc(kind string, size int64) (actualKind string, err error) { + requested := *resource.NewQuantity(size, resource.BinarySI) + + if kind == "" { + for k, quantity := range c { + if quantity.Value() >= size { + kind = k + break + } + } + // Still nothing? + if kind == "" { + available := c.Check("") + return "", status.Error(codes.ResourceExhausted, + fmt.Sprintf("not enough capacity: have %s, need %s", available.String(), requested.String())) + } + } + + available, ok := c[kind] + if !ok { + return "", status.Error(codes.InvalidArgument, fmt.Sprintf("unknown capacity kind: %q", kind)) + } + if available.Cmp(requested) < 0 { + return "", status.Error(codes.ResourceExhausted, + fmt.Sprintf("not enough capacity of kind %q: have %s, need %s", kind, available.String(), requested.String())) + } + available.Sub(requested) + c[kind] = available + return kind, nil +} + +// Free returns capacity reserved earlier with Alloc. +func (c Capacity) Free(kind string, size int64) { + available := c[kind] + available.Add(*resource.NewQuantity(size, resource.BinarySI)) + c[kind] = available +} + +// Check reports available capacity for a certain kind. +// If empty, it reports the maximum capacity. +func (c Capacity) Check(kind string) resource.Quantity { + if kind != "" { + quantity := c[kind] + return quantity + } + available := resource.Quantity{} + for _, q := range c { + if q.Cmp(available) >= 0 { + available = q + } + } + return available +} + +// Enabled returns true if capacities are configured. +func (c Capacity) Enabled() bool { + return len(c) > 0 +} diff --git a/pkg/hostpath/controllerserver.go b/pkg/hostpath/controllerserver.go index cc407ab83..a4c32a8e3 100644 --- a/pkg/hostpath/controllerserver.go +++ b/pkg/hostpath/controllerserver.go @@ -98,12 +98,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque hp.mutex.Lock() defer hp.mutex.Unlock() - // Check for maximum available capacity capacity := int64(req.GetCapacityRange().GetRequiredBytes()) - if capacity >= maxStorageCapacity { - return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", capacity, maxStorageCapacity) - } - topologies := []*csi.Topology{&csi.Topology{ Segments: map[string]string{TopologyKeyNode: hp.nodeID}, }} @@ -145,8 +140,8 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque } volumeID := uuid.NewUUID().String() - - vol, err := createHostpathVolume(volumeID, req.GetName(), capacity, requestedAccessType, false /* ephemeral */) + kind := req.GetParameters()[storageKind] + vol, err := hp.createVolume(volumeID, req.GetName(), capacity, requestedAccessType, false /* ephemeral */, kind) if err != nil { return nil, status.Errorf(codes.Internal, "failed to create volume %v: %v", volumeID, err) } @@ -158,12 +153,12 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque switch volumeSource.Type.(type) { case *csi.VolumeContentSource_Snapshot: if snapshot := volumeSource.GetSnapshot(); snapshot != nil { - err = loadFromSnapshot(capacity, snapshot.GetSnapshotId(), path, requestedAccessType) + err = hp.loadFromSnapshot(capacity, snapshot.GetSnapshotId(), path, requestedAccessType) vol.ParentSnapID = snapshot.GetSnapshotId() } case *csi.VolumeContentSource_Volume: if srcVolume := volumeSource.GetVolume(); srcVolume != nil { - err = loadFromVolume(capacity, srcVolume.GetVolumeId(), path, requestedAccessType) + err = hp.loadFromVolume(capacity, srcVolume.GetVolumeId(), path, requestedAccessType) vol.ParentVolID = srcVolume.GetVolumeId() } default: @@ -210,7 +205,6 @@ func (hp *hostPath) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque if err := hp.deleteVolume(volId); err != nil { return nil, status.Errorf(codes.Internal, "failed to delete volume %v: %v", volId, err) } - glog.V(4).Infof("volume %v successfully deleted", volId) return &csi.DeleteVolumeResponse{}, nil @@ -268,7 +262,24 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont } func (hp *hostPath) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + // Lock before acting on global state. A production-quality + // driver might use more fine-grained locking. + hp.mutex.Lock() + defer hp.mutex.Unlock() + + // Topology and capabilities are irrelevant. We only + // distinguish based on the "kind" parameter, if at all. + // Without configured capacity, we just have the maximum size. + available := maxStorageCapacity + if hp.capacity.Enabled() { + kind := req.GetParameters()[storageKind] + quantity := hp.capacity.Check(kind) + available = quantity.Value() + } + + return &csi.GetCapacityResponse{ + AvailableCapacity: available, + }, nil } func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { @@ -286,7 +297,7 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest hp.mutex.Lock() defer hp.mutex.Unlock() - volumeIds := getSortedVolumeIDs() + volumeIds := hp.getSortedVolumeIDs() if req.StartingToken == "" { req.StartingToken = "1" } @@ -305,7 +316,7 @@ func (hp *hostPath) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest for index := startIdx - 1; index < volumesLength && index < maxLength; index++ { hpVolume = hp.volumes[volumeIds[index]] - healthy, msg := doHealthCheckInControllerSide(volumeIds[index]) + healthy, msg := hp.doHealthCheckInControllerSide(volumeIds[index]) glog.V(3).Infof("Healthy state: %s Volume: %t", hpVolume.VolName, healthy) volumeRes.Entries = append(volumeRes.Entries, &csi.ListVolumesResponse_Entry{ Volume: &csi.Volume{ @@ -337,7 +348,7 @@ func (hp *hostPath) ControllerGetVolume(ctx context.Context, req *csi.Controller return nil, status.Error(codes.NotFound, "The volume not found") } - healthy, msg := doHealthCheckInControllerSide(req.GetVolumeId()) + healthy, msg := hp.doHealthCheckInControllerSide(req.GetVolumeId()) glog.V(3).Infof("Healthy state: %s Volume: %t", volume.VolName, healthy) return &csi.ControllerGetVolumeResponse{ Volume: &csi.Volume{ @@ -435,7 +446,7 @@ func (hp *hostPath) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotR snapshot.SizeBytes = hostPathVolume.VolSize snapshot.ReadyToUse = true - hostPathVolumeSnapshots[snapshotID] = snapshot + hp.snapshots[snapshotID] = snapshot return &csi.CreateSnapshotResponse{ Snapshot: &csi.Snapshot{ @@ -468,7 +479,7 @@ func (hp *hostPath) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotR glog.V(4).Infof("deleting snapshot %s", snapshotID) path := getSnapshotPath(snapshotID) os.RemoveAll(path) - delete(hostPathVolumeSnapshots, snapshotID) + delete(hp.snapshots, snapshotID) return &csi.DeleteSnapshotResponse{}, nil } @@ -486,14 +497,14 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq // case 1: SnapshotId is not empty, return snapshots that match the snapshot id. if len(req.GetSnapshotId()) != 0 { snapshotID := req.SnapshotId - if snapshot, ok := hostPathVolumeSnapshots[snapshotID]; ok { + if snapshot, ok := hp.snapshots[snapshotID]; ok { return convertSnapshot(snapshot), nil } } // case 2: SourceVolumeId is not empty, return snapshots that match the source volume id. if len(req.GetSourceVolumeId()) != 0 { - for _, snapshot := range hostPathVolumeSnapshots { + for _, snapshot := range hp.snapshots { if snapshot.VolID == req.SourceVolumeId { return convertSnapshot(snapshot), nil } @@ -503,13 +514,13 @@ func (hp *hostPath) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReq var snapshots []csi.Snapshot // case 3: no parameter is set, so we return all the snapshots. sortedKeys := make([]string, 0) - for k := range hostPathVolumeSnapshots { + for k := range hp.snapshots { sortedKeys = append(sortedKeys, k) } sort.Strings(sortedKeys) for _, key := range sortedKeys { - snap := hostPathVolumeSnapshots[key] + snap := hp.snapshots[key] snapshot := csi.Snapshot{ SnapshotId: snap.Id, SourceVolumeId: snap.VolID, @@ -659,6 +670,7 @@ func (hp *hostPath) getControllerServiceCapabilities() []*csi.ControllerServiceC cl = []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_GET_VOLUME, + csi.ControllerServiceCapability_RPC_GET_CAPACITY, csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, csi.ControllerServiceCapability_RPC_LIST_VOLUMES, diff --git a/pkg/hostpath/healthcheck.go b/pkg/hostpath/healthcheck.go index ab7c81ef4..10a54621d 100644 --- a/pkg/hostpath/healthcheck.go +++ b/pkg/hostpath/healthcheck.go @@ -128,14 +128,14 @@ func checkMountPointExist(sourcePath string) (bool, error) { return false, nil } -func checkPVCapacityValid(volumeHandle string) (bool, error) { +func (hp *hostPath) checkPVCapacityValid(volumeHandle string) (bool, error) { sourcePath := getSourcePath(volumeHandle) _, fscapacity, _, _, _, _, err := fs.FsInfo(sourcePath) if err != nil { return false, fmt.Errorf("failed to get capacity info: %+v", err) } - volumeCapacity := hostPathVolumes[volumeHandle].VolSize + volumeCapacity := hp.volumes[volumeHandle].VolSize glog.V(3).Infof("volume capacity: %+v fs capacity:%+v", volumeCapacity, fscapacity) return fscapacity >= volumeCapacity, nil } @@ -157,7 +157,7 @@ func checkPVUsage(volumeHandle string) (bool, error) { return fsavailable > 0, nil } -func doHealthCheckInControllerSide(volumeHandle string) (bool, string) { +func (hp *hostPath) doHealthCheckInControllerSide(volumeHandle string) (bool, string) { spExist, err := checkSourcePathExist(volumeHandle) if err != nil { return false, err.Error() @@ -167,7 +167,7 @@ func doHealthCheckInControllerSide(volumeHandle string) (bool, string) { return false, "The source path of the volume doesn't exist" } - capValid, err := checkPVCapacityValid(volumeHandle) + capValid, err := hp.checkPVCapacityValid(volumeHandle) if err != nil { return false, err.Error() } diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index 4942ad5b4..607b8ed7f 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -45,6 +45,10 @@ const ( gib100 int64 = gib * 100 tib int64 = gib * 1024 tib100 int64 = tib * 100 + + // storageKind is the special parameter which requests + // storage of a certain kind (only affects capacity checks). + storageKind = "kind" ) type hostPath struct { @@ -61,6 +65,7 @@ type hostPath struct { mutex sync.Mutex volumes map[string]hostPathVolume snapshots map[string]hostPathSnapshot + capacity Capacity } type hostPathVolume struct { @@ -73,6 +78,7 @@ type hostPathVolume struct { ParentSnapID string `json:"parentSnapID,omitempty"` Ephemeral bool `json:"ephemeral"` NodeID string `json:"nodeID"` + Kind string `json:"kind"` } type hostPathSnapshot struct { @@ -87,9 +93,6 @@ type hostPathSnapshot struct { var ( vendorVersion = "dev" - - hostPathVolumes map[string]hostPathVolume - hostPathVolumeSnapshots map[string]hostPathSnapshot ) const ( @@ -102,7 +105,7 @@ const ( snapshotExt = ".snap" ) -func NewHostPathDriver(driverName, nodeID, endpoint string, ephemeral bool, maxVolumesPerNode int64, version string) (*hostPath, error) { +func NewHostPathDriver(driverName, nodeID, endpoint string, ephemeral bool, maxVolumesPerNode int64, version string, capacity Capacity) (*hostPath, error) { if driverName == "" { return nil, errors.New("no driver name provided") } @@ -132,6 +135,7 @@ func NewHostPathDriver(driverName, nodeID, endpoint string, ephemeral bool, maxV endpoint: endpoint, ephemeral: ephemeral, maxVolumesPerNode: maxVolumesPerNode, + capacity: capacity, volumes: map[string]hostPathVolume{}, snapshots: map[string]hostPathSnapshot{}, @@ -199,6 +203,7 @@ func (hp *hostPath) discoveryExistingVolumes() error { // getting existing volumes based on the mount point infos. // It's a temporary solution to recall volumes. + // TODO: discover what kind of storage was used and the nominal size. for _, pv := range mountInfosOfPod.ContainerFileSystem { if !strings.Contains(pv.Target, csiSignOfVolumeTargetPath) { continue @@ -209,6 +214,11 @@ func (hp *hostPath) discoveryExistingVolumes() error { return err } + if hpv.Kind != "" && hp.capacity.Enabled() { + if _, err := hp.capacity.Alloc(hpv.Kind, hpv.VolSize); err != nil { + return fmt.Errorf("existing volume(s) do not match new capacity configuration: %v", err) + } + } hp.volumes[hpv.VolID] = *hpv } @@ -255,9 +265,29 @@ func getVolumePath(volID string) string { return filepath.Join(dataRoot, volID) } -// createVolume create the directory for the hostpath volume. -// It returns the volume path or err if one occurs. -func createHostpathVolume(volID, name string, cap int64, volAccessType accessType, ephemeral bool) (*hostPathVolume, error) { +// createVolume allocates capacity, creates the directory for the hostpath volume, and +// adds the volume to the list. +// +// It returns the volume path or err if one occurs. That error is suitable as result of a gRPC call. +func (hp *hostPath) createVolume(volID, name string, cap int64, volAccessType accessType, ephemeral bool, kind string) (hpv *hostPathVolume, finalErr error) { + // Check for maximum available capacity + if cap >= maxStorageCapacity { + return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", cap, maxStorageCapacity) + } + if hp.capacity.Enabled() { + actualKind, err := hp.capacity.Alloc(kind, cap) + if err != nil { + return nil, err + } + // Free the capacity in case of any error - either a volume gets created or it doesn't. + defer func() { + if finalErr != nil { + hp.capacity.Free(actualKind, cap) + } + }() + kind = actualKind + } + path := getVolumePath(volID) switch volAccessType { @@ -303,8 +333,10 @@ func createHostpathVolume(volID, name string, cap int64, volAccessType accessTyp VolPath: path, VolAccessType: volAccessType, Ephemeral: ephemeral, + Kind: kind, } - hostPathVolumes[volID] = hostpathVol + glog.V(4).Infof("adding hostpath volume: %s = %+v", volID, hostpathVol) + hp.volumes[volID] = hostpathVol return &hostpathVol, nil } @@ -322,7 +354,7 @@ func (hp *hostPath) updateVolume(volID string, volume hostPathVolume) error { // deleteVolume deletes the directory for the hostpath volume. func (hp *hostPath) deleteVolume(volID string) error { - glog.V(4).Infof("deleting hostpath volume: %s", volID) + glog.V(4).Infof("starting to delete hostpath volume: %s", volID) vol, err := hp.getVolumeByID(volID) if err != nil { @@ -343,6 +375,10 @@ func (hp *hostPath) deleteVolume(volID string) error { if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) { return err } + glog.V(4).Infof("deleting hostpath volume: %s = %+v", volID, vol) + if hp.capacity.Enabled() { + hp.capacity.Free(vol.Kind, vol.VolSize) + } delete(hp.volumes, volID) return nil } @@ -364,8 +400,8 @@ func hostPathIsEmpty(p string) (bool, error) { } // loadFromSnapshot populates the given destPath with data from the snapshotID -func loadFromSnapshot(size int64, snapshotId, destPath string, mode accessType) error { - snapshot, ok := hostPathVolumeSnapshots[snapshotId] +func (hp *hostPath) loadFromSnapshot(size int64, snapshotId, destPath string, mode accessType) error { + snapshot, ok := hp.snapshots[snapshotId] if !ok { return status.Errorf(codes.NotFound, "cannot find snapshot %v", snapshotId) } @@ -398,8 +434,8 @@ func loadFromSnapshot(size int64, snapshotId, destPath string, mode accessType) } // loadFromVolume populates the given destPath with data from the srcVolumeID -func loadFromVolume(size int64, srcVolumeId, destPath string, mode accessType) error { - hostPathVolume, ok := hostPathVolumes[srcVolumeId] +func (hp *hostPath) loadFromVolume(size int64, srcVolumeId, destPath string, mode accessType) error { + hostPathVolume, ok := hp.volumes[srcVolumeId] if !ok { return status.Error(codes.NotFound, "source volumeId does not exist, are source/destination in the same storage class?") } @@ -450,10 +486,10 @@ func loadFromBlockVolume(hostPathVolume hostPathVolume, destPath string) error { return nil } -func getSortedVolumeIDs() []string { - ids := make([]string, len(hostPathVolumes)) +func (hp *hostPath) getSortedVolumeIDs() []string { + ids := make([]string, len(hp.volumes)) index := 0 - for volId := range hostPathVolumes { + for volId := range hp.volumes { ids[index] = volId index += 1 } diff --git a/pkg/hostpath/nodeserver.go b/pkg/hostpath/nodeserver.go index d6783cff8..044a07d45 100644 --- a/pkg/hostpath/nodeserver.go +++ b/pkg/hostpath/nodeserver.go @@ -63,10 +63,11 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV if ephemeralVolume { volID := req.GetVolumeId() volName := fmt.Sprintf("ephemeral-%s", volID) - vol, err := createHostpathVolume(req.GetVolumeId(), volName, maxStorageCapacity, mountAccess, ephemeralVolume) + kind := req.GetVolumeContext()[storageKind] + vol, err := hp.createVolume(req.GetVolumeId(), volName, maxStorageCapacity, mountAccess, ephemeralVolume, kind) if err != nil && !os.IsExist(err) { glog.Error("ephemeral mode failed to create volume: ", err) - return nil, status.Error(codes.Internal, err.Error()) + return nil, err } glog.V(4).Infof("ephemeral mode: created volume: %s", vol.VolPath) } @@ -175,9 +176,9 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV } } - volume := hostPathVolumes[req.GetVolumeId()] + volume := hp.volumes[req.GetVolumeId()] volume.NodeID = hp.nodeID - hostPathVolumes[req.GetVolumeId()] = volume + hp.volumes[req.GetVolumeId()] = volume return &csi.NodePublishVolumeResponse{}, nil } @@ -309,7 +310,7 @@ func (hp *hostPath) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolum hp.mutex.Lock() defer hp.mutex.Unlock() - volume, ok := hostPathVolumes[in.GetVolumeId()] + volume, ok := hp.volumes[in.GetVolumeId()] if !ok { return nil, status.Error(codes.NotFound, "The volume not found") }