From 0067cd951506955eec8d072a284b62fff8df243d Mon Sep 17 00:00:00 2001 From: gene-redpanda <123959009+gene-redpanda@users.noreply.github.com> Date: Wed, 9 Oct 2024 15:18:37 -0500 Subject: [PATCH] feat: add support for mount management Adds mount/unmount support --- src/go/rpk/Makefile | 12 +- src/go/rpk/pkg/cli/cluster/storage/BUILD | 12 +- .../pkg/cli/cluster/storage/cancel-mount.go | 54 ++++++ .../rpk/pkg/cli/cluster/storage/list-mount.go | 165 ++++++++++++++++++ src/go/rpk/pkg/cli/cluster/storage/mount.go | 104 +++++++++++ .../pkg/cli/cluster/storage/status-mount.go | 76 ++++++++ src/go/rpk/pkg/cli/cluster/storage/storage.go | 5 + src/go/rpk/pkg/cli/cluster/storage/unmount.go | 69 ++++++++ 8 files changed, 494 insertions(+), 3 deletions(-) create mode 100644 src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go create mode 100644 src/go/rpk/pkg/cli/cluster/storage/list-mount.go create mode 100644 src/go/rpk/pkg/cli/cluster/storage/mount.go create mode 100644 src/go/rpk/pkg/cli/cluster/storage/status-mount.go create mode 100644 src/go/rpk/pkg/cli/cluster/storage/unmount.go diff --git a/src/go/rpk/Makefile b/src/go/rpk/Makefile index ae17a200237f6..312c499e717ad 100644 --- a/src/go/rpk/Makefile +++ b/src/go/rpk/Makefile @@ -10,7 +10,14 @@ GOCMD=go GOLANGCILINTCMD=golangci-lint GOFUMPTCMD=gofumpt -BAZELCMD=bazel + +ifeq ($(shell uname),Darwin) + BAZELCMD := bazelisk + BAZEL_GAZELLE_CMD := run --config system-clang //:gazelle +else + BAZELCMD := bazel + BAZEL_GAZELLE_CMD := run //:gazelle +endif GOOS ?= $(shell go env GOOS) GOARCH ?= $(shell go env GOARCH) @@ -29,6 +36,7 @@ all: help ready: build test fmt lint bazel check_diff ## Runs all. Ensures commit is ready. +.PHONY: build build: ## Build rpk. $(shell mkdir -p $(OUTDIR)) $(GOCMD) build -ldflags '$(LDFLAGS)' -o $(OUTDIR) ./... @@ -65,7 +73,7 @@ install_bazelisk: @$(GOCMD) install github.com/bazelbuild/bazelisk@latest bazel_generate_build: install_bazelisk - @$(BAZELCMD) run //:gazelle + @$(BAZELCMD) $(BAZEL_GAZELLE_CMD) bazel_tidy: install_bazelisk @$(BAZELCMD) mod tidy diff --git a/src/go/rpk/pkg/cli/cluster/storage/BUILD b/src/go/rpk/pkg/cli/cluster/storage/BUILD index a2e3b468954ab..3f31204d6eae5 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/BUILD +++ b/src/go/rpk/pkg/cli/cluster/storage/BUILD @@ -2,12 +2,22 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "storage", - srcs = ["storage.go"], + srcs = [ + "cancel-mount.go", + "list-mount.go", + "mount.go", + "status-mount.go", + "storage.go", + "unmount.go", + ], importpath = "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/storage", visibility = ["//visibility:public"], deps = [ + "//src/go/rpk/pkg/adminapi", "//src/go/rpk/pkg/cli/cluster/storage/recovery", "//src/go/rpk/pkg/config", + "//src/go/rpk/pkg/out", + "@com_github_redpanda_data_common_go_rpadmin//:rpadmin", "@com_github_spf13_afero//:afero", "@com_github_spf13_cobra//:cobra", ], diff --git a/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go b/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go new file mode 100644 index 0000000000000..342b92560df93 --- /dev/null +++ b/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go @@ -0,0 +1,54 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package storage + +import ( + "fmt" + "strconv" + + "github.com/redpanda-data/common-go/rpadmin" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newMountCancel(fs afero.Fs, p *config.Params) *cobra.Command { + cmd := &cobra.Command{ + Use: "cancel-mount [MIGRATION ID]", + Aliases: []string{"cancel-unmount"}, + Short: "Cancels a mount/unmount operation", + Long: `Cancels a mount/unmount operation on a topic. + +Use the migration ID that is emitted when the mount or unmount operation is executed. +You can also get the migration ID by listing the mount/unmount operations.`, + Example: ` +Cancel a mount/unmount operation + rpk cluster storage cancel-mount 123 +`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, from []string) { + pf, err := p.LoadVirtualProfile(fs) + out.MaybeDie(err, "rpk unable to load config: %v", err) + config.CheckExitCloudAdmin(pf) + adm, err := adminapi.NewClient(cmd.Context(), fs, pf) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + migrationID, err := strconv.Atoi(from[0]) + out.MaybeDie(err, "invalid migration ID: %v", err) + + err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel) + out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err) + fmt.Printf("Successfully canceled the operation with ID %v", migrationID) + }, + } + return cmd +} diff --git a/src/go/rpk/pkg/cli/cluster/storage/list-mount.go b/src/go/rpk/pkg/cli/cluster/storage/list-mount.go new file mode 100644 index 0000000000000..b26ad9d3eb739 --- /dev/null +++ b/src/go/rpk/pkg/cli/cluster/storage/list-mount.go @@ -0,0 +1,165 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package storage + +import ( + "fmt" + "io" + "os" + "strings" + + "github.com/redpanda-data/common-go/rpadmin" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newMountList(fs afero.Fs, p *config.Params) *cobra.Command { + var filter string + cmd := &cobra.Command{ + Use: "list-mount", + Short: "List mount/unmount operations", + Aliases: []string{"list-unmount"}, + Long: `List mount/unmount operations on a topic to the Redpanda cluster from Tiered Storage. + +You can also filter the list by state using the --filter flag. The possible states are: +- planned +- prepared +- executed +- finished + +If no filter is provided, all migrations will be listed.`, + Example: ` +Lists mount/unmount operations + rpk cluster storage list-mount + +Use filter to list only migrations in a specific state + rpk cluster storage list-mount --filter planned +`, + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, _ []string) { + f := p.Formatter + if h, ok := f.Help([]migrationState{}); ok { + out.Exit(h) + } + + pf, err := p.LoadVirtualProfile(fs) + out.MaybeDie(err, "rpk unable to load config: %v", err) + config.CheckExitCloudAdmin(pf) + adm, err := adminapi.NewClient(cmd.Context(), fs, pf) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + migrations, err := adm.ListMigrations(cmd.Context()) + out.MaybeDie(err, "unable to list migrations: %v", err) + printDetailedListMount(p.Formatter, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout) + }, + } + p.InstallFormatFlag(cmd) + cmd.Flags().StringVarP(&filter, "filter", "f", "", "Filter the list of migrations by state. Only valid for text") + return cmd +} + +type filterOpts int + +const ( + FilterOptsAll filterOpts = iota + FilterOptsPlanned + FilterOptsPrepared + FilterOptsExecuted + FilterOptsFinished +) + +func (f filterOpts) String() string { + switch f { + case FilterOptsAll: + return "all" + case FilterOptsPlanned: + return "planned" + case FilterOptsPrepared: + return "prepared" + case FilterOptsExecuted: + return "executed" + case FilterOptsFinished: + return "finished" + default: + return "" + } +} + +func filterOptFromString(s string) filterOpts { + switch s { + case "planned": + return FilterOptsPlanned + case "prepared": + return FilterOptsPrepared + case "executed": + return FilterOptsExecuted + case "finished": + return FilterOptsFinished + case "all": + return FilterOptsAll + default: + return -1 + } +} + +func printDetailedListMount(f config.OutFormatter, ft filterOpts, d []migrationState, w io.Writer) { + if isText, _, t, err := f.Format(d); !isText { + out.MaybeDie(err, "unable to print in the requested format %q: %v", f.Kind, err) + fmt.Fprintln(w, t) + return + } + tw := out.NewTableTo(w, "ID", "State", "Migration", "Topics") + defer tw.Flush() + for _, m := range d { + if ft != FilterOptsAll { + if m.State != ft.String() { + continue + } + } + tw.Print(m.ID, m.State, m.MigrationType, strings.Join(m.Topics, ", ")) + } +} + +func rpadminMigrationStateToMigrationState(in []rpadmin.MigrationState) (resp []migrationState) { + resp = make([]migrationState, 0, len(in)) + for _, entry := range in { + resp = append(resp, migrationState{ + ID: entry.ID, + State: entry.State, + MigrationType: entry.Migration.MigrationType, + Topics: rpadminTopicsToStringSlice(entry.Migration.Topics), + }) + } + return resp +} + +// rpadminTopicsToStringSlice converts a slice of rpadmin.NamespacedTopic to a slice of strings +// if the topic has a non nil namespace it will appear as `namespace:topic` +// otherwise it will appear as `topic`. +func rpadminTopicsToStringSlice(in []rpadmin.NamespacedTopic) (resp []string) { + for _, entry := range in { + if entry.Namespace != nil { + resp = append(resp, fmt.Sprintf("%s/%s", *entry.Namespace, entry.Topic)) + continue + } + resp = append(resp, entry.Topic) + } + return +} + +type migrationState struct { + ID int `json:"id" yaml:"id"` + State string `json:"state" yaml:"state"` + MigrationType string `json:"type" yaml:"type"` + Topics []string `json:"topics" yaml:"topics"` +} diff --git a/src/go/rpk/pkg/cli/cluster/storage/mount.go b/src/go/rpk/pkg/cli/cluster/storage/mount.go new file mode 100644 index 0000000000000..256ae01ad8e9b --- /dev/null +++ b/src/go/rpk/pkg/cli/cluster/storage/mount.go @@ -0,0 +1,104 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package storage + +import ( + "fmt" + "strings" + + "github.com/redpanda-data/common-go/rpadmin" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newMountCommand(fs afero.Fs, p *config.Params) *cobra.Command { + var to string + + cmd := &cobra.Command{ + Use: "mount [TOPIC]", + Short: "Mount a topic", + Long: `Mount a topic to the Redpanda cluster from Tiered Storage. + +This command mounts a topic in the Redpanda cluster using log segments stored +in Tiered Storage. The topic may be optionally renamed with the --to flag. + +Requirements: +- Tiered Storage must be enabled. +- Log segments for the topic must be available in Tiered Storage. +- A topic with the same name must not already exist in the cluster.`, + Example: ` +Mounts topic my-typic from Tiered Storage to the cluster in the my-namespace + rpk cluster storage mount my-topic + +Mount topic my-topic from Tiered Storage to the cluster in the my-namespace +with my-new-topic as the new topic name + rpk cluster storage mount my-namespace/my-topic --to my-namespace/my-new-topic +`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, from []string) { + pf, err := p.LoadVirtualProfile(fs) + out.MaybeDie(err, "rpk unable to load config: %v", err) + config.CheckExitCloudAdmin(pf) + adm, err := adminapi.NewClient(cmd.Context(), fs, pf) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + n, t := nsTopic(from[0]) + if t == "" { + out.Die("topic is required") + } + topic := rpadmin.InboundTopic{ + SourceTopic: rpadmin.NamespacedTopic{ + Namespace: string2pointer(n), + Topic: t, + }, + } + an, at := nsTopic(to) + alias := t + if at != "" { + alias = at + topic.Alias = &rpadmin.NamespacedTopic{ + Namespace: string2pointer(an), + Topic: alias, + } + } + + mg, err := adm.MountTopics(cmd.Context(), rpadmin.MountConfiguration{Topics: []rpadmin.InboundTopic{topic}}) + out.MaybeDie(err, "unable to mount topic: %v", err) + + fmt.Printf(` +Topic mount from Tiered Storage topic %v to your Redpanda Cluster topic %v +has started with Migration ID %v +To check the status run 'rpk cluster storage status-mount %d'\n`, t, alias, mg.ID, mg.ID) + }, + } + cmd.Flags().StringVar(&to, "to", "", "New namespace/topic name for the mounted topic (optional)") + return cmd +} + +// nsTopic splits a topic string consisting of / and +// returns each component, if the namespace is not specified, returns 'kafka'. +func nsTopic(nst string) (namespace string, topic string) { + nsTopic := strings.SplitN(nst, "/", 2) + if len(nsTopic) == 1 { + namespace = "kafka" + topic = nsTopic[0] + } else { + namespace = nsTopic[0] + topic = nsTopic[1] + } + return namespace, topic +} + +func string2pointer(s string) *string { + return &s +} diff --git a/src/go/rpk/pkg/cli/cluster/storage/status-mount.go b/src/go/rpk/pkg/cli/cluster/storage/status-mount.go new file mode 100644 index 0000000000000..79806c943efdb --- /dev/null +++ b/src/go/rpk/pkg/cli/cluster/storage/status-mount.go @@ -0,0 +1,76 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package storage + +import ( + "fmt" + "io" + "os" + "strconv" + "strings" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newMountStatus(fs afero.Fs, p *config.Params) *cobra.Command { + cmd := &cobra.Command{ + Use: "status-mount [MIGRATION ID]", + Short: "Status of mount/unmount operation", + Long: "Status of mount/unmount operation on topic to Redpanda cluster from Tiered Storage", + Aliases: []string{"status-unmount"}, + Example: ` +Status for a mount/unmount operation + rpk cluster storage status-mount 123 +`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, from []string) { + f := p.Formatter + if h, ok := f.Help(migrationState{}); ok { + out.Exit(h) + } + pf, err := p.LoadVirtualProfile(fs) + out.MaybeDie(err, "rpk unable to load config: %v", err) + + config.CheckExitCloudAdmin(pf) + adm, err := adminapi.NewClient(cmd.Context(), fs, pf) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + migrationID, err := strconv.Atoi(from[0]) + out.MaybeDie(err, "invalid migration ID: %v", err) + + status, err := adm.GetMigration(cmd.Context(), migrationID) + out.MaybeDie(err, "unable to get the status of the migration: %v", err) + outStatus := migrationState{ + ID: status.ID, + State: status.State, + MigrationType: status.Migration.MigrationType, + Topics: rpadminTopicsToStringSlice(status.Migration.Topics), + } + printDetailedStatusMount(p.Formatter, outStatus, os.Stdout) + }, + } + p.InstallFormatFlag(cmd) + return cmd +} + +func printDetailedStatusMount(f config.OutFormatter, d migrationState, w io.Writer) { + if isText, _, t, err := f.Format(d); !isText { + out.MaybeDie(err, "unable to print in the requested format %q: %v", f.Kind, err) + fmt.Fprintln(w, t) + return + } + tw := out.NewTableTo(w, "ID", "State", "Migration", "Topics") + defer tw.Flush() + tw.Print(d.ID, d.State, d.MigrationType, strings.Join(d.Topics, ", ")) +} diff --git a/src/go/rpk/pkg/cli/cluster/storage/storage.go b/src/go/rpk/pkg/cli/cluster/storage/storage.go index 73c991bf657ec..d779f0b1eea15 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/storage.go +++ b/src/go/rpk/pkg/cli/cluster/storage/storage.go @@ -23,6 +23,11 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { } cmd.AddCommand( recovery.NewCommand(fs, p), + newMountCommand(fs, p), + newUnmountCommand(fs, p), + newMountList(fs, p), + newMountStatus(fs, p), + newMountCancel(fs, p), ) return cmd } diff --git a/src/go/rpk/pkg/cli/cluster/storage/unmount.go b/src/go/rpk/pkg/cli/cluster/storage/unmount.go new file mode 100644 index 0000000000000..e30b2416f7606 --- /dev/null +++ b/src/go/rpk/pkg/cli/cluster/storage/unmount.go @@ -0,0 +1,69 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package storage + +import ( + "fmt" + + "github.com/redpanda-data/common-go/rpadmin" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newUnmountCommand(fs afero.Fs, p *config.Params) *cobra.Command { + cmd := &cobra.Command{ + Use: "unmount [TOPIC]", + Short: "Unmount a topic from the Redpanda cluster", + Long: `Unmount a topic from the Redpanda cluster and secure it in Tiered +Storage. + +This command performs an operation that: +1. Rejects all writes to the topic +2. Flushes data to Tiered Storage +3. Removes the topic from the cluster + +Key Points: +- During unmounting, any attempted writes or reads will receive an + UNKNOWN_TOPIC_OR_PARTITION error. +- The unmount operation works independently of other topic configurations like + remote.delete=false. +- After unmounting, the topic can be remounted to this cluster or a different + cluster if the log segments are moved to that cluster's Tiered Storage. +`, + Example: ` +Unmount topic 'my-topic' from the cluster in the 'my-namespace' + rpk cluster storage unmount my-namespace/my-topic +`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, topics []string) { + pf, err := p.LoadVirtualProfile(fs) + out.MaybeDie(err, "rpk unable to load config: %v", err) + config.CheckExitCloudAdmin(pf) + adm, err := adminapi.NewClient(cmd.Context(), fs, pf) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + n, t := nsTopic(topics[0]) + if t == "" { + out.Die("topic is required") + } + + mg, err := adm.UnmountTopics(cmd.Context(), rpadmin.UnmountConfiguration{Topics: []rpadmin.NamespacedTopic{{Namespace: string2pointer(n), Topic: t}}}) + out.MaybeDie(err, "unable to unmount topic: %v", err) + fmt.Printf(` +Topic unmounting from your Redpanda Cluster topic %v +has started with Migration ID %v +To check the status run 'rpk cluster storage status-mount %d'\n`, t, mg.ID, mg.ID) + }, + } + return cmd +}