Skip to content

Commit

Permalink
Fix for intermittency in unique job tests that use ByPeriod
Browse files Browse the repository at this point in the history
Fix for #392 in which under rare cases, a test can start right up
against the boundary of a period in use for unique opts, cross over that
period, and produce the wrong result as a new row is inserted.

For example, using a period of 15 minutes like in `db_unique_test.go`,
if the tests run at 14:59:59.998, there's a good chance that they'll
finish after 15:00:00, thereby encapsulating two separate 15 minutes and
allowing separate jobs to be inserted. This is more likely to happen in
CI where tests take long to run.

The fix is to use a more stable time for this time of test. We use the
current hour, but assign a fixed minute and second that's right in the
middle and nowhere near any boundaries.

I gave `client_test.go`'s unique tests the same treatment since it's
possible for this problem to happen there too, although they're using a
24 hour period so it was already very unlikely. (The test case would
have to start at exactly the wrong moment before a UTC day boundary.)

Fixes #392.
  • Loading branch information
brandur committed Jun 19, 2024
1 parent c1aa4ee commit 4cd66eb
Show file tree
Hide file tree
Showing 26 changed files with 260 additions and 196 deletions.
34 changes: 26 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ type Config struct {
// Scheduler run interval. Shared between the scheduler and producer/job
// executors, but not currently exposed for configuration.
schedulerInterval time.Duration

// Time generator to make time stubbable in tests.
time baseservice.TimeGenerator
}

func (c *Config) validate() error {
Expand Down Expand Up @@ -448,16 +451,20 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
Workers: config.Workers,
disableStaggerStart: config.disableStaggerStart,
schedulerInterval: valutil.ValOrDefault(config.schedulerInterval, maintenance.JobSchedulerIntervalDefault),
time: config.time,
}

if err := config.validate(); err != nil {
return nil, err
}

archetype := &baseservice.Archetype{
Logger: config.Logger,
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
Logger: config.Logger,
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
Time: config.time,
}
if archetype.Time == nil {
archetype.Time = &baseservice.UnStubbableTimeGenerator{}
}

client := &Client[TTx]{
Expand Down Expand Up @@ -1042,7 +1049,7 @@ func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*ri
func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor, jobID int64) (*rivertype.JobRow, error) {
return exec.JobCancel(ctx, &riverdriver.JobCancelParams{
ID: jobID,
CancelAttemptedAt: c.baseService.TimeNowUTC(),
CancelAttemptedAt: c.baseService.Time.NowUTC(),
ControlTopic: string(notifier.NotificationTopicControl),
})
}
Expand Down Expand Up @@ -1112,7 +1119,7 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
Expand All @@ -1127,6 +1134,12 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp
jobInsertOpts = argsWithOpts.InsertOpts()
}

// If the time is stubbed (in a test), use that for `created_at`. Otherwise,
// leave an empty value which will use the database's `now()` value, which
// keeps the timestamps of jobs inserted across many different computers
// more consistent (i.e. in case of minor time drifts).
createdAt := archetype.Time.NowUTCOrNil()

maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts)
priority := valutil.FirstNonZero(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault)
queue := valutil.FirstNonZero(insertOpts.Queue, jobInsertOpts.Queue, rivercommon.QueueDefault)
Expand Down Expand Up @@ -1161,6 +1174,7 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp
}

