Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for intermittency in unique job tests that use ByPeriod #395

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ type Config struct {
// Scheduler run interval. Shared between the scheduler and producer/job
// executors, but not currently exposed for configuration.
schedulerInterval time.Duration

// Time generator to make time stubbable in tests.
time baseservice.TimeGenerator
}

func (c *Config) validate() error {
Expand Down Expand Up @@ -448,16 +451,20 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
Workers: config.Workers,
disableStaggerStart: config.disableStaggerStart,
schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.JobSchedulerIntervalDefault),
time: config.time,
}

if err := config.validate(); err != nil {
return nil, err
}

archetype := &baseservice.Archetype{
Logger: config.Logger,
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
Logger: config.Logger,
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
Time: config.time,
}
if archetype.Time == nil {
archetype.Time = &baseservice.UnStubbableTimeGenerator{}
}

client := &Client[TTx]{
Expand Down Expand Up @@ -1042,7 +1049,7 @@ func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*ri
func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor, jobID int64) (*rivertype.JobRow, error) {
return exec.JobCancel(ctx, &riverdriver.JobCancelParams{
ID: jobID,
CancelAttemptedAt: c.baseService.TimeNowUTC(),
CancelAttemptedAt: c.baseService.Time.NowUTC(),
ControlTopic: string(notifier.NotificationTopicControl),
})
}
Expand Down Expand Up @@ -1112,7 +1119,7 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we're passing in a couple of fields from Client to this (neither of which was needed 1mo ago), it might be cleaner and less confusing to make it a method on Client instead. Borderline 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah true.

I took a shot at it, but in the end, decided to leave as is.

The trouble is that insertParamsFromConfigArgsAndOptions gets called from a few other non-client components that make some annoying changes required:

  • periodic_job.go needs it in PeriodicJobBundle so it can insert periodic jobs. We could embed a client on PeriodicJobBundle instead of a client config, but then PeriodicJobBundle needs to take a generic parameter so it can put a type on Client[TTx], which is super annoying.
  • producer_test.go uses it. It's workable just to create a client there and call the function on that, but sort of annoying to add the extra client that's not really used for anything else.

Let's keep an eye on this one. It's definitely a refactor that's possible, and we should do it if things get anymore gnarly, but this might be the last addition it needs in a while, so maybe okay for now.

encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
Expand All @@ -1127,6 +1134,12 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp
jobInsertOpts = argsWithOpts.InsertOpts()
}

// If the time is stubbed (in a test), use that for `created_at`. Otherwise,
// leave an empty value which will use the database's `now()` value, which
// keeps the timestamps of jobs inserted across many different computers
// more consistent (i.e. in case of minor time drifts).
createdAt := archetype.Time.NowUTCOrNil()

maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts)
priority := valutil.FirstNonZero(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault)
queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault)
Expand Down Expand Up @@ -1161,6 +1174,7 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp
}

insertParams := &riverdriver.JobInsertFastParams{
CreatedAt: createdAt,
EncodedArgs: encodedArgs,
Kind: args.Kind(),
MaxAttempts: maxAttempts,
Expand All @@ -1171,7 +1185,11 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp
Tags: tags,
}

if !insertOpts.ScheduledAt.IsZero() {
if insertOpts.ScheduledAt.IsZero() {
// Use a stubbed time if there was one, but otherwise prefer the value
// generated by the database. createdAt is nil unless time is stubbed.
insertParams.ScheduledAt = createdAt
} else {
insertParams.ScheduledAt = &insertOpts.ScheduledAt
insertParams.State = rivertype.JobStateScheduled
}
Expand Down Expand Up @@ -1226,7 +1244,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(c.config, args, opts)
params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1382,7 +1400,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(c.config, param.Args, param.InsertOpts)
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
Expand Down
31 changes: 22 additions & 9 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config {
Workers: workers,
disableStaggerStart: true, // disables staggered start in maintenance services
schedulerInterval: riverinternaltest.SchedulerShortInterval,
time: &riverinternaltest.TimeStub{},
}
}

Expand Down Expand Up @@ -2221,7 +2222,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

// Bypass the normal Insert function because that will error on an
// unknown job.
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
require.NoError(t, err)
_, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(t, err)
Expand Down Expand Up @@ -3664,7 +3665,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) {
subscribeChan, cancel := client.Subscribe(EventKindJobFailed)
t.Cleanup(cancel)

insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
require.NoError(err)
insertedJob, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(err)
Expand Down Expand Up @@ -4257,12 +4258,13 @@ func TestClient_JobTimeout(t *testing.T) {
func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

archetype := riverinternaltest.BaseServiceArchetype(t)
config := newTestConfig(t, nil)

t.Run("Defaults", func(t *testing.T) {
t.Parallel()

insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, nil)
insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, nil)
require.NoError(t, err)
require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs))
require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind)
Expand All @@ -4282,7 +4284,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
MaxAttempts: 34,
}

insertParams, _, err := insertParamsFromConfigArgsAndOptions(overrideConfig, noOpArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, overrideConfig, noOpArgs{}, nil)
require.NoError(t, err)
require.Equal(t, overrideConfig.MaxAttempts, insertParams.MaxAttempts)
})
Expand All @@ -4297,7 +4299,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ScheduledAt: time.Now().Add(time.Hour),
Tags: []string{"tag1", "tag2"},
}
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, opts)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, opts)
require.NoError(t, err)
require.Equal(t, 42, insertParams.MaxAttempts)
require.Equal(t, 2, insertParams.Priority)
Expand All @@ -4309,7 +4311,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("WorkerInsertOptsOverrides", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, &customInsertOptsJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, nil)
require.NoError(t, err)
// All these come from overrides in customInsertOptsJobArgs's definition:
require.Equal(t, 42, insertParams.MaxAttempts)
Expand All @@ -4328,7 +4330,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted},
}

_, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
_, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
require.NoError(t, err)
require.Equal(t, uniqueOpts.ByArgs, internalUniqueOpts.ByArgs)
require.Equal(t, uniqueOpts.ByPeriod, internalUniqueOpts.ByPeriod)
Expand All @@ -4339,7 +4341,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("PriorityIsLimitedTo4", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{Priority: 5})
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{Priority: 5})
require.ErrorContains(t, err, "priority must be between 1 and 4")
require.Nil(t, insertParams)
})
Expand All @@ -4348,7 +4350,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

args := timeoutTestArgs{TimeoutValue: time.Hour}
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, args, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, nil)
require.NoError(t, err)
require.Equal(t, `{"timeout_value":3600000000000}`, string(insertParams.EncodedArgs))
})
Expand All @@ -4360,6 +4362,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
// since we already have tests elsewhere for that. Just make sure validation
// is running.
insertParams, _, err := insertParamsFromConfigArgsAndOptions(
archetype,
config,
noOpArgs{},
&InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}},
Expand Down Expand Up @@ -4539,6 +4542,16 @@ func TestUniqueOpts(t *testing.T) {

client := newTestClient(t, dbPool, newTestConfig(t, nil))

// Tests that use ByPeriod below can be sensitive to intermittency if
// the tests run at say 23:59:59.998, then it's possible to accidentally
// cross a period threshold, even if very unlikely. So here, seed mostly
// the current time, but make sure it's nicened up a little to be
// roughly in the middle of the hour and well clear of any period
// boundaries.
client.baseService.Time.StubNowUTC(
time.Now().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond),
)

return client, &testBundle{}
}

Expand Down
43 changes: 37 additions & 6 deletions internal/baseservice/base_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ type Archetype struct {
// secure randomness.
Rand *rand.Rand

// TimeNowUTC returns the current time as UTC. Normally it's implemented as
// a call to `time.Now().UTC()`, but may be overridden in tests for time
// injection. Services should try to use this function instead of the
// vanilla ones from the `time` package for testing purposes.
TimeNowUTC func() time.Time
// Time returns a time generator to get the current time in UTC. Normally
// it's implemented as UnStubbableTimeGenerator which just calls through to
// `time.Now().UTC()`, but is riverinternaltest.timeStub in tests to allow
// the current time to be stubbed. Services should try to use this function
// instead of the vanilla ones from the `time` package for testing purposes.
Time TimeGenerator
}

// BaseService is a struct that's meant to be embedded on "service-like" objects
Expand Down Expand Up @@ -167,7 +168,37 @@ func Init[TService withBaseService](archetype *Archetype, service TService) TSer
baseService.Logger = archetype.Logger
baseService.Name = reflect.TypeOf(service).Elem().Name()
baseService.Rand = archetype.Rand
baseService.TimeNowUTC = archetype.TimeNowUTC
baseService.Time = archetype.Time

return service
}

// TimeGenerator generates a current time in UTC. In test environments it's
// implemented by riverinternaltest.timeStub which lets the current time be
// stubbed. Otherwise, it's implemented as UnStubbableTimeGenerator which
// doesn't allow stubbing.
type TimeGenerator interface {
// NowUTC returns the current time. This may be a stubbed time if the time
// has been actively stubbed in a test.
NowUTC() time.Time

// NowUTCOrNil returns if the currently stubbed time _if_ the current time
// is stubbed, and returns nil otherwise. This is generally useful in cases
// where a component may want to use a stubbed time if the time is stubbed,
// but to fall back to a database time default otherwise.
NowUTCOrNil() *time.Time

// StubNowUTC stubs the current time. It will panic if invoked outside of
// tests. Returns the same time passed as parameter for convenience.
StubNowUTC(nowUTC time.Time) time.Time
}

// UnStubbableTimeGenerator is a TimeGenerator implementation that can't be
// stubbed. It's always the generator used outside of tests.
type UnStubbableTimeGenerator struct{}

func (g *UnStubbableTimeGenerator) NowUTC() time.Time { return time.Now() }
func (g *UnStubbableTimeGenerator) NowUTCOrNil() *time.Time { return nil }
func (g *UnStubbableTimeGenerator) StubNowUTC(nowUTC time.Time) time.Time {
panic("time not stubbable outside tests")
}
8 changes: 4 additions & 4 deletions internal/baseservice/base_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestInit(t *testing.T) {
myService := Init(archetype, &MyService{})
require.NotNil(t, myService.Logger)
require.Equal(t, "MyService", myService.Name)
require.WithinDuration(t, time.Now().UTC(), myService.TimeNowUTC(), 2*time.Second)
require.WithinDuration(t, time.Now().UTC(), myService.Time.NowUTC(), 2*time.Second)
}

func TestBaseService_CancellableSleep(t *testing.T) {
Expand Down Expand Up @@ -143,8 +143,8 @@ type MyService struct {

func archetype() *Archetype {
return &Archetype{
Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
Time: &UnStubbableTimeGenerator{},
}
}
2 changes: 1 addition & 1 deletion internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (i *UniqueInserter) JobInsert(ctx context.Context, exec riverdriver.Executo
}

if uniqueOpts.ByPeriod != time.Duration(0) {
lowerPeriodBound := i.TimeNowUTC().Truncate(uniqueOpts.ByPeriod)
lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod)

advisoryLockHash.Write([]byte("&period=" + lowerPeriodBound.Format(time.RFC3339)))

Expand Down
Loading
Loading