Skip to content

Commit fdbc2dd

Browse files
authored
feat(config): add SkipUnknownJobCheck client config option
feat(config): add SkipUnknownJobCheck client config option to skip job arg worker validation (#731)
1 parent 90850d3 commit fdbc2dd

File tree

3 files changed

+96
-4
lines changed

3 files changed

+96
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Added
1111

1212
- `NeverSchedule` returns a `PeriodicSchedule` that never runs. This can be used to effectively disable the reindexer or any other maintenance service. [PR #718](https://github.com/riverqueue/river/pull/718).
13+
- Add `SkipUnknownJobCheck` client config option to skip job arg worker validation. [PR #731](https://github.com/riverqueue/river/pull/731).
1314

1415
### Changed
1516

client.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,17 @@ type Config struct {
220220
// Defaults to DefaultRetryPolicy.
221221
RetryPolicy ClientRetryPolicy
222222

223+
// SkipUnknownJobCheck is a flag to control whether the client should skip
224+
// checking to see if a registered worker exists in the client's worker bundle
225+
// for a job arg prior to insertion.
226+
//
227+
// This can be set to true to allow a client to insert jobs which are
228+
// intended to be worked by a different client which effectively makes
229+
// the client's insertion behavior mimic that of an insert-only client.
230+
//
231+
// Defaults to false.
232+
SkipUnknownJobCheck bool
233+
223234
// TestOnly can be set to true to disable certain features that are useful
224235
// in production, but which may be harmful to tests, in ways like having the
225236
// effect of making them slower. It should not be used outside of test
@@ -485,6 +496,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
485496
ReindexerSchedule: config.ReindexerSchedule,
486497
RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter),
487498
RetryPolicy: retryPolicy,
499+
SkipUnknownJobCheck: config.SkipUnknownJobCheck,
488500
TestOnly: config.TestOnly,
489501
Workers: config.Workers,
490502
WorkerMiddleware: config.WorkerMiddleware,
@@ -1687,11 +1699,11 @@ func (c *Client[TTx]) notifyQueuePauseOrResume(ctx context.Context, tx riverdriv
16871699
}
16881700

16891701
// Validates job args prior to insertion. Currently, verifies that a worker to
1690-
// handle the kind is registered in the configured workers bundle. An
1691-
// insert-only client doesn't require a workers bundle be configured though, so
1692-
// no validation occurs if one wasn't.
1702+
// handle the kind is registered in the configured workers bundle.
1703+
// This validation is skipped if the client is configured as an insert-only (with no workers)
1704+
// or if the client is configured to skip unknown job kinds.
16931705
func (c *Client[TTx]) validateJobArgs(args JobArgs) error {
1694-
if c.config.Workers == nil {
1706+
if c.config.Workers == nil || c.config.SkipUnknownJobCheck {
16951707
return nil
16961708
}
16971709

client_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1627,6 +1627,17 @@ func Test_Client_Insert(t *testing.T) {
16271627
_, err := client.Insert(ctx, &unregisteredJobArgs{}, nil)
16281628
require.NoError(t, err)
16291629
})
1630+
1631+
t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
1632+
t.Parallel()
1633+
1634+
client, _ := setup(t)
1635+
1636+
client.config.SkipUnknownJobCheck = true
1637+
1638+
_, err := client.Insert(ctx, &unregisteredJobArgs{}, nil)
1639+
require.NoError(t, err)
1640+
})
16301641
}
16311642

16321643
func Test_Client_InsertTx(t *testing.T) {
@@ -1752,6 +1763,17 @@ func Test_Client_InsertTx(t *testing.T) {
17521763
_, err := client.InsertTx(ctx, bundle.tx, &unregisteredJobArgs{}, nil)
17531764
require.NoError(t, err)
17541765
})
1766+
1767+
t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
1768+
t.Parallel()
1769+
1770+
client, bundle := setup(t)
1771+
1772+
client.config.SkipUnknownJobCheck = true
1773+
1774+
_, err := client.InsertTx(ctx, bundle.tx, &unregisteredJobArgs{}, nil)
1775+
require.NoError(t, err)
1776+
})
17551777
}
17561778

17571779
func Test_Client_InsertManyFast(t *testing.T) {
@@ -1960,6 +1982,19 @@ func Test_Client_InsertManyFast(t *testing.T) {
19601982
require.NoError(t, err)
19611983
})
19621984