insertParams := &riverdriver.JobInsertFastParams{
CreatedAt: createdAt,
EncodedArgs: encodedArgs,
Kind: args.Kind(),
MaxAttempts: maxAttempts,
Expand All @@ -1171,7 +1185,11 @@ func insertParamsFromConfigArgsAndOptions(config *Config, args JobArgs, insertOp
Tags: tags,
}

if !insertOpts.ScheduledAt.IsZero() {
if insertOpts.ScheduledAt.IsZero() {
// Use a stubbed time if there was one, but otherwise prefer the value
// generated by the database. createdAt is nil unless time is stubbed.
insertParams.ScheduledAt = createdAt
} else {
insertParams.ScheduledAt = &insertOpts.ScheduledAt
insertParams.State = rivertype.JobStateScheduled
}
Expand Down Expand Up @@ -1226,7 +1244,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(c.config, args, opts)
params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1382,7 +1400,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(c.config, param.Args, param.InsertOpts)
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
Expand Down
31 changes: 22 additions & 9 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func newTestConfig(t *testing.T, callback callbackFunc) *Config {
Workers: workers,
disableStaggerStart: true, // disables staggered start in maintenance services
schedulerInterval: riverinternaltest.SchedulerShortInterval,
time: &riverinternaltest.TimeStub{},
}
}

Expand Down Expand Up @@ -2221,7 +2222,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

// Bypass the normal Insert function because that will error on an
// unknown job.
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
require.NoError(t, err)
_, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(t, err)
Expand Down Expand Up @@ -3664,7 +3665,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) {
subscribeChan, cancel := client.Subscribe(EventKindJobFailed)
t.Cleanup(cancel)

insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
require.NoError(err)
insertedJob, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(err)
Expand Down Expand Up @@ -4257,12 +4258,13 @@ func TestClient_JobTimeout(t *testing.T) {
func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

archetype := riverinternaltest.BaseServiceArchetype(t)
config := newTestConfig(t, nil)

t.Run("Defaults", func(t *testing.T) {
t.Parallel()

insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, nil)
insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, nil)
require.NoError(t, err)
require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs))
require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind)
Expand All @@ -4282,7 +4284,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
MaxAttempts: 34,
}

insertParams, _, err := insertParamsFromConfigArgsAndOptions(overrideConfig, noOpArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, overrideConfig, noOpArgs{}, nil)
require.NoError(t, err)
require.Equal(t, overrideConfig.MaxAttempts, insertParams.MaxAttempts)
})
Expand All @@ -4297,7 +4299,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ScheduledAt: time.Now().Add(time.Hour),
Tags: []string{"tag1", "tag2"},
}
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, opts)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, opts)
require.NoError(t, err)
require.Equal(t, 42, insertParams.MaxAttempts)
require.Equal(t, 2, insertParams.Priority)
Expand All @@ -4309,7 +4311,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("WorkerInsertOptsOverrides", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, &customInsertOptsJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, nil)
require.NoError(t, err)
// All these come from overrides in customInsertOptsJobArgs's definition:
require.Equal(t, 42, insertParams.MaxAttempts)
Expand All @@ -4328,7 +4330,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted},
}

_, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
_, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
require.NoError(t, err)
require.Equal(t, uniqueOpts.ByArgs, internalUniqueOpts.ByArgs)
require.Equal(t, uniqueOpts.ByPeriod, internalUniqueOpts.ByPeriod)
Expand All @@ -4339,7 +4341,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("PriorityIsLimitedTo4", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, noOpArgs{}, &InsertOpts{Priority: 5})
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{Priority: 5})
require.ErrorContains(t, err, "priority must be between 1 and 4")
require.Nil(t, insertParams)
})
Expand All @@ -4348,7 +4350,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

args := timeoutTestArgs{TimeoutValue: time.Hour}
insertParams, _, err := insertParamsFromConfigArgsAndOptions(config, args, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, nil)
require.NoError(t, err)
require.Equal(t, `{"timeout_value":3600000000000}`, string(insertParams.EncodedArgs))
})
Expand All @@ -4360,6 +4362,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
// since we already have tests elsewhere for that. Just make sure validation
// is running.
insertParams, _, err := insertParamsFromConfigArgsAndOptions(
archetype,
config,
noOpArgs{},
&InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}},
Expand Down Expand Up @@ -4539,6 +4542,16 @@ func TestUniqueOpts(t *testing.T) {

client := newTestClient(t, dbPool, newTestConfig(t, nil))

// Tests that use ByPeriod below can be sensitive to intermittency if
// the tests run at say 23:59:59.998, then it's possible to accidentally
// cross a period threshold, even if very unlikely. So here, seed mostly
// the current time, but make sure it's nicened up a little to be
// roughly in the middle of the hour and well clear of any period
// boundaries.
client.baseService.Time.StubNowUTC(
time.Now().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond),
)

