Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpk: add Redpanda Cloud support to rpk cluster storage mount/unmount commands #24134

Merged
merged 4 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rpk: add public API support to mount/unmount
Now we will be able to run mount/unmount commands
against Redpanda Cloud clusters using the
dataplane API.
  • Loading branch information
r-vasquez committed Nov 15, 2024
commit 26e5aa31775c906a90ba90b629b0b11e53dbecca
3 changes: 3 additions & 0 deletions src/go/rpk/pkg/cli/cluster/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ go_library(
"//src/go/rpk/pkg/cli/cluster/storage/recovery",
"//src/go/rpk/pkg/config",
"//src/go/rpk/pkg/out",
"//src/go/rpk/pkg/publicapi",
"@build_buf_gen_go_redpandadata_dataplane_protocolbuffers_go//redpanda/api/dataplane/v1alpha2",
"@com_connectrpc_connect//:connect",
"@com_github_redpanda_data_common_go_rpadmin//:rpadmin",
"@com_github_spf13_afero//:afero",
"@com_github_spf13_cobra//:cobra",
Expand Down
30 changes: 24 additions & 6 deletions src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"fmt"
"strconv"

dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2"
"connectrpc.com/connect"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand All @@ -36,17 +38,33 @@ Cancel a mount/unmount operation
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, from []string) {
pf, err := p.LoadVirtualProfile(fs)
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
config.CheckExitServerlessAdmin(p)

migrationID, err := strconv.Atoi(from[0])
out.MaybeDie(err, "invalid migration ID: %v", err)

err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
if p.FromCloud {
cl, err := createDataplaneClient(p)
out.MaybeDieErr(err)

req := connect.NewRequest(
&dataplanev1alpha2.UpdateMountTaskRequest{
Id: int32(migrationID),
Action: dataplanev1alpha2.UpdateMountTaskRequest_ACTION_CANCEL,
},
)
_, err = cl.CloudStorage.UpdateMountTask(cmd.Context(), req)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
} else {
adm, err := adminapi.NewClient(cmd.Context(), fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
}

fmt.Printf("Successfully canceled the operation with ID %v", migrationID)
},
}
Expand Down
76 changes: 68 additions & 8 deletions src/go/rpk/pkg/cli/cluster/storage/list-mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"os"
"strings"

dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2"
"connectrpc.com/connect"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand Down Expand Up @@ -52,15 +54,29 @@ Use filter to list only migrations in a specific state
out.Exit(h)
}

pf, err := p.LoadVirtualProfile(fs)
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
config.CheckExitServerlessAdmin(p)

migrations, err := adm.ListMigrations(cmd.Context())
out.MaybeDie(err, "unable to list migrations: %v", err)
printDetailedListMount(p.Formatter, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout)
var migrations []rpadmin.MigrationState
if p.FromCloud {
cl, err := createDataplaneClient(p)
out.MaybeDieErr(err)

resp, err := cl.CloudStorage.ListMountTasks(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListMountTasksRequest{}))
out.MaybeDie(err, "unable to list mount/unmount operations: %v", err)

if resp != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this conditional needed? MaybeDie exits on error. Can this return nil, nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, I was overly paranoid here to avoid a panic. resp is a connect wrapper of the response and I'm not sure if an empty list will produce an empty resp.

And I prefer being a little bit paranoid as we don't control the result at all. Happy to remove it if we are 100% sure this doesn't return nil, nil

migrations = listMountTaskToAdminMigrationState(resp.Msg)
}
} else {
adm, err := adminapi.NewClient(cmd.Context(), fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

migrations, err = adm.ListMigrations(cmd.Context())
out.MaybeDie(err, "unable to list migrations: %v", err)
}
printDetailedListMount(f, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout)
},
}
p.InstallFormatFlag(cmd)
Expand Down Expand Up @@ -96,7 +112,7 @@ func (f filterOpts) String() string {
}

