Skip to content

Commit

Permalink
Add capability to schedule periodic jobs dynamically (#288)
Browse files Browse the repository at this point in the history
Here, attempt to resolve #242 by providing a way for new periodic jobs
to be added dynamically after a client's already started (i.e. not only
by passing them to the client's initial constructor).

The new API puts all functions on a separate `PeriodicJobs` bundle to
avoid further polluting `Client`'s namespace, and to make all related
functions easy to find.

    riverClient.PeriodicJobs().Add(
        river.NewPeriodicJob(
            river.PeriodicInterval(15*time.Minute),
            func() (river.JobArgs, *river.InsertOpts) {
                return PeriodicJobArgs{}, nil
            },
            nil,
        ),
    )

Additions return a periodic job "handle" which can be used to remove an
added job:

    periodicJobHandle := riverClient.PeriodicJobs().Add(...)

    riverClient.PeriodicJobs().Remove(periodicJobHandle)

I used a handle because the `river.PeriodicJob` construct isn't
particularly pretty to keep around, and contains a number of fields that
are aren't comparable so its unsuitable for use in equality checks or as
a map key.

Adding a new periodic job bumps the enqueuer's run loop so that it
enqueues the job immediately if it's configured with `RunOnStart`, and
schedules its initial target run time. In other words, adding or
removing periodic jobs should take effect ~instantly.

Fixes #242.
  • Loading branch information
brandur committed Mar 28, 2024
1 parent fa3a4b8 commit 9043557
Show file tree
Hide file tree
Showing 10 changed files with 737 additions and 223 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- New periodic jobs can now be added after a client's already started using `Client.PeriodicJobs().Add()` and removed with `Remove()`. [PR #288](https://github.com/riverqueue/river/pull/288).

### Changed

- The level of some of River's common log statements has changed, most often demoting `info` statements to `debug` so that `info`-level logging is overall less verbose. [PR #275](https://github.com/riverqueue/river/pull/275).
Expand Down
28 changes: 8 additions & 20 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ type Client[TTx any] struct {

monitor *clientMonitor
notifier *notifier.Notifier
periodicJobs *PeriodicJobBundle
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
services []startstop.Service
Expand Down Expand Up @@ -513,31 +514,14 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
}

{
emptyOpts := PeriodicJobOpts{}
periodicJobs := make([]*maintenance.PeriodicJob, 0, len(config.PeriodicJobs))
for _, periodicJob := range config.PeriodicJobs {
periodicJob := periodicJob // capture range var

opts := &emptyOpts
if periodicJob.opts != nil {
opts = periodicJob.opts
}

periodicJobs = append(periodicJobs, &maintenance.PeriodicJob{
ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
return insertParamsFromArgsAndOptions(periodicJob.constructorFunc())
},
RunOnStart: opts.RunOnStart,
ScheduleFunc: periodicJob.scheduleFunc.Next,
})
}

periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
PeriodicJobs: periodicJobs,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, periodicJobEnqueuer)
client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals

client.periodicJobs = newPeriodicJobBundle(periodicJobEnqueuer)
client.periodicJobs.AddMany(config.PeriodicJobs)
}

{
Expand Down Expand Up @@ -1495,6 +1479,10 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
return dblist.JobList(ctx, c.driver.UnwrapExecutor(tx), dbParams)
}

// PeriodicJobs returns the currently configured set of periodic jobs for the
// client, and can be used to add new ones or remove existing ones.
func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs }

// Generates a default client ID using the current hostname and time.
func defaultClientID(startedAt time.Time) string {
host, _ := os.Hostname()
Expand Down
88 changes: 86 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,7 +1967,7 @@ func Test_Client_Maintenance(t *testing.T) {
requireJobHasState(jobInFuture3.ID, jobInFuture3.State)
})

