Skip to content

Commit

Permalink
Add river bench benchmarking tool producing succinct output + summary
Browse files Browse the repository at this point in the history
Here, add a new benchmarking tool to the main River CLI. We had an
existing one, but it hasn't been used or updated in ages, and was
written quite quickly, without too much concern for UX.

This `river bench` is user-runnable, and designed to produce output
that's succinct and easily comprehensible. Every two seconds it produces
a new line of output with the number of jobs worked during that period,
number of jobs inserted during that period, and the rough jobs per
second being complete. When the program is interrupted via `SIGINT`, it
produces one final log line indicating similar information, but
calculated across the entire run period.

    $ go run main.go bench --database-url $DATABASE_URL
    bench: jobs worked [          0 ], inserted [      50000 ], job/sec [        0.0 ] [0s]
    bench: jobs worked [      22445 ], inserted [      22000 ], job/sec [    11222.5 ] [2s]
    bench: jobs worked [      26504 ], inserted [      28000 ], job/sec [    13252.0 ] [2s]
    bench: jobs worked [      25919 ], inserted [      24000 ], job/sec [    12959.5 ] [2s]
    bench: jobs worked [      27432 ], inserted [      28000 ], job/sec [    13716.0 ] [2s]
    bench: jobs worked [      26068 ], inserted [      26000 ], job/sec [    13034.0 ] [2s]
    bench: jobs worked [      27068 ], inserted [      28000 ], job/sec [    13534.0 ] [2s]
    bench: jobs worked [      27876 ], inserted [      28000 ], job/sec [    13938.0 ] [2s]
    bench: jobs worked [      25058 ], inserted [      24000 ], job/sec [    12529.0 ] [2s]
    ^Cbench: total jobs worked [     214356 ], total jobs inserted [     264000 ], overall job/sec [    13026.7 ], running 16.455185125s

It can also run with a total duration, which will be useful if we're
trying to compare runs across branches without having to try and time it
artificially:

    $ go run main.go bench --database-url $DATABASE_URL --duration 30s
    bench: jobs worked [          0 ], inserted [      50000 ], job/sec [        0.0 ] [0s]
    bench: jobs worked [      23875 ], inserted [      24000 ], job/sec [    11937.5 ] [2s]
    bench: jobs worked [      27964 ], inserted [      28000 ], job/sec [    13982.0 ] [2s]
    bench: jobs worked [      25694 ], inserted [      26000 ], job/sec [    12847.0 ] [2s]
    bench: jobs worked [      26649 ], inserted [      26000 ], job/sec [    13324.5 ] [2s]
    bench: jobs worked [      26872 ], inserted [      28000 ], job/sec [    13436.0 ] [2s]
    bench: jobs worked [      26519 ], inserted [      26000 ], job/sec [    13259.5 ] [2s]
    bench: jobs worked [      25077 ], inserted [      24000 ], job/sec [    12538.5 ] [2s]
    bench: jobs worked [      24126 ], inserted [      26000 ], job/sec [    12063.0 ] [2s]
    bench: jobs worked [      23936 ], inserted [      22000 ], job/sec [    11968.0 ] [2s]
    bench: jobs worked [      26044 ], inserted [      28000 ], job/sec [    13022.0 ] [2s]
    bench: jobs worked [      26289 ], inserted [      26000 ], job/sec [    13144.5 ] [2s]
    bench: jobs worked [      23058 ], inserted [      22000 ], job/sec [    11529.0 ] [2s]
    bench: jobs worked [      23474 ], inserted [      24000 ], job/sec [    11737.0 ] [2s]
    bench: jobs worked [      25380 ], inserted [      26000 ], job/sec [    12690.0 ] [2s]
    bench: total jobs worked [     375743 ], total jobs inserted [     426000 ], overall job/sec [    12524.8 ], running 30.000017167s

Unlike the old benchmarking tool, we switch this one over to do job
accounting using a client subscribe channel instead of measuring it in
the worker. Measuring in the worker doesn't account for the time needed
to block in the job executor waiting for a goroutine to become available
in the completer to complete a job, making it less accurate and possibly
prone to memory overruns as a large backlog of jobs have been accounted
as completed but are actually waiting for a completer slot.

I'm not going to call this feature complete, but I think it's a step in
the right direction, and the hope is that it'll give us a reasonable way
to gutcheck new changes and see whether they cause an obvious regression
or improvement in total performance.
  • Loading branch information
