Skip to content

Commit 190493f

Browse files
Implement volume health monitoring feature to detect abnormal volumes
1 parent 74c3e9f commit 190493f

File tree

23 files changed

+1025
-74
lines changed

23 files changed

+1025
-74
lines changed

cmd/directpv/node-server.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"os"
23+
"time"
2324

2425
"github.com/container-storage-interface/spec/lib/go/csi"
2526
"github.com/minio/directpv/pkg/consts"
@@ -33,7 +34,11 @@ import (
3334
"k8s.io/klog/v2"
3435
)
3536

36-
var metricsPort = consts.MetricsPort
37+
var (
38+
metricsPort = consts.MetricsPort
39+
volumeHealthMonitorInterval = 10 * time.Minute
40+
enableVolumeHealthMonitor bool
41+
)
3742

3843
var nodeServerCmd = &cobra.Command{
3944
Use: consts.NodeServerName,
@@ -56,6 +61,8 @@ var nodeServerCmd = &cobra.Command{
5661

5762
func init() {
5863
nodeServerCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", metricsPort, "Metrics port at "+consts.AppPrettyName+" exports metrics data")
64+
nodeServerCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitor", enableVolumeHealthMonitor, "Enable volume health monitoring")
65+
nodeServerCmd.PersistentFlags().DurationVar(&volumeHealthMonitorInterval, "volume-health-monitor-interval", volumeHealthMonitorInterval, "Interval for volume health monitoring in duration. Example: '20m','1h'")
5966
}
6067

6168
func startNodeServer(ctx context.Context) error {
@@ -114,6 +121,15 @@ func startNodeServer(ctx context.Context) error {
114121
}
115122
}()
116123

124+
if enableVolumeHealthMonitor {
125+
go func() {
126+
if err := volume.RunHealthMonitor(ctx, nodeID, volumeHealthMonitorInterval); err != nil {
127+
klog.ErrorS(err, "unable to run volume health monitor")
128+
errCh <- err
129+
}
130+
}()
131+
}
132+
117133
return <-errCh
118134
}
119135

cmd/kubectl-directpv/install.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,22 @@ import (
4040
)
4141

4242
var (
43-
image = consts.AppName + ":" + Version
44-
registry = "quay.io"
45-
org = "minio"
46-
nodeSelectorArgs = []string{}
47-
tolerationArgs = []string{}
48-
seccompProfile = ""
49-
apparmorProfile = ""
50-
imagePullSecrets = []string{}
51-
nodeSelector map[string]string
52-
tolerations []corev1.Toleration
53-
k8sVersion = "1.27.0"
54-
kubeVersion *version.Version
55-
legacyFlag bool
56-
declarativeFlag bool
57-
openshiftFlag bool
43+
image = consts.AppName + ":" + Version
44+
registry = "quay.io"
45+
org = "minio"
46+
nodeSelectorArgs = []string{}
47+
tolerationArgs = []string{}
48+
seccompProfile = ""
49+
apparmorProfile = ""
50+
imagePullSecrets = []string{}
51+
nodeSelector map[string]string
52+
tolerations []corev1.Toleration
53+
k8sVersion = "1.27.0"
54+
kubeVersion *version.Version
55+
legacyFlag bool
56+
declarativeFlag bool
57+
openshiftFlag bool
58+
enableVolumeHealthMonitor bool
5859
)
5960

6061
var installCmd = &cobra.Command{
@@ -82,7 +83,10 @@ var installCmd = &cobra.Command{
8283
$ kubectl {PLUGIN_NAME} install --apparmor-profile directpv
8384
8485
7. Install DirectPV with seccomp profile
85-
$ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json`,
86+
$ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json
87+
88+
8. Install DirectPV with volume health monitoring enabled
89+
$ kubectl {PLUGIN_NAME} install --enable-volume-health-monitoring`,
8690
`{PLUGIN_NAME}`,
8791
consts.AppName,
8892
),
@@ -123,6 +127,7 @@ func init() {
123127
installCmd.PersistentFlags().BoolVar(&declarativeFlag, "declarative", declarativeFlag, "Output YAML for declarative installation")
124128
installCmd.PersistentFlags().MarkHidden("declarative")
125129
installCmd.PersistentFlags().BoolVar(&openshiftFlag, "openshift", openshiftFlag, "Use OpenShift specific installation")
130+
installCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitoring", enableVolumeHealthMonitor, "Enable volume health monitoring")
126131
}
127132

128133
func validateNodeSelectorArgs() error {
@@ -305,8 +310,9 @@ func installMain(ctx context.Context) {
305310
}
306311
}
307312
}
308-
args.Declarative = declarativeFlag
309313
args.Openshift = openshiftFlag
314+
args.Declarative = declarativeFlag
315+
args.EnableVolumeHealthMonitor = enableVolumeHealthMonitor
310316

311317
var failed bool
312318
var installedComponents []installer.Component

cmd/kubectl-directpv/list_volumes.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ func listVolumesMain(ctx context.Context) {
241241
status = "Released"
242242
case volume.IsDriveLost():
243243
status = "Lost"
244+
case volume.HasError():
245+
status = "Error"
244246
case volume.IsPublished():
245247
status = "Bounded"
246248
}

docs/command-reference.md

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,20 @@ USAGE:
5353
directpv install [flags]
5454
5555
FLAGS:
56-
--node-selector strings Select the storage nodes using labels (KEY=VALUE,..)
57-
--tolerations strings Set toleration labels on the storage nodes (KEY[=VALUE]:EFFECT,..)
58-
--registry string Name of container registry (default "quay.io")
59-
--org string Organization name in the registry (default "minio")
60-
--image string Name of the DirectPV image (default "directpv:v4.0.6")
61-
--image-pull-secrets strings Image pull secrets for DirectPV images (SECRET1,..)
62-
--apparmor-profile string Set path to Apparmor profile
63-
--seccomp-profile string Set path to Seccomp profile
64-
-o, --output string Generate installation manifest. One of: yaml|json
65-
--kube-version string Select the kubernetes version for manifest generation (default "1.27.0")
66-
--legacy Enable legacy mode (Used with '-o')
67-
--openshift Use OpenShift specific installation
68-
-h, --help help for install
56+
--node-selector strings Select the storage nodes using labels (KEY=VALUE,..)
57+
--tolerations strings Set toleration labels on the storage nodes (KEY[=VALUE]:EFFECT,..)
58+
--registry string Name of container registry (default "quay.io")
59+
--org string Organization name in the registry (default "minio")
60+
--image string Name of the DirectPV image (default "directpv:0.0.0-dev")
61+
--image-pull-secrets strings Image pull secrets for DirectPV images (SECRET1,..)
62+
--apparmor-profile string Set path to Apparmor profile
63+
--seccomp-profile string Set path to Seccomp profile
64+
-o, --output string Generate installation manifest. One of: yaml|json
65+
--kube-version string Select the kubernetes version for manifest generation (default "1.27.0")
66+
--legacy Enable legacy mode (Used with '-o')
67+
--openshift Use OpenShift specific installation
68+
--enable-volume-health-monitoring Enable volume health monitoring
69+
-h, --help help for install
6970
7071
GLOBAL FLAGS:
7172
--kubeconfig string Path to the kubeconfig file to use for CLI requests
@@ -92,6 +93,9 @@ EXAMPLES:
9293
9394
7. Install DirectPV with seccomp profile
9495
$ kubectl directpv install --seccomp-profile profiles/seccomp.json
96+
97+
8. Install DirectPV with volume health monitoring enabled
98+
$ kubectl directpv install --enable-volume-health-monitoring
9599
```
96100

97101
## `discover` command

docs/monitoring.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,15 @@ scrape_configs:
7979
action: replace
8080
target_label: kubernetes_name
8181
```
82+
83+
# Monitoring volume health
84+
85+
DirectPV monitors volume health and reports abnormalities as event to PVCs and Pods. Currently volume mounts are monitored and more checks will be added later. This feature uses [Volume health monitoring CSI feature](https://kubernetes.io/docs/concepts/storage/volume-health-monitoring/).
86+
87+
To enable volume health monitoring, pass `--enable-volume-health-monitoring` flag to installation and `CSIVolumeHealth` feature gate must be enabled.
88+
89+
For private registries, please note that the following image is required for enabling volume health monitoring
90+
91+
```
92+
quay.io/minio/csi-external-health-monitor-controller:v0.10.0
93+
```

pkg/apis/directpv.min.io/types/types.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,23 +120,28 @@ type VolumeConditionType string
120120

121121
// Enum value of VolumeConditionType type.
122122
const (
123-
VolumeConditionTypeLost VolumeConditionType = "Lost"
123+
VolumeConditionTypeLost VolumeConditionType = "Lost"
124+
VolumeConditionTypeError VolumeConditionType = "Error"
124125
)
125126

126127
// VolumeConditionReason denotes volume reason. Allows maximum upto 1024 chars.
127128
type VolumeConditionReason string
128129

129130
// Enum values of VolumeConditionReason type.
130131
const (
131-
VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
132+
VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
133+
VolumeConditionReasonNotMounted VolumeConditionReason = "NotMounted"
134+
VolumeConditionReasonNoError VolumeConditionReason = "NoError"
132135
)
133136

134137
// VolumeConditionMessage denotes drive message. Allows maximum upto 32768 chars.
135138
type VolumeConditionMessage string
136139

137140
// Enum values of VolumeConditionMessage type.
138141
const (
139-
VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
142+
VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
143+
VolumeConditionMessageStagingPathNotMounted VolumeConditionMessage = "Staging path is umounted from the host. Please restart the workload"
144+
VolumeConditionMessageTargetPathNotMounted VolumeConditionMessage = "Target path is umounted from the host. Please restart the workload"
140145
)
141146

142147
// DriveConditionType denotes drive condition. Allows maximum upto 316 chars.

pkg/apis/directpv.min.io/v1beta1/volume.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ package v1beta1
1919
import (
2020
"strconv"
2121

22+
"github.com/container-storage-interface/spec/lib/go/csi"
2223
"github.com/minio/directpv/pkg/apis/directpv.min.io/types"
2324
"github.com/minio/directpv/pkg/consts"
25+
"github.com/minio/directpv/pkg/k8s"
2426
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2527
)
2628

@@ -123,6 +125,12 @@ func (volume DirectPVVolume) IsDriveLost() bool {
123125
return false
124126
}
125127

128+
// HasError returns if the volume is in error state.
129+
func (volume DirectPVVolume) HasError() bool {
130+
condition := k8s.GetConditionByType(volume.Status.Conditions, string(types.VolumeConditionTypeError))
131+
return condition != nil && condition.Status == metav1.ConditionTrue
132+
}
133+
126134
// SetDriveLost sets associated drive is lost.
127135
func (volume *DirectPVVolume) SetDriveLost() {
128136
c := metav1.Condition{
@@ -316,6 +324,39 @@ func (volume *DirectPVVolume) Resume() bool {
316324
return volume.RemoveLabel(types.SuspendLabelKey)
317325
}
318326

327+
// ResetStageMountErrorCondition resets the stage volume mount error condition.
328+
func (volume *DirectPVVolume) ResetStageMountErrorCondition() {
329+
k8s.ResetConditionIfMatches(volume.Status.Conditions,
330+
string(types.VolumeConditionTypeError),
331+
string(types.VolumeConditionReasonNotMounted),
332+
string(types.VolumeConditionMessageStagingPathNotMounted),
333+
string(types.VolumeConditionReasonNoError))
334+
}
335+
336+
// ResetTargetMountErrorCondition resets the target volume mount error condition.
337+
func (volume *DirectPVVolume) ResetTargetMountErrorCondition() {
338+
k8s.ResetConditionIfMatches(volume.Status.Conditions,
339+
string(types.VolumeConditionTypeError),
340+
string(types.VolumeConditionReasonNotMounted),
341+
string(types.VolumeConditionMessageTargetPathNotMounted),
342+
string(types.VolumeConditionReasonNoError))
343+
}
344+
345+
// GetCSIVolumeCondition returns the CSI volume condition.
346+
func (volume *DirectPVVolume) GetCSIVolumeCondition() *csi.VolumeCondition {
347+
var isAbnormal bool
348+
var message string
349+
errorCondition := k8s.GetConditionByType(volume.Status.Conditions, string(types.VolumeConditionTypeError))
350+
if errorCondition != nil && errorCondition.Status == metav1.ConditionTrue {
351+
isAbnormal = true
352+
message = errorCondition.Message
353+
}
354+
return &csi.VolumeCondition{
355+
Abnormal: isAbnormal,
356+
Message: message,
357+
}
358+
}
359+
319360
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
320361

321362
// DirectPVVolumeList denotes list of volumes.

pkg/consts/consts.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,7 @@ const (
9797

9898
// TmpFS mount
9999
TmpMountDir = AppRootDir + "/tmp"
100+
101+
// Volume Health Monitor
102+
VolumeHealthMonitorIntervalInDuration = "10m"
100103
)

pkg/consts/consts.go.in

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,7 @@ const (
9595

9696
// TmpFS mount
9797
TmpMountDir = AppRootDir + "/tmp"
98+
99+
// Volume Health Monitor
100+
VolumeHealthMonitorIntervalInDuration = "10m"
98101
)

pkg/csi/controller/server.go

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,21 @@ func (c *Server) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerG
9797
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME},
9898
},
9999
},
100+
{
101+
Type: &csi.ControllerServiceCapability_Rpc{
102+
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES},
103+
},
104+
},
105+
{
106+
Type: &csi.ControllerServiceCapability_Rpc{
107+
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_GET_VOLUME},
108+
},
109+
},
110+
{
111+
Type: &csi.ControllerServiceCapability_Rpc{
112+
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_VOLUME_CONDITION},
113+
},
114+
},
100115
},
101116
}, nil
102117
}
@@ -359,8 +374,52 @@ func (c *Server) ControllerExpandVolume(ctx context.Context, req *csi.Controller
359374

360375
// ListVolumes implements ListVolumes controller RPC
361376
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#listvolumes
362-
func (c *Server) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
363-
return nil, status.Error(codes.Unimplemented, "unimplemented")
377+
func (c *Server) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
378+
result, err := client.VolumeClient().List(ctx, metav1.ListOptions{
379+
Limit: int64(req.GetMaxEntries()),
380+
Continue: req.GetStartingToken(),
381+
})
382+
if err != nil {
383+
if req.GetStartingToken() != "" {
384+
return nil, status.Errorf(codes.Aborted, "unable to list volumes: %v", err)
385+
}
386+
return nil, status.Errorf(codes.Internal, "unable to list volumes: %v", err)
387+
}
388+
var volumeEntries []*csi.ListVolumesResponse_Entry
389+
for _, volume := range result.Items {
390+
csiVolume, err := getCSIVolume(ctx, &volume)
391+
if err != nil {
392+
return nil, status.Error(codes.Internal, err.Error())
393+
}
394+
volumeEntries = append(volumeEntries, &csi.ListVolumesResponse_Entry{
395+
Volume: csiVolume,
396+
Status: &csi.ListVolumesResponse_VolumeStatus{
397+
VolumeCondition: volume.GetCSIVolumeCondition(),
398+
},
399+
})
400+
}
401+
return &csi.ListVolumesResponse{
402+
Entries: volumeEntries,
403+
NextToken: result.Continue,
404+
}, nil
405+
}
406+
407+
func getCSIVolume(ctx context.Context, volume *types.Volume) (*csi.Volume, error) {
408+
drive, err := client.DriveClient().Get(ctx, string(volume.GetDriveID()), metav1.GetOptions{
409+
TypeMeta: types.NewDriveTypeMeta(),
410+
})
411+
if err != nil {
412+
return nil, err
413+
}
414+
return &csi.Volume{
415+
CapacityBytes: volume.Status.TotalCapacity,
416+
VolumeId: volume.Name,
417+
AccessibleTopology: []*csi.Topology{
418+
{
419+
Segments: drive.Status.Topology,
420+
},
421+
},
422+
}, nil
364423
}
365424

366425
// ControllerPublishVolume - controller RPC to publish volumes
@@ -377,8 +436,23 @@ func (c *Server) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerU
377436

378437
// ControllerGetVolume - controller RPC for get volume
379438
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#controllergetvolume
380-
func (c *Server) ControllerGetVolume(_ context.Context, _ *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
381-
return nil, status.Error(codes.Unimplemented, "unimplemented")
439+
func (c *Server) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
440+
volume, err := client.VolumeClient().Get(
441+
ctx, req.GetVolumeId(), metav1.GetOptions{TypeMeta: types.NewVolumeTypeMeta()},
442+
)
443+
if err != nil {
444+
return nil, status.Error(codes.NotFound, err.Error())
445+
}
446+
csiVolume, err := getCSIVolume(ctx, volume)
447+
if err != nil {
448+
return nil, status.Error(codes.Internal, err.Error())
449+
}
450+
return &csi.ControllerGetVolumeResponse{
451+
Volume: csiVolume,
452+
Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
453+
VolumeCondition: volume.GetCSIVolumeCondition(),
454+
},
455+
}, nil
382456
}
383457

384458
// ListSnapshots - controller RPC for listing snapshots

0 commit comments

Comments
 (0)