Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add river bench benchmarking tool producing succinct output + summary #254

Merged
merged 1 commit into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add river bench benchmarking tool producing succinct output + summary
Here, add a new benchmarking tool to the main River CLI. We had an
existing one, but it hasn't been used or updated in ages, and was
written quite quickly, without too much concern for UX.

This `river bench` is user-runnable, and designed to produce output
that's succinct and easily comprehensible. Every two seconds it produces
a new line of output with the number of jobs worked during that period,
number of jobs inserted during that period, and the rough jobs per
second being complete. When the program is interrupted via `SIGINT`, it
produces one final log line indicating similar information, but
calculated across the entire run period.

    $ go run main.go bench --database-url $DATABASE_URL
    bench: jobs worked [          0 ], inserted [      50000 ], job/sec [        0.0 ] [0s]
    bench: jobs worked [      22445 ], inserted [      22000 ], job/sec [    11222.5 ] [2s]
    bench: jobs worked [      26504 ], inserted [      28000 ], job/sec [    13252.0 ] [2s]
    bench: jobs worked [      25919 ], inserted [      24000 ], job/sec [    12959.5 ] [2s]
    bench: jobs worked [      27432 ], inserted [      28000 ], job/sec [    13716.0 ] [2s]
    bench: jobs worked [      26068 ], inserted [      26000 ], job/sec [    13034.0 ] [2s]
    bench: jobs worked [      27068 ], inserted [      28000 ], job/sec [    13534.0 ] [2s]
    bench: jobs worked [      27876 ], inserted [      28000 ], job/sec [    13938.0 ] [2s]
    bench: jobs worked [      25058 ], inserted [      24000 ], job/sec [    12529.0 ] [2s]
    ^Cbench: total jobs worked [     214356 ], total jobs inserted [     264000 ], overall job/sec [    13026.7 ], running 16.455185125s

It can also run with a total duration, which will be useful if we're
trying to compare runs across branches without having to try and time it
artificially:

    $ go run main.go bench --database-url $DATABASE_URL --duration 30s
    bench: jobs worked [          0 ], inserted [      50000 ], job/sec [        0.0 ] [0s]
    bench: jobs worked [      23875 ], inserted [      24000 ], job/sec [    11937.5 ] [2s]
    bench: jobs worked [      27964 ], inserted [      28000 ], job/sec [    13982.0 ] [2s]
    bench: jobs worked [      25694 ], inserted [      26000 ], job/sec [    12847.0 ] [2s]
    bench: jobs worked [      26649 ], inserted [      26000 ], job/sec [    13324.5 ] [2s]
    bench: jobs worked [      26872 ], inserted [      28000 ], job/sec [    13436.0 ] [2s]
    bench: jobs worked [      26519 ], inserted [      26000 ], job/sec [    13259.5 ] [2s]
    bench: jobs worked [      25077 ], inserted [      24000 ], job/sec [    12538.5 ] [2s]
    bench: jobs worked [      24126 ], inserted [      26000 ], job/sec [    12063.0 ] [2s]
    bench: jobs worked [      23936 ], inserted [      22000 ], job/sec [    11968.0 ] [2s]
    bench: jobs worked [      26044 ], inserted [      28000 ], job/sec [    13022.0 ] [2s]
    bench: jobs worked [      26289 ], inserted [      26000 ], job/sec [    13144.5 ] [2s]
    bench: jobs worked [      23058 ], inserted [      22000 ], job/sec [    11529.0 ] [2s]
    bench: jobs worked [      23474 ], inserted [      24000 ], job/sec [    11737.0 ] [2s]
    bench: jobs worked [      25380 ], inserted [      26000 ], job/sec [    12690.0 ] [2s]
    bench: total jobs worked [     375743 ], total jobs inserted [     426000 ], overall job/sec [    12524.8 ], running 30.000017167s

Unlike the old benchmarking tool, we switch this one over to do job
accounting using a client subscribe channel instead of measuring it in
the worker. Measuring in the worker doesn't account for the time needed
to block in the job executor waiting for a goroutine to become available
in the completer to complete a job, making it less accurate and possibly
prone to memory overruns as a large backlog of jobs have been accounted
as completed but are actually waiting for a completer slot.

I'm not going to call this feature complete, but I think it's a step in
the right direction, and the hope is that it'll give us a reasonable way
to gutcheck new changes and see whether they cause an obvious regression
or improvement in total performance.
  • Loading branch information
brandur committed Mar 10, 2024
commit ebced7064e70d5005feb674fee7503fa35e98547
13 changes: 13 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ jobs:
run: ./river validate --database-url $DATABASE_URL
working-directory: ./cmd/river

- name: river bench
run: |
( sleep 10 && killall -SIGTERM river ) &
./river bench --database-url $DATABASE_URL
working-directory: ./cmd/river

# Bench again in fixed number of jobs mode.
- name: river bench
run: |
( sleep 10 && killall -SIGTERM river ) &
./river bench --database-url $DATABASE_URL --num-total-jobs 1_234
working-directory: ./cmd/river

- name: river migrate-down
run: ./river migrate-down --database-url $DATABASE_URL --max-steps 100
working-directory: ./cmd/river
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- The River CLI now supports `river bench` to benchmark River's job throughput against a database. [PR #254](https://github.com/riverqueue/river/pull/254).