t.Run("PeriodicJobEnqueuerWithOpts", func(t *testing.T) {
t.Run("PeriodicJobEnqueuerWithInsertOpts", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)
Expand All @@ -1992,7 +1992,7 @@ func Test_Client_Maintenance(t *testing.T) {
require.Len(t, jobs, 1, "Expected to find exactly one job of kind: "+(periodicJobArgs{}).Kind())
})

t.Run("PeriodicJobEnqueuerNoOpts", func(t *testing.T) {
t.Run("PeriodicJobEnqueuerNoInsertOpts", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)
Expand All @@ -2018,6 +2018,90 @@ func Test_Client_Maintenance(t *testing.T) {
require.Empty(t, jobs)
})

t.Run("PeriodicJobEnqueuerAddDynamically", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)

worker := &periodicJobWorker{}
AddWorker(config.Workers, worker)

client := runNewTestClient(ctx, t, config)
exec := client.driver.GetExecutor()

client.testSignals.electedLeader.WaitOrTimeout()

client.PeriodicJobs().Add(
NewPeriodicJob(cron.Every(15*time.Minute), func() (JobArgs, *InsertOpts) {
return periodicJobArgs{}, nil
}, &PeriodicJobOpts{RunOnStart: true}),
)

svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
svc.TestSignals.EnteredLoop.WaitOrTimeout()
svc.TestSignals.InsertedJobs.WaitOrTimeout()

// We get a queued job because RunOnStart was specified.
jobs, err := exec.JobGetByKindMany(ctx, []string{(periodicJobArgs{}).Kind()})
require.NoError(t, err)
require.Len(t, jobs, 1)
})

t.Run("PeriodicJobEnqueuerRemoveDynamically", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)

worker := &periodicJobWorker{}
AddWorker(config.Workers, worker)

client := newTestClient(t, riverinternaltest.TestDB(ctx, t), config)
exec := client.driver.GetExecutor()

handle := client.PeriodicJobs().Add(
NewPeriodicJob(cron.Every(15*time.Minute), func() (JobArgs, *InsertOpts) {
return periodicJobArgs{}, nil
}, &PeriodicJobOpts{RunOnStart: true}),
)

startClient(ctx, t, client)

client.testSignals.electedLeader.WaitOrTimeout()

svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
svc.TestSignals.EnteredLoop.WaitOrTimeout()
svc.TestSignals.InsertedJobs.WaitOrTimeout()

client.PeriodicJobs().Remove(handle)

type OtherPeriodicArgs struct {
JobArgsReflectKind[OtherPeriodicArgs]
}

client.PeriodicJobs().Add(
NewPeriodicJob(cron.Every(15*time.Minute), func() (JobArgs, *InsertOpts) {
return OtherPeriodicArgs{}, nil
}, &PeriodicJobOpts{RunOnStart: true}),
)

svc.TestSignals.InsertedJobs.WaitOrTimeout()

// One of each because the first periodic job was inserted on the first
// go around due to RunOnStart, but then subsequently removed. The next
// periodic job was inserted also due to RunOnStart, but only after the
// first was removed.
{
jobs, err := exec.JobGetByKindMany(ctx, []string{(periodicJobArgs{}).Kind()})
require.NoError(t, err)
require.Len(t, jobs, 1)
}
{
jobs, err := exec.JobGetByKindMany(ctx, []string{(OtherPeriodicArgs{}).Kind()})
require.NoError(t, err)
require.Len(t, jobs, 1)
}
})

t.Run("Reindexer", func(t *testing.T) {
t.Parallel()
t.Skip("Reindexer is disabled for further development")
Expand Down
13 changes: 13 additions & 0 deletions example_periodic_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ func Example_periodicJob() {

waitForNJobs(subscribeChan, 1)

// Periodic jobs can also be configured dynamically after a client has
// already started. Added jobs are scheduled for run immediately.
riverClient.PeriodicJobs().Clear()
riverClient.PeriodicJobs().Add(
river.NewPeriodicJob(
river.PeriodicInterval(15*time.Minute),
func() (river.JobArgs, *river.InsertOpts) {
return PeriodicJobArgs{}, nil
},
nil,
),
)

if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
Expand Down
Loading

0 comments on commit 9043557

Please sign in to comment.