diff --git a/client.go b/client.go index 29323953..d1d897ee 100644 --- a/client.go +++ b/client.go @@ -217,6 +217,9 @@ type Config struct { // Scheduler run interval. Shared between the scheduler and producer/job // executors, but not currently exposed for configuration. schedulerInterval time.Duration + + // Time generator to make time stubbable in tests. + time baseservice.TimeGenerator } func (c *Config) validate() error { @@ -448,6 +451,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client Workers: config.Workers, disableStaggerStart: config.disableStaggerStart, schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.JobSchedulerIntervalDefault), + time: config.time, } if err := config.validate(); err != nil { @@ -455,9 +459,12 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } archetype := &baseservice.Archetype{ - Logger: config.Logger, - Rand: randutil.NewCryptoSeededConcurrentSafeRand(), - TimeNowUTC: func() time.Time { return time.Now().UTC() }, + Logger: config.Logger, + Rand: randutil.NewCryptoSeededConcurrentSafeRand(), + Time: config.time, + } + if archetype.Time == nil { + archetype.Time = &baseservice.UnStubbableTimeGenerator{} } client := &Client[TTx]{ @@ -1042,7 +1049,7 @@ func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*ri func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor, jobID int64) (*rivertype.JobRow, error) { return exec.JobCancel(ctx, &riverdriver.JobCancelParams{ ID: jobID, - CancelAttemptedAt: c.baseService.TimeNowUTC(), + CancelAttemptedAt: c.baseService.Time.NowUTC(), ControlTopic: string(notifier.NotificationTopicControl), }) } @@ -1112,7 +1119,7 @@ func (c *Client[TTx]) ID() string { return c.config.ID } -func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { +func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { encodedArgs, err := json.Marshal(args) if err != nil { return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err) @@ -1127,6 +1134,12 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp jobInsertOpts = argsWithOpts.InsertOpts() } + // If the time is stubbed (in a test), use that for `created_at`. Otherwise, + // leave an empty value which will use the database's `now()` value, which + // keeps the timestamps of jobs inserted across many different computers + // more consistent (i.e. in case of minor time drifts). + createdAt := archetype.Time.NowUTCOrNil() + maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts) priority := valutil.FirstNonZero(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault) queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault) @@ -1161,6 +1174,7 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp } insertParams := &riverdriver.JobInsertFastParams{ + CreatedAt: createdAt, EncodedArgs: encodedArgs, Kind: args.Kind(), MaxAttempts: maxAttempts, @@ -1171,7 +1185,11 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp Tags: tags, } - if !insertOpts.ScheduledAt.IsZero() { + if insertOpts.ScheduledAt.IsZero() { + // Use a stubbed time if there was one, but otherwise prefer the value + // generated by the database. createdAt is nil unless time is stubbed. + insertParams.ScheduledAt = createdAt + } else { insertParams.ScheduledAt = &insertOpts.ScheduledAt insertParams.State = rivertype.JobStateScheduled } @@ -1226,7 +1244,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg return nil, err } - params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(c.config, args, opts) + params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts) if err != nil { return nil, err } @@ -1382,7 +1400,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive } var err error - insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(c.config, param.Args, param.InsertOpts) + insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts) if err != nil { return nil, err } diff --git a/client_test.go b/client_test.go index 0701259a..392d3c00 100644 --- a/client_test.go +++ b/client_test.go @@ -145,6 +145,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config { Workers: workers, disableStaggerStart: true, // disables staggered start in maintenance services schedulerInterval: riverinternaltest.SchedulerShortInterval, + time: &riverinternaltest.TimeStub{}, } } @@ -2221,7 +2222,7 @@ func Test_Client_ErrorHandler(t *testing.T) { // Bypass the normal Insert function because that will error on an // unknown job. - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil) require.NoError(t, err) _, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams) require.NoError(t, err) @@ -3664,7 +3665,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) { subscribeChan, cancel := client.Subscribe(EventKindJobFailed) t.Cleanup(cancel) - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil) require.NoError(err) insertedJob, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams) require.NoError(err) @@ -4257,12 +4258,13 @@ func TestClient_JobTimeout(t *testing.T) { func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() + archetype := riverinternaltest.BaseServiceArchetype(t) config := newTestConfig(t, nil) t.Run("Defaults", func(t *testing.T) { t.Parallel() - insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, nil) + insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, nil) require.NoError(t, err) require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs)) require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind) @@ -4282,7 +4284,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { MaxAttempts: 34, } - insertParams, _, err := insertParamsFromConfigArgsAndOptions(overrideConfig, noOpArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, overrideConfig, noOpArgs{}, nil) require.NoError(t, err) require.Equal(t, overrideConfig.MaxAttempts, insertParams.MaxAttempts) }) @@ -4297,7 +4299,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { ScheduledAt: time.Now().Add(time.Hour), Tags: []string{"tag1", "tag2"}, } - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, opts) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, opts) require.NoError(t, err) require.Equal(t, 42, insertParams.MaxAttempts) require.Equal(t, 2, insertParams.Priority) @@ -4309,7 +4311,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Run("WorkerInsertOptsOverrides", func(t *testing.T) { t.Parallel() - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, &customInsertOptsJobArgs{}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, nil) require.NoError(t, err) // All these come from overrides in customInsertOptsJobArgs's definition: require.Equal(t, 42, insertParams.MaxAttempts) @@ -4328,7 +4330,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted}, } - _, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}) + _, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}) require.NoError(t, err) require.Equal(t, uniqueOpts.ByArgs, internalUniqueOpts.ByArgs) require.Equal(t, uniqueOpts.ByPeriod, internalUniqueOpts.ByPeriod) @@ -4339,7 +4341,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Run("PriorityIsLimitedTo4", func(t *testing.T) { t.Parallel() - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{Priority: 5}) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{Priority: 5}) require.ErrorContains(t, err, "priority must be between 1 and 4") require.Nil(t, insertParams) }) @@ -4348,7 +4350,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Parallel() args := timeoutTestArgs{TimeoutValue: time.Hour} - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, args, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, nil) require.NoError(t, err) require.Equal(t, `{"timeout_value":3600000000000}`, string(insertParams.EncodedArgs)) }) @@ -4360,6 +4362,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { // since we already have tests elsewhere for that. Just make sure validation // is running. insertParams, _, err := insertParamsFromConfigArgsAndOptions( + archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}}, @@ -4539,6 +4542,16 @@ func TestUniqueOpts(t *testing.T) { client := newTestClient(t, dbPool, newTestConfig(t, nil)) + // Tests that use ByPeriod below can be sensitive to intermittency if + // the tests run at say 23:59:59.998, then it's possible to accidentally + // cross a period threshold, even if very unlikely. So here, seed mostly + // the current time, but make sure it's nicened up a little to be + // roughly in the middle of the hour and well clear of any period + // boundaries. + client.baseService.Time.StubNowUTC( + time.Now().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond), + ) + return client, &testBundle{} } diff --git a/internal/baseservice/base_service.go b/internal/baseservice/base_service.go index 80a9f4a3..e6cc5fd2 100644 --- a/internal/baseservice/base_service.go +++ b/internal/baseservice/base_service.go @@ -34,11 +34,12 @@ type Archetype struct { // secure randomness. Rand *rand.Rand - // TimeNowUTC returns the current time as UTC. Normally it's implemented as - // a call to `time.Now().UTC()`, but may be overridden in tests for time - // injection. Services should try to use this function instead of the - // vanilla ones from the `time` package for testing purposes. - TimeNowUTC func() time.Time + // Time returns a time generator to get the current time in UTC. Normally + // it's implemented as UnStubbableTimeGenerator which just calls through to + // `time.Now().UTC()`, but is riverinternaltest.timeStub in tests to allow + // the current time to be stubbed. Services should try to use this function + // instead of the vanilla ones from the `time` package for testing purposes. + Time TimeGenerator } // BaseService is a struct that's meant to be embedded on "service-like" objects @@ -167,7 +168,37 @@ func Init[TService withBaseService](archetype *Archetype, service TService) TSer baseService.Logger = archetype.Logger baseService.Name = reflect.TypeOf(service).Elem().Name() baseService.Rand = archetype.Rand - baseService.TimeNowUTC = archetype.TimeNowUTC + baseService.Time = archetype.Time return service } + +// TimeGenerator generates a current time in UTC. In test environments it's +// implemented by riverinternaltest.timeStub which lets the current time be +// stubbed. Otherwise, it's implemented as UnStubbableTimeGenerator which +// doesn't allow stubbing. +type TimeGenerator interface { + // NowUTC returns the current time. This may be a stubbed time if the time + // has been actively stubbed in a test. + NowUTC() time.Time + + // NowUTCOrNil returns if the currently stubbed time _if_ the current time + // is stubbed, and returns nil otherwise. This is generally useful in cases + // where a component may want to use a stubbed time if the time is stubbed, + // but to fall back to a database time default otherwise. + NowUTCOrNil() *time.Time + + // StubNowUTC stubs the current time. It will panic if invoked outside of + // tests. Returns the same time passed as parameter for convenience. + StubNowUTC(nowUTC time.Time) time.Time +} + +// UnStubbableTimeGenerator is a TimeGenerator implementation that can't be +// stubbed. It's always the generator used outside of tests. +type UnStubbableTimeGenerator struct{} + +func (g *UnStubbableTimeGenerator) NowUTC() time.Time { return time.Now() } +func (g *UnStubbableTimeGenerator) NowUTCOrNil() *time.Time { return nil } +func (g *UnStubbableTimeGenerator) StubNowUTC(nowUTC time.Time) time.Time { + panic("time not stubbable outside tests") +} diff --git a/internal/baseservice/base_service_test.go b/internal/baseservice/base_service_test.go index 5ded00af..bfe520ef 100644 --- a/internal/baseservice/base_service_test.go +++ b/internal/baseservice/base_service_test.go @@ -20,7 +20,7 @@ func TestInit(t *testing.T) { myService := Init(archetype, &MyService{}) require.NotNil(t, myService.Logger) require.Equal(t, "MyService", myService.Name) - require.WithinDuration(t, time.Now().UTC(), myService.TimeNowUTC(), 2*time.Second) + require.WithinDuration(t, time.Now().UTC(), myService.Time.NowUTC(), 2*time.Second) } func TestBaseService_CancellableSleep(t *testing.T) { @@ -143,8 +143,8 @@ type MyService struct { func archetype() *Archetype { return &Archetype{ - Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)), - Rand: randutil.NewCryptoSeededConcurrentSafeRand(), - TimeNowUTC: func() time.Time { return time.Now().UTC() }, + Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)), + Rand: randutil.NewCryptoSeededConcurrentSafeRand(), + Time: &UnStubbableTimeGenerator{}, } } diff --git a/internal/dbunique/db_unique.go b/internal/dbunique/db_unique.go index 045b7ddc..7e2804a8 100644 --- a/internal/dbunique/db_unique.go +++ b/internal/dbunique/db_unique.go @@ -75,7 +75,7 @@ func (i *UniqueInserter) JobInsert(ctx context.Context, exec riverdriver.Executo } if uniqueOpts.ByPeriod != time.Duration(0) { - lowerPeriodBound := i.TimeNowUTC().Truncate(uniqueOpts.ByPeriod) + lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod) advisoryLockHash.Write([]byte("&period=" + lowerPeriodBound.Format(time.RFC3339))) diff --git a/internal/dbunique/db_unique_test.go b/internal/dbunique/db_unique_test.go index 87e6ebab..64485925 100644 --- a/internal/dbunique/db_unique_test.go +++ b/internal/dbunique/db_unique_test.go @@ -41,20 +41,27 @@ func TestUniqueInserter_JobInsert(t *testing.T) { ) bundle := &testBundle{ - baselineTime: time.Now(), - driver: driver, - exec: driver.UnwrapExecutor(tx), - tx: tx, + driver: driver, + exec: driver.UnwrapExecutor(tx), + tx: tx, } inserter := baseservice.Init(riverinternaltest.BaseServiceArchetype(t), &UniqueInserter{}) - inserter.TimeNowUTC = func() time.Time { return bundle.baselineTime } + + // Tests that use ByPeriod below can be sensitive to intermittency if + // the tests run at say 14:59:59.998, then it's possible to accidentally + // cross a period threshold, even if very unlikely. So here, seed mostly + // the current time, but make sure it's nicened up a little to be + // roughly in the middle of the hour and well clear of any period + // boundaries. + bundle.baselineTime = inserter.Time.StubNowUTC(time.Now().UTC().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond)) return inserter, bundle } - makeInsertParams := func() *riverdriver.JobInsertFastParams { + makeInsertParams := func(bundle *testBundle) *riverdriver.JobInsertFastParams { return &riverdriver.JobInsertFastParams{ + CreatedAt: &bundle.baselineTime, EncodedArgs: []byte(`{}`), Kind: "fake_job", MaxAttempts: rivercommon.MaxAttemptsDefault, @@ -71,7 +78,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) res, err := inserter.JobInsert(ctx, bundle.exec, insertParams, nil) require.NoError(t, err) @@ -83,7 +90,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.Equal(t, 0, res.Job.Attempt) require.Nil(t, res.Job.AttemptedAt) require.Empty(t, res.Job.AttemptedBy) - require.WithinDuration(t, time.Now(), res.Job.CreatedAt, 2*time.Second) + require.Equal(t, bundle.baselineTime.Truncate(1*time.Microsecond), res.Job.CreatedAt) require.Empty(t, res.Job.Errors) require.Nil(t, res.Job.FinalizedAt) require.Equal(t, insertParams.Kind, res.Job.Kind) @@ -103,7 +110,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { const maxJobsToFetch = 8 - res, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(), nil) + res, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(bundle), nil) require.NoError(t, err) require.NotEqual(t, 0, res.Job.ID, "expected job ID to be set, got %d", res.Job.ID) require.WithinDuration(t, time.Now(), res.Job.ScheduledAt, 1*time.Second) @@ -120,7 +127,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { "expected selected job to be in running state, got %q", jobs[0].State) for i := 1; i < 10; i++ { - _, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(), nil) + _, err := inserter.JobInsert(ctx, bundle.exec, makeInsertParams(bundle), nil) require.NoError(t, err) } @@ -152,7 +159,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByArgs: true, } @@ -184,7 +191,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByPeriod: 15 * time.Minute, } @@ -200,7 +207,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.Equal(t, res0.Job.ID, res1.Job.ID) require.True(t, res1.UniqueSkippedAsDuplicate) - inserter.TimeNowUTC = func() time.Time { return bundle.baselineTime.Add(uniqueOpts.ByPeriod).Add(1 * time.Second) } + inserter.Time.StubNowUTC(bundle.baselineTime.Add(uniqueOpts.ByPeriod).Add(1 * time.Second)) // Same operation again, except that because we've advanced time passed // the period within unique bounds, another job is allowed to be queued, @@ -216,7 +223,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByQueue: true, } @@ -248,7 +255,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateRunning}, } @@ -302,7 +309,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByQueue: true, } @@ -372,7 +379,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { inserter, bundle := setup(t) - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByArgs: true, ByPeriod: 15 * time.Minute, @@ -406,7 +413,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { // With period modified { insertParams := *insertParams // dup - inserter.TimeNowUTC = func() time.Time { return bundle.baselineTime.Add(uniqueOpts.ByPeriod).Add(1 * time.Second) } + inserter.Time.StubNowUTC(bundle.baselineTime.Add(uniqueOpts.ByPeriod).Add(1 * time.Second)) // New job because a unique dimension has changed. res2, err := inserter.JobInsert(ctx, bundle.exec, &insertParams, uniqueOpts) @@ -415,7 +422,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { require.False(t, res2.UniqueSkippedAsDuplicate) // Make sure to change timeNow back - inserter.TimeNowUTC = func() time.Time { return bundle.baselineTime } + inserter.Time.StubNowUTC(bundle.baselineTime) } // With queue modified @@ -451,7 +458,7 @@ func TestUniqueInserter_JobInsert(t *testing.T) { bundle.driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, t)) bundle.exec = bundle.driver.GetExecutor() - insertParams := makeInsertParams() + insertParams := makeInsertParams(bundle) uniqueOpts := &UniqueOpts{ ByPeriod: 15 * time.Minute, } diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index ad1c2630..5ad7a5a0 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -78,7 +78,7 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst c.wg.Add(1) defer c.wg.Done() - start := c.TimeNowUTC() + start := c.Time.NowUTC() job, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) (*rivertype.JobRow, error) { return c.exec.JobSetStateIfRunning(ctx, params) @@ -87,7 +87,7 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst return err } - stats.CompleteDuration = c.TimeNowUTC().Sub(start) + stats.CompleteDuration = c.Time.NowUTC().Sub(start) c.subscribeCh <- []CompleterJobUpdated{{Job: job, JobStats: stats}} return nil @@ -156,7 +156,7 @@ func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, exec Par func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { // Start clock outside of goroutine so that the time spent blocking waiting // for an errgroup slot is accurately measured. - start := c.TimeNowUTC() + start := c.Time.NowUTC() c.errGroup.Go(func() error { job, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) (*rivertype.JobRow, error) { @@ -166,7 +166,7 @@ func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta return err } - stats.CompleteDuration = c.TimeNowUTC().Sub(start) + stats.CompleteDuration = c.Time.NowUTC().Sub(start) c.subscribeCh <- []CompleterJobUpdated{{Job: job, JobStats: stats}} return nil @@ -413,7 +413,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated { setState := setStateBatch[jobRow.ID] - setState.Stats.CompleteDuration = c.TimeNowUTC().Sub(*setState.Params.FinalizedAt) + setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(*setState.Params.FinalizedAt) return CompleterJobUpdated{Job: jobRow, JobStats: setState.Stats} }) diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index 419e81d0..2bfcb752 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -144,7 +144,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er } defer tx.Rollback(ctx) - now := s.TimeNowUTC() + now := s.Time.NowUTC() nowWithLookAhead := now.Add(s.config.Interval) scheduledJobs, err := tx.JobSchedule(ctx, &riverdriver.JobScheduleParams{ @@ -161,7 +161,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er // slightly in the future (this loop, the notify, and tx commit will take // a small amount of time). This isn't going to be perfect, but the goal // is to roughly try to guess when the clients will attempt to fetch jobs. - notificationHorizon := s.TimeNowUTC().Add(5 * time.Millisecond) + notificationHorizon := s.Time.NowUTC().Add(5 * time.Millisecond) for _, job := range scheduledJobs { if job.ScheduledAt.After(notificationHorizon) { diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index e5eb4836..8256b338 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -235,7 +235,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { insertParamsMany []*riverdriver.JobInsertFastParams insertParamsUnique []*insertParamsAndUniqueOpts - now = s.TimeNowUTC() + now = s.Time.NowUTC() ) // Handle periodic jobs in sorted order so we can correctly account @@ -289,7 +289,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { insertParamsUnique []*insertParamsAndUniqueOpts ) - now := s.TimeNowUTC() + now := s.Time.NowUTC() // Add a small margin to the current time so we're not only // running jobs that are already ready, but also ones ready at @@ -440,7 +440,7 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration { var ( firstNextRunAt time.Time - now = s.TimeNowUTC() + now = s.Time.NowUTC() ) for _, periodicJob := range s.periodicJobs { diff --git a/internal/maintenance/periodic_job_enqueuer_test.go b/internal/maintenance/periodic_job_enqueuer_test.go index 14c17260..ec519af9 100644 --- a/internal/maintenance/periodic_job_enqueuer_test.go +++ b/internal/maintenance/periodic_job_enqueuer_test.go @@ -247,8 +247,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc, _ := setup(t) - now := time.Now() - svc.TimeNowUTC = func() time.Time { return now } + now := svc.Time.StubNowUTC(time.Now()) svc.periodicJobs = make(map[rivertype.PeriodicJobHandle]*PeriodicJob) periodicJobHandles := svc.AddMany([]*PeriodicJob{ diff --git a/internal/maintenance/reindexer_test.go b/internal/maintenance/reindexer_test.go index 8ef6a8bb..34bdc7d6 100644 --- a/internal/maintenance/reindexer_test.go +++ b/internal/maintenance/reindexer_test.go @@ -29,11 +29,10 @@ func TestReindexer(t *testing.T) { dbPool := riverinternaltest.TestDB(ctx, t) bundle := &testBundle{ exec: riverpgxv5.New(dbPool).GetExecutor(), - now: time.Now(), } archetype := riverinternaltest.BaseServiceArchetype(t) - archetype.TimeNowUTC = func() time.Time { return bundle.now } + bundle.now = archetype.Time.StubNowUTC(time.Now()) fromNow := func(d time.Duration) func(time.Time) time.Time { return func(t time.Time) time.Time { diff --git a/internal/notifylimiter/limiter.go b/internal/notifylimiter/limiter.go index 443a9017..201e63d2 100644 --- a/internal/notifylimiter/limiter.go +++ b/internal/notifylimiter/limiter.go @@ -29,7 +29,7 @@ func NewLimiter(archetype *baseservice.Archetype, waitDuration time.Duration) *L func (l *Limiter) ShouldTrigger(topic string) bool { // Calculate this beforehand to reduce mutex duration. - now := l.TimeNowUTC() + now := l.Time.NowUTC() lastSentHorizon := now.Add(-l.waitDuration) l.mu.Lock() diff --git a/internal/notifylimiter/limiter_test.go b/internal/notifylimiter/limiter_test.go index c294065a..3dbd255d 100644 --- a/internal/notifylimiter/limiter_test.go +++ b/internal/notifylimiter/limiter_test.go @@ -1,7 +1,6 @@ package notifylimiter import ( - "sync" "sync/atomic" "testing" "time" @@ -11,43 +10,16 @@ import ( "github.com/riverqueue/river/internal/riverinternaltest" ) -type mockableTime struct { - mu sync.Mutex // protects now - now *time.Time -} - -func (m *mockableTime) Now() time.Time { - m.mu.Lock() - defer m.mu.Unlock() - - if m.now != nil { - return *m.now - } - return time.Now() -} - -func (m *mockableTime) SetNow(now time.Time) { - m.mu.Lock() - defer m.mu.Unlock() - - m.now = &now -} - func TestLimiter(t *testing.T) { t.Parallel() - type testBundle struct { - mockTime *mockableTime - } + type testBundle struct{} setup := func() (*Limiter, *testBundle) { - bundle := &testBundle{ - mockTime: &mockableTime{}, - } + bundle := &testBundle{} archetype := riverinternaltest.BaseServiceArchetype(t) limiter := NewLimiter(archetype, 10*time.Millisecond) - limiter.TimeNowUTC = bundle.mockTime.Now return limiter, bundle } @@ -55,22 +27,22 @@ func TestLimiter(t *testing.T) { t.Run("OnlySendsOncePerWaitDuration", func(t *testing.T) { t.Parallel() - limiter, bundle := setup() + limiter, _ := setup() now := time.Now() - bundle.mockTime.SetNow(now) + limiter.Time.StubNowUTC(now) require.True(t, limiter.ShouldTrigger("a")) for i := 0; i < 10; i++ { require.False(t, limiter.ShouldTrigger("a")) } // Move the time forward, by just less than waitDuration: - bundle.mockTime.SetNow(now.Add(9 * time.Millisecond)) + limiter.Time.StubNowUTC(now.Add(9 * time.Millisecond)) require.False(t, limiter.ShouldTrigger("a")) require.True(t, limiter.ShouldTrigger("b")) // First time being triggered on "b" // Move the time forward to just past the waitDuration: - bundle.mockTime.SetNow(now.Add(11 * time.Millisecond)) + limiter.Time.StubNowUTC(now.Add(11 * time.Millisecond)) require.True(t, limiter.ShouldTrigger("a")) for i := 0; i < 10; i++ { require.False(t, limiter.ShouldTrigger("a")) @@ -79,7 +51,7 @@ func TestLimiter(t *testing.T) { require.False(t, limiter.ShouldTrigger("b")) // has only been 2ms since last trigger of "b" // Move forward by another waitDuration (plus padding): - bundle.mockTime.SetNow(now.Add(22 * time.Millisecond)) + limiter.Time.StubNowUTC(now.Add(22 * time.Millisecond)) require.True(t, limiter.ShouldTrigger("a")) require.True(t, limiter.ShouldTrigger("b")) require.False(t, limiter.ShouldTrigger("b")) @@ -91,9 +63,9 @@ func TestLimiter(t *testing.T) { doneCh := make(chan struct{}) t.Cleanup(func() { close(doneCh) }) - limiter, bundle := setup() + limiter, _ := setup() now := time.Now() - bundle.mockTime.SetNow(now) + limiter.Time.StubNowUTC(now) counters := make(map[string]*atomic.Int64) for _, topic := range []string{"a", "b", "c"} { @@ -125,7 +97,7 @@ func TestLimiter(t *testing.T) { require.Equal(t, int64(1), counters["b"].Load()) require.Equal(t, int64(1), counters["c"].Load()) - bundle.mockTime.SetNow(now.Add(11 * time.Millisecond)) + limiter.Time.StubNowUTC(now.Add(11 * time.Millisecond)) <-time.After(100 * time.Millisecond) diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 1e0b1343..ea4f61db 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -722,23 +722,24 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv exec, _ := setupExecutor(ctx, t, driver, beginTx) - now := time.Now().UTC() + targetTime := time.Now().UTC().Add(-15 * time.Minute) job, err := exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{ + CreatedAt: &targetTime, EncodedArgs: []byte(`{"encoded": "args"}`), Kind: "test_kind", MaxAttempts: 6, Metadata: []byte(`{"meta": "data"}`), Priority: 2, Queue: "queue_name", - ScheduledAt: &now, + ScheduledAt: &targetTime, State: rivertype.JobStateRunning, Tags: []string{"tag"}, }) require.NoError(t, err) require.Equal(t, 0, job.Attempt) require.Nil(t, job.AttemptedAt) - require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second) + requireEqualTime(t, targetTime, job.CreatedAt) require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) require.Empty(t, job.Errors) require.Nil(t, job.FinalizedAt) @@ -747,7 +748,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata) require.Equal(t, 2, job.Priority) require.Equal(t, "queue_name", job.Queue) - requireEqualTime(t, now, job.ScheduledAt) + requireEqualTime(t, targetTime, job.ScheduledAt) require.Equal(t, rivertype.JobStateRunning, job.State) require.Equal(t, []string{"tag"}, job.Tags) }) diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index 62a9cdd1..f1fcfc66 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -61,9 +61,9 @@ func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype { tb.Helper() return &baseservice.Archetype{ - Logger: Logger(tb), - Rand: rand, - TimeNowUTC: func() time.Time { return time.Now().UTC() }, + Logger: Logger(tb), + Rand: rand, + Time: &TimeStub{}, } } @@ -221,31 +221,6 @@ func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool { return testPool.Pool() } -// StubTime returns a pair of function for (getTime, setTime), the former of -// which is compatible with `TimeNowUTC` found in the service archetype. -// It's concurrent safe is so that a started service can access its stub -// time while the test case is setting it, and without the race detector -// triggering. -func StubTime(initialTime time.Time) (func() time.Time, func(t time.Time)) { - var ( - mu sync.RWMutex - stubbedTime = &initialTime - ) - - getTime := func() time.Time { - mu.RLock() - defer mu.RUnlock() - return *stubbedTime - } - setTime := func(newTime time.Time) { - mu.Lock() - defer mu.Unlock() - stubbedTime = &newTime - } - - return getTime, setTime -} - // A pool and mutex to protect it, lazily initialized by TestTx. Once open, this // pool is never explicitly closed, instead closing implicitly as the package // tests finish. @@ -433,3 +408,36 @@ func WrapTestMain(m *testing.M) { os.Exit(status) } + +// TimeStub implements baseservice.TimeGenerator to allow time to be stubbed in +// tests. +type TimeStub struct { + mu sync.RWMutex + nowUTC *time.Time +} + +func (t *TimeStub) NowUTC() time.Time { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.nowUTC == nil { + return time.Now().UTC() + } + + return *t.nowUTC +} + +func (t *TimeStub) NowUTCOrNil() *time.Time { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.nowUTC +} + +func (t *TimeStub) StubNowUTC(nowUTC time.Time) time.Time { + t.mu.Lock() + defer t.mu.Unlock() + + t.nowUTC = &nowUTC + return nowUTC +} diff --git a/internal/riverinternaltest/riverinternaltest_test.go b/internal/riverinternaltest/riverinternaltest_test.go index 8d526156..c55dffb6 100644 --- a/internal/riverinternaltest/riverinternaltest_test.go +++ b/internal/riverinternaltest/riverinternaltest_test.go @@ -11,38 +11,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestStubTime(t *testing.T) { - t.Parallel() - - t.Run("BasicUsage", func(t *testing.T) { - t.Parallel() - - initialTime := time.Now() - - getTime, setTime := StubTime(initialTime) - require.Equal(t, initialTime, getTime()) - - newTime := initialTime.Add(1 * time.Second) - setTime(newTime) - require.Equal(t, newTime, getTime()) - }) - - t.Run("Stress", func(t *testing.T) { - t.Parallel() - - getTime, setTime := StubTime(time.Now()) - - for i := 0; i < 10; i++ { - go func() { - for j := 0; j < 50; j++ { - setTime(time.Now()) - _ = getTime() - } - }() - } - }) -} - // Implemented by `pgx.Tx` or `pgxpool.Pool`. Normally we'd use a similar type // from `dbsqlc` or `dbutil`, but riverinternaltest is extremely low level and // that would introduce a cyclic dependency. We could package as @@ -142,3 +110,36 @@ func TestWaitOrTimeoutN(t *testing.T) { nums := WaitOrTimeoutN(t, numChan, 3) require.Equal(t, []int{0, 1, 2}, nums) } + +func TestTimeStub(t *testing.T) { + t.Parallel() + + t.Run("BasicUsage", func(t *testing.T) { + t.Parallel() + + initialTime := time.Now().UTC() + + timeStub := &TimeStub{} + + timeStub.StubNowUTC(initialTime) + require.Equal(t, initialTime, timeStub.NowUTC()) + + newTime := timeStub.StubNowUTC(initialTime.Add(1 * time.Second)) + require.Equal(t, newTime, timeStub.NowUTC()) + }) + + t.Run("Stress", func(t *testing.T) { + t.Parallel() + + timeStub := &TimeStub{} + + for i := 0; i < 10; i++ { + go func() { + for j := 0; j < 50; j++ { + timeStub.StubNowUTC(time.Now().UTC()) + _ = timeStub.NowUTC() + } + }() + } + }) +} diff --git a/job_executor.go b/job_executor.go index 33747bf6..177caab9 100644 --- a/job_executor.go +++ b/job_executor.go @@ -145,7 +145,7 @@ func (e *jobExecutor) Execute(ctx context.Context) { // Ensure that the context is cancelled no matter what, or it will leak: defer e.CancelFunc(errExecutorDefaultCancel) - e.start = e.TimeNowUTC() + e.start = e.Time.NowUTC() e.stats = &jobstats.JobStatistics{ QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt), } @@ -179,7 +179,7 @@ func (e *jobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { PanicVal: recovery, } } - e.stats.RunDuration = e.TimeNowUTC().Sub(e.start) + e.stats.RunDuration = e.Time.NowUTC().Sub(e.start) }() if e.WorkUnit == nil { @@ -258,7 +258,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) // so we instead make the job immediately `available` if the snooze time is // smaller than the scheduler's run interval. var params *riverdriver.JobSetStateIfRunningParams - if nextAttemptScheduledAt.Sub(e.TimeNowUTC()) <= e.SchedulerInterval { + if nextAttemptScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval { params = riverdriver.JobSetStateSnoozedAvailable(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1) } else { params = riverdriver.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1) @@ -276,7 +276,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) return } - if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(e.JobRow.ID, e.TimeNowUTC())); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(e.JobRow.ID, e.Time.NowUTC())); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Error completing job", slog.String("err", err.Error()), slog.Int64("job_id", e.JobRow.ID), @@ -364,7 +364,7 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { // respected. Here, we offset that with a branch that makes jobs immediately // `available` if their retry was smaller than the scheduler's run interval. var params *riverdriver.JobSetStateIfRunningParams - if nextRetryScheduledAt.Sub(e.TimeNowUTC()) <= e.SchedulerInterval { + if nextRetryScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval { params = riverdriver.JobSetStateErrorAvailable(e.JobRow.ID, nextRetryScheduledAt, errData) } else { params = riverdriver.JobSetStateErrorRetryable(e.JobRow.ID, nextRetryScheduledAt, errData) diff --git a/job_executor_test.go b/job_executor_test.go index 4233072b..e1ed00b3 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -223,7 +223,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) - executor.Archetype.TimeNowUTC, _ = riverinternaltest.StubTime(time.Now().UTC()) + now := executor.Archetype.Time.StubNowUTC(time.Now().UTC()) workerErr := errors.New("job error") executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) @@ -236,7 +236,7 @@ func TestJobExecutor_Execute(t *testing.T) { require.WithinDuration(t, executor.ClientRetryPolicy.NextRetry(bundle.jobRow), job.ScheduledAt, 1*time.Second) require.Equal(t, rivertype.JobStateRetryable, job.State) require.Len(t, job.Errors, 1) - require.Equal(t, executor.Archetype.TimeNowUTC().Truncate(1*time.Microsecond), job.Errors[0].At.Truncate(1*time.Microsecond)) + require.Equal(t, now.Truncate(1*time.Microsecond), job.Errors[0].At.Truncate(1*time.Microsecond)) require.Equal(t, 1, job.Errors[0].Attempt) require.Equal(t, "job error", job.Errors[0].Error) require.Equal(t, "", job.Errors[0].Trace) diff --git a/periodic_job.go b/periodic_job.go index c2cea8df..48c3c6e7 100644 --- a/periodic_job.go +++ b/periodic_job.go @@ -184,7 +184,7 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe return &maintenance.PeriodicJob{ ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { - return insertParamsFromConfigArgsAndOptions(b.clientConfig, args, options) + return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options) }, RunOnStart: opts.RunOnStart, ScheduleFunc: periodicJob.scheduleFunc.Next, diff --git a/producer_test.go b/producer_test.go index 4b157d16..8520a5fb 100644 --- a/producer_test.go +++ b/producer_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/componentstatus" "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/maintenance" @@ -105,7 +106,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { params := make([]*riverdriver.JobInsertFastParams, maxJobCount) for i := range params { - insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, WithJobNumArgs{JobNum: i}, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, WithJobNumArgs{JobNum: i}, nil) require.NoError(err) params[i] = insertParams @@ -245,6 +246,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin ctx := context.Background() type testBundle struct { + archetype *baseservice.Archetype completer jobcompleter.JobCompleter config *Config exec riverdriver.Executor @@ -269,6 +271,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin }() return producer, &testBundle{ + archetype: &producer.Archetype, completer: producer.completer, config: config, exec: producer.exec, @@ -280,7 +283,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin mustInsert := func(ctx context.Context, t *testing.T, bundle *testBundle, args JobArgs) { t.Helper() - insertParams, _, err := insertParamsFromConfigArgsAndOptions(bundle.config, args, nil) + insertParams, _, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, args, nil) require.NoError(t, err) _, err = bundle.exec.JobInsertFast(ctx, insertParams) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 511d2221..3a3a3d71 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -201,6 +201,7 @@ type JobGetStuckParams struct { } type JobInsertFastParams struct { + CreatedAt *time.Time EncodedArgs []byte Kind string MaxAttempts int diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index d309c3c6..8105ddea 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -506,6 +506,7 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara const jobInsertFast = `-- name: JobInsertFast :one INSERT INTO river_job( args, + created_at, finalized_at, kind, max_attempts, @@ -517,20 +518,22 @@ INSERT INTO river_job( tags ) VALUES ( $1::jsonb, - $2, - $3::text, - $4::smallint, - coalesce($5::jsonb, '{}'), - $6::smallint, - $7::text, - coalesce($8::timestamptz, now()), - $9::river_job_state, - coalesce($10::varchar(255)[], '{}') + coalesce($2::timestamptz, now()), + $3, + $4::text, + $5::smallint, + coalesce($6::jsonb, '{}'), + $7::smallint, + $8::text, + coalesce($9::timestamptz, now()), + $10::river_job_state, + coalesce($11::varchar(255)[], '{}') ) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags ` type JobInsertFastParams struct { Args json.RawMessage + CreatedAt *time.Time FinalizedAt *time.Time Kind string MaxAttempts int16 @@ -545,6 +548,7 @@ type JobInsertFastParams struct { func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFastParams) (*RiverJob, error) { row := db.QueryRowContext(ctx, jobInsertFast, arg.Args, + arg.CreatedAt, arg.FinalizedAt, arg.Kind, arg.MaxAttempts, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index fd5b7a5f..d1b80af3 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -195,6 +195,7 @@ LIMIT @max; -- name: JobInsertFast :one INSERT INTO river_job( args, + created_at, finalized_at, kind, max_attempts, @@ -206,6 +207,7 @@ INSERT INTO river_job( tags ) VALUES ( @args::jsonb, + coalesce(sqlc.narg('created_at')::timestamptz, now()), @finalized_at, @kind::text, @max_attempts::smallint, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index a902750d..61f6b13e 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -491,6 +491,7 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara const jobInsertFast = `-- name: JobInsertFast :one INSERT INTO river_job( args, + created_at, finalized_at, kind, max_attempts, @@ -502,20 +503,22 @@ INSERT INTO river_job( tags ) VALUES ( $1::jsonb, - $2, - $3::text, - $4::smallint, - coalesce($5::jsonb, '{}'), - $6::smallint, - $7::text, - coalesce($8::timestamptz, now()), - $9::river_job_state, - coalesce($10::varchar(255)[], '{}') + coalesce($2::timestamptz, now()), + $3, + $4::text, + $5::smallint, + coalesce($6::jsonb, '{}'), + $7::smallint, + $8::text, + coalesce($9::timestamptz, now()), + $10::river_job_state, + coalesce($11::varchar(255)[], '{}') ) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags ` type JobInsertFastParams struct { Args []byte + CreatedAt *time.Time FinalizedAt *time.Time Kind string MaxAttempts int16 @@ -530,6 +533,7 @@ type JobInsertFastParams struct { func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFastParams) (*RiverJob, error) { row := db.QueryRow(ctx, jobInsertFast, arg.Args, + arg.CreatedAt, arg.FinalizedAt, arg.Kind, arg.MaxAttempts, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 79114b6f..341b30fc 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -169,6 +169,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt func (e *Executor) JobInsertFast(ctx context.Context, params *riverdriver.JobInsertFastParams) (*rivertype.JobRow, error) { job, err := e.queries.JobInsertFast(ctx, e.dbtx, &dbsqlc.JobInsertFastParams{ Args: params.EncodedArgs, + CreatedAt: params.CreatedAt, Kind: params.Kind, MaxAttempts: int16(min(params.MaxAttempts, math.MaxInt16)), Metadata: params.Metadata, diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 7258d74e..c6cfd7f0 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -99,9 +99,9 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx] } archetype := &baseservice.Archetype{ - Logger: logger, - Rand: randutil.NewCryptoSeededConcurrentSafeRand(), - TimeNowUTC: func() time.Time { return time.Now().UTC() }, + Logger: logger, + Rand: randutil.NewCryptoSeededConcurrentSafeRand(), + Time: &baseservice.UnStubbableTimeGenerator{}, } return baseservice.Init(archetype, &Migrator[TTx]{