Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpk: add Redpanda Cloud support to rpk cluster storage mount/unmount commands #24134

Merged
merged 4 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/go/rpk/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ endif

GOOS ?= $(shell go env GOOS)
GOARCH ?= $(shell go env GOARCH)
GOPATH ?= $(shell go env GOPATH)
OUTDIR := $(GOOS)-$(GOARCH)

REV := $(shell git rev-parse --short HEAD)
Expand Down Expand Up @@ -64,8 +65,7 @@ run_gofumpt:
$(GOFUMPT_OS_CMD)

install_golangci_lint:
@echo "installing golangci-lint"
@$(GOCMD) install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
@which $(GOLANGCILINTCMD) || (if [ $$? -eq 1 ]; then echo "golangci-lint not found, installing..."; curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.62.0; fi)

run_linter:
$(GOLANGCILINTCMD) run
Expand Down
10 changes: 5 additions & 5 deletions src/go/rpk/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
buf.build/gen/go/redpandadata/cloud/connectrpc/go v1.17.0-20241024195046-353ea4645e3d.1
buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.35.1-20241024195046-353ea4645e3d.1
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.35.1-20240917150400-3f349e63f44a.1
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20240823133854-b83c57715214.1
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20240823133854-b83c57715214.1
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20241112225414-3759fedba3f3.1
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20241112225414-3759fedba3f3.1
cloud.google.com/go/compute/metadata v0.5.2
connectrpc.com/connect v1.17.0
github.com/AlecAivazis/survey/v2 v2.3.7
Expand Down Expand Up @@ -140,9 +140,9 @@ require (
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.7.0 // indirect
golang.org/x/tools v0.26.0 // indirect
google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gotest.tools/v3 v3.0.3 // indirect
Expand Down
20 changes: 10 additions & 10 deletions src/go/rpk/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.35.1-20241024195046-35
buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.35.1-20241024195046-353ea4645e3d.1/go.mod h1:KYw4KQVGbgMkHR4br5uQjNFwT3b5TML5Ll3SLAsh4Ho=
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.35.1-20240917150400-3f349e63f44a.1 h1:H/JebbbR+Kd0vXXY4cyqUZOmmXw0YUvQjjmvHBnKSpw=
buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.35.1-20240917150400-3f349e63f44a.1/go.mod h1:AD5cSkm/Wy/YTKR9VKtnKAoYxbLZSh/pYC8g9VCeMJA=
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20240823133854-b83c57715214.1 h1:sLbN1qppoFCmfp8e/h8z1y7TyFqQYWJlyJpaybQGsmw=
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20240823133854-b83c57715214.1/go.mod h1:J9MSgmioQQG//z0cS0pEkkZn3Q0DxRT8UvLk1dAYNhM=
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20240823133854-b83c57715214.1 h1:Ytc3jVPUHAA6Lep8ptfesx3zsWeroHVHbloNP1q6HXc=
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20240823133854-b83c57715214.1/go.mod h1:5WJc8OWoe83gSQv52+xclVntrgm2tixef9S61wyQAQA=
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20241112225414-3759fedba3f3.1 h1:4cD7CRcJLTjA45y5xoL5qPyqiV0pTyNKgJ9jMg8c2so=
buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20241112225414-3759fedba3f3.1/go.mod h1:lAVv5Nv6SZUV8+UFtUfFF2mMS4WlDp1CsOSPtNgrjPE=
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20241112225414-3759fedba3f3.1 h1:FoxR0Huu43isy8t/JcQkeORWN6KYb0SDoCKLrpU529E=
buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20241112225414-3759fedba3f3.1/go.mod h1:+/pdQipFpdMztKw+xaZFHGUrwMfHLu1qyKOGpTsWFeA=
cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo=
cloud.google.com/go/compute/metadata v0.5.2/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k=
connectrpc.com/connect v1.17.0 h1:W0ZqMhtVzn9Zhn2yATuUokDLO5N+gIuBWMOnsQrfmZk=
Expand Down Expand Up @@ -368,12 +368,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38 h1:Q3nlH8iSQSRUwOskjbcSMcF2jiYMNiQYZ0c2KEJLKKU=
google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38/go.mod h1:xBI+tzfqGGN2JBeSebfKXFSdBpWVQ7sLW40PTupVRm4=
google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38 h1:2oV8dfuIkM1Ti7DwXc0BJfnwr9csz4TDXI9EmiI+Rbw=
google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38/go.mod h1:vuAjtvlwkDKF6L1GQ0SokiRLCGFfeBUXWr/aFFkHACc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 h1:KJjNNclfpIkVqrZlTWcgOOaVQ00LdBnoEaRfkUx760s=
google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:mt9/MofW7AWQ+Gy179ChOnvmJatV8YHUmrcedo9CIFI=
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g=
google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
Expand Down
3 changes: 3 additions & 0 deletions src/go/rpk/pkg/cli/cluster/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ go_library(
"//src/go/rpk/pkg/cli/cluster/storage/recovery",
"//src/go/rpk/pkg/config",
"//src/go/rpk/pkg/out",
"//src/go/rpk/pkg/publicapi",
"@build_buf_gen_go_redpandadata_dataplane_protocolbuffers_go//redpanda/api/dataplane/v1alpha2",
"@com_connectrpc_connect//:connect",
"@com_github_redpanda_data_common_go_rpadmin//:rpadmin",
"@com_github_spf13_afero//:afero",
"@com_github_spf13_cobra//:cobra",
Expand Down
32 changes: 25 additions & 7 deletions src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"fmt"
"strconv"

dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2"
"connectrpc.com/connect"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand All @@ -36,18 +38,34 @@ Cancel a mount/unmount operation
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, from []string) {
pf, err := p.LoadVirtualProfile(fs)
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
config.CheckExitServerlessAdmin(p)

migrationID, err := strconv.Atoi(from[0])
out.MaybeDie(err, "invalid migration ID: %v", err)

err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
fmt.Printf("Successfully canceled the operation with ID %v", migrationID)
if p.FromCloud {
cl, err := createDataplaneClient(p)
out.MaybeDieErr(err)

req := connect.NewRequest(
&dataplanev1alpha2.UpdateMountTaskRequest{
Id: int32(migrationID),
Action: dataplanev1alpha2.UpdateMountTaskRequest_ACTION_CANCEL,
},
)
_, err = cl.CloudStorage.UpdateMountTask(cmd.Context(), req)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
} else {
adm, err := adminapi.NewClient(cmd.Context(), fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
}

fmt.Printf("Successfully canceled the operation with ID %v\n", migrationID)
},
}
return cmd
Expand Down
84 changes: 73 additions & 11 deletions src/go/rpk/pkg/cli/cluster/storage/list-mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"os"
"strings"

dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2"
"connectrpc.com/connect"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand Down Expand Up @@ -52,19 +54,33 @@ Use filter to list only migrations in a specific state
out.Exit(h)
}