### Changed

- Changed default client IDs to be a combination of hostname and the time which the client started. This can still be changed by specifying `Config.ID`. [PR #255](https://github.com/riverqueue/river/pull/255).
Expand Down
4 changes: 3 additions & 1 deletion cmd/river/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go 1.21.4
require (
github.com/jackc/pgx/v5 v5.5.2
github.com/riverqueue/river v0.0.17
github.com/riverqueue/river/riverdriver v0.0.17
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17
github.com/spf13/cobra v1.8.0
)
Expand All @@ -22,9 +23,10 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/riverqueue/river/riverdriver v0.0.17 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this is outdated diff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, I thought so too, but that's what go mod tidy produces. The ULID dep should be dropped next time we cut a release and can target the River CLI Go submodule against it.

github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
5 changes: 5 additions & 0 deletions cmd/river/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river v0.0.17 h1:7beHZxo1WMzhN48y1Jt7CKkkmsw+TjuLd6qCEaznm7s=
Expand All @@ -26,6 +29,8 @@ github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.17 h1:xPmTpQNBicTZ
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.17/go.mod h1:zlZKXZ6XHcbwYniSKWX2+GwFlXHTnG9pJtE/BkxK0Xc=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17 h1:iuruCNT7nkC7Z4Qzb79jcvAVniGyK+Kstsy7fKJagUU=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17/go.mod h1:kL59NW3LoPbQxPz9DQoUtDYq3Zkcpdt5CIowgeBZwFw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
Expand Down
86 changes: 81 additions & 5 deletions cmd/river/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"strconv"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/spf13/cobra"

"github.com/riverqueue/river/cmd/river/riverbench"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
Expand Down Expand Up @@ -39,14 +41,46 @@ Provides command line facilities for the River job queue.
}
}

mustMarkFlagRequired := func(cmd *cobra.Command, name string) {
mustMarkFlagRequired := func(cmd *cobra.Command, name string) { //nolint:unparam
// We just panic here because this will never happen outside of an error
// in development.
if err := cmd.MarkFlagRequired(name); err != nil {
panic(err)
}
}

// bench
{
var opts benchOpts

cmd := &cobra.Command{
Use: "bench",
Short: "Run River benchmark",
Long: `
Run a River benchmark which inserts and works jobs continually, giving a rough
idea of jobs per second and time to work a single job.

By default, the benchmark will continuously insert and work jobs in perpetuity
until interrupted by SIGINT (Ctrl^C). It can alternatively take a maximum run
duration with --duration, which takes a Go-style duration string like 1m.
Lastly, it can take --num-total-jobs, which inserts the given number of jobs
before starting the client, and works until all jobs are finished.

The database in --database-url will have its jobs table truncated, so make sure
to use a development database only.
`,
Run: func(cmd *cobra.Command, args []string) {
execHandlingError(func() (bool, error) { return bench(ctx, &opts) })
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to benchmark (should look like `postgres://...`")
cmd.Flags().DurationVar(&opts.Duration, "duration", 0, "duration after which to stop benchmark, accepting Go-style durations like 1m, 5m30s")
cmd.Flags().IntVarP(&opts.NumTotalJobs, "num-total-jobs", "n", 0, "number of jobs to insert before starting and which are worked down until finish")
cmd.Flags().BoolVarP(&opts.Verbose, "verbose", "v", false, "output additional logging verbosity")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}

// migrate-down
{
var opts migrateDownOpts
Expand All @@ -65,8 +99,8 @@ Defaults to running a single down migration. This behavior can be changed with
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 1, "Maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "Target version to migrate to (final state includes this version, but none after it)")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 1, "maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "target version to migrate to (final state includes this version, but none after it)")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}
Expand All @@ -89,8 +123,8 @@ restricted with --max-steps or --target-version.
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 0, "Maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "Target version to migrate to (final state includes this version)")
cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 0, "maximum number of steps to migrate")
cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "target version to migrate to (final state includes this version)")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}
Expand Down Expand Up @@ -151,6 +185,48 @@ func setParamIfUnset(runtimeParams map[string]string, name, val string) {
runtimeParams[name] = val
}

type benchOpts struct {
DatabaseURL string
Duration time.Duration
NumTotalJobs int
Verbose bool
}

func (o *benchOpts) validate() error {
if o.DatabaseURL == "" {
return errors.New("database URL cannot be empty")
}

return nil
}

func bench(ctx context.Context, opts *benchOpts) (bool, error) {
if err := opts.validate(); err != nil {
return false, err
}

dbPool, err := openDBPool(ctx, opts.DatabaseURL)
if err != nil {
return false, err
}
defer dbPool.Close()

var logger *slog.Logger
if opts.Verbose {
logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
} else {
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn}))
}

benchmarker := riverbench.NewBenchmarker(riverpgxv5.New(dbPool), logger, opts.Duration, opts.NumTotalJobs)

if err := benchmarker.Run(ctx); err != nil {
return false, err
}

return true, nil
}

type migrateDownOpts struct {
DatabaseURL string
MaxSteps int
Expand Down
Loading
Loading