Skip to content

Commit

Permalink
Merge pull request #1810 from rzetelskik/manager-client-switch-1.11
Browse files Browse the repository at this point in the history
[v1.11] Switch from in-tree mermaid client to out-of-tree manager client
  • Loading branch information
scylla-operator-bot[bot] authored Mar 12, 2024
2 parents 63f5625 + fa70e26 commit 46ad382
Show file tree
Hide file tree
Showing 274 changed files with 35,404 additions and 2,850 deletions.
16 changes: 12 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0
github.com/evanphx/json-patch v5.9.0+incompatible
github.com/go-git/go-git/v5 v5.11.0
github.com/go-openapi/errors v0.22.0
github.com/go-openapi/runtime v0.28.0
github.com/go-openapi/strfmt v0.23.0
github.com/go-openapi/swag v0.23.0
github.com/go-openapi/validate v0.24.0
github.com/gocql/gocql v1.6.0
github.com/godbus/dbus/v5 v5.1.0
github.com/google/go-cmp v0.6.0
Expand All @@ -31,6 +28,7 @@ require (
github.com/prometheus/client_golang v1.19.0
github.com/scylladb/go-set v1.0.2
github.com/scylladb/gocqlx/v2 v2.8.0
github.com/scylladb/scylla-manager/v3 v3.2.6
github.com/scylladb/scylladb-swagger-go-client v0.2.0
github.com/shurcooL/githubv4 v0.0.0-20240120211514-18a1ae0e79dc
github.com/spf13/cobra v1.8.0
Expand Down Expand Up @@ -93,12 +91,16 @@ require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/analysis v0.23.0 // indirect
github.com/go-openapi/errors v0.22.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/loads v0.22.0 // indirect
github.com/go-openapi/spec v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/go-openapi/validate v0.24.0 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/gobuffalo/flect v1.0.2 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
Expand All @@ -112,16 +114,20 @@ require (
github.com/gorilla/websocket v1.5.1 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hbollon/go-edlib v1.6.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/lnquy/cron v1.1.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/moby/spdystream v0.2.0 // indirect
github.com/moby/term v0.5.0 // indirect
Expand All @@ -138,8 +144,10 @@ require (
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.50.0 // indirect
github.com/prometheus/procfs v0.13.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/scylladb/go-reflectx v1.0.1 // indirect
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/shurcooL/graphql v0.0.0-20230722043721-ed46e5a46466 // indirect
github.com/skeema/knownhosts v1.2.1 // indirect
Expand All @@ -161,7 +169,7 @@ require (
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240308144416-29370a3891b7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/evanphx/json-patch.v5 v5.9.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
71 changes: 69 additions & 2 deletions go.sum

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions pkg/cmd/operator/manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"sync"
"time"

"github.com/scylladb/scylla-manager/v3/pkg/managerclient"
scyllaversionedclient "github.com/scylladb/scylla-operator/pkg/client/scylla/clientset/versioned"
scyllainformers "github.com/scylladb/scylla-operator/pkg/client/scylla/informers/externalversions"
"github.com/scylladb/scylla-operator/pkg/controller/manager"
"github.com/scylladb/scylla-operator/pkg/genericclioptions"
"github.com/scylladb/scylla-operator/pkg/leaderelection"
"github.com/scylladb/scylla-operator/pkg/mermaidclient"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/signals"
"github.com/scylladb/scylla-operator/pkg/version"
Expand All @@ -31,7 +31,7 @@ type ManagerControllerOptions struct {

kubeClient kubernetes.Interface
scyllaClient scyllaversionedclient.Interface
managerClient *mermaidclient.Client
managerClient *managerclient.Client

ConcurrentSyncs int
}
Expand Down Expand Up @@ -123,7 +123,13 @@ func (o *ManagerControllerOptions) Complete() error {

// TODO: Use https and wire certs.
url := fmt.Sprintf("http://%s/api/v1", naming.ScyllaManagerServiceName)
managerClient, err := mermaidclient.NewClient(url, &http.Transport{})
managerClient, err := managerclient.NewClient(url, func(httpClient *http.Client) {
httpClient.Transport = http.DefaultTransport
// Limit manager calls by default to a higher bound.
// Individual calls can still be further limited using context.
// Manager is prone to extremely long calls because it (unfortunately) retries errors internally.
httpClient.Timeout = 15 * time.Second
})
if err != nil {
return fmt.Errorf("can't build manager client: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/manager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"sync"
"time"

"github.com/scylladb/scylla-manager/v3/pkg/managerclient"
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
scyllav1client "github.com/scylladb/scylla-operator/pkg/client/scylla/clientset/versioned/typed/scylla/v1"
scyllav1informers "github.com/scylladb/scylla-operator/pkg/client/scylla/informers/externalversions/scylla/v1"
scyllav1listers "github.com/scylladb/scylla-operator/pkg/client/scylla/listers/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
"github.com/scylladb/scylla-operator/pkg/kubeinterfaces"
"github.com/scylladb/scylla-operator/pkg/mermaidclient"
"github.com/scylladb/scylla-operator/pkg/scheme"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -49,7 +49,7 @@ type Controller struct {
secretLister corev1listers.SecretLister
scyllaLister scyllav1listers.ScyllaClusterLister

managerClient *mermaidclient.Client
managerClient *managerclient.Client

cachesToSync []cache.InformerSynced

Expand All @@ -64,7 +64,7 @@ func NewController(
scyllaClient scyllav1client.ScyllaV1Interface,
secretInformer corev1informers.SecretInformer,
scyllaClusterInformer scyllav1informers.ScyllaClusterInformer,
managerClient *mermaidclient.Client,
managerClient *managerclient.Client,
) (*Controller, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/manager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
helpers "github.com/scylladb/scylla-operator/pkg/helpers"
"github.com/scylladb/scylla-operator/pkg/helpers"
"github.com/scylladb/scylla-operator/pkg/naming"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -45,27 +45,27 @@ func (c *Controller) getManagerState(ctx context.Context, clusterID string) (*st
}

if clusterFound {
managerRepairTasks, err := c.managerClient.ListTasks(ctx, clusterID, "repair", true, "")
managerRepairTasks, err := c.managerClient.ListTasks(ctx, clusterID, "repair", true, "", "")
if err != nil {
return nil, err
}

repairTasks = make([]*RepairTask, 0, len(managerRepairTasks.ExtendedTaskSlice))
for _, managerRepairTask := range managerRepairTasks.ExtendedTaskSlice {
repairTasks = make([]*RepairTask, 0, len(managerRepairTasks.TaskListItemSlice))
for _, managerRepairTask := range managerRepairTasks.TaskListItemSlice {
rt := &RepairTask{}
if err := rt.FromManager(managerRepairTask); err != nil {
return nil, err
}
repairTasks = append(repairTasks, rt)
}

managerBackupTasks, err := c.managerClient.ListTasks(ctx, clusterID, "backup", true, "")
managerBackupTasks, err := c.managerClient.ListTasks(ctx, clusterID, "backup", true, "", "")
if err != nil {
return nil, err
}

backupTasks = make([]*BackupTask, 0, len(managerBackupTasks.ExtendedTaskSlice))
for _, managerBackupTask := range managerBackupTasks.ExtendedTaskSlice {
backupTasks = make([]*BackupTask, 0, len(managerBackupTasks.TaskListItemSlice))
for _, managerBackupTask := range managerBackupTasks.TaskListItemSlice {
bt := &BackupTask{}
if err := bt.FromManager(managerBackupTask); err != nil {
return nil, err
Expand Down
57 changes: 35 additions & 22 deletions pkg/controller/manager/sync_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-manager/v3/pkg/managerclient"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
"github.com/scylladb/scylla-manager/v3/swagger/gen/scylla-manager/models"
"github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/mermaidclient"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/util/uuid"
)

type state struct {
Clusters []*mermaidclient.Cluster
Clusters []*managerclient.Cluster
RepairTasks []*RepairTask
BackupTasks []*BackupTask
}
Expand All @@ -39,7 +40,7 @@ func runSync(ctx context.Context, cluster *v1.ScyllaCluster, authToken string, s
// * username/password are not part of Cluster CRD
if c.AuthToken != authToken {
actions = append(actions, &updateClusterAction{
cluster: &mermaidclient.Cluster{
cluster: &managerclient.Cluster{
ID: c.ID,
Name: naming.ManagerClusterName(cluster),
Host: naming.CrossNamespaceServiceNameForCluster(cluster),
Expand All @@ -56,7 +57,7 @@ func runSync(ctx context.Context, cluster *v1.ScyllaCluster, authToken string, s
}
if !found {
actions = append(actions, &addClusterAction{
cluster: &mermaidclient.Cluster{
cluster: &managerclient.Cluster{
Host: naming.CrossNamespaceServiceNameForCluster(cluster),
Name: naming.ManagerClusterName(cluster),
AuthToken: authToken,
Expand Down Expand Up @@ -274,15 +275,15 @@ func (s stateCache) taskID(taskName string) string {
}

type action interface {
Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error
Execute(ctx context.Context, client *managerclient.Client, status *v1.ScyllaClusterStatus) error
}

type addClusterAction struct {
cluster *mermaidclient.Cluster
cluster *managerclient.Cluster
clusterID string
}

func (a *addClusterAction) Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error {
func (a *addClusterAction) Execute(ctx context.Context, client *managerclient.Client, status *v1.ScyllaClusterStatus) error {
id, err := client.CreateCluster(ctx, a.cluster)
if err != nil {
return err
Expand All @@ -298,10 +299,10 @@ func (a addClusterAction) String() string {
}

type updateClusterAction struct {
cluster *mermaidclient.Cluster
cluster *managerclient.Cluster
}

func (a *updateClusterAction) Execute(ctx context.Context, client *mermaidclient.Client, _ *v1.ScyllaClusterStatus) error {
func (a *updateClusterAction) Execute(ctx context.Context, client *managerclient.Client, _ *v1.ScyllaClusterStatus) error {
return client.UpdateCluster(ctx, a.cluster)
}

Expand All @@ -313,7 +314,7 @@ type deleteClusterAction struct {
clusterID string
}

func (a *deleteClusterAction) Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error {
func (a *deleteClusterAction) Execute(ctx context.Context, client *managerclient.Client, status *v1.ScyllaClusterStatus) error {
return client.DeleteCluster(ctx, a.clusterID)
}

Expand All @@ -327,14 +328,14 @@ type deleteTaskAction struct {
taskID string
}

func (a *deleteTaskAction) Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error {
func (a *deleteTaskAction) Execute(ctx context.Context, client *managerclient.Client, status *v1.ScyllaClusterStatus) error {
err := client.DeleteTask(ctx, a.clusterID, a.taskType, uuid.MustParse(a.taskID))

if a.taskType == "repair" {
filteredStatuses := status.Repairs[:0]
for i, repairTaskStatus := range status.Repairs {
if err != nil && repairTaskStatus.ID == a.taskID {
status.Repairs[i].Error = mermaidclient.MessageOf(err)
status.Repairs[i].Error = messageOf(err)
}
if err != nil || repairTaskStatus.ID != a.taskID {
filteredStatuses = append(filteredStatuses, repairTaskStatus)
Expand All @@ -346,7 +347,7 @@ func (a *deleteTaskAction) Execute(ctx context.Context, client *mermaidclient.Cl
filteredStatuses := status.Backups[:0]
for i, backupTaskStatus := range status.Backups {
if err != nil && backupTaskStatus.ID == a.taskID {
status.Backups[i].Error = mermaidclient.MessageOf(err)
status.Backups[i].Error = messageOf(err)
}
if err != nil || backupTaskStatus.ID != a.taskID {
filteredStatuses = append(filteredStatuses, backupTaskStatus)
Expand All @@ -364,15 +365,15 @@ func (a deleteTaskAction) String() string {

type addTaskAction struct {
clusterID string
task *mermaidclient.Task
task *managerclient.Task
taskSpec interface{}
}

func (a addTaskAction) String() string {
return fmt.Sprintf("add task %+v", a.task)
}

func (a *addTaskAction) Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error {
func (a *addTaskAction) Execute(ctx context.Context, client *managerclient.Client, status *v1.ScyllaClusterStatus) error {
id, err := client.CreateTask(ctx, a.clusterID, a.task)

if a.task.Type == "repair" {
Expand All @@ -381,7 +382,7 @@ func (a *addTaskAction) Execute(ctx context.Context, client *mermaidclient.Clien
ID: id.String(),
}
if err != nil {
rt.Error = mermaidclient.MessageOf(err)
rt.Error = messageOf(err)
}

found := false
Expand All @@ -402,7 +403,7 @@ func (a *addTaskAction) Execute(ctx context.Context, client *mermaidclient.Clien
ID: id.String(),
}
if err != nil {
bt.Error = mermaidclient.MessageOf(err)
bt.Error = messageOf(err)
}

found := false
Expand All @@ -423,23 +424,23 @@ func (a *addTaskAction) Execute(ctx context.Context, client *mermaidclient.Clien

type updateTaskAction struct {
clusterID string
task *mermaidclient.Task
task *managerclient.Task
taskSpec interface{}
}

func (a updateTaskAction) String() string {
return fmt.Sprintf("update task %+v", a.task)
}

func (a *updateTaskAction) Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error {
func (a *updateTaskAction) Execute(ctx context.Context, client *managerclient.Client, status *v1.ScyllaClusterStatus) error {
err := client.UpdateTask(ctx, a.clusterID, a.task)

if a.task.Type == "repair" {
for i, repairStatus := range status.Repairs {
if a.task.ID == repairStatus.ID {
status.Repairs[i].RepairTaskSpec = a.taskSpec.(v1.RepairTaskSpec)
if err != nil {
status.Repairs[i].Error = mermaidclient.MessageOf(err)
status.Repairs[i].Error = messageOf(err)
}
}
break
Expand All @@ -450,7 +451,7 @@ func (a *updateTaskAction) Execute(ctx context.Context, client *mermaidclient.Cl
if a.task.ID == backupStatus.ID {
status.Backups[i].BackupTaskSpec = a.taskSpec.(v1.BackupTaskSpec)
if err != nil {
status.Backups[i].Error = mermaidclient.MessageOf(err)
status.Backups[i].Error = messageOf(err)
}
}
break
Expand All @@ -459,3 +460,15 @@ func (a *updateTaskAction) Execute(ctx context.Context, client *mermaidclient.Cl

return err
}

// messageOf returns error message embedded in returned error.
func messageOf(err error) string {
err = errors.Cause(err)
switch v := err.(type) {
case interface {
GetPayload() *models.ErrorResponse
}:
return v.GetPayload().Message
}
return err.Error()
}
Loading

0 comments on commit 46ad382

Please sign in to comment.