forked from argoproj/argo-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(cli): add DB CLI for local development
This adds a small CLI tool for developers to use when working with the DB locally. It provides two functions: 1. Explicitly migrate the DB (which is normally only done when the workflow controller starts) to help test migrations: ``` $ go run ./hack/db migrate INFO[0000] Migrating database schema clusterName=default dbType=postgres ``` 2. Insert randomly-generated archived workflows into the DB, which is intended to help test query performance (e.g. argoproj#13601) ``` $ time go run ./hack/db/main.go fake-archived-workflows --rows 100000 Using seed 2600415997737113470 Clusters: [cx5j6rrzm5vnkfqqr4d] Namespaces: [bs98pxsfhs9z v95848 hq76xbj4kq7vrdp49vm ghzj6vcrth 262wb8w2b8v2wd2p2p9] Inserted 100000 rows real 17m56.554s user 1m53.833s sys 0m43.581s ``` This is obviously not as efficient as it could be, but it's only intended to be run on an adhoc basis when manually testing query performance. To make it fast, we'd probably have to switch to direct SQL inserts, which would couple this script to the logic in `persist/sqldb/workflow_archive.go`. Signed-off-by: Mason Malone <651224+MasonM@users.noreply.github.com> # Please enter the commit message for your changes. Lines starting # with '#' will be ignored, and an empty message aborts the commit. # # Author: Mason Malone <mmalone@adobe.com> # Date: Thu Oct 3 18:48:48 2024 -0700 # # On branch feat-cli-db-generator2 # Changes to be committed: # modified: docs/running-locally.md # new file: hack/db/main.go # # Untracked files: # notes.md # test.json # tmp #
- Loading branch information
Showing
2 changed files
with
160 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"os" | ||
|
||
"github.com/spf13/cobra" | ||
"github.com/upper/db/v4" | ||
mysqladp "github.com/upper/db/v4/adapter/mysql" | ||
postgresqladp "github.com/upper/db/v4/adapter/postgresql" | ||
"k8s.io/apimachinery/pkg/util/rand" | ||
"k8s.io/apimachinery/pkg/util/uuid" | ||
|
||
"github.com/argoproj/argo-workflows/v3/persist/sqldb" | ||
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" | ||
"github.com/argoproj/argo-workflows/v3/util/instanceid" | ||
) | ||
|
||
var session db.Session | ||
|
||
func main() { | ||
var dbtype, dsn string | ||
rootCmd := &cobra.Command{ | ||
Use: "db", | ||
Short: "CLI for developers to use when working on the DB locally", | ||
} | ||
rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) (err error) { | ||
session, err = createDBSession(dbtype, dsn) | ||
return | ||
} | ||
rootCmd.PersistentFlags().StringVarP(&dbtype, "driver", "d", "postgresql", "Database type (mysql or postgresql)") | ||
rootCmd.PersistentFlags().StringVarP(&dsn, "dsn", "c", "postgres://postgres@localhost:5432/postgres", "DSN connection string") | ||
rootCmd.AddCommand(NewMigrateCommand()) | ||
rootCmd.AddCommand(NewFakeDataCommand()) | ||
|
||
if err := rootCmd.Execute(); err != nil { | ||
os.Exit(1) | ||
} | ||
} | ||
|
||
func NewMigrateCommand() *cobra.Command { | ||
var cluster, table string | ||
migrationCmd := &cobra.Command{ | ||
Use: "migrate", | ||
Short: "Force DB migration for given cluster/table", | ||
RunE: func(cmd *cobra.Command, args []string) error { | ||
return sqldb.NewMigrate(session, cluster, table).Exec(context.Background()) | ||
}, | ||
} | ||
migrationCmd.Flags().StringVar(&cluster, "cluster", "default", "Cluster name") | ||
migrationCmd.Flags().StringVar(&table, "table", "argo_workflows", "Table name") | ||
return migrationCmd | ||
} | ||
|
||
func NewFakeDataCommand() *cobra.Command { | ||
var template string | ||
var seed, rows, numClusters, numNamespaces int | ||
fakeDataCmd := &cobra.Command{ | ||
Use: "fake-archived-workflows", | ||
Short: "Insert randomly-generated workflows into argo_archived_workflows, for testing purposes", | ||
RunE: func(cmd *cobra.Command, args []string) error { | ||
rand.Seed(int64(seed)) | ||
clusters := randomStringArray(numClusters) | ||
namespaces := randomStringArray(numNamespaces) | ||
fmt.Printf("Using seed %d\nClusters: %v\nNamespaces: %v\n", seed, clusters, namespaces) | ||
|
||
instanceIDService := instanceid.NewService("") | ||
wfTmpl := wfv1.MustUnmarshalWorkflow(template) | ||
|
||
for i := 0; i < rows; i++ { | ||
wf := randomizeWorkflow(wfTmpl, namespaces) | ||
cluster := clusters[rand.Intn(len(clusters))] | ||
wfArchive := sqldb.NewWorkflowArchive(session, cluster, "", instanceIDService) | ||
if err := wfArchive.ArchiveWorkflow(wf); err != nil { | ||
return err | ||
} | ||
} | ||
fmt.Printf("Inserted %d rows\n", rows) | ||
return nil | ||
}, | ||
} | ||
fakeDataCmd.Flags().StringVar(&template, "template", "@workflow/controller/testdata/dag-exhausted-retries-xfail.yaml", "Workflow definition to use as a template") | ||
fakeDataCmd.Flags().IntVar(&seed, "seed", rand.Int(), "Random number seed") | ||
fakeDataCmd.Flags().IntVar(&rows, "rows", 10, "Number of rows to insert") | ||
fakeDataCmd.Flags().IntVar(&numClusters, "clusters", 1, "Number of cluster names to autogenerate") | ||
fakeDataCmd.Flags().IntVar(&numNamespaces, "namespaces", 5, "Number of namespaces to autogenerate") | ||
return fakeDataCmd | ||
} | ||
|
||
func createDBSession(dbtype, dsn string) (db.Session, error) { | ||
if dbtype == "postgresql" { | ||
url, err := postgresqladp.ParseURL(dsn) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return postgresqladp.Open(url) | ||
} else { | ||
url, err := mysqladp.ParseURL(dsn) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return mysqladp.Open(url) | ||
} | ||
} | ||
|
||
func randomStringArray(length int) []string { | ||
var result []string | ||
for i := 0; i < length; i++ { | ||
result = append(result, rand.String(rand.IntnRange(5, 20))) | ||
} | ||
return result | ||
} | ||
|
||
func randomPhase() wfv1.WorkflowPhase { | ||
phases := []wfv1.WorkflowPhase{ | ||
wfv1.WorkflowSucceeded, | ||
wfv1.WorkflowFailed, | ||
wfv1.WorkflowError, | ||
} | ||
return phases[rand.Intn(len(phases))] | ||
} | ||
|
||
func randomizeWorkflow(wfTmpl *wfv1.Workflow, namespaces []string) *wfv1.Workflow { | ||
wf := wfTmpl.DeepCopy() | ||
wf.Name = rand.String(rand.IntnRange(10, 30)) | ||
wf.Namespace = namespaces[rand.Intn(len(namespaces))] | ||
wf.UID = uuid.NewUUID() | ||
wf.Status.Phase = randomPhase() | ||
if wf.Labels == nil { | ||
wf.Labels = map[string]string{} | ||
} | ||
wf.Labels["workflows.argoproj.io/phase"] = string(wf.Status.Phase) | ||
return wf | ||
} |