Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk unique insertion, uniqueness with subset of args #590

Merged
merged 6 commits into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,37 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ Version 0.12.0 contains a new database migration, version 6. See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version:

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

If not using River's internal migration system, the raw SQL can alternatively be dumped with:

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-get --version 6 --up > river6.up.sql
river migrate-get --version 6 --down > river6.down.sql
```

The migration **includes a new index**. Users with a very large job table may want to consider raising the index separately using `CONCURRENTLY` (which must be run outside of a transaction), then run `river migrate-up` to finalize the process (it will tolerate an index that already exists):

```sql
ALTER TABLE river_job ADD COLUMN unique_states BIT(8);

CREATE UNIQUE INDEX CONCURRENTLY river_job_unique_idx ON river_job (unique_key)
WHERE unique_key IS NOT NULL
AND unique_states IS NOT NULL
AND river_job_state_in_bitmask(unique_states, state);
```

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

## Added

- `rivertest.WorkContext`, a test function that can be used to initialize a context to test a `JobArgs.Work` implementation that will have a client set to context for use with `river.ClientFromContext`. [PR #526](https://github.com/riverqueue/river/pull/526).
Expand Down Expand Up @@ -35,6 +66,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
}
```

- Unique jobs have been improved to allow bulk insertion of unique jobs via `InsertMany` / `InsertManyTx`, and to allow customizing the `ByState` list to add or remove certain states. This enables users to expand the set of unique states to also include `cancelled` and `discarded` jobs, or to remove `retryable` from uniqueness consideration. This updated implementation maintains the speed advantage of the newer index-backed uniqueness system, while allowing some flexibility in which job states.

Unique jobs utilizing `ByArgs` can now also opt to have a subset of the job's arguments considered for uniqueness. For example, you could choose to consider only the `customer_id` field while ignoring the `trace_id` field:

```go
type MyJobArgs {
CustomerID string `json:"customer_id" river:"unique`
TraceID string `json:"trace_id"`
}
```

Any fields considered in uniqueness are also sorted alphabetically in order to guarantee a consistent result, even if the encoded JSON isn't sorted consistently. For example `encoding/json` encodes struct fields in their defined order, so merely reordering struct fields would previously have been enough to cause a new job to not be considered identical to a pre-existing one with different JSON order.

The `UniqueOpts` type also gains an `ExcludeKind` option for cases where uniqueness needs to be guaranteed across multiple job types.

In-flight unique jobs using the previous designs will continue to be executed successfully with these changes, so there should be no need for downtime as part of the migration. However the v6 migration adds a new unique job index while also removing the old one, so users with in-flight unique jobs may also wish to avoid removing the old index until the new River release has been deployed in order to guarantee that jobs aren't duplicated by old River code once that index is removed.

**Deprecated**: The original unique jobs implementation which relied on advisory locks has been deprecated, but not yet removed. The only way to trigger this old code path is with a single insert (`Insert`/`InsertTx`) and using `UniqueOpts.ByState` with a custom list of states that omits some of the now-required states for unique jobs. Specifically, `pending`, `scheduled`, `available`, and `running` can not be removed from the `ByState` list with the new implementation. These are included in the default list so only the places which customize this attribute need to be updated to opt into the new (much faster) unique jobs. The advisory lock unique implementation will be removed in an upcoming release.

[PR #590](https://github.com/riverqueue/river/pull/590).

- **Deprecated**: The `MigrateTx` method of `rivermigrate` has been deprecated. It turns out there are certain combinations of schema changes which cannot be run within a single transaction, and the migrator now prefers to run each migration in its own transaction, one-at-a-time. `MigrateTx` will be removed in future version.

- The migrator now produces a better error in case of a non-existent migration line including suggestions for known migration lines that are similar in name to the invalid one. [PR #558](https://github.com/riverqueue/river/pull/558).
Expand Down
90 changes: 52 additions & 38 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ type Config struct {
// only 32 bits of number space for advisory lock hashes, so it makes
// internally conflicting River-generated keys more likely.
//
// Advisory locks are currently only used for the fallback/slow path of
// unique job insertion where finalized states are included in a ByState
// configuration.
// Advisory locks are currently only used for the deprecated fallback/slow
// path of unique job insertion when pending, scheduled, available, or running
// are omitted from a customized ByState configuration.
AdvisoryLockPrefix int32
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we complete the deprecation and eventual removal of the advisory lock based unique jobs, would you want to just nuke this config altogether? It's possible we could want to use advisory locks again at some point but it'd be easy enough to re-add.


// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
Expand Down Expand Up @@ -346,7 +346,7 @@ type Client[TTx any] struct {
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter
uniqueInserter *dbunique.UniqueInserter // deprecated fallback path for unique job insertion

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
Expand Down Expand Up @@ -1162,7 +1162,7 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts, bulk bool) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
Expand Down Expand Up @@ -1225,6 +1225,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
}

insertParams := &riverdriver.JobInsertFastParams{
Args: args,
CreatedAt: createdAt,
EncodedArgs: json.RawMessage(encodedArgs),
Kind: args.Kind(),
Expand All @@ -1235,6 +1236,22 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
State: rivertype.JobStateAvailable,
Tags: tags,
}
var returnUniqueOpts *dbunique.UniqueOpts
if !uniqueOpts.isEmpty() {
if uniqueOpts.isV1() {
if bulk {
return nil, nil, errors.New("bulk inserts do not support advisory lock uniqueness and cannot remove required states")
}
returnUniqueOpts = (*dbunique.UniqueOpts)(&uniqueOpts)
} else {
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
insertParams.UniqueKey, err = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams)
if err != nil {
return nil, nil, err
}
insertParams.UniqueStates = internalUniqueOpts.StateBitmask()
}
}

switch {
case !insertOpts.ScheduledAt.IsZero():
Expand All @@ -1253,7 +1270,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
insertParams.State = rivertype.JobStatePending
}

return insertParams, (*dbunique.UniqueOpts)(&uniqueOpts), nil
return insertParams, returnUniqueOpts, nil
}

var errNoDriverDBPool = errors.New("driver must have non-nil database pool to use non-transactional methods like Insert and InsertMany (try InsertTx or InsertManyTx instead")
Expand All @@ -1273,7 +1290,7 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
return nil, errNoDriverDBPool
}

return c.insert(ctx, c.driver.GetExecutor(), args, opts)
return c.insert(ctx, c.driver.GetExecutor(), args, opts, false)
}

// InsertTx inserts a new job with the provided args on the given transaction.
Expand All @@ -1294,15 +1311,15 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
// transactions, the job will not be worked until the transaction has committed,
// and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts)
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts, false)
}

func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts, bulk bool) (*rivertype.JobInsertResult, error) {
if err := c.validateJobArgs(args); err != nil {
return nil, err
}

params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts)
params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts, bulk)
if err != nil {
return nil, err
}
Expand All @@ -1313,9 +1330,23 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
}
defer tx.Rollback(ctx)

jobInsertRes, err := c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
if err != nil {
return nil, err
// TODO: consolidate insertion paths for single + multi, remove deprecated uniqueness design
var jobInsertRes *riverdriver.JobInsertFastResult
if uniqueOpts == nil {
jobInsertRes, err = tx.JobInsertFast(ctx, params)
if err != nil {
return nil, err
}
} else {
if bulk {
return nil, errors.New("bulk inserts do not support advisory lock uniqueness")
}
// Old deprecated advisory lock route
c.baseService.Logger.WarnContext(ctx, "Using deprecated advisory lock uniqueness for job insert")
jobInsertRes, err = c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
if err != nil {
return nil, err
}
}

if err := c.maybeNotifyInsert(ctx, tx, params.State, params.Queue); err != nil {
Expand All @@ -1325,7 +1356,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

return jobInsertRes, nil
return (*rivertype.JobInsertResult)(jobInsertRes), nil
}

// InsertManyParams encapsulates a single job combined with insert options for
Expand Down Expand Up @@ -1431,8 +1462,8 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx,
}

return sliceutil.Map(jobRows,
func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult {
return &rivertype.JobInsertResult{Job: jobRow}
func(result *riverdriver.JobInsertFastResult) *rivertype.JobInsertResult {
return (*rivertype.JobInsertResult)(result)
},
), nil
}
Expand All @@ -1450,20 +1481,12 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
return nil, err
}

if param.InsertOpts != nil {
// UniqueOpts aren't supported for batch inserts because they use PG
// advisory locks to work, and taking many locks simultaneously could
// easily lead to contention and deadlocks.
if !param.InsertOpts.UniqueOpts.isEmpty() {
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
insertParamsItem, _, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, true)
if err != nil {
return nil, err
}

insertParams[i] = insertParamsItem
}

return insertParams, nil
Expand Down Expand Up @@ -1579,20 +1602,11 @@ func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverd
return nil, err
}

if param.InsertOpts != nil {
// UniqueOpts aren't support for batch inserts because they use PG
// advisory locks to work, and taking many locks simultaneously
// could easily lead to contention and deadlocks.
if !param.InsertOpts.UniqueOpts.isEmpty() {
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
insertParamsItem, _, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, true)
if err != nil {
return nil, err
}
insertParams[i] = insertParamsItem
}

return insertParams, nil
Expand Down
Loading
Loading