pf, err := p.LoadVirtualProfile(fs)
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
config.CheckExitServerlessAdmin(p)

migrations, err := adm.ListMigrations(cmd.Context())
out.MaybeDie(err, "unable to list migrations: %v", err)
printDetailedListMount(p.Formatter, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout)
var migrations []rpadmin.MigrationState
if p.FromCloud {
cl, err := createDataplaneClient(p)
out.MaybeDieErr(err)

resp, err := cl.CloudStorage.ListMountTasks(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListMountTasksRequest{}))
out.MaybeDie(err, "unable to list mount/unmount operations: %v", err)

if resp != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this conditional needed? MaybeDie exits on error. Can this return nil, nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, I was overly paranoid here to avoid a panic. resp is a connect wrapper of the response and I'm not sure if an empty list will produce an empty resp.

And I prefer being a little bit paranoid as we don't control the result at all. Happy to remove it if we are 100% sure this doesn't return nil, nil

migrations = listMountTaskToAdminMigrationState(resp.Msg)
}
} else {
adm, err := adminapi.NewClient(cmd.Context(), fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

migrations, err = adm.ListMigrations(cmd.Context())
out.MaybeDie(err, "unable to list migrations: %v", err)
}
printDetailedListMount(f, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout)
},
}
p.InstallFormatFlag(cmd)
cmd.Flags().StringVarP(&filter, "filter", "f", "", "Filter the list of migrations by state. Only valid for text")
cmd.Flags().StringVarP(&filter, "filter", "f", "all", "Filter the list of migrations by state. Only valid for text")
return cmd
}

Expand Down Expand Up @@ -96,7 +112,7 @@ func (f filterOpts) String() string {
}

