From 4cd66eb7829047c52eb3e3b9f1422ddfea9358cf Mon Sep 17 00:00:00 2001 From: Brandur Date: Thu, 13 Jun 2024 19:16:11 -0700 Subject: [PATCH] Fix for intermittency in unique job tests that use `ByPeriod` Fix for #392 in which under rare cases, a test can start right up against the boundary of a period in use for unique opts, cross over that period, and produce the wrong result as a new row is inserted. For example, using a period of 15 minutes like in `db_unique_test.go`, if the tests run at 14:59:59.998, there's a good chance that they'll finish after 15:00:00, thereby encapsulating two separate 15 minutes and allowing separate jobs to be inserted. This is more likely to happen in CI where tests take long to run. The fix is to use a more stable time for this time of test. We use the current hour, but assign a fixed minute and second that's right in the middle and nowhere near any boundaries. I gave `client_test.go`'s unique tests the same treatment since it's possible for this problem to happen there too, although they're using a 24 hour period so it was already very unlikely. (The test case would have to start at exactly the wrong moment before a UTC day boundary.) Fixes #392. --- client.go | 34 +++++++--- client_test.go | 31 ++++++--- internal/baseservice/base_service.go | 43 ++++++++++-- internal/baseservice/base_service_test.go | 8 +-- internal/dbunique/db_unique.go | 2 +- internal/dbunique/db_unique_test.go | 47 ++++++++------ internal/jobcompleter/job_completer.go | 10 +-- internal/maintenance/job_scheduler.go | 4 +- internal/maintenance/periodic_job_enqueuer.go | 6 +- .../maintenance/periodic_job_enqueuer_test.go | 3 +- internal/maintenance/reindexer_test.go | 3 +- internal/notifylimiter/limiter.go | 2 +- internal/notifylimiter/limiter_test.go | 48 +++----------- .../riverdrivertest/riverdrivertest.go | 9 +-- .../riverinternaltest/riverinternaltest.go | 64 ++++++++++-------- .../riverinternaltest_test.go | 65 ++++++++++--------- job_executor.go | 10 +-- job_executor_test.go | 4 +- periodic_job.go | 2 +- producer_test.go | 7 +- riverdriver/river_driver_interface.go | 1 + .../internal/dbsqlc/river_job.sql.go | 22 ++++--- .../riverpgxv5/internal/dbsqlc/river_job.sql | 2 + .../internal/dbsqlc/river_job.sql.go | 22 ++++--- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 1 + rivermigrate/river_migrate.go | 6 +- 26 files changed, 260 insertions(+), 196 deletions(-) diff --git a/client.go b/client.go index 29323953..d1d897ee 100644 --- a/client.go +++ b/client.go @@ -217,6 +217,9 @@ type Config struct { // Scheduler run interval. Shared between the scheduler and producer/job // executors, but not currently exposed for configuration. schedulerInterval time.Duration + + // Time generator to make time stubbable in tests. + time baseservice.TimeGenerator } func (c *Config) validate() error { @@ -448,6 +451,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client Workers: config.Workers, disableStaggerStart: config.disableStaggerStart, schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.JobSchedulerIntervalDefault), + time: config.time, } if err := config.validate(); err != nil { @@ -455,9 +459,12 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } archetype := &baseservice.Archetype{ - Logger: config.Logger, - Rand: randutil.NewCryptoSeededConcurrentSafeRand(), - TimeNowUTC: func() time.Time { return time.Now().UTC() }, + Logger: config.Logger, + Rand: randutil.NewCryptoSeededConcurrentSafeRand(), + Time: config.time, + } + if archetype.Time == nil { + archetype.Time = &baseservice.UnStubbableTimeGenerator{} } client := &Client[TTx]{ @@ -1042,7 +1049,7 @@ func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*ri func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor, jobID int64) (*rivertype.JobRow, error) { return exec.JobCancel(ctx, &riverdriver.JobCancelParams{ ID: jobID, - CancelAttemptedAt: c.baseService.TimeNowUTC(), + CancelAttemptedAt: c.baseService.Time.NowUTC(), ControlTopic: string(notifier.NotificationTopicControl), }) } @@ -1112,7 +1119,7 @@ func (c *Client[TTx]) ID() string { return c.config.ID } -func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { +func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { encodedArgs, err := json.Marshal(args) if err != nil { return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err) @@ -1127,6 +1134,12 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp jobInsertOpts = argsWithOpts.InsertOpts() } + // If the time is stubbed (in a test), use that for `created_at`. Otherwise, + // leave an empty value which will use the database's `now()` value, which + // keeps the timestamps of jobs inserted across many different computers + // more consistent (i.e. in case of minor time drifts). + createdAt := archetype.Time.NowUTCOrNil() + maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts) priority := valutil.FirstNonZero(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault) queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault) @@ -1161,6 +1174,7 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp } insertParams := &riverdriver.JobInsertFastParams{ + CreatedAt: createdAt, EncodedArgs: encodedArgs, Kind: args.Kind(), MaxAttempts: maxAttempts, @@ -1171,7 +1185,11 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp Tags: tags, } - if !insertOpts.ScheduledAt.IsZero() { + if insertOpts.ScheduledAt.IsZero() { + // Use a stubbed time if there was one, but otherwise prefer the value + // generated by the database. createdAt is nil unless time is stubbed. + insertParams.ScheduledAt = createdAt + } else { insertParams.ScheduledAt = &insertOpts.ScheduledAt insertParams.State = rivertype.JobStateScheduled } @@ -1226,7 +1244,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg return nil, err } - params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(c.config, args, opts) + params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts) if err != nil { return nil, err } @@ -1382,7 +1400,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive } var err error - insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(c.config, param.Args, param.InsertOpts) + insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts) if err != nil { return nil, err } diff --git a/client_test.go b/client_test.go index 0701259a..392d3c00 100644 --- a/client_test.go +++ b/client_test.go @@ -145,6 +145,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config { Workers: workers, disableStaggerStart: true, // disables staggered start in maintenance services schedulerInterval: riverinternaltest.SchedulerShortInterval, + time: &riverinternaltest.TimeStub{}, } } @@ -2221,7 +2222,7 @@ func Test_Client_ErrorHandler(t *testing.T) { // Bypass the normal Insert function because that will error on an // unknown job. - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil) require.NoError(t, err) _, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams) require.NoError(t, err) @@ -3664,7 +3665,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) { subscribeChan, cancel := client.Subscribe(EventKindJobFailed) t.Cleanup(cancel) - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil) require.NoError(err) insertedJob, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams) require.NoError(err) @@ -4257,12 +4258,13 @@ func TestClient_JobTimeout(t *testing.T) { func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() + archetype := riverinternaltest.BaseServiceArchetype(t) config := newTestConfig(t, nil) t.Run("Defaults", func(t *testing.T) { t.Parallel() - insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, nil) + insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, nil) require.NoError(t, err) require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs)) require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind) @@ -4282,7 +4284,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { MaxAttempts: 34, } - insertParams, _, err := insertParamsFromConfigArgsAndOptions(overrideConfig, noOpArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, overrideConfig, noOpArgs{}, nil) require.NoError(t, err) require.Equal(t, overrideConfig.MaxAttempts, insertParams.MaxAttempts) }) @@ -4297,7 +4299,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { ScheduledAt: time.Now().Add(time.Hour), Tags: []string{"tag1", "tag2"}, } - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, opts) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, opts) require.NoError(t, err) require.Equal(t, 42, insertParams.MaxAttempts) require.Equal(t, 2, insertParams.Priority) @@ -4309,7 +4311,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Run("WorkerInsertOptsOverrides", func(t *testing.T) { t.Parallel() - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, &customInsertOptsJobArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, nil) require.NoError(t, err) // All these come from overrides in customInsertOptsJobArgs's definition: require.Equal(t, 42, insertParams.MaxAttempts) @@ -4328,7 +4330,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted}, } - _, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}) + _, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}) require.NoError(t, err) require.Equal(t, uniqueOpts.ByArgs, internalUniqueOpts.ByArgs) require.Equal(t, uniqueOpts.ByPeriod, internalUniqueOpts.ByPeriod) @@ -4339,7 +4341,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Run("PriorityIsLimitedTo4", func(t *testing.T) { t.Parallel() - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{Priority: 5}) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{Priority: 5}) require.ErrorContains(t, err, "priority must be between 1 and 4") require.Nil(t, insertParams) }) @@ -4348,7 +4350,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() args := timeoutTestArgs{TimeoutValue: time.Hour} - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, args, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, nil) require.NoError(t, err) require.Equal(t, `{"timeout_value":3600000000000}`, string(insertParams.EncodedArgs)) }) @@ -4360,6 +4362,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { // since we already have tests elsewhere for that. Just make sure validation // is running. insertParams, _, err := insertParamsFromConfigArgsAndOptions( + archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}}, @@ -4539,6 +4542,16 @@ func TestUniqueOpts(t *testing.T) { client := newTestClient(t, dbPool, newTestConfig(t, nil)) + // Tests that use ByPeriod below can be sensitive to intermittency if + // the tests run at say 23:59:59.998, then it's possible to accidentally + // cross a period threshold, even if very unlikely. So here, seed mostly + // the current time, but make sure it's nicened up a little to be + // roughly in the middle of the hour and well clear of any period + // boundaries. + client.baseService.Time.StubNowUTC( + time.Now().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond), + ) + return client, &testBundle{} } diff --git a/internal/baseservice/base_service.go b/internal/baseservice/base_service.go index 80a9f4a3..e6cc5fd2 100644 --- a/internal/baseservice/base_service.go +++ b/internal/baseservice/base_service.go @@ -34,11 +34,12 @@ type Archetype struct { // secure randomness. Rand *rand.Rand - // TimeNowUTC returns the current time as UTC. Normally it's implemented as - // a call to `time.Now().UTC()`, but may be overridden in tests for time - // injection. Services should try to use this function instead of the - // vanilla ones from the `time` package for testing purposes. - TimeNowUTC func() time.Time + // Time returns a time generator to get the current time in UTC. Normally + // it's implemented as UnStubbableTimeGenerator which just calls through to + // `time.Now().UTC()`, but is riverinternaltest.timeStub in tests to allow + // the current time to be stubbed. Services should try to use this function + // instead of the vanilla ones from the `time` package for testing purposes. + Time TimeGenerator } // BaseService is a struct that's meant to be embedded on "service-like" objects @@ -167,7 +168,37 @@ func Init[TService withBaseService](archetype *Archetype, service TService) TSer baseService.Logger = archetype.Logger baseService.Name = reflect.TypeOf(service).Elem().Name() baseService.Rand = archetype.Rand - baseService.TimeNowUTC = archetype.TimeNowUTC + baseService.Time = archetype.Time return service } + +// TimeGenerator generates a current time in UTC. In test environments it's +// implemented by riverinternaltest.timeStub which lets the current time be +// stubbed. Otherwise, it's implemented as UnStubbableTimeGenerator which +// doesn't allow stubbing. +type TimeGenerator interface { + // NowUTC returns the current time. This may be a stubbed time if the time + // has been actively stubbed in a test. + NowUTC() time.Time + + // NowUTCOrNil returns if the currently stubbed time _if_ the current time + // is stubbed, and returns nil otherwise. This is generally useful in cases + // where a component may want to use a stubbed time if the time is stubbed, + // but to fall back to a database time default otherwise. + NowUTCOrNil() *time.Time + + // StubNowUTC stubs the current time. It will panic if invoked outside of + // tests. Returns the same time passed as parameter for convenience. + StubNowUTC(nowUTC time.Time) time.Time +} + +// UnStubbableTimeGenerator is a TimeGenerator implementation that can't be +// stubbed. It's always the generator used outside of tests. +type UnStubbableTimeGenerator struct{} + +func (g *UnStubbableTimeGenerator) NowUTC() time.Time { return time.Now() } +func (g *UnStubbableTimeGenerator) NowUTCOrNil() *time.Time { return nil } +func (g *UnStubbableTimeGenerator) StubNowUTC(nowUTC time.Time) time.Time { + panic("time not stubbable outside tests") +} diff --git a/internal/baseservice/base_service_test.go b/internal/baseservice/base_service_test.go index 5ded00af..bfe520ef 100644 --- a/internal/baseservice/base_service_test.go +++ b/internal/baseservice/base_service_test.go @@ -20,7 +20,7 @@ func TestInit(t *testing.T) { myService := Init(archetype, &MyService{}) require.NotNil(t, myService.Logger) require.Equal(t, "MyService", myService.Name) - require.WithinDuration(t, time.Now().UTC(), myService.TimeNowUTC(), 2*time.Second) + require.WithinDuration(t, time.Now().UTC(), myService.Time.NowUTC(), 2*time.Second) } func TestBaseService_CancellableSleep(t *testing.T) { @@ -143,8 +143,8 @@ type MyService struct { func archetype() *Archetype { return &Archetype{ - Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)), - Rand: randutil.NewCryptoSeededConcurrentSafeRand(), - TimeNowUTC: func() time.Time { return time.Now().UTC() }, + Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)), + Rand: randutil.NewCryptoSeededConcurrentSafeRand(), + Time: &UnStubbableTimeGenerator{}, } } diff --git a/internal/dbunique/db_unique.go b/internal/dbunique/db_unique.go index 045b7ddc..7e2804a8 100644 --- a/internal/dbunique/db_unique.go +++ b/internal/dbunique/db_unique.go @@ -75,7 +75,7 @@ func (i *UniqueInserter) JobInsert(ctx context.Context, exec riverdriver.Executo } if uniqueOpts.ByPeriod != time.Duration(0) { - lowerPeriodBound := i.TimeNowUTC().Truncate(uniqueOpts.ByPeriod) + lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod) advisoryLockHash.Write([]byte("&period=" + lowerPeriodBound.Format(time.RFC3339))) diff --git a/internal/dbunique/db_unique_test.go b/internal/dbunique/db_unique_test.go index 87e6ebab..64485925 100644 --- a/internal/dbunique/db_unique_test.go +++ b/internal/dbunique/db_unique_test.go @@ -41,20 +41,27 @@ func TestUniqueInserter_JobInsert(t *testing.T) { ) bundle := &testBundle{ - baselineTime: time.Now(), - driver: driver, - exec: driver.UnwrapExecutor(tx), - tx: tx, + driver: driver, + exec: driver.UnwrapExecutor(tx), + tx: tx, } inserter := baseservice.Init(riverinternaltest.BaseServiceArchetype(t), &UniqueInserter{}) - inserter.TimeNowUTC = func() time.Time { return bundle.baselineTime } + + // Tests that use ByPeriod below can be sensitive to intermittency if + // the tests run at say 14:59:59.998, then it's possible to accidentally + // cross a period threshold, even if very unlikely. So here, seed mostly + // the current time, but make sure it's nicened up a little to be + // roughly in the middle of the hour and well clear of any period + // boundaries. + bundle.baselineTime = inserter.Time.StubNowUTC(time.Now().UTC().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond)) return inserter, bundle } - makeInsertParams := func() *riverdriver.JobInsertFastParams { + makeInsertParams := func(bundle *testBundle) *riverdriver.JobInsertFastParams { return &riverdriver.JobInsertFastParams{ + CreatedAt: &bundle.baselineTime, EncodedArgs: []byte(`{}`), Kind: "fake_job", MaxAttempts: rivercommon.MaxAttemptsDefault, @@ -71,7 +78,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) res, err := inserter.JobInsert(ctx, bundle.exec, insertParams, nil) require.NoError(t, err) @@ -83,7 +90,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.Equal(t, 0, res.Job.Attempt) require.Nil(t, res.Job.AttemptedAt) require.Empty(t, res.Job.AttemptedBy) - require.WithinDuration(t, time.Now(), res.Job.CreatedAt, 2*time.Second) + require.Equal(t, bundle.baselineTime.Truncate(1*time.Microsecond), res.Job.CreatedAt) require.Empty(t, res.Job.Errors) require.Nil(t, res.Job.FinalizedAt) require.Equal(t, insertParams.Kind, res.Job.Kind) @@ -103,7 +110,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { const maxJobsToFetch = 8 - res, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(), nil) + res, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(bundle), nil) require.NoError(t, err) require.NotEqual(t, 0, res.Job.ID, "expected job ID to be set, got %d", res.Job.ID) require.WithinDuration(t, time.Now(), res.Job.ScheduledAt, 1*time.Second) @@ -120,7 +127,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { "expected selected job to be in running state, got %q", jobs[0].State) for i := 1; i < 10; i++ { - _, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(), nil) + _, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(bundle), nil) require.NoError(t, err) } @@ -152,7 +159,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByArgs: true, } @@ -184,7 +191,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByPeriod: 15 * time.Minute, } @@ -200,7 +207,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.Equal(t, res0.Job.ID, res1.Job.ID) require.True(t, res1.UniqueSkippedAsDuplicate) - inserter.TimeNowUTC = func() time.Time { return bundle.baselineTime.Add(uniqueOpts.ByPeriod).Add(1 * time.Second) } + inserter.Time.StubNowUTC(bundle.baselineTime.Add(uniqueOpts.ByPeriod).Add(1 * time.Second)) // Same operation again, except that because we've advanced time passed // the period within unique bounds, another job is allowed to be queued, @@ -216,7 +223,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByQueue: true, } @@ -248,7 +255,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateRunning}, } @@ -302,7 +309,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByQueue: true, } @@ -372,7 +379,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByArgs: true, ByPeriod: 15 * time.Minute, @@ -406,7 +413,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { // With period modified { insertParams := *insertParams // dup - inserter.TimeNowUTC = func() time.Time { return bundle.baselineTime.Add(uniqueOpts.ByPeriod).Add(1 * time.Second) } + inserter.Time.StubNowUTC(bundle.baselineTime.Add(uniqueOpts.ByPeriod).Add(1 * time.Second)) // New job because a unique dimension has changed. res2, err := inserter.JobInsert(ctx, bundle.exec, &insertParams, uniqueOpts) @@ -415,7 +422,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.False(t, res2.UniqueSkippedAsDuplicate) // Make sure to change timeNow back - inserter.TimeNowUTC = func() time.Time { return bundle.baselineTime } + inserter.Time.StubNowUTC(bundle.baselineTime) } // With queue modified @@ -451,7 +458,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { bundle.driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, t)) bundle.exec = bundle.driver.GetExecutor() - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByPeriod: 15 * time.Minute, } diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index ad1c2630..5ad7a5a0 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -78,7 +78,7 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst c.wg.Add(1) defer c.wg.Done() - start := c.TimeNowUTC() + start := c.Time.NowUTC() job, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) (*rivertype.JobRow, error) { return c.exec.JobSetStateIfRunning(ctx, params) @@ -87,7 +87,7 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst return err } - stats.CompleteDuration = c.TimeNowUTC().Sub(start) + stats.CompleteDuration = c.Time.NowUTC().Sub(start) c.subscribeCh <- []CompleterJobUpdated{{Job: job, JobStats: stats}} return nil @@ -156,7 +156,7 @@ func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, exec Par func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { // Start clock outside of goroutine so that the time spent blocking waiting // for an errgroup slot is accurately measured. - start := c.TimeNowUTC() + start := c.Time.NowUTC() c.errGroup.Go(func() error { job, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) (*rivertype.JobRow, error) { @@ -166,7 +166,7 @@ func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta return err } - stats.CompleteDuration = c.TimeNowUTC().Sub(start) + stats.CompleteDuration = c.Time.NowUTC().Sub(start) c.subscribeCh <- []CompleterJobUpdated{{Job: job, JobStats: stats}} return nil @@ -413,7 +413,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated { setState := setStateBatch[jobRow.ID] - setState.Stats.CompleteDuration = c.TimeNowUTC().Sub(*setState.Params.FinalizedAt) + setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(*setState.Params.FinalizedAt) return CompleterJobUpdated{Job: jobRow, JobStats: setState.Stats} }) diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index 419e81d0..2bfcb752 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -144,7 +144,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er } defer tx.Rollback(ctx) - now := s.TimeNowUTC() + now := s.Time.NowUTC() nowWithLookAhead := now.Add(s.config.Interval) scheduledJobs, err := tx.JobSchedule(ctx, &riverdriver.JobScheduleParams{ @@ -161,7 +161,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er // slightly in the future (this loop, the notify, and tx commit will take // a small amount of time). This isn't going to be perfect, but the goal // is to roughly try to guess when the clients will attempt to fetch jobs. - notificationHorizon := s.TimeNowUTC().Add(5 * time.Millisecond) + notificationHorizon := s.Time.NowUTC().Add(5 * time.Millisecond) for _, job := range scheduledJobs { if job.ScheduledAt.After(notificationHorizon) { diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index e5eb4836..8256b338 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -235,7 +235,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { insertParamsMany []*riverdriver.JobInsertFastParams insertParamsUnique []*insertParamsAndUniqueOpts - now = s.TimeNowUTC() + now = s.Time.NowUTC() ) // Handle periodic jobs in sorted order so we can correctly account @@ -289,7 +289,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { insertParamsUnique []*insertParamsAndUniqueOpts ) - now := s.TimeNowUTC() + now := s.Time.NowUTC() // Add a small margin to the current time so we're not only // running jobs that are already ready, but also ones ready at @@ -440,7 +440,7 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration { var ( firstNextRunAt time.Time - now = s.TimeNowUTC() + now = s.Time.NowUTC() ) for _, periodicJob := range s.periodicJobs { diff --git a/internal/maintenance/periodic_job_enqueuer_test.go b/internal/maintenance/periodic_job_enqueuer_test.go index 14c17260..ec519af9 100644 --- a/internal/maintenance/periodic_job_enqueuer_test.go +++ b/internal/maintenance/periodic_job_enqueuer_test.go @@ -247,8 +247,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc, _ := setup(t) - now := time.Now() - svc.TimeNowUTC = func() time.Time { return now } + now := svc.Time.StubNowUTC(time.Now()) svc.periodicJobs = make(map[rivertype.PeriodicJobHandle]*PeriodicJob) periodicJobHandles := svc.AddMany([]*PeriodicJob{ diff --git a/internal/maintenance/reindexer_test.go b/internal/maintenance/reindexer_test.go index 8ef6a8bb..34bdc7d6 100644 --- a/internal/maintenance/reindexer_test.go +++ b/internal/maintenance/reindexer_test.go @@ -29,11 +29,10 @@ func TestReindexer(t *testing.T) { dbPool := riverinternaltest.TestDB(ctx, t) bundle := &testBundle{ exec: riverpgxv5.New(dbPool).GetExecutor(), - now: time.Now(), } archetype := riverinternaltest.BaseServiceArchetype(t) - archetype.TimeNowUTC = func() time.Time { return bundle.now } + bundle.now = archetype.Time.StubNowUTC(time.Now()) fromNow := func(d time.Duration) func(time.Time) time.Time { return func(t time.Time) time.Time { diff --git a/internal/notifylimiter/limiter.go b/internal/notifylimiter/limiter.go index 443a9017..201e63d2 100644 --- a/internal/notifylimiter/limiter.go +++ b/internal/notifylimiter/limiter.go @@ -29,7 +29,7 @@ func NewLimiter(archetype *baseservice.Archetype, waitDuration time.Duration) *L func (l *Limiter) ShouldTrigger(topic string) bool { // Calculate this beforehand to reduce mutex duration. - now := l.TimeNowUTC() + now := l.Time.NowUTC() lastSentHorizon := now.Add(-l.waitDuration) l.mu.Lock() diff --git a/internal/notifylimiter/limiter_test.go b/internal/notifylimiter/limiter_test.go index c294065a..3dbd255d 100644 --- a/internal/notifylimiter/limiter_test.go +++ b/internal/notifylimiter/limiter_test.go @@ -1,7 +1,6 @@ package notifylimiter import ( - "sync" "sync/atomic" "testing" "time" @@ -11,43 +10,16 @@ import ( "github.com/riverqueue/river/internal/riverinternaltest" ) -type mockableTime struct { - mu sync.Mutex // protects now - now *time.Time -} - -func (m *mockableTime) Now() time.Time { - m.mu.Lock() - defer m.mu.Unlock() - - if m.now != nil { - return *m.now - } - return time.Now() -} - -func (m *mockableTime) SetNow(now time.Time) { - m.mu.Lock() - defer m.mu.Unlock() - - m.now = &now -} - func TestLimiter(t *testing.T) { t.Parallel() - type testBundle struct { - mockTime *mockableTime - } + type testBundle struct{} setup := func() (*Limiter, *testBundle) { - bundle := &testBundle{ - mockTime: &mockableTime{}, - } + bundle := &testBundle{} archetype := riverinternaltest.BaseServiceArchetype(t) limiter := NewLimiter(archetype, 10*time.Millisecond) - limiter.TimeNowUTC = bundle.mockTime.Now return limiter, bundle } @@ -55,22 +27,22 @@ func TestLimiter(t *testing.T) { t.Run("OnlySendsOncePerWaitDuration", func(t *testing.T) { t.Parallel() - limiter, bundle := setup() + limiter, _ := setup() now := time.Now() - bundle.mockTime.SetNow(now) + limiter.Time.StubNowUTC(now) require.True(t, limiter.ShouldTrigger("a")) for i := 0; i < 10; i++ { require.False(t, limiter.ShouldTrigger("a")) } // Move the time forward, by just less than waitDuration: - bundle.mockTime.SetNow(now.Add(9 * time.Millisecond)) + limiter.Time.StubNowUTC(now.Add(9 * time.Millisecond)) require.False(t, limiter.ShouldTrigger("a")) require.True(t, limiter.ShouldTrigger("b")) // First time being triggered on "b" // Move the time forward to just past the waitDuration: - bundle.mockTime.SetNow(now.Add(11 * time.Millisecond)) + limiter.Time.StubNowUTC(now.Add(11 * time.Millisecond)) require.True(t, limiter.ShouldTrigger("a")) for i := 0; i < 10; i++ { require.False(t, limiter.ShouldTrigger("a")) @@ -79,7 +51,7 @@ func TestLimiter(t *testing.T) { require.False(t, limiter.ShouldTrigger("b")) // has only been 2ms since last trigger of "b" // Move forward by another waitDuration (plus padding): - bundle.mockTime.SetNow(now.Add(22 * time.Millisecond)) + limiter.Time.StubNowUTC(now.Add(22 * time.Millisecond)) require.True(t, limiter.ShouldTrigger("a")) require.True(t, limiter.ShouldTrigger("b")) require.False(t, limiter.ShouldTrigger("b")) @@ -91,9 +63,9 @@ func TestLimiter(t *testing.T) { doneCh := make(chan struct{}) t.Cleanup(func() { close(doneCh) }) - limiter, bundle := setup() + limiter, _ := setup() now := time.Now() - bundle.mockTime.SetNow(now) + limiter.Time.StubNowUTC(now) counters := make(map[string]*atomic.Int64) for _, topic := range []string{"a", "b", "c"} { @@ -125,7 +97,7 @@ func TestLimiter(t *testing.T) { require.Equal(t, int64(1), counters["b"].Load()) require.Equal(t, int64(1), counters["c"].Load()) - bundle.mockTime.SetNow(now.Add(11 * time.Millisecond)) + limiter.Time.StubNowUTC(now.Add(11 * time.Millisecond)) <-time.After(100 * time.Millisecond) diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 1e0b1343..ea4f61db 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -722,23 +722,24 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv exec, _ := setupExecutor(ctx, t, driver, beginTx) - now := time.Now().UTC() + targetTime := time.Now().UTC().Add(-15 * time.Minute) job, err := exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{ + CreatedAt: &targetTime, EncodedArgs: []byte(`{"encoded": "args"}`), Kind: "test_kind", MaxAttempts: 6, Metadata: []byte(`{"meta": "data"}`), Priority: 2, Queue: "queue_name", - ScheduledAt: &now, + ScheduledAt: &targetTime, State: rivertype.JobStateRunning, Tags: []string{"tag"}, }) require.NoError(t, err) require.Equal(t, 0, job.Attempt) require.Nil(t, job.AttemptedAt) - require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second) + requireEqualTime(t, targetTime, job.CreatedAt) require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) require.Empty(t, job.Errors) require.Nil(t, job.FinalizedAt) @@ -747,7 +748,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata) require.Equal(t, 2, job.Priority) require.Equal(t, "queue_name", job.Queue) - requireEqualTime(t, now, job.ScheduledAt) + requireEqualTime(t, targetTime, job.ScheduledAt) require.Equal(t, rivertype.JobStateRunning, job.State) require.Equal(t, []string{"tag"}, job.Tags) }) diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index 62a9cdd1..f1fcfc66 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -61,9 +61,9 @@ func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype { tb.Helper() return &baseservice.Archetype{ - Logger: Logger(tb), - Rand: rand, - TimeNowUTC: func() time.Time { return time.Now().UTC() }, + Logger: Logger(tb), + Rand: rand, + Time: &TimeStub{}, } } @@ -221,31 +221,6 @@ func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool { return testPool.Pool() } -// StubTime returns a pair of function for (getTime, setTime), the former of -// which is compatible with `TimeNowUTC` found in the service archetype. -// It's concurrent safe is so that a started service can access its stub -// time while the test case is setting it, and without the race detector -// triggering. -func StubTime(initialTime time.Time) (func() time.Time, func(t time.Time)) { - var ( - mu sync.RWMutex - stubbedTime = &initialTime - ) - - getTime := func() time.Time { - mu.RLock() - defer mu.RUnlock() - return *stubbedTime - } - setTime := func(newTime time.Time) { - mu.Lock() - defer mu.Unlock() - stubbedTime = &newTime - } - - return getTime, setTime -} - // A pool and mutex to protect it, lazily initialized by TestTx. Once open, this // pool is never explicitly closed, instead closing implicitly as the package // tests finish. @@ -433,3 +408,36 @@ func WrapTestMain(m *testing.M) { os.Exit(status) } + +// TimeStub implements baseservice.TimeGenerator to allow time to be stubbed in +// tests. +type TimeStub struct { + mu sync.RWMutex + nowUTC *time.Time +} + +func (t *TimeStub) NowUTC() time.Time { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.nowUTC == nil { + return time.Now().UTC() + } + + return *t.nowUTC +} + +func (t *TimeStub) NowUTCOrNil() *time.Time { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.nowUTC +} + +func (t *TimeStub) StubNowUTC(nowUTC time.Time) time.Time { + t.mu.Lock() + defer t.mu.Unlock() + + t.nowUTC = &nowUTC + return nowUTC +} diff --git a/internal/riverinternaltest/riverinternaltest_test.go b/internal/riverinternaltest/riverinternaltest_test.go index 8d526156..c55dffb6 100644 --- a/internal/riverinternaltest/riverinternaltest_test.go +++ b/internal/riverinternaltest/riverinternaltest_test.go @@ -11,38 +11,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestStubTime(t *testing.T) { - t.Parallel() - - t.Run("BasicUsage", func(t *testing.T) { - t.Parallel() - - initialTime := time.Now() - - getTime, setTime := StubTime(initialTime) - require.Equal(t, initialTime, getTime()) - - newTime := initialTime.Add(1 * time.Second) - setTime(newTime) - require.Equal(t, newTime, getTime()) - }) - - t.Run("Stress", func(t *testing.T) { - t.Parallel() - - getTime, setTime := StubTime(time.Now()) - - for i := 0; i < 10; i++ { - go func() { - for j := 0; j < 50; j++ { - setTime(time.Now()) - _ = getTime() - } - }() - } - }) -} - // Implemented by `pgx.Tx` or `pgxpool.Pool`. Normally we'd use a similar type // from `dbsqlc` or `dbutil`, but riverinternaltest is extremely low level and // that would introduce a cyclic dependency. We could package as @@ -142,3 +110,36 @@ func TestWaitOrTimeoutN(t *testing.T) { nums := WaitOrTimeoutN(t, numChan, 3) require.Equal(t, []int{0, 1, 2}, nums) } + +func TestTimeStub(t *testing.T) { + t.Parallel() + + t.Run("BasicUsage", func(t *testing.T) { + t.Parallel() + + initialTime := time.Now().UTC() + + timeStub := &TimeStub{} + + timeStub.StubNowUTC(initialTime) + require.Equal(t, initialTime, timeStub.NowUTC()) + + newTime := timeStub.StubNowUTC(initialTime.Add(1 * time.Second)) + require.Equal(t, newTime, timeStub.NowUTC()) + }) + + t.Run("Stress", func(t *testing.T) { + t.Parallel() + + timeStub := &TimeStub{} + + for i := 0; i < 10; i++ { + go func() { + for j := 0; j < 50; j++ { + timeStub.StubNowUTC(time.Now().UTC()) + _ = timeStub.NowUTC() + } + }() + } + }) +} diff --git a/job_executor.go b/job_executor.go index 33747bf6..177caab9 100644 --- a/job_executor.go +++ b/job_executor.go @@ -145,7 +145,7 @@ func (e *jobExecutor) Execute(ctx context.Context) { // Ensure that the context is cancelled no matter what, or it will leak: defer e.CancelFunc(errExecutorDefaultCancel) - e.start = e.TimeNowUTC() + e.start = e.Time.NowUTC() e.stats = &jobstats.JobStatistics{ QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt), } @@ -179,7 +179,7 @@ func (e *jobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { PanicVal: recovery, } } - e.stats.RunDuration = e.TimeNowUTC().Sub(e.start) + e.stats.RunDuration = e.Time.NowUTC().Sub(e.start) }() if e.WorkUnit == nil { @@ -258,7 +258,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) // so we instead make the job immediately `available` if the snooze time is // smaller than the scheduler's run interval. var params *riverdriver.JobSetStateIfRunningParams - if nextAttemptScheduledAt.Sub(e.TimeNowUTC()) <= e.SchedulerInterval { + if nextAttemptScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval { params = riverdriver.JobSetStateSnoozedAvailable(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1) } else { params = riverdriver.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1) @@ -276,7 +276,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) return } - if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(e.JobRow.ID, e.TimeNowUTC())); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(e.JobRow.ID, e.Time.NowUTC())); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Error completing job", slog.String("err", err.Error()), slog.Int64("job_id", e.JobRow.ID), @@ -364,7 +364,7 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { // respected. Here, we offset that with a branch that makes jobs immediately // `available` if their retry was smaller than the scheduler's run interval. var params *riverdriver.JobSetStateIfRunningParams - if nextRetryScheduledAt.Sub(e.TimeNowUTC()) <= e.SchedulerInterval { + if nextRetryScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval { params = riverdriver.JobSetStateErrorAvailable(e.JobRow.ID, nextRetryScheduledAt, errData) } else { params = riverdriver.JobSetStateErrorRetryable(e.JobRow.ID, nextRetryScheduledAt, errData) diff --git a/job_executor_test.go b/job_executor_test.go index 4233072b..e1ed00b3 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -223,7 +223,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) - executor.Archetype.TimeNowUTC, _ = riverinternaltest.StubTime(time.Now().UTC()) + now := executor.Archetype.Time.StubNowUTC(time.Now().UTC()) workerErr := errors.New("job error") executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) @@ -236,7 +236,7 @@ func TestJobExecutor_Execute(t *testing.T) { require.WithinDuration(t, executor.ClientRetryPolicy.NextRetry(bundle.jobRow), job.ScheduledAt, 1*time.Second) require.Equal(t, rivertype.JobStateRetryable, job.State) require.Len(t, job.Errors, 1) - require.Equal(t, executor.Archetype.TimeNowUTC().Truncate(1*time.Microsecond), job.Errors[0].At.Truncate(1*time.Microsecond)) + require.Equal(t, now.Truncate(1*time.Microsecond), job.Errors[0].At.Truncate(1*time.Microsecond)) require.Equal(t, 1, job.Errors[0].Attempt) require.Equal(t, "job error", job.Errors[0].Error) require.Equal(t, "", job.Errors[0].Trace) diff --git a/periodic_job.go b/periodic_job.go index c2cea8df..48c3c6e7 100644 --- a/periodic_job.go +++ b/periodic_job.go @@ -184,7 +184,7 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe return &maintenance.PeriodicJob{ ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { - return insertParamsFromConfigArgsAndOptions(b.clientConfig, args, options) + return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options) }, RunOnStart: opts.RunOnStart, ScheduleFunc: periodicJob.scheduleFunc.Next, diff --git a/producer_test.go b/producer_test.go index 4b157d16..8520a5fb 100644 --- a/producer_test.go +++ b/producer_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/componentstatus" "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/maintenance" @@ -105,7 +106,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { params := make([]*riverdriver.JobInsertFastParams, maxJobCount) for i := range params { - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, WithJobNumArgs{JobNum: i}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, WithJobNumArgs{JobNum: i}, nil) require.NoError(err) params[i] = insertParams @@ -245,6 +246,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin ctx := context.Background() type testBundle struct { + archetype *baseservice.Archetype completer jobcompleter.JobCompleter config *Config exec riverdriver.Executor @@ -269,6 +271,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin }() return producer, &testBundle{ + archetype: &producer.Archetype, completer: producer.completer, config: config, exec: producer.exec, @@ -280,7 +283,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin mustInsert := func(ctx context.Context, t *testing.T, bundle *testBundle, args JobArgs) { t.Helper() - insertParams, _, err := insertParamsFromConfigArgsAndOptions(bundle.config, args, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, args, nil) require.NoError(t, err) _, err = bundle.exec.JobInsertFast(ctx, insertParams) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 511d2221..3a3a3d71 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -201,6 +201,7 @@ type JobGetStuckParams struct { } type JobInsertFastParams struct { + CreatedAt *time.Time EncodedArgs []byte Kind string MaxAttempts int diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index d309c3c6..8105ddea 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -506,6 +506,7 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara const jobInsertFast = `-- name: JobInsertFast :one INSERT INTO river_job( args, + created_at, finalized_at, kind, max_attempts, @@ -517,20 +518,22 @@ INSERT INTO river_job( tags ) VALUES ( $1::jsonb, - $2, - $3::text, - $4::smallint, - coalesce($5::jsonb, '{}'), - $6::smallint, - $7::text, - coalesce($8::timestamptz, now()), - $9::river_job_state, - coalesce($10::varchar(255)[], '{}') + coalesce($2::timestamptz, now()), + $3, + $4::text, + $5::smallint, + coalesce($6::jsonb, '{}'), + $7::smallint, + $8::text, + coalesce($9::timestamptz, now()), + $10::river_job_state, + coalesce($11::varchar(255)[], '{}') ) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags ` type JobInsertFastParams struct { Args json.RawMessage + CreatedAt *time.Time FinalizedAt *time.Time Kind string MaxAttempts int16 @@ -545,6 +548,7 @@ type JobInsertFastParams struct { func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFastParams) (*RiverJob, error) { row := db.QueryRowContext(ctx, jobInsertFast, arg.Args, + arg.CreatedAt, arg.FinalizedAt, arg.Kind, arg.MaxAttempts, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index fd5b7a5f..d1b80af3 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -195,6 +195,7 @@ LIMIT @max; -- name: JobInsertFast :one INSERT INTO river_job( args, + created_at, finalized_at, kind, max_attempts, @@ -206,6 +207,7 @@ INSERT INTO river_job( tags ) VALUES ( @args::jsonb, + coalesce(sqlc.narg('created_at')::timestamptz, now()), @finalized_at, @kind::text, @max_attempts::smallint, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index a902750d..61f6b13e 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -491,6 +491,7 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara const jobInsertFast = `-- name: JobInsertFast :one INSERT INTO river_job( args, + created_at, finalized_at, kind, max_attempts, @@ -502,20 +503,22 @@ INSERT INTO river_job( tags ) VALUES ( $1::jsonb, - $2, - $3::text, - $4::smallint, - coalesce($5::jsonb, '{}'), - $6::smallint, - $7::text, - coalesce($8::timestamptz, now()), - $9::river_job_state, - coalesce($10::varchar(255)[], '{}') + coalesce($2::timestamptz, now()), + $3, + $4::text, + $5::smallint, + coalesce($6::jsonb, '{}'), + $7::smallint, + $8::text, + coalesce($9::timestamptz, now()), + $10::river_job_state, + coalesce($11::varchar(255)[], '{}') ) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags ` type JobInsertFastParams struct { Args []byte + CreatedAt *time.Time FinalizedAt *time.Time Kind string MaxAttempts int16 @@ -530,6 +533,7 @@ type JobInsertFastParams struct { func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFastParams) (*RiverJob, error) { row := db.QueryRow(ctx, jobInsertFast, arg.Args, + arg.CreatedAt, arg.FinalizedAt, arg.Kind, arg.MaxAttempts, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 79114b6f..341b30fc 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -169,6 +169,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt func (e *Executor) JobInsertFast(ctx context.Context, params *riverdriver.JobInsertFastParams) (*rivertype.JobRow, error) { job, err := e.queries.JobInsertFast(ctx, e.dbtx, &dbsqlc.JobInsertFastParams{ Args: params.EncodedArgs, + CreatedAt: params.CreatedAt, Kind: params.Kind, MaxAttempts: int16(min(params.MaxAttempts, math.MaxInt16)), Metadata: params.Metadata, diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 7258d74e..c6cfd7f0 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -99,9 +99,9 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx] } archetype := &baseservice.Archetype{ - Logger: logger, - Rand: randutil.NewCryptoSeededConcurrentSafeRand(), - TimeNowUTC: func() time.Time { return time.Now().UTC() }, + Logger: logger, + Rand: randutil.NewCryptoSeededConcurrentSafeRand(), + Time: &baseservice.UnStubbableTimeGenerator{}, } return baseservice.Init(archetype, &Migrator[TTx]{