Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `NeverSchedule` returns a `PeriodicSchedule` that never runs. This can be used to effectively disable the reindexer or any other maintenance service. [PR #718](https://github.com/riverqueue/river/pull/718).
- Add `SkipUnknownJobCheck` client config option to skip job arg worker validation. [PR #731](https://github.com/riverqueue/river/pull/731).

### Changed

Expand Down
20 changes: 16 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,17 @@ type Config struct {
// Defaults to DefaultRetryPolicy.
RetryPolicy ClientRetryPolicy

// SkipUnknownJobCheck is a flag to control whether the client should skip
// checking to see if a registered worker exists in the client's worker bundle
// for a job arg prior to insertion.
//
// This can be set to true to allow a client to insert jobs which are
// intended to be worked by a different client which effectively makes
// the client's insertion behavior mimic that of an insert-only client.
//
// Defaults to false.
SkipUnknownJobCheck bool

// TestOnly can be set to true to disable certain features that are useful
// in production, but which may be harmful to tests, in ways like having the
// effect of making them slower. It should not be used outside of test
Expand Down Expand Up @@ -485,6 +496,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
ReindexerSchedule: config.ReindexerSchedule,
RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter),
RetryPolicy: retryPolicy,
SkipUnknownJobCheck: config.SkipUnknownJobCheck,
TestOnly: config.TestOnly,
Workers: config.Workers,
WorkerMiddleware: config.WorkerMiddleware,
Expand Down Expand Up @@ -1687,11 +1699,11 @@ func (c *Client[TTx]) notifyQueuePauseOrResume(ctx context.Context, tx riverdriv
}

// Validates job args prior to insertion. Currently, verifies that a worker to
// handle the kind is registered in the configured workers bundle. An
// insert-only client doesn't require a workers bundle be configured though, so
// no validation occurs if one wasn't.
// handle the kind is registered in the configured workers bundle.
// This validation is skipped if the client is configured as an insert-only (with no workers)
// or if the client is configured to skip unknown job kinds.
func (c *Client[TTx]) validateJobArgs(args JobArgs) error {
if c.config.Workers == nil {
if c.config.Workers == nil || c.config.SkipUnknownJobCheck {
return nil
}

Expand Down
79 changes: 79 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,17 @@ func Test_Client_Insert(t *testing.T) {
_, err := client.Insert(ctx, &unregisteredJobArgs{}, nil)
require.NoError(t, err)
})

t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

client.config.SkipUnknownJobCheck = true

_, err := client.Insert(ctx, &unregisteredJobArgs{}, nil)
require.NoError(t, err)
})
}

func Test_Client_InsertTx(t *testing.T) {
Expand Down Expand Up @@ -1751,6 +1762,17 @@ func Test_Client_InsertTx(t *testing.T) {
_, err := client.InsertTx(ctx, bundle.tx, &unregisteredJobArgs{}, nil)
require.NoError(t, err)
})

t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

client.config.SkipUnknownJobCheck = true

_, err := client.InsertTx(ctx, bundle.tx, &unregisteredJobArgs{}, nil)
require.NoError(t, err)
})
}

func Test_Client_InsertManyFast(t *testing.T) {
Expand Down Expand Up @@ -1959,6 +1981,19 @@ func Test_Client_InsertManyFast(t *testing.T) {
require.NoError(t, err)
})

t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

client.config.SkipUnknownJobCheck = true

_, err := client.InsertManyFast(ctx, []InsertManyParams{
{Args: unregisteredJobArgs{}},
})
require.NoError(t, err)
})

t.Run("ErrorsOnInsertOptsWithoutRequiredUniqueStates", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -2114,6 +2149,19 @@ func Test_Client_InsertManyFastTx(t *testing.T) {
require.NoError(t, err)
})

t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

client.config.SkipUnknownJobCheck = true

_, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{
{Args: unregisteredJobArgs{}},
})
require.NoError(t, err)
})

t.Run("ErrorsOnInsertOptsWithV1UniqueOpts", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -2380,6 +2428,20 @@ func Test_Client_InsertMany(t *testing.T) {
require.Len(t, results, 1)
})

t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

client.config.SkipUnknownJobCheck = true

results, err := client.InsertMany(ctx, []InsertManyParams{
{Args: unregisteredJobArgs{}},
})
require.NoError(t, err)
require.Len(t, results, 1)
})

t.Run("ErrorsOnInsertOptsWithV1UniqueOpts", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -2637,6 +2699,20 @@ func Test_Client_InsertManyTx(t *testing.T) {
require.Len(t, results, 1)
})

t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

client.config.SkipUnknownJobCheck = true

results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{
{Args: unregisteredJobArgs{}},
})
require.NoError(t, err)
require.Len(t, results, 1)
})

t.Run("ErrorsOnInsertOptsWithV1UniqueOpts", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -4960,6 +5036,7 @@ func Test_NewClient_Defaults(t *testing.T) {
require.NotZero(t, client.baseService.Logger)
require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts)
require.IsType(t, &DefaultClientRetryPolicy{}, client.config.RetryPolicy)
require.False(t, client.config.SkipUnknownJobCheck)
}

func Test_NewClient_Overrides(t *testing.T) {
Expand Down Expand Up @@ -4999,6 +5076,7 @@ func Test_NewClient_Overrides(t *testing.T) {
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
ReindexerSchedule: &periodicIntervalSchedule{interval: time.Hour},
RetryPolicy: retryPolicy,
SkipUnknownJobCheck: true,
TestOnly: true, // disables staggered start in maintenance services
Workers: workers,
WorkerMiddleware: []rivertype.WorkerMiddleware{&noOpWorkerMiddleware{}},
Expand Down Expand Up @@ -5029,6 +5107,7 @@ func Test_NewClient_Overrides(t *testing.T) {
require.Equal(t, logger, client.baseService.Logger)
require.Equal(t, 5, client.config.MaxAttempts)
require.Equal(t, retryPolicy, client.config.RetryPolicy)
require.True(t, client.config.SkipUnknownJobCheck)
require.Len(t, client.config.WorkerMiddleware, 1)
}

Expand Down
Loading