From bc35780a4d9e27551bddde7d3499fdae7aac35a5 Mon Sep 17 00:00:00 2001 From: Mason Malone Date: Thu, 3 Oct 2024 18:48:48 -0700 Subject: [PATCH] feat(cli): add DB CLI for local development This adds a small CLI tool for developers to use when working with the DB locally. It provides two functions: 1. Explicitly migrate the DB (which is normally only done when the workflow controller starts) to help test migrations: ``` $ go run ./hack/db migrate INFO[0000] Migrating database schema clusterName=default dbType=postgres ``` 2. Insert randomly-generated archived workflows into the DB, which is intended to help test query performance (e.g. https://github.com/argoproj/argo-workflows/issues/13601) ``` $ time go run ./hack/db/main.go fake-archived-workflows --rows 100000 Using seed 2600415997737113470 Clusters: [cx5j6rrzm5vnkfqqr4d] Namespaces: [bs98pxsfhs9z v95848 hq76xbj4kq7vrdp49vm ghzj6vcrth 262wb8w2b8v2wd2p2p9] Inserted 100000 rows real 17m56.554s user 1m53.833s sys 0m43.581s ``` This is obviously not as efficient as it could be, but it's only intended to be run on an adhoc basis when manually testing query performance. To make it fast, we'd probably have to switch to direct SQL inserts, which would couple this script to the logic in `persist/sqldb/workflow_archive.go`. Signed-off-by: Mason Malone <651224+MasonM@users.noreply.github.com> # Please enter the commit message for your changes. Lines starting # with '#' will be ignored, and an empty message aborts the commit. # # Author: Mason Malone # Date: Thu Oct 3 18:48:48 2024 -0700 # # On branch feat-cli-db-generator2 # Changes to be committed: # modified: docs/running-locally.md # new file: hack/db/main.go # # Untracked files: # notes.md # test.json # tmp # --- docs/running-locally.md | 25 ++++++++ hack/db/main.go | 135 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 hack/db/main.go diff --git a/docs/running-locally.md b/docs/running-locally.md index 8e5c53ced90d..5825c81f88f6 100644 --- a/docs/running-locally.md +++ b/docs/running-locally.md @@ -206,6 +206,31 @@ Tests often fail: that's good. To diagnose failure: If tests run slowly or time out, factory reset your Kubernetes cluster. +### Database Tooling + +The `go run ./hack/db` CLI provides a few useful commands for working with the DB locally: + +```bash +$ go run ./hack/db +CLI for developers to use when working on the DB locally + +Usage: + db [command] + +Available Commands: + completion Generate the autocompletion script for the specified shell + fake-archived-workflows Insert randomly-generated workflows into argo_archived_workflows, for testing purposes + help Help about any command + migrate Force DB migration for given cluster/table + +Flags: + -d, --driver string Database type (mysql or postgresql) (default "postgresql") + -c, --dsn string DSN connection string (default "postgres://postgres@localhost:5432/postgres") + -h, --help help for db + +Use "db [command] --help" for more information about a command. +``` + ## Committing Before you commit code and raise a PR, always run: diff --git a/hack/db/main.go b/hack/db/main.go new file mode 100644 index 000000000000..82fafa253e4a --- /dev/null +++ b/hack/db/main.go @@ -0,0 +1,135 @@ +package main + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + "github.com/upper/db/v4" + mysqladp "github.com/upper/db/v4/adapter/mysql" + postgresqladp "github.com/upper/db/v4/adapter/postgresql" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/uuid" + + "github.com/argoproj/argo-workflows/v3/persist/sqldb" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/util/instanceid" +) + +var session db.Session + +func main() { + var dbtype, dsn string + rootCmd := &cobra.Command{ + Use: "db", + Short: "CLI for developers to use when working on the DB locally", + } + rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) (err error) { + session, err = createDBSession(dbtype, dsn) + return + } + rootCmd.PersistentFlags().StringVarP(&dbtype, "driver", "d", "postgresql", "Database type (mysql or postgresql)") + rootCmd.PersistentFlags().StringVarP(&dsn, "dsn", "c", "postgres://postgres@localhost:5432/postgres", "DSN connection string") + rootCmd.AddCommand(NewMigrateCommand()) + rootCmd.AddCommand(NewFakeDataCommand()) + + if err := rootCmd.Execute(); err != nil { + os.Exit(1) + } +} + +func NewMigrateCommand() *cobra.Command { + var cluster, table string + migrationCmd := &cobra.Command{ + Use: "migrate", + Short: "Force DB migration for given cluster/table", + RunE: func(cmd *cobra.Command, args []string) error { + return sqldb.NewMigrate(session, cluster, table).Exec(context.Background()) + }, + } + migrationCmd.Flags().StringVar(&cluster, "cluster", "default", "Cluster name") + migrationCmd.Flags().StringVar(&table, "table", "argo_workflows", "Table name") + return migrationCmd +} + +func NewFakeDataCommand() *cobra.Command { + var template string + var seed, rows, numClusters, numNamespaces int + fakeDataCmd := &cobra.Command{ + Use: "fake-archived-workflows", + Short: "Insert randomly-generated workflows into argo_archived_workflows, for testing purposes", + RunE: func(cmd *cobra.Command, args []string) error { + rand.Seed(int64(seed)) + clusters := randomStringArray(numClusters) + namespaces := randomStringArray(numNamespaces) + fmt.Printf("Using seed %d\nClusters: %v\nNamespaces: %v\n", seed, clusters, namespaces) + + instanceIDService := instanceid.NewService("") + wfTmpl := wfv1.MustUnmarshalWorkflow(template) + + for i := 0; i < rows; i++ { + wf := randomizeWorkflow(wfTmpl, namespaces) + cluster := clusters[rand.Intn(len(clusters))] + wfArchive := sqldb.NewWorkflowArchive(session, cluster, "", instanceIDService) + if err := wfArchive.ArchiveWorkflow(wf); err != nil { + return err + } + } + fmt.Printf("Inserted %d rows\n", rows) + return nil + }, + } + fakeDataCmd.Flags().StringVar(&template, "template", "@workflow/controller/testdata/dag-exhausted-retries-xfail.yaml", "Workflow definition to use as a template") + fakeDataCmd.Flags().IntVar(&seed, "seed", rand.Int(), "Random number seed") + fakeDataCmd.Flags().IntVar(&rows, "rows", 10, "Number of rows to insert") + fakeDataCmd.Flags().IntVar(&numClusters, "clusters", 1, "Number of cluster names to autogenerate") + fakeDataCmd.Flags().IntVar(&numNamespaces, "namespaces", 5, "Number of namespaces to autogenerate") + return fakeDataCmd +} + +func createDBSession(dbtype, dsn string) (db.Session, error) { + if dbtype == "postgresql" { + url, err := postgresqladp.ParseURL(dsn) + if err != nil { + return nil, err + } + return postgresqladp.Open(url) + } else { + url, err := mysqladp.ParseURL(dsn) + if err != nil { + return nil, err + } + return mysqladp.Open(url) + } +} + +func randomStringArray(length int) []string { + var result []string + for i := 0; i < length; i++ { + result = append(result, rand.String(rand.IntnRange(5, 20))) + } + return result +} + +func randomPhase() wfv1.WorkflowPhase { + phases := []wfv1.WorkflowPhase{ + wfv1.WorkflowSucceeded, + wfv1.WorkflowFailed, + wfv1.WorkflowError, + } + return phases[rand.Intn(len(phases))] +} + +func randomizeWorkflow(wfTmpl *wfv1.Workflow, namespaces []string) *wfv1.Workflow { + wf := wfTmpl.DeepCopy() + wf.Name = rand.String(rand.IntnRange(10, 30)) + wf.Namespace = namespaces[rand.Intn(len(namespaces))] + wf.UID = uuid.NewUUID() + wf.Status.Phase = randomPhase() + if wf.Labels == nil { + wf.Labels = map[string]string{} + } + wf.Labels["workflows.argoproj.io/phase"] = string(wf.Status.Phase) + return wf +}