-
Notifications
You must be signed in to change notification settings - Fork 560
/
Copy pathcephfs_helper.go
140 lines (124 loc) · 4.25 KB
/
cephfs_helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package e2e
import (
"context"
"encoding/json"
"fmt"
"strings"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
)
const (
adminUser = "admin"
)
// validateSubvolumegroup validates whether subvolumegroup is present.
func validateSubvolumegroup(f *framework.Framework, subvolgrp string) error {
cmd := fmt.Sprintf("ceph fs subvolumegroup getpath myfs %s", subvolgrp)
stdOut, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace)
if err != nil {
return err
}
if stdErr != "" {
return fmt.Errorf("failed to getpath for subvolumegroup %s with error %v", subvolgrp, stdErr)
}
expectedGrpPath := "/volumes/" + subvolgrp
stdOut = strings.TrimSpace(stdOut)
if stdOut != expectedGrpPath {
return fmt.Errorf("error unexpected group path. Found: %s", stdOut)
}
return nil
}
func createCephfsStorageClass(c kubernetes.Interface, f *framework.Framework, enablePool bool, params map[string]string) error {
scPath := fmt.Sprintf("%s/%s", cephfsExamplePath, "storageclass.yaml")
sc, err := getStorageClass(scPath)
if err != nil {
return err
}
sc.Parameters["fsName"] = "myfs"
sc.Parameters["csi.storage.k8s.io/provisioner-secret-namespace"] = rookNamespace
sc.Parameters["csi.storage.k8s.io/provisioner-secret-name"] = cephfsProvisionerSecretName
sc.Parameters["csi.storage.k8s.io/controller-expand-secret-namespace"] = rookNamespace
sc.Parameters["csi.storage.k8s.io/controller-expand-secret-name"] = cephfsProvisionerSecretName
sc.Parameters["csi.storage.k8s.io/node-stage-secret-namespace"] = rookNamespace
sc.Parameters["csi.storage.k8s.io/node-stage-secret-name"] = cephfsNodePluginSecretName
if enablePool {
sc.Parameters["pool"] = "myfs-data0"
}
// overload any parameters that were passed
if params == nil {
// create an empty params, so that params["clusterID"] below
// does not panic
params = map[string]string{}
}
for param, value := range params {
sc.Parameters[param] = value
}
// fetch and set fsID from the cluster if not set in params
if _, found := params["clusterID"]; !found {
fsID, stdErr, failErr := execCommandInToolBoxPod(f, "ceph fsid", rookNamespace)
if failErr != nil {
return failErr
}
if stdErr != "" {
return fmt.Errorf("error getting fsid %v", stdErr)
}
sc.Parameters["clusterID"] = strings.Trim(fsID, "\n")
}
sc.Namespace = cephCSINamespace
_, err = c.StorageV1().StorageClasses().Create(context.TODO(), &sc, metav1.CreateOptions{})
return err
}
func createCephfsSecret(c kubernetes.Interface, f *framework.Framework) error {
scPath := fmt.Sprintf("%s/%s", cephfsExamplePath, "secret.yaml")
sc, err := getSecret(scPath)
if err != nil {
return err
}
adminKey, stdErr, err := execCommandInToolBoxPod(f, "ceph auth get-key client.admin", rookNamespace)
if err != nil {
return err
}
if stdErr != "" {
return fmt.Errorf("error getting admin key %v", stdErr)
}
sc.StringData["adminID"] = adminUser
sc.StringData["adminKey"] = adminKey
delete(sc.StringData, "userID")
delete(sc.StringData, "userKey")
sc.Namespace = cephCSINamespace
_, err = c.CoreV1().Secrets(cephCSINamespace).Create(context.TODO(), &sc, metav1.CreateOptions{})
return err
}
func deleteBackingCephFSVolume(f *framework.Framework, pvc *v1.PersistentVolumeClaim) error {
imageData, err := getImageInfoFromPVC(pvc.Namespace, pvc.Name, f)
if err != nil {
return err
}
_, stdErr, err := execCommandInToolBoxPod(f, "ceph fs subvolume rm myfs "+imageData.imageName+" "+subvolumegroup, rookNamespace)
if err != nil {
return err
}
if stdErr != "" {
return fmt.Errorf("error deleting backing volume %s %v", imageData.imageName, stdErr)
}
return nil
}
type cephfsSubVolume struct {
Name string `json:"name"`
}
func listCephFSSubVolumes(f *framework.Framework, filesystem, groupname string) ([]cephfsSubVolume, error) {
var subVols []cephfsSubVolume
stdout, stdErr, err := execCommandInToolBoxPod(f, fmt.Sprintf("ceph fs subvolume ls %s --group_name=%s --format=json", filesystem, groupname), rookNamespace)
if err != nil {
return subVols, err
}
if stdErr != "" {
return subVols, fmt.Errorf("error listing subolumes %v", stdErr)
}
err = json.Unmarshal([]byte(stdout), &subVols)
if err != nil {
return subVols, err
}
return subVols, nil
}