func filterOptFromString(s string) filterOpts {
switch s {
switch strings.ToLower(s) {
case "planned":
return FilterOptsPlanned
case "prepared":
Expand Down Expand Up @@ -150,9 +166,11 @@ func rpadminTopicsToStringSlice(in []rpadmin.NamespacedOrInboundTopic) (resp []s
for _, entry := range in {
if entry.Namespace != nil {
resp = append(resp, fmt.Sprintf("%s/%s", *entry.Namespace, entry.Topic))
continue
} else if entry.SourceTopicReference.Topic != "" {
resp = append(resp, entry.SourceTopicReference.Topic)
} else {
resp = append(resp, entry.Topic)
}
resp = append(resp, entry.Topic)
}
return
}
Expand All @@ -163,3 +181,47 @@ type migrationState struct {
MigrationType string `json:"type" yaml:"type"`
Topics []string `json:"topics" yaml:"topics"`
}

func listMountTaskToAdminMigrationState(resp *dataplanev1alpha2.ListMountTasksResponse) []rpadmin.MigrationState {
var migrations []rpadmin.MigrationState
if resp != nil {
for _, task := range resp.Tasks {
if task != nil {
migrations = append(migrations, rpadmin.MigrationState{
ID: int(task.Id),
State: strings.TrimPrefix(task.State.String(), "STATE_"),
Migration: rpadmin.Migration{
MigrationType: task.Type.String(),
Topics: mountTaskTopicsToNamespacedOrInboundTopics(task.Topics, task.Type),
},
})
}
}
}
return migrations
}

// mountTaskTopicsToNamespacedOrInboundTopics converts the dataplane's
// mountTaskTopics to the rpadmin's equivalent.
func mountTaskTopicsToNamespacedOrInboundTopics(taskTopics []*dataplanev1alpha2.MountTask_Topic, taskType dataplanev1alpha2.MountTask_Type) []rpadmin.NamespacedOrInboundTopic {
var topics []rpadmin.NamespacedOrInboundTopic
for _, topic := range taskTopics {
// Inbound == Mount.
if taskType == dataplanev1alpha2.MountTask_TYPE_MOUNT {
topics = append(topics, rpadmin.NamespacedOrInboundTopic{
InboundTopic: rpadmin.InboundTopic{
SourceTopicReference: rpadmin.NamespacedTopic{
Topic: topic.SourceTopicReference, // The topic of the bucket you are mounting.
},
},
})
} else {
topics = append(topics, rpadmin.NamespacedOrInboundTopic{
NamespacedTopic: rpadmin.NamespacedTopic{
Topic: topic.TopicReference, // The topic in the cluster you are un-mounting.
},
})
}
}
return topics
}
44 changes: 37 additions & 7 deletions src/go/rpk/pkg/cli/cluster/storage/list-mountable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io"
"os"

dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2"
"connectrpc.com/connect"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand Down Expand Up @@ -33,15 +35,30 @@ List all mountable topics:
out.Exit(h)
}

pf, err := p.LoadVirtualProfile(fs)
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)
config.CheckExitServerlessAdmin(p)

response, err := adm.ListMountableTopics(cmd.Context())
out.MaybeDie(err, "unable to list mountable topics: %v", err)
printDetailedListMountable(p.Formatter, rpadminMountableTopicsToMountableTopicState(response.Topics), os.Stdout)
var mountableTopics []rpadmin.MountableTopic
if p.FromCloud {
cl, err := createDataplaneClient(p)
out.MaybeDieErr(err)

resp, err := cl.CloudStorage.ListMountableTopics(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListMountableTopicsRequest{}))
out.MaybeDie(err, "unable to list mountable topics: %v", err)
if resp != nil {
mountableTopics = dataplaneToAdminMountableTopics(resp.Msg)
}
} else {
adm, err := adminapi.NewClient(cmd.Context(), fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

response, err := adm.ListMountableTopics(cmd.Context())
out.MaybeDie(err, "unable to list mountable topics: %v", err)
mountableTopics = response.Topics
}

printDetailedListMountable(f, rpadminMountableTopicsToMountableTopicState(mountableTopics), os.Stdout)
},
}
p.InstallFormatFlag(cmd)
Expand Down Expand Up @@ -83,3 +100,16 @@ func rpadminMountableTopicsToMountableTopicState(in []rpadmin.MountableTopic) []
}
return resp
}

func dataplaneToAdminMountableTopics(resp *dataplanev1alpha2.ListMountableTopicsResponse) []rpadmin.MountableTopic {
var topics []rpadmin.MountableTopic
if resp != nil {
for _, topic := range resp.Topics {
topics = append(topics, rpadmin.MountableTopic{
TopicLocation: topic.TopicLocation,
Topic: topic.Name,
})
}
}
return topics
}
Loading
Loading