brandur committed Mar 8, 2024
1 parent 35ee45e commit e453cbd
Show file tree
Hide file tree
Showing 6 changed files with 409 additions and 204 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ jobs:
run: ./river validate --database-url $DATABASE_URL
working-directory: ./cmd/river

- name: river bench
run: |
( sleep 10 && killall -SIGTERM river ) &
./river bench --database-url $DATABASE_URL
working-directory: ./cmd/river

- name: river migrate-down
run: ./river migrate-down --database-url $DATABASE_URL --max-steps 100
working-directory: ./cmd/river
Expand Down
4 changes: 3 additions & 1 deletion cmd/river/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go 1.21.4
require (
github.com/jackc/pgx/v5 v5.5.2
github.com/riverqueue/river v0.0.17
github.com/riverqueue/river/riverdriver v0.0.17
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17
github.com/spf13/cobra v1.8.0
)
Expand All @@ -22,9 +23,10 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/riverqueue/river/riverdriver v0.0.17 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
5 changes: 5 additions & 0 deletions cmd/river/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river v0.0.17 h1:7beHZxo1WMzhN48y1Jt7CKkkmsw+TjuLd6qCEaznm7s=
Expand All @@ -26,6 +29,8 @@ github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.17 h1:xPmTpQNBicTZ
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.17/go.mod h1:zlZKXZ6XHcbwYniSKWX2+GwFlXHTnG9pJtE/BkxK0Xc=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17 h1:iuruCNT7nkC7Z4Qzb79jcvAVniGyK+Kstsy7fKJagUU=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17/go.mod h1:kL59NW3LoPbQxPz9DQoUtDYq3Zkcpdt5CIowgeBZwFw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
Expand Down
79 changes: 78 additions & 1 deletion cmd/river/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"strconv"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/spf13/cobra"

"github.com/riverqueue/river/cmd/river/riverbench"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
Expand Down Expand Up @@ -39,14 +41,39 @@ Provides command line facilities for the River job queue.
}
}

mustMarkFlagRequired := func(cmd *cobra.Command, name string) {
mustMarkFlagRequired := func(cmd *cobra.Command, name string) { //nolint:unparam
// We just panic here because this will never happen outside of an error
// in development.
if err := cmd.MarkFlagRequired(name); err != nil {
panic(err)
}
}

// bench
{
var opts benchOpts

cmd := &cobra.Command{
Use: "bench",
Short: "Run River benchmark",
Long: `
Run a River benchmark which inserts and works jobs continually, giving a rough
idea of jobs per second and time to work a single job.
The database in --database-url will have its jobs table truncated, so make sure
to use a development database only.
`,
Run: func(cmd *cobra.Command, args []string) {
execHandlingError(func() (bool, error) { return bench(ctx, &opts) })
},
}
cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to benchmark (should look like `postgres://...`")
cmd.Flags().StringVar(&opts.Duration, "duration", "", "Duration after which to stop benchmark, accepting Go-style durations like 1m, 5m30s")
cmd.Flags().BoolVarP(&opts.Verbose, "verbose", "v", false, "Output additional logging verbosity")
mustMarkFlagRequired(cmd, "database-url")
rootCmd.AddCommand(cmd)
}

// migrate-down
{
var opts migrateDownOpts
Expand Down Expand Up @@ -151,6 +178,56 @@ func setParamIfUnset(runtimeParams map[string]string, name, val string) {
runtimeParams[name] = val
}

type benchOpts struct {
DatabaseURL string
Duration string
Verbose bool
}

func (o *benchOpts) validate() error {
if o.DatabaseURL == "" {
return errors.New("database URL cannot be empty")
}

return nil
}

func bench(ctx context.Context, opts *benchOpts) (bool, error) {
if err := opts.validate(); err != nil {
return false, err
}

dbPool, err := openDBPool(ctx, opts.DatabaseURL)
if err != nil {
return false, err
}
defer dbPool.Close()

var logger *slog.Logger
if opts.Verbose {
logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
} else {
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn}))
}

var duration time.Duration
if opts.Duration != "" {
var err error
duration, err = time.ParseDuration(opts.Duration)
if err != nil {
return false, fmt.Errorf("error parsing duration: %w", err)
}
}

benchmarker := riverbench.NewBenchmarker(riverpgxv5.New(dbPool), logger, duration)

if err := benchmarker.Run(ctx); err != nil {
return false, err
}

return true, nil
}

type migrateDownOpts struct {
DatabaseURL string
MaxSteps int
Expand Down
Loading

0 comments on commit e453cbd

Please sign in to comment.