func filterOptFromString(s string) filterOpts {
switch s {
switch strings.ToLower(s) {
case "planned":
return FilterOptsPlanned
case "prepared":
Expand Down Expand Up @@ -163,3 +179,47 @@ type migrationState struct {
MigrationType string `json:"type" yaml:"type"`
Topics []string `json:"topics" yaml:"topics"`
}

func listMountTaskToAdminMigrationState(resp *dataplanev1alpha2.ListMountTasksResponse) []rpadmin.MigrationState {
var migrations []rpadmin.MigrationState
if resp != nil {
for _, task := range resp.Tasks {
if task != nil {
migrations = append(migrations, rpadmin.MigrationState{
ID: int(task.Id),
State: strings.TrimPrefix(task.State.String(), "STATE_"),
Migration: rpadmin.Migration{
MigrationType: task.Type.String(),
Topics: mountTaskTopicsToNamespacedOrInboundTopics(task.Topics, task.Type),
},
})
}
}
}
return migrations
}

// mountTaskTopicsToNamespacedOrInboundTopics converts the dataplane's
// mountTaskTopics to the rpadmin's equivalent.
func mountTaskTopicsToNamespacedOrInboundTopics(taskTopics []*dataplanev1alpha2.MountTask_Topic, taskType dataplanev1alpha2.MountTask_Type) []rpadmin.NamespacedOrInboundTopic {
var topics []rpadmin.NamespacedOrInboundTopic
for _, topic := range taskTopics {
// Inbound == Mount.
if taskType == dataplanev1alpha2.MountTask_TYPE_MOUNT {
topics = append(topics, rpadmin.NamespacedOrInboundTopic{
InboundTopic: rpadmin.InboundTopic{
SourceTopicReference: rpadmin.NamespacedTopic{
Topic: topic.SourceTopicReference, // The topic of the bucket you are mounting.
},
},
})
} else {
topics = append(topics, rpadmin.NamespacedOrInboundTopic{
NamespacedTopic: rpadmin.NamespacedTopic{
Topic: topic.TopicReference, // The topic in the cluster you are un-mounting.
},
})
}
}
return topics
}
44 changes: 37 additions & 7 deletions src/go/rpk/pkg/cli/cluster/storage/list-mountable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io"
"os"

dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2"
"connectrpc.com/connect"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand Down Expand Up @@ -33,15 +35,30 @@ List all mountable topics:
out.Exit(h)
}

pf, err := p.LoadVirtualProfile(fs)
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
config.CheckExitServerlessAdmin(p)

response, err := adm.ListMountableTopics(cmd.Context())
out.MaybeDie(err, "unable to list mountable topics: %v", err)
printDetailedListMountable(p.Formatter, rpadminMountableTopicsToMountableTopicState(response.Topics), os.Stdout)
var mountableTopics []rpadmin.MountableTopic
if p.FromCloud {
cl, err := createDataplaneClient(p)
out.MaybeDieErr(err)

resp, err := cl.CloudStorage.ListMountableTopics(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListMountableTopicsRequest{}))
out.MaybeDie(err, "unable to list mountable topics: %v", err)
if resp != nil {
mountableTopics = dataplaneToAdminMountableTopics(resp.Msg)
}
} else {
adm, err := adminapi.NewClient(cmd.Context(), fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

response, err := adm.ListMountableTopics(cmd.Context())
out.MaybeDie(err, "unable to list mountable topics: %v", err)
mountableTopics = response.Topics
}

printDetailedListMountable(f, rpadminMountableTopicsToMountableTopicState(mountableTopics), os.Stdout)
},
}
p.InstallFormatFlag(cmd)
Expand Down Expand Up @@ -83,3 +100,16 @@ func rpadminMountableTopicsToMountableTopicState(in []rpadmin.MountableTopic) []
}
return resp
}

func dataplaneToAdminMountableTopics(resp *dataplanev1alpha2.ListMountableTopicsResponse) []rpadmin.MountableTopic {
var topics []rpadmin.MountableTopic
if resp != nil {
for _, topic := range resp.Topics {
topics = append(topics, rpadmin.MountableTopic{
TopicLocation: topic.TopicLocation,
Topic: topic.Name,
})
}
}
return topics
}
73 changes: 53 additions & 20 deletions src/go/rpk/pkg/cli/cluster/storage/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"fmt"
"strings"

dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2"
"connectrpc.com/connect"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand Down Expand Up @@ -46,39 +48,70 @@ with my-new-topic as the new topic name
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, from []string) {
pf, err := p.LoadVirtualProfile(fs)
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
config.CheckExitServerlessAdmin(p)

n, t := nsTopic(from[0])
ns, t := nsTopic(from[0])
if t == "" {
out.Die("topic is required")
}
topic := rpadmin.InboundTopic{
SourceTopicReference: rpadmin.NamespacedTopic{
Namespace: string2pointer(n),
Topic: t,
},
}
an, at := nsTopic(to)
alias := t
if at != "" {
alias = at
topic.Alias = &rpadmin.NamespacedTopic{
Namespace: string2pointer(an),
Topic: alias,
var id int
if p.FromCloud {
if ns != "" && strings.ToLower(ns) != "kafka" {
out.Die("Namespace %q not allowed. Only kafka topics can be mounted in Redpanda Cloud clusters", ns)
}
}
if an != "" && strings.ToLower(an) != "kafka" {
out.Die("Failed to parse '--to' flag: namespace %q not allowed. Only kafka topics can be mounted in Redpanda Cloud clusters", an)
}
cl, err := createDataplaneClient(p)
out.MaybeDieErr(err)

mg, err := adm.MountTopics(cmd.Context(), rpadmin.MountConfiguration{Topics: []rpadmin.InboundTopic{topic}})
out.MaybeDie(err, "unable to mount topic: %v", err)
topicMount := &dataplanev1alpha2.MountTopicsRequest_TopicMount{
SourceTopicReference: t,
}
if at != "" {
alias = at
topicMount.Alias = alias
}
resp, err := cl.CloudStorage.MountTopics(
cmd.Context(),
connect.NewRequest(
&dataplanev1alpha2.MountTopicsRequest{
Topics: []*dataplanev1alpha2.MountTopicsRequest_TopicMount{topicMount},
}),
)
out.MaybeDie(err, "unable to mount topic: %v", err)
if resp.Msg != nil {
id = int(resp.Msg.MountTaskId)
}
} else {
adm, err := adminapi.NewClient(cmd.Context(), fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
topic := rpadmin.InboundTopic{
SourceTopicReference: rpadmin.NamespacedTopic{
Namespace: string2pointer(ns),
Topic: t,
},
}
if at != "" {
alias = at
topic.Alias = &rpadmin.NamespacedTopic{
Namespace: string2pointer(an),
Topic: alias,
}
}
mg, err := adm.MountTopics(cmd.Context(), rpadmin.MountConfiguration{Topics: []rpadmin.InboundTopic{topic}})
out.MaybeDie(err, "unable to mount topic: %v", err)
id = mg.ID
}

fmt.Printf(`
Topic mount from Tiered Storage topic %v to your Redpanda Cluster topic %v
has started with Migration ID %v
To check the status run 'rpk cluster storage status-mount %d`+"\n", t, alias, mg.ID, mg.ID)
To check the status run 'rpk cluster storage status-mount %d`+"\n", t, alias, id, id)
},
}
cmd.Flags().StringVar(&to, "to", "", "New namespace/topic name for the mounted topic (optional)")
Expand Down
Loading