From 8d2c6330844344fb54df650cf5fb520aa231ee0e Mon Sep 17 00:00:00 2001 From: r-vasquez Date: Thu, 14 Nov 2024 14:36:49 -0800 Subject: [PATCH 1/4] rpk: bump dataplane to v1alpha2 The new dependency version soft-deprecates the v1alpha2 services. --- src/go/rpk/go.mod | 10 +++++----- src/go/rpk/go.sum | 20 ++++++++++---------- src/go/rpk/pkg/cli/transform/delete.go | 4 ++-- src/go/rpk/pkg/cli/transform/deploy.go | 11 +++++------ src/go/rpk/pkg/cli/transform/list.go | 6 +++--- src/go/rpk/pkg/publicapi/transform.go | 16 ++++++++-------- 6 files changed, 33 insertions(+), 34 deletions(-) diff --git a/src/go/rpk/go.mod b/src/go/rpk/go.mod index 6eac1aa8d03cd..62668de279bde 100644 --- a/src/go/rpk/go.mod +++ b/src/go/rpk/go.mod @@ -9,8 +9,8 @@ require ( buf.build/gen/go/redpandadata/cloud/connectrpc/go v1.17.0-20241024195046-353ea4645e3d.1 buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.35.1-20241024195046-353ea4645e3d.1 buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.35.1-20240917150400-3f349e63f44a.1 - buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20240823133854-b83c57715214.1 - buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20240823133854-b83c57715214.1 + buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20241112225414-3759fedba3f3.1 + buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20241112225414-3759fedba3f3.1 cloud.google.com/go/compute/metadata v0.5.2 connectrpc.com/connect v1.17.0 github.com/AlecAivazis/survey/v2 v2.3.7 @@ -140,9 +140,9 @@ require ( golang.org/x/text v0.19.0 // indirect golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.26.0 // indirect - google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect + google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gotest.tools/v3 v3.0.3 // indirect diff --git a/src/go/rpk/go.sum b/src/go/rpk/go.sum index cbb33f0bba413..d651f54244400 100644 --- a/src/go/rpk/go.sum +++ b/src/go/rpk/go.sum @@ -8,10 +8,10 @@ buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.35.1-20241024195046-35 buf.build/gen/go/redpandadata/cloud/protocolbuffers/go v1.35.1-20241024195046-353ea4645e3d.1/go.mod h1:KYw4KQVGbgMkHR4br5uQjNFwT3b5TML5Ll3SLAsh4Ho= buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.35.1-20240917150400-3f349e63f44a.1 h1:H/JebbbR+Kd0vXXY4cyqUZOmmXw0YUvQjjmvHBnKSpw= buf.build/gen/go/redpandadata/common/protocolbuffers/go v1.35.1-20240917150400-3f349e63f44a.1/go.mod h1:AD5cSkm/Wy/YTKR9VKtnKAoYxbLZSh/pYC8g9VCeMJA= -buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20240823133854-b83c57715214.1 h1:sLbN1qppoFCmfp8e/h8z1y7TyFqQYWJlyJpaybQGsmw= -buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20240823133854-b83c57715214.1/go.mod h1:J9MSgmioQQG//z0cS0pEkkZn3Q0DxRT8UvLk1dAYNhM= -buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20240823133854-b83c57715214.1 h1:Ytc3jVPUHAA6Lep8ptfesx3zsWeroHVHbloNP1q6HXc= -buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20240823133854-b83c57715214.1/go.mod h1:5WJc8OWoe83gSQv52+xclVntrgm2tixef9S61wyQAQA= +buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20241112225414-3759fedba3f3.1 h1:4cD7CRcJLTjA45y5xoL5qPyqiV0pTyNKgJ9jMg8c2so= +buf.build/gen/go/redpandadata/dataplane/connectrpc/go v1.17.0-20241112225414-3759fedba3f3.1/go.mod h1:lAVv5Nv6SZUV8+UFtUfFF2mMS4WlDp1CsOSPtNgrjPE= +buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20241112225414-3759fedba3f3.1 h1:FoxR0Huu43isy8t/JcQkeORWN6KYb0SDoCKLrpU529E= +buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go v1.35.1-20241112225414-3759fedba3f3.1/go.mod h1:+/pdQipFpdMztKw+xaZFHGUrwMfHLu1qyKOGpTsWFeA= cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo= cloud.google.com/go/compute/metadata v0.5.2/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k= connectrpc.com/connect v1.17.0 h1:W0ZqMhtVzn9Zhn2yATuUokDLO5N+gIuBWMOnsQrfmZk= @@ -368,12 +368,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38 h1:Q3nlH8iSQSRUwOskjbcSMcF2jiYMNiQYZ0c2KEJLKKU= -google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38/go.mod h1:xBI+tzfqGGN2JBeSebfKXFSdBpWVQ7sLW40PTupVRm4= -google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38 h1:2oV8dfuIkM1Ti7DwXc0BJfnwr9csz4TDXI9EmiI+Rbw= -google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38/go.mod h1:vuAjtvlwkDKF6L1GQ0SokiRLCGFfeBUXWr/aFFkHACc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28 h1:KJjNNclfpIkVqrZlTWcgOOaVQ00LdBnoEaRfkUx760s= +google.golang.org/genproto v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:mt9/MofW7AWQ+Gy179ChOnvmJatV8YHUmrcedo9CIFI= +google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g= +google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= diff --git a/src/go/rpk/pkg/cli/transform/delete.go b/src/go/rpk/pkg/cli/transform/delete.go index 504582d011615..6c98be9c7e3aa 100644 --- a/src/go/rpk/pkg/cli/transform/delete.go +++ b/src/go/rpk/pkg/cli/transform/delete.go @@ -12,7 +12,7 @@ package transform import ( "fmt" - dataplanev1alpha1 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha1" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" "connectrpc.com/connect" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" @@ -53,7 +53,7 @@ func newDeleteCommand(fs afero.Fs, p *config.Params) *cobra.Command { _, err = cl.Transform.DeleteTransform( cmd.Context(), - connect.NewRequest(&dataplanev1alpha1.DeleteTransformRequest{ + connect.NewRequest(&dataplanev1alpha2.DeleteTransformRequest{ Name: functionName, })) out.MaybeDie(err, "unable to delete transform %q: %v", functionName, err) diff --git a/src/go/rpk/pkg/cli/transform/deploy.go b/src/go/rpk/pkg/cli/transform/deploy.go index 84d0128304304..f433df37269c1 100644 --- a/src/go/rpk/pkg/cli/transform/deploy.go +++ b/src/go/rpk/pkg/cli/transform/deploy.go @@ -22,9 +22,8 @@ import ( "strings" "time" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" "github.com/redpanda-data/common-go/rpadmin" - - dataplanev1alpha1 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha1" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/transform/project" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" @@ -360,15 +359,15 @@ func parseOffset(formattedOffset string) (*rpadmin.Offset, error) { return &rpadmin.Offset{Format: format, Value: val}, nil } -func adminAPIToDataplaneMetadata(m rpadmin.TransformMetadata) *dataplanev1alpha1.DeployTransformRequest { - var envs []*dataplanev1alpha1.TransformMetadata_EnvironmentVariable +func adminAPIToDataplaneMetadata(m rpadmin.TransformMetadata) *dataplanev1alpha2.DeployTransformRequest { + var envs []*dataplanev1alpha2.TransformMetadata_EnvironmentVariable for _, e := range m.Environment { - envs = append(envs, &dataplanev1alpha1.TransformMetadata_EnvironmentVariable{ + envs = append(envs, &dataplanev1alpha2.TransformMetadata_EnvironmentVariable{ Key: e.Key, Value: e.Value, }) } - return &dataplanev1alpha1.DeployTransformRequest{ + return &dataplanev1alpha2.DeployTransformRequest{ Name: m.Name, InputTopicName: m.InputTopic, OutputTopicNames: m.OutputTopics, diff --git a/src/go/rpk/pkg/cli/transform/list.go b/src/go/rpk/pkg/cli/transform/list.go index 17d3f1b68061a..294d32a0ed5eb 100644 --- a/src/go/rpk/pkg/cli/transform/list.go +++ b/src/go/rpk/pkg/cli/transform/list.go @@ -19,7 +19,7 @@ import ( "github.com/redpanda-data/common-go/rpadmin" - dataplanev1alpha1 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha1" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" "connectrpc.com/connect" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" @@ -91,7 +91,7 @@ The --detailed flag (-d) opts in to printing extra per-processor information. cl, err := publicapi.NewDataPlaneClientSet(url, p.CurrentAuth().AuthToken) out.MaybeDie(err, "unable to initialize cloud client: %v", err) - res, err := cl.Transform.ListTransforms(cmd.Context(), connect.NewRequest(&dataplanev1alpha1.ListTransformsRequest{})) + res, err := cl.Transform.ListTransforms(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListTransformsRequest{})) out.MaybeDie(err, "unable to list transforms from Cloud: %v", err) l = dataplaneToAdminTransformMetadata(res.Msg.Transforms) } else { @@ -198,7 +198,7 @@ func printDetailed(f config.OutFormatter, d []detailedTransformMetadata, w io.Wr } } -func dataplaneToAdminTransformMetadata(transforms []*dataplanev1alpha1.TransformMetadata) []rpadmin.TransformMetadata { +func dataplaneToAdminTransformMetadata(transforms []*dataplanev1alpha2.TransformMetadata) []rpadmin.TransformMetadata { var transformMetadata []rpadmin.TransformMetadata for _, t := range transforms { var ( diff --git a/src/go/rpk/pkg/publicapi/transform.go b/src/go/rpk/pkg/publicapi/transform.go index 606b9c0929a35..10bfe14d5f367 100644 --- a/src/go/rpk/pkg/publicapi/transform.go +++ b/src/go/rpk/pkg/publicapi/transform.go @@ -19,8 +19,8 @@ import ( "net/http" commonv1alpha1 "buf.build/gen/go/redpandadata/common/protocolbuffers/go/redpanda/api/common/v1alpha1" - "buf.build/gen/go/redpandadata/dataplane/connectrpc/go/redpanda/api/dataplane/v1alpha1/dataplanev1alpha1connect" - v1alpha1 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha1" + "buf.build/gen/go/redpandadata/dataplane/connectrpc/go/redpanda/api/dataplane/v1alpha2/dataplanev1alpha2connect" + v1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" "connectrpc.com/connect" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/httpapi" ) @@ -31,12 +31,12 @@ const transformPath = "/v1alpha1/transforms" // dataplanev1alpha1connect.TransformServiceClient to support the // DeployTransform request. type transformServiceClient struct { - tCl dataplanev1alpha1connect.TransformServiceClient + tCl dataplanev1alpha2connect.TransformServiceClient httpCl *httpapi.Client } type DeployTransformRequest struct { - Metadata *v1alpha1.DeployTransformRequest + Metadata *v1alpha2.DeployTransformRequest WasmBinary io.Reader } @@ -50,20 +50,20 @@ func newTransformServiceClient(httpClient *http.Client, host, authToken string, }), } return transformServiceClient{ - tCl: dataplanev1alpha1connect.NewTransformServiceClient(httpClient, host, opts...), + tCl: dataplanev1alpha2connect.NewTransformServiceClient(httpClient, host, opts...), httpCl: httpapi.NewClient(httpOpts...), } } -func (tsc *transformServiceClient) ListTransforms(ctx context.Context, r *connect.Request[v1alpha1.ListTransformsRequest]) (*connect.Response[v1alpha1.ListTransformsResponse], error) { +func (tsc *transformServiceClient) ListTransforms(ctx context.Context, r *connect.Request[v1alpha2.ListTransformsRequest]) (*connect.Response[v1alpha2.ListTransformsResponse], error) { return tsc.tCl.ListTransforms(ctx, r) } -func (tsc *transformServiceClient) GetTransform(ctx context.Context, r *connect.Request[v1alpha1.GetTransformRequest]) (*connect.Response[v1alpha1.GetTransformResponse], error) { +func (tsc *transformServiceClient) GetTransform(ctx context.Context, r *connect.Request[v1alpha2.GetTransformRequest]) (*connect.Response[v1alpha2.GetTransformResponse], error) { return tsc.tCl.GetTransform(ctx, r) } -func (tsc *transformServiceClient) DeleteTransform(ctx context.Context, r *connect.Request[v1alpha1.DeleteTransformRequest]) (*connect.Response[v1alpha1.DeleteTransformResponse], error) { +func (tsc *transformServiceClient) DeleteTransform(ctx context.Context, r *connect.Request[v1alpha2.DeleteTransformRequest]) (*connect.Response[v1alpha2.DeleteTransformResponse], error) { return tsc.tCl.DeleteTransform(ctx, r) } From 0e655a61b13709ddee92b327c9bbea88fd6760fb Mon Sep 17 00:00:00 2001 From: r-vasquez Date: Thu, 14 Nov 2024 14:37:58 -0800 Subject: [PATCH 2/4] rpk/makefile: download golangcilint with recommended method Downloading using go get is not recommended per golangci-lint docs --- src/go/rpk/Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/go/rpk/Makefile b/src/go/rpk/Makefile index cf5657890ec6f..d9d6575418e77 100644 --- a/src/go/rpk/Makefile +++ b/src/go/rpk/Makefile @@ -23,6 +23,7 @@ endif GOOS ?= $(shell go env GOOS) GOARCH ?= $(shell go env GOARCH) +GOPATH ?= $(shell go env GOPATH) OUTDIR := $(GOOS)-$(GOARCH) REV := $(shell git rev-parse --short HEAD) @@ -64,8 +65,7 @@ run_gofumpt: $(GOFUMPT_OS_CMD) install_golangci_lint: - @echo "installing golangci-lint" - @$(GOCMD) install github.com/golangci/golangci-lint/cmd/golangci-lint@latest + @which $(GOLANGCILINTCMD) || (if [ $$? -eq 1 ]; then echo "golangci-lint not found, installing..."; curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.62.0; fi) run_linter: $(GOLANGCILINTCMD) run From 26e5aa31775c906a90ba90b629b0b11e53dbecca Mon Sep 17 00:00:00 2001 From: r-vasquez Date: Thu, 14 Nov 2024 14:56:29 -0800 Subject: [PATCH 3/4] rpk: add public API support to mount/unmount Now we will be able to run mount/unmount commands against Redpanda Cloud clusters using the dataplane API. --- src/go/rpk/pkg/cli/cluster/storage/BUILD | 3 + .../pkg/cli/cluster/storage/cancel-mount.go | 30 ++++++-- .../rpk/pkg/cli/cluster/storage/list-mount.go | 76 +++++++++++++++++-- .../pkg/cli/cluster/storage/list-mountable.go | 44 +++++++++-- src/go/rpk/pkg/cli/cluster/storage/mount.go | 73 +++++++++++++----- .../pkg/cli/cluster/storage/status-mount.go | 62 ++++++++++++--- src/go/rpk/pkg/cli/cluster/storage/storage.go | 15 ++++ src/go/rpk/pkg/cli/cluster/storage/unmount.go | 43 +++++++++-- src/go/rpk/pkg/cli/transform/BUILD | 2 +- src/go/rpk/pkg/publicapi/BUILD | 4 +- src/go/rpk/pkg/publicapi/dataplane.go | 7 +- 11 files changed, 293 insertions(+), 66 deletions(-) diff --git a/src/go/rpk/pkg/cli/cluster/storage/BUILD b/src/go/rpk/pkg/cli/cluster/storage/BUILD index 7dbb37ec5c0a8..2558cc6521250 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/BUILD +++ b/src/go/rpk/pkg/cli/cluster/storage/BUILD @@ -18,6 +18,9 @@ go_library( "//src/go/rpk/pkg/cli/cluster/storage/recovery", "//src/go/rpk/pkg/config", "//src/go/rpk/pkg/out", + "//src/go/rpk/pkg/publicapi", + "@build_buf_gen_go_redpandadata_dataplane_protocolbuffers_go//redpanda/api/dataplane/v1alpha2", + "@com_connectrpc_connect//:connect", "@com_github_redpanda_data_common_go_rpadmin//:rpadmin", "@com_github_spf13_afero//:afero", "@com_github_spf13_cobra//:cobra", diff --git a/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go b/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go index 342b92560df93..be8d57f65bea6 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go @@ -13,6 +13,8 @@ import ( "fmt" "strconv" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" + "connectrpc.com/connect" "github.com/redpanda-data/common-go/rpadmin" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" @@ -36,17 +38,33 @@ Cancel a mount/unmount operation `, Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, from []string) { - pf, err := p.LoadVirtualProfile(fs) + p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) - config.CheckExitCloudAdmin(pf) - adm, err := adminapi.NewClient(cmd.Context(), fs, pf) - out.MaybeDie(err, "unable to initialize admin client: %v", err) + config.CheckExitServerlessAdmin(p) migrationID, err := strconv.Atoi(from[0]) out.MaybeDie(err, "invalid migration ID: %v", err) - err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel) - out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err) + if p.FromCloud { + cl, err := createDataplaneClient(p) + out.MaybeDieErr(err) + + req := connect.NewRequest( + &dataplanev1alpha2.UpdateMountTaskRequest{ + Id: int32(migrationID), + Action: dataplanev1alpha2.UpdateMountTaskRequest_ACTION_CANCEL, + }, + ) + _, err = cl.CloudStorage.UpdateMountTask(cmd.Context(), req) + out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err) + } else { + adm, err := adminapi.NewClient(cmd.Context(), fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel) + out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err) + } + fmt.Printf("Successfully canceled the operation with ID %v", migrationID) }, } diff --git a/src/go/rpk/pkg/cli/cluster/storage/list-mount.go b/src/go/rpk/pkg/cli/cluster/storage/list-mount.go index f4c83f46fa281..ea7c14b390f1b 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/list-mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/list-mount.go @@ -15,6 +15,8 @@ import ( "os" "strings" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" + "connectrpc.com/connect" "github.com/redpanda-data/common-go/rpadmin" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" @@ -52,15 +54,29 @@ Use filter to list only migrations in a specific state out.Exit(h) } - pf, err := p.LoadVirtualProfile(fs) + p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) - config.CheckExitCloudAdmin(pf) - adm, err := adminapi.NewClient(cmd.Context(), fs, pf) - out.MaybeDie(err, "unable to initialize admin client: %v", err) + config.CheckExitServerlessAdmin(p) - migrations, err := adm.ListMigrations(cmd.Context()) - out.MaybeDie(err, "unable to list migrations: %v", err) - printDetailedListMount(p.Formatter, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout) + var migrations []rpadmin.MigrationState + if p.FromCloud { + cl, err := createDataplaneClient(p) + out.MaybeDieErr(err) + + resp, err := cl.CloudStorage.ListMountTasks(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListMountTasksRequest{})) + out.MaybeDie(err, "unable to list mount/unmount operations: %v", err) + + if resp != nil { + migrations = listMountTaskToAdminMigrationState(resp.Msg) + } + } else { + adm, err := adminapi.NewClient(cmd.Context(), fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + migrations, err = adm.ListMigrations(cmd.Context()) + out.MaybeDie(err, "unable to list migrations: %v", err) + } + printDetailedListMount(f, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout) }, } p.InstallFormatFlag(cmd) @@ -96,7 +112,7 @@ func (f filterOpts) String() string { } func filterOptFromString(s string) filterOpts { - switch s { + switch strings.ToLower(s) { case "planned": return FilterOptsPlanned case "prepared": @@ -163,3 +179,47 @@ type migrationState struct { MigrationType string `json:"type" yaml:"type"` Topics []string `json:"topics" yaml:"topics"` } + +func listMountTaskToAdminMigrationState(resp *dataplanev1alpha2.ListMountTasksResponse) []rpadmin.MigrationState { + var migrations []rpadmin.MigrationState + if resp != nil { + for _, task := range resp.Tasks { + if task != nil { + migrations = append(migrations, rpadmin.MigrationState{ + ID: int(task.Id), + State: strings.TrimPrefix(task.State.String(), "STATE_"), + Migration: rpadmin.Migration{ + MigrationType: task.Type.String(), + Topics: mountTaskTopicsToNamespacedOrInboundTopics(task.Topics, task.Type), + }, + }) + } + } + } + return migrations +} + +// mountTaskTopicsToNamespacedOrInboundTopics converts the dataplane's +// mountTaskTopics to the rpadmin's equivalent. +func mountTaskTopicsToNamespacedOrInboundTopics(taskTopics []*dataplanev1alpha2.MountTask_Topic, taskType dataplanev1alpha2.MountTask_Type) []rpadmin.NamespacedOrInboundTopic { + var topics []rpadmin.NamespacedOrInboundTopic + for _, topic := range taskTopics { + // Inbound == Mount. + if taskType == dataplanev1alpha2.MountTask_TYPE_MOUNT { + topics = append(topics, rpadmin.NamespacedOrInboundTopic{ + InboundTopic: rpadmin.InboundTopic{ + SourceTopicReference: rpadmin.NamespacedTopic{ + Topic: topic.SourceTopicReference, // The topic of the bucket you are mounting. + }, + }, + }) + } else { + topics = append(topics, rpadmin.NamespacedOrInboundTopic{ + NamespacedTopic: rpadmin.NamespacedTopic{ + Topic: topic.TopicReference, // The topic in the cluster you are un-mounting. + }, + }) + } + } + return topics +} diff --git a/src/go/rpk/pkg/cli/cluster/storage/list-mountable.go b/src/go/rpk/pkg/cli/cluster/storage/list-mountable.go index a24febdc6ec3e..8508a34ac276e 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/list-mountable.go +++ b/src/go/rpk/pkg/cli/cluster/storage/list-mountable.go @@ -5,6 +5,8 @@ import ( "io" "os" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" + "connectrpc.com/connect" "github.com/redpanda-data/common-go/rpadmin" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" @@ -33,15 +35,30 @@ List all mountable topics: out.Exit(h) } - pf, err := p.LoadVirtualProfile(fs) + p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) - config.CheckExitCloudAdmin(pf) - adm, err := adminapi.NewClient(cmd.Context(), fs, pf) - out.MaybeDie(err, "unable to initialize admin client: %v", err) + config.CheckExitServerlessAdmin(p) - response, err := adm.ListMountableTopics(cmd.Context()) - out.MaybeDie(err, "unable to list mountable topics: %v", err) - printDetailedListMountable(p.Formatter, rpadminMountableTopicsToMountableTopicState(response.Topics), os.Stdout) + var mountableTopics []rpadmin.MountableTopic + if p.FromCloud { + cl, err := createDataplaneClient(p) + out.MaybeDieErr(err) + + resp, err := cl.CloudStorage.ListMountableTopics(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListMountableTopicsRequest{})) + out.MaybeDie(err, "unable to list mountable topics: %v", err) + if resp != nil { + mountableTopics = dataplaneToAdminMountableTopics(resp.Msg) + } + } else { + adm, err := adminapi.NewClient(cmd.Context(), fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + response, err := adm.ListMountableTopics(cmd.Context()) + out.MaybeDie(err, "unable to list mountable topics: %v", err) + mountableTopics = response.Topics + } + + printDetailedListMountable(f, rpadminMountableTopicsToMountableTopicState(mountableTopics), os.Stdout) }, } p.InstallFormatFlag(cmd) @@ -83,3 +100,16 @@ func rpadminMountableTopicsToMountableTopicState(in []rpadmin.MountableTopic) [] } return resp } + +func dataplaneToAdminMountableTopics(resp *dataplanev1alpha2.ListMountableTopicsResponse) []rpadmin.MountableTopic { + var topics []rpadmin.MountableTopic + if resp != nil { + for _, topic := range resp.Topics { + topics = append(topics, rpadmin.MountableTopic{ + TopicLocation: topic.TopicLocation, + Topic: topic.Name, + }) + } + } + return topics +} diff --git a/src/go/rpk/pkg/cli/cluster/storage/mount.go b/src/go/rpk/pkg/cli/cluster/storage/mount.go index aa69213b0a9d5..c08a83eb7dc5d 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/mount.go @@ -13,6 +13,8 @@ import ( "fmt" "strings" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" + "connectrpc.com/connect" "github.com/redpanda-data/common-go/rpadmin" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" @@ -46,39 +48,70 @@ with my-new-topic as the new topic name `, Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, from []string) { - pf, err := p.LoadVirtualProfile(fs) + p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) - config.CheckExitCloudAdmin(pf) - adm, err := adminapi.NewClient(cmd.Context(), fs, pf) - out.MaybeDie(err, "unable to initialize admin client: %v", err) + config.CheckExitServerlessAdmin(p) - n, t := nsTopic(from[0]) + ns, t := nsTopic(from[0]) if t == "" { out.Die("topic is required") } - topic := rpadmin.InboundTopic{ - SourceTopicReference: rpadmin.NamespacedTopic{ - Namespace: string2pointer(n), - Topic: t, - }, - } an, at := nsTopic(to) alias := t - if at != "" { - alias = at - topic.Alias = &rpadmin.NamespacedTopic{ - Namespace: string2pointer(an), - Topic: alias, + var id int + if p.FromCloud { + if ns != "" && strings.ToLower(ns) != "kafka" { + out.Die("Namespace %q not allowed. Only kafka topics can be mounted in Redpanda Cloud clusters", ns) } - } + if an != "" && strings.ToLower(an) != "kafka" { + out.Die("Failed to parse '--to' flag: namespace %q not allowed. Only kafka topics can be mounted in Redpanda Cloud clusters", an) + } + cl, err := createDataplaneClient(p) + out.MaybeDieErr(err) - mg, err := adm.MountTopics(cmd.Context(), rpadmin.MountConfiguration{Topics: []rpadmin.InboundTopic{topic}}) - out.MaybeDie(err, "unable to mount topic: %v", err) + topicMount := &dataplanev1alpha2.MountTopicsRequest_TopicMount{ + SourceTopicReference: t, + } + if at != "" { + alias = at + topicMount.Alias = alias + } + resp, err := cl.CloudStorage.MountTopics( + cmd.Context(), + connect.NewRequest( + &dataplanev1alpha2.MountTopicsRequest{ + Topics: []*dataplanev1alpha2.MountTopicsRequest_TopicMount{topicMount}, + }), + ) + out.MaybeDie(err, "unable to mount topic: %v", err) + if resp.Msg != nil { + id = int(resp.Msg.MountTaskId) + } + } else { + adm, err := adminapi.NewClient(cmd.Context(), fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + topic := rpadmin.InboundTopic{ + SourceTopicReference: rpadmin.NamespacedTopic{ + Namespace: string2pointer(ns), + Topic: t, + }, + } + if at != "" { + alias = at + topic.Alias = &rpadmin.NamespacedTopic{ + Namespace: string2pointer(an), + Topic: alias, + } + } + mg, err := adm.MountTopics(cmd.Context(), rpadmin.MountConfiguration{Topics: []rpadmin.InboundTopic{topic}}) + out.MaybeDie(err, "unable to mount topic: %v", err) + id = mg.ID + } fmt.Printf(` Topic mount from Tiered Storage topic %v to your Redpanda Cluster topic %v has started with Migration ID %v -To check the status run 'rpk cluster storage status-mount %d`+"\n", t, alias, mg.ID, mg.ID) +To check the status run 'rpk cluster storage status-mount %d`+"\n", t, alias, id, id) }, } cmd.Flags().StringVar(&to, "to", "", "New namespace/topic name for the mounted topic (optional)") diff --git a/src/go/rpk/pkg/cli/cluster/storage/status-mount.go b/src/go/rpk/pkg/cli/cluster/storage/status-mount.go index 79806c943efdb..e5e2fb364e061 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/status-mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/status-mount.go @@ -16,6 +16,9 @@ import ( "strconv" "strings" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" + "connectrpc.com/connect" + "github.com/redpanda-data/common-go/rpadmin" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" @@ -39,25 +42,44 @@ Status for a mount/unmount operation if h, ok := f.Help(migrationState{}); ok { out.Exit(h) } - pf, err := p.LoadVirtualProfile(fs) + p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) - - config.CheckExitCloudAdmin(pf) - adm, err := adminapi.NewClient(cmd.Context(), fs, pf) - out.MaybeDie(err, "unable to initialize admin client: %v", err) + config.CheckExitServerlessAdmin(p) migrationID, err := strconv.Atoi(from[0]) out.MaybeDie(err, "invalid migration ID: %v", err) - status, err := adm.GetMigration(cmd.Context(), migrationID) - out.MaybeDie(err, "unable to get the status of the migration: %v", err) + var mState rpadmin.MigrationState + if p.FromCloud { + cl, err := createDataplaneClient(p) + out.MaybeDieErr(err) + + resp, err := cl.CloudStorage.GetMountTask( + cmd.Context(), + connect.NewRequest( + &dataplanev1alpha2.GetMountTaskRequest{ + Id: int32(migrationID), + }, + ), + ) + out.MaybeDie(err, "unable to get the status of mount/unmount operation: %v", err) + if resp != nil { + mState = mountTaskToAdminMigrationState(resp.Msg) + } + } else { + adm, err := adminapi.NewClient(cmd.Context(), fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + mState, err = adm.GetMigration(cmd.Context(), migrationID) + out.MaybeDie(err, "unable to get the status of the migration: %v", err) + } outStatus := migrationState{ - ID: status.ID, - State: status.State, - MigrationType: status.Migration.MigrationType, - Topics: rpadminTopicsToStringSlice(status.Migration.Topics), + ID: mState.ID, + State: mState.State, + MigrationType: mState.Migration.MigrationType, + Topics: rpadminTopicsToStringSlice(mState.Migration.Topics), } - printDetailedStatusMount(p.Formatter, outStatus, os.Stdout) + printDetailedStatusMount(f, outStatus, os.Stdout) }, } p.InstallFormatFlag(cmd) @@ -74,3 +96,19 @@ func printDetailedStatusMount(f config.OutFormatter, d migrationState, w io.Writ defer tw.Flush() tw.Print(d.ID, d.State, d.MigrationType, strings.Join(d.Topics, ", ")) } + +func mountTaskToAdminMigrationState(resp *dataplanev1alpha2.GetMountTaskResponse) rpadmin.MigrationState { + var state rpadmin.MigrationState + if resp != nil { + task := resp.Task + state = rpadmin.MigrationState{ + ID: int(task.Id), + State: strings.TrimPrefix(task.State.String(), "STATE_"), + Migration: rpadmin.Migration{ + MigrationType: task.Type.String(), + Topics: mountTaskTopicsToNamespacedOrInboundTopics(task.Topics, task.Type), + }, + } + } + return state +} diff --git a/src/go/rpk/pkg/cli/cluster/storage/storage.go b/src/go/rpk/pkg/cli/cluster/storage/storage.go index a9436a93b8325..97731886d8c32 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/storage.go +++ b/src/go/rpk/pkg/cli/cluster/storage/storage.go @@ -10,8 +10,11 @@ package storage import ( + "fmt" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/storage/recovery" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/publicapi" "github.com/spf13/afero" "github.com/spf13/cobra" ) @@ -32,3 +35,15 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { ) return cmd } + +func createDataplaneClient(p *config.RpkProfile) (*publicapi.DataPlaneClientSet, error) { + url, err := p.CloudCluster.CheckClusterURL() + if err != nil { + return nil, fmt.Errorf("unable to get cluster information from your profile: %v", err) + } + cl, err := publicapi.NewDataPlaneClientSet(url, p.CurrentAuth().AuthToken) + if err != nil { + return nil, fmt.Errorf("unable to initialize cloud client: %v", err) + } + return cl, nil +} diff --git a/src/go/rpk/pkg/cli/cluster/storage/unmount.go b/src/go/rpk/pkg/cli/cluster/storage/unmount.go index dadb3b65be1da..fea915cc4bb57 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/unmount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/unmount.go @@ -11,7 +11,10 @@ package storage import ( "fmt" + "strings" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" + "connectrpc.com/connect" "github.com/redpanda-data/common-go/rpadmin" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" @@ -46,23 +49,47 @@ Unmount topic 'my-topic' from the cluster in the 'my-namespace' `, Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, topics []string) { - pf, err := p.LoadVirtualProfile(fs) + p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) - config.CheckExitCloudAdmin(pf) - adm, err := adminapi.NewClient(cmd.Context(), fs, pf) - out.MaybeDie(err, "unable to initialize admin client: %v", err) + config.CheckExitServerlessAdmin(p) - n, t := nsTopic(topics[0]) + ns, t := nsTopic(topics[0]) if t == "" { out.Die("topic is required") } - mg, err := adm.UnmountTopics(cmd.Context(), rpadmin.UnmountConfiguration{Topics: []rpadmin.NamespacedTopic{{Namespace: string2pointer(n), Topic: t}}}) - out.MaybeDie(err, "unable to unmount topic: %v", err) + var id int + if p.FromCloud { + if ns != "" && strings.ToLower(ns) != "kafka" { + out.Die("Namespace %q not allowed. Only kafka topics can be unmounted in Redpanda Cloud clusters", ns) + } + cl, err := createDataplaneClient(p) + out.MaybeDieErr(err) + + resp, err := cl.CloudStorage.UnmountTopics( + cmd.Context(), + connect.NewRequest( + &dataplanev1alpha2.UnmountTopicsRequest{ + Topics: []string{t}, + }), + ) + out.MaybeDie(err, "unable to unmount topic: %v", err) + if resp != nil { + id = int(resp.Msg.MountTaskId) + } + } else { + adm, err := adminapi.NewClient(cmd.Context(), fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + mg, err := adm.UnmountTopics(cmd.Context(), rpadmin.UnmountConfiguration{Topics: []rpadmin.NamespacedTopic{{Namespace: string2pointer(ns), Topic: t}}}) + out.MaybeDie(err, "unable to unmount topic: %v", err) + id = mg.ID + } + fmt.Printf(` Topic unmounting from your Redpanda Cluster topic %v has started with Migration ID %v -To check the status run 'rpk cluster storage status-mount %d`+"\n", t, mg.ID, mg.ID) +To check the status run 'rpk cluster storage status-mount %d`+"\n", t, id, id) }, } return cmd diff --git a/src/go/rpk/pkg/cli/transform/BUILD b/src/go/rpk/pkg/cli/transform/BUILD index c43ebf94eacd9..b520434281ced 100644 --- a/src/go/rpk/pkg/cli/transform/BUILD +++ b/src/go/rpk/pkg/cli/transform/BUILD @@ -24,7 +24,7 @@ go_library( "//src/go/rpk/pkg/kafka", "//src/go/rpk/pkg/out", "//src/go/rpk/pkg/publicapi", - "@build_buf_gen_go_redpandadata_dataplane_protocolbuffers_go//redpanda/api/dataplane/v1alpha1", + "@build_buf_gen_go_redpandadata_dataplane_protocolbuffers_go//redpanda/api/dataplane/v1alpha2", "@com_connectrpc_connect//:connect", "@com_github_redpanda_data_common_go_rpadmin//:rpadmin", "@com_github_spf13_afero//:afero", diff --git a/src/go/rpk/pkg/publicapi/BUILD b/src/go/rpk/pkg/publicapi/BUILD index 507eeefc036e2..bb85cb5344cc1 100644 --- a/src/go/rpk/pkg/publicapi/BUILD +++ b/src/go/rpk/pkg/publicapi/BUILD @@ -15,8 +15,8 @@ go_library( "@build_buf_gen_go_redpandadata_cloud_connectrpc_go//redpanda/api/controlplane/v1beta2/controlplanev1beta2connect", "@build_buf_gen_go_redpandadata_cloud_protocolbuffers_go//redpanda/api/controlplane/v1beta2", "@build_buf_gen_go_redpandadata_common_protocolbuffers_go//redpanda/api/common/v1alpha1", - "@build_buf_gen_go_redpandadata_dataplane_connectrpc_go//redpanda/api/dataplane/v1alpha1/dataplanev1alpha1connect", - "@build_buf_gen_go_redpandadata_dataplane_protocolbuffers_go//redpanda/api/dataplane/v1alpha1", + "@build_buf_gen_go_redpandadata_dataplane_connectrpc_go//redpanda/api/dataplane/v1alpha2/dataplanev1alpha2connect", + "@build_buf_gen_go_redpandadata_dataplane_protocolbuffers_go//redpanda/api/dataplane/v1alpha2", "@com_connectrpc_connect//:connect", "@org_uber_go_zap//:zap", ], diff --git a/src/go/rpk/pkg/publicapi/dataplane.go b/src/go/rpk/pkg/publicapi/dataplane.go index d445ed0ca52a3..612a5609f709e 100644 --- a/src/go/rpk/pkg/publicapi/dataplane.go +++ b/src/go/rpk/pkg/publicapi/dataplane.go @@ -13,13 +13,15 @@ import ( "fmt" "net/http" + "buf.build/gen/go/redpandadata/dataplane/connectrpc/go/redpanda/api/dataplane/v1alpha2/dataplanev1alpha2connect" "connectrpc.com/connect" ) // DataPlaneClientSet holds the respective service clients to interact with // the data plane endpoints of the Public API. type DataPlaneClientSet struct { - Transform transformServiceClient + Transform transformServiceClient + CloudStorage dataplanev1alpha2connect.CloudStorageServiceClient } // NewDataPlaneClientSet creates a Public API client set with the service @@ -36,6 +38,7 @@ func NewDataPlaneClientSet(host, authToken string, opts ...connect.ClientOption) }, opts...) return &DataPlaneClientSet{ - Transform: newTransformServiceClient(http.DefaultClient, host, authToken, opts...), + Transform: newTransformServiceClient(http.DefaultClient, host, authToken, opts...), + CloudStorage: dataplanev1alpha2connect.NewCloudStorageServiceClient(http.DefaultClient, host, opts...), }, nil } From 8650b26b1eb49175d6d003ee5fc4d529920b0e6a Mon Sep 17 00:00:00 2001 From: r-vasquez Date: Fri, 15 Nov 2024 11:24:07 -0800 Subject: [PATCH 4/4] rpk: bugfix - filter in list-mount We should default to filter `all`, otherwise we won't print any operation unless a filter is provided. --- src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go | 2 +- src/go/rpk/pkg/cli/cluster/storage/list-mount.go | 8 +++++--- src/go/rpk/pkg/cli/cluster/storage/mount.go | 9 +++++---- src/go/rpk/pkg/cli/cluster/storage/unmount.go | 9 +++++---- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go b/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go index be8d57f65bea6..25753540d694b 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go @@ -65,7 +65,7 @@ Cancel a mount/unmount operation out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err) } - fmt.Printf("Successfully canceled the operation with ID %v", migrationID) + fmt.Printf("Successfully canceled the operation with ID %v\n", migrationID) }, } return cmd diff --git a/src/go/rpk/pkg/cli/cluster/storage/list-mount.go b/src/go/rpk/pkg/cli/cluster/storage/list-mount.go index ea7c14b390f1b..a29986c8c8f39 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/list-mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/list-mount.go @@ -80,7 +80,7 @@ Use filter to list only migrations in a specific state }, } p.InstallFormatFlag(cmd) - cmd.Flags().StringVarP(&filter, "filter", "f", "", "Filter the list of migrations by state. Only valid for text") + cmd.Flags().StringVarP(&filter, "filter", "f", "all", "Filter the list of migrations by state. Only valid for text") return cmd } @@ -166,9 +166,11 @@ func rpadminTopicsToStringSlice(in []rpadmin.NamespacedOrInboundTopic) (resp []s for _, entry := range in { if entry.Namespace != nil { resp = append(resp, fmt.Sprintf("%s/%s", *entry.Namespace, entry.Topic)) - continue + } else if entry.SourceTopicReference.Topic != "" { + resp = append(resp, entry.SourceTopicReference.Topic) + } else { + resp = append(resp, entry.Topic) } - resp = append(resp, entry.Topic) } return } diff --git a/src/go/rpk/pkg/cli/cluster/storage/mount.go b/src/go/rpk/pkg/cli/cluster/storage/mount.go index c08a83eb7dc5d..fa3868f92872e 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/mount.go @@ -108,10 +108,11 @@ with my-new-topic as the new topic name id = mg.ID } - fmt.Printf(` -Topic mount from Tiered Storage topic %v to your Redpanda Cluster topic %v -has started with Migration ID %v -To check the status run 'rpk cluster storage status-mount %d`+"\n", t, alias, id, id) + fmt.Printf(`Topic mount from Tiered Storage topic %q to your Redpanda Cluster topic %q +has started with Migration ID %v. + +To check the status run 'rpk cluster storage status-mount %d. +`, t, alias, id, id) }, } cmd.Flags().StringVar(&to, "to", "", "New namespace/topic name for the mounted topic (optional)") diff --git a/src/go/rpk/pkg/cli/cluster/storage/unmount.go b/src/go/rpk/pkg/cli/cluster/storage/unmount.go index fea915cc4bb57..0a81436f6e172 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/unmount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/unmount.go @@ -86,10 +86,11 @@ Unmount topic 'my-topic' from the cluster in the 'my-namespace' id = mg.ID } - fmt.Printf(` -Topic unmounting from your Redpanda Cluster topic %v -has started with Migration ID %v -To check the status run 'rpk cluster storage status-mount %d`+"\n", t, id, id) + fmt.Printf(`Topic unmounting from your Redpanda Cluster topic %q has started with +Migration ID %v. + +To check the status run 'rpk cluster storage status-mount %d. +`, t, id, id) }, } return cmd