return client, &testBundle{}
}

Expand Down
43 changes: 37 additions & 6 deletions internal/baseservice/base_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ type Archetype struct {
// secure randomness.
Rand *rand.Rand

// TimeNowUTC returns the current time as UTC. Normally it's implemented as
// a call to `time.Now().UTC()`, but may be overridden in tests for time
// injection. Services should try to use this function instead of the
// vanilla ones from the `time` package for testing purposes.
TimeNowUTC func() time.Time
// Time returns a time generator to get the current time in UTC. Normally
// it's implemented as UnStubbableTimeGenerator which just calls through to
// `time.Now().UTC()`, but is riverinternaltest.timeStub in tests to allow
// the current time to be stubbed. Services should try to use this function
// instead of the vanilla ones from the `time` package for testing purposes.
Time TimeGenerator
}

// BaseService is a struct that's meant to be embedded on "service-like" objects
Expand Down Expand Up @@ -167,7 +168,37 @@ func Init[TService withBaseService](archetype *Archetype, service TService) TSer
baseService.Logger = archetype.Logger
baseService.Name = reflect.TypeOf(service).Elem().Name()
baseService.Rand = archetype.Rand
baseService.TimeNowUTC = archetype.TimeNowUTC
baseService.Time = archetype.Time

return service
}

// TimeGenerator generates a current time in UTC. In test environments it's
// implemented by riverinternaltest.timeStub which lets the current time be
// stubbed. Otherwise, it's implemented as UnStubbableTimeGenerator which
// doesn't allow stubbing.
type TimeGenerator interface {
// NowUTC returns the current time. This may be a stubbed time if the time
// has been actively stubbed in a test.
NowUTC() time.Time

// NowUTCOrNil returns if the currently stubbed time _if_ the current time
// is stubbed, and returns nil otherwise. This is generally useful in cases
// where a component may want to use a stubbed time if the time is stubbed,
// but to fall back to a database time default otherwise.
NowUTCOrNil() *time.Time

// StubNowUTC stubs the current time. It will panic if invoked outside of
// tests. Returns the same time passed as parameter for convenience.
StubNowUTC(nowUTC time.Time) time.Time
}

// UnStubbableTimeGenerator is a TimeGenerator implementation that can't be
// stubbed. It's always the generator used outside of tests.
type UnStubbableTimeGenerator struct{}

func (g *UnStubbableTimeGenerator) NowUTC() time.Time { return time.Now() }
func (g *UnStubbableTimeGenerator) NowUTCOrNil() *time.Time { return nil }
func (g *UnStubbableTimeGenerator) StubNowUTC(nowUTC time.Time) time.Time {
panic("time not stubbable outside tests")
}
8 changes: 4 additions & 4 deletions internal/baseservice/base_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestInit(t *testing.T) {
myService := Init(archetype, &MyService{})
require.NotNil(t, myService.Logger)
require.Equal(t, "MyService", myService.Name)
require.WithinDuration(t, time.Now().UTC(), myService.TimeNowUTC(), 2*time.Second)
require.WithinDuration(t, time.Now().UTC(), myService.Time.NowUTC(), 2*time.Second)
}

func TestBaseService_CancellableSleep(t *testing.T) {
Expand Down Expand Up @@ -143,8 +143,8 @@ type MyService struct {

func archetype() *Archetype {
return &Archetype{
Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
Time: &UnStubbableTimeGenerator{},
}
}
2 changes: 1 addition & 1 deletion internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (i *UniqueInserter) JobInsert(ctx context.Context, exec riverdriver.Executo
}

if uniqueOpts.ByPeriod != time.Duration(0) {
lowerPeriodBound := i.TimeNowUTC().Truncate(uniqueOpts.ByPeriod)
lowerPeriodBound := i.Time.NowUTC().Truncate(uniqueOpts.ByPeriod)

advisoryLockHash.Write([]byte("&period=" + lowerPeriodBound.Format(time.RFC3339)))

Expand Down
Loading

0 comments on commit 4cd66eb

Please sign in to comment.