1985+
t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
1986+
t.Parallel()
1987+
1988+
client, _ := setup(t)
1989+
1990+
client.config.SkipUnknownJobCheck = true
1991+
1992+
_, err := client.InsertManyFast(ctx, []InsertManyParams{
1993+
{Args: unregisteredJobArgs{}},
1994+
})
1995+
require.NoError(t, err)
1996+
})
1997+
19631998
t.Run("ErrorsOnInsertOptsWithoutRequiredUniqueStates", func(t *testing.T) {
19641999
t.Parallel()
19652000

@@ -2115,6 +2150,19 @@ func Test_Client_InsertManyFastTx(t *testing.T) {
21152150
require.NoError(t, err)
21162151
})
21172152

2153+
t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
2154+
t.Parallel()
2155+
2156+
client, bundle := setup(t)
2157+
2158+
client.config.SkipUnknownJobCheck = true
2159+
2160+
_, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{
2161+
{Args: unregisteredJobArgs{}},
2162+
})
2163+
require.NoError(t, err)
2164+
})
2165+
21182166
t.Run("ErrorsOnInsertOptsWithV1UniqueOpts", func(t *testing.T) {
21192167
t.Parallel()
21202168

@@ -2381,6 +2429,20 @@ func Test_Client_InsertMany(t *testing.T) {
23812429
require.Len(t, results, 1)
23822430
})
23832431

2432+
t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
2433+
t.Parallel()
2434+
2435+
client, _ := setup(t)
2436+
2437+
client.config.SkipUnknownJobCheck = true
2438+
2439+
results, err := client.InsertMany(ctx, []InsertManyParams{
2440+
{Args: unregisteredJobArgs{}},
2441+
})
2442+
require.NoError(t, err)
2443+
require.Len(t, results, 1)
2444+
})
2445+
23842446
t.Run("ErrorsOnInsertOptsWithV1UniqueOpts", func(t *testing.T) {
23852447
t.Parallel()
23862448

@@ -2638,6 +2700,20 @@ func Test_Client_InsertManyTx(t *testing.T) {
26382700
require.Len(t, results, 1)
26392701
})
26402702

2703+
t.Run("AllowsUnknownJobKindWithSkipUnknownJobCheck", func(t *testing.T) {
2704+
t.Parallel()
2705+
2706+
client, bundle := setup(t)
2707+
2708+
client.config.SkipUnknownJobCheck = true
2709+
2710+
results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{
2711+
{Args: unregisteredJobArgs{}},
2712+
})
2713+
require.NoError(t, err)
2714+
require.Len(t, results, 1)
2715+
})
2716+
26412717
t.Run("ErrorsOnInsertOptsWithV1UniqueOpts", func(t *testing.T) {
26422718
t.Parallel()
26432719

@@ -4961,6 +5037,7 @@ func Test_NewClient_Defaults(t *testing.T) {
49615037
require.NotZero(t, client.baseService.Logger)
49625038
require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts)
49635039
require.IsType(t, &DefaultClientRetryPolicy{}, client.config.RetryPolicy)
5040+
require.False(t, client.config.SkipUnknownJobCheck)
49645041
}
49655042

49665043
func Test_NewClient_Overrides(t *testing.T) {
@@ -5000,6 +5077,7 @@ func Test_NewClient_Overrides(t *testing.T) {
50005077
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
50015078
ReindexerSchedule: &periodicIntervalSchedule{interval: time.Hour},
50025079
RetryPolicy: retryPolicy,
5080+
SkipUnknownJobCheck: true,
50035081
TestOnly: true, // disables staggered start in maintenance services
50045082
Workers: workers,
50055083
WorkerMiddleware: []rivertype.WorkerMiddleware{&noOpWorkerMiddleware{}},
@@ -5030,6 +5108,7 @@ func Test_NewClient_Overrides(t *testing.T) {
50305108
require.Equal(t, logger, client.baseService.Logger)
50315109
require.Equal(t, 5, client.config.MaxAttempts)
50325110
require.Equal(t, retryPolicy, client.config.RetryPolicy)
5111+
require.True(t, client.config.SkipUnknownJobCheck)
50335112
require.Len(t, client.config.WorkerMiddleware, 1)
50345113
}
50355114

0 commit comments

Comments
 (0)