diff --git a/csi/node_server.go b/csi/node_server.go index 175bf73728..cc0eaf759c 100644 --- a/csi/node_server.go +++ b/csi/node_server.go @@ -56,6 +56,7 @@ type NodeServer struct { apiClient *longhornclient.RancherClient nodeID string caps []*csi.NodeServiceCapability + log *logrus.Entry } func NewNodeServer(apiClient *longhornclient.RancherClient, nodeID string) *NodeServer { @@ -68,17 +69,13 @@ func NewNodeServer(apiClient *longhornclient.RancherClient, nodeID string) *Node csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, csi.NodeServiceCapability_RPC_EXPAND_VOLUME, }), + log: logrus.StandardLogger().WithField("component", "csi-node-server"), } } -func getLoggerForCSINodeServer() *logrus.Entry { - return logrus.StandardLogger().WithField("component", "csi-node-server") -} - // NodePublishVolume will mount the volume /dev/longhorn/ to target_path func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodePublishVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodePublishVolume"}) targetPath := req.GetTargetPath() if targetPath == "" { @@ -140,7 +137,17 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } if volumeCapability.GetBlock() != nil { - devicePath := volume.Controllers[0].Endpoint + devicePath := getStageBlockVolumePath(stagingTargetPath, volumeID) + _, err := os.Stat(devicePath) + if err != nil { + if !os.IsNotExist(err) { + return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to stat device %s", devicePath).Error()) + } + // Fall back to the controller endpoint if the device path under the stagingTargetPath doesn't exist + log.Infof("Device path %s doesn't exist, falling back to controller endpoint %s", devicePath, volume.Controllers[0].Endpoint) + devicePath = volume.Controllers[0].Endpoint + } + if err := ns.nodePublishBlockVolume(volumeID, devicePath, targetPath, mounter); err != nil { log.WithError(err).Errorf("Failed to publish BlockVolume %s", volumeID) return nil, err @@ -152,7 +159,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // we validate the staging path to make sure the global mount is still valid if isMnt, err := ensureMountPoint(stagingTargetPath, mounter); err != nil || !isMnt { - msg := fmt.Sprintf("Staging path %v is no longer valid for volume %v", stagingTargetPath, volumeID) + msg := fmt.Sprintf("Staging target path %v is no longer valid for volume %v", stagingTargetPath, volumeID) log.WithError(err).Error(msg) // HACK: normally when we return FailedPrecondition below kubelet should call NodeStageVolume again @@ -249,12 +256,11 @@ func (ns *NodeServer) nodeStageSharedVolume(volumeID, shareEndpoint, targetPath } func (ns *NodeServer) nodeStageMountVolume(volumeID, devicePath, stagingTargetPath, fsType string, mountFlags []string, mounter *mount.SafeFormatAndMount) error { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodePublishVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodePublishVolume"}) isMnt, err := ensureMountPoint(stagingTargetPath, mounter) if err != nil { - return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to prepare mount point for volume %v", volumeID).Error()) + return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to prepare mount point %v for volume %v", stagingTargetPath, volumeID).Error()) } if isMnt { return nil @@ -267,30 +273,38 @@ func (ns *NodeServer) nodeStageMountVolume(volumeID, devicePath, stagingTargetPa return nil } +// nodeStageBlockVolume utilizes the stagingTargetPath to create a volumeID file to bind mount the devicePath +// this is valid since the csi plugin is in control of the staging path +func (ns *NodeServer) nodeStageBlockVolume(volumeID, devicePath, stagingTargetPath string, mounter mount.Interface) error { + path := getStageBlockVolumePath(stagingTargetPath, volumeID) + return ns.nodePublishBlockVolume(volumeID, devicePath, path, mounter) +} + func (ns *NodeServer) nodePublishBlockVolume(volumeID, devicePath, targetPath string, mounter mount.Interface) error { + log := ns.log.WithFields(logrus.Fields{"function": "nodePublishBlockVolume"}) + // we ensure the parent directory exists and is valid - if _, err := ensureMountPoint(filepath.Dir(stagingTargetPath), mounter); err != nil { + if _, err := ensureMountPoint(filepath.Dir(targetPath), mounter); err != nil { return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to prepare mount point for block device %v", devicePath).Error()) } // create file where we can bind mount the device to - if err := makeFile(stagingTargetPath); err != nil { - return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to create file %v", stagingTargetPath).Error()) + if err := makeFile(targetPath); err != nil { + return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to create file %v", targetPath).Error()) } - log.Infof("Bind mounting device %v at %v", devicePath, stagingTargetPath) - if err := mounter.Mount(devicePath, stagingTargetPath, "", []string{"bind"}); err != nil { - if removeErr := os.Remove(stagingTargetPath); removeErr != nil { - return status.Errorf(codes.Internal, errors.Wrapf(removeErr, "failed to remove mount target %q", stagingTargetPath).Error()) + log.Infof("Bind mounting device %v at %v", devicePath, targetPath) + if err := mounter.Mount(devicePath, targetPath, "", []string{"bind"}); err != nil { + if removeErr := os.Remove(targetPath); removeErr != nil { + return status.Errorf(codes.Internal, errors.Wrapf(removeErr, "failed to remove mount target %q", targetPath).Error()) } - return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to bind mount %q at %q", devicePath, stagingTargetPath).Error()) + return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to bind mount %q at %q", devicePath, targetPath).Error()) } return nil } func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodeUnpublishVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodeUnpublishVolume"}) targetPath := req.GetTargetPath() if targetPath == "" { @@ -311,8 +325,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu } func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodeStageVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodeStageVolume"}) stagingTargetPath := req.GetStagingTargetPath() if stagingTargetPath == "" { @@ -359,7 +372,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if volume.State != string(longhorn.VolumeStateAttached) || volume.Controllers[0].Endpoint == "" { log.Infof("Volume %v hasn't been attached yet, unmounting potential mount point %v", volumeID, stagingTargetPath) if err := unmount(stagingTargetPath, mounter); err != nil { - log.WithError(err).Warn("Failed to unmount stagingTargetPath") + log.WithError(err).Warnf("Failed to unmount stagingTargetPath %v", stagingTargetPath) } return nil, status.Errorf(codes.InvalidArgument, "volume %s hasn't been attached yet", volumeID) } @@ -368,13 +381,6 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Errorf(codes.Aborted, "volume %s is not ready for workloads", volumeID) } - devicePath := volume.Controllers[0].Endpoint - - // do nothing for block devices, since they are handled by publish - if volumeCapability.GetBlock() != nil { - return &csi.NodeStageVolumeResponse{}, nil - } - if requiresSharedAccess(volume, volumeCapability) && !volume.Migratable { if volume.AccessMode != string(longhorn.AccessModeReadWriteMany) { return nil, status.Errorf(codes.FailedPrecondition, "volume %s requires shared access but is not marked for shared use", volumeID) @@ -399,18 +405,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return &csi.NodeStageVolumeResponse{}, nil } - options := volumeCapability.GetMount().GetMountFlags() - fsType := volumeCapability.GetMount().GetFsType() - if fsType == "" { - fsType = defaultFsType - } - - formatMounter, ok := mounter.(*mount.SafeFormatAndMount) - if !ok { - return nil, status.Errorf(codes.Internal, "volume %v cannot get format mounter that support filesystem %v creation", volumeID, fsType) - } - - diskFormat, err := formatMounter.GetDiskFormat(devicePath) + devicePath := volume.Controllers[0].Endpoint + diskFormat, err := getDiskFormat(devicePath) if err != nil { return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to evaluate device filesystem %v format", devicePath).Error()) } @@ -453,7 +449,27 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol devicePath = cryptoDevice } - if err := ns.nodeStageMountVolume(volumeID, devicePath, targetPath, fsType, options, formatMounter); err != nil { + if volumeCapability.GetBlock() != nil { + if err := ns.nodeStageBlockVolume(volumeID, devicePath, stagingTargetPath, mounter); err != nil { + return nil, err + } + + logrus.Infof("Volume %v device %v available for usage as block device", volumeID, devicePath) + return &csi.NodeStageVolumeResponse{}, nil + } + + options := volumeCapability.GetMount().GetMountFlags() + fsType := volumeCapability.GetMount().GetFsType() + if fsType == "" { + fsType = defaultFsType + } + + formatMounter, ok := mounter.(*mount.SafeFormatAndMount) + if !ok { + return nil, status.Errorf(codes.Internal, "volume %v cannot get format mounter that support filesystem %v creation", volumeID, fsType) + } + + if err := ns.nodeStageMountVolume(volumeID, devicePath, stagingTargetPath, fsType, options, formatMounter); err != nil { return nil, err } @@ -484,8 +500,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodeUnstageVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodeUnstageVolume"}) stagingTargetPath := req.GetStagingTargetPath() if stagingTargetPath == "" { @@ -497,16 +512,29 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Error(codes.InvalidArgument, "volume id missing in request") } + mounter := mount.New("") + // CO owns the staging_path so we only unmount but not remove the path - if err := unmount(targetPath, mount.New("")); err != nil { - return nil, status.Error(codes.Internal, fmt.Sprintf("failed to unmount volume %s mount point %v error %v", volumeID, targetPath, err)) + if err := unmount(stagingTargetPath, mounter); err != nil { + return nil, status.Error(codes.Internal, errors.Wrapf(err, "failed to unmount volume %s mount point %v", volumeID, stagingTargetPath).Error()) + } + + // For block mode we use the staging path as parent, so we have to do additional cleanup of the subfolder/files + // we should transition the regular fs mounts to also use the same sub folder, this allows us to store additional + // metadata as well as do more forcefully removals since we no longer share the control of the staging_path with kubernetes + // + // The unmount of the parent is a no op for block mode, this is also important for backwards compatibility of the existing block devices. + deviceFilePath := getStageBlockVolumePath(stagingTargetPath, volumeID) + if err := cleanupMountPoint(deviceFilePath, mounter); err != nil { + return nil, status.Error(codes.Internal, errors.Wrapf(err, "failed to clean up volume %s device mount point %v", volumeID, deviceFilePath).Error()) } // optionally try to retrieve the volume and check if it's an RWX volume // if it is we let the share-manager clean up the crypto device volume, _ := ns.apiClient.Volume.ById(volumeID) - cleanupCryptoDevice := !requiresSharedAccess(volume, nil) + // Currently, only "RWO volumes" and "block device with volume.Migratable is true" supports encryption. + cleanupCryptoDevice := !requiresSharedAccess(volume, nil) || (volume != nil && volume.Migratable) if cleanupCryptoDevice { cryptoDevice := crypto.VolumeMapper(volumeID) if isOpen, err := crypto.IsDeviceOpen(cryptoDevice); err != nil { @@ -555,7 +583,7 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo if isBlockVolume { volCapacity, err := strconv.ParseInt(existVol.Size, 10, 64) if err != nil { - return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to convert volume size %v", existVol.Size).Error()) + return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to convert volume size %v for volume %v", existVol.Size, volumeID).Error()) } return &csi.NodeGetVolumeStatsResponse{ Usage: []*csi.VolumeUsage{ @@ -574,7 +602,7 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo if errors.Is(err, unix.ENOENT) { return nil, status.Errorf(codes.NotFound, "volume %v is not mounted on path %v", volumeID, volumePath) } - return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to retrieve capacity statistics for volume path %v", volumePath).Error()) + return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to retrieve capacity statistics for volume path %v for volume %v", volumePath, volumeID).Error()) } return &csi.NodeGetVolumeStatsResponse{ @@ -597,8 +625,7 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo // NodeExpandVolume is designed to expand the file system for ONLINE expansion, func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodeExpandVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodeExpandVolume"}) if req.CapacityRange == nil { return nil, status.Error(codes.InvalidArgument, "capacity range missing in request") diff --git a/csi/util.go b/csi/util.go index 91ec0efce2..4f6a35c49a 100644 --- a/csi/util.go +++ b/csi/util.go @@ -7,6 +7,7 @@ import ( "io" "os" "path" + "path/filepath" "strconv" "strings" "time" @@ -364,6 +365,11 @@ func isBlockDevice(volumePath string) (bool, error) { return false, nil } +func getDiskFormat(devicePath string) (string, error) { + m := mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()} + return m.GetDiskFormat(devicePath) +} + func getFilesystemStatistics(volumePath string) (*volumeFilesystemStatistics, error) { var statfs unix.Statfs_t // See http://man7.org/linux/man-pages/man2/statfs.2.html for details. @@ -420,3 +426,7 @@ func requiresSharedAccess(vol *longhornclient.Volume, cap *csi.VolumeCapability) mode == csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER || mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER } + +func getStageBlockVolumePath(stagingTargetPath, volumeID string) string { + return filepath.Join(stagingTargetPath, volumeID) +}