Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ Version 0.19.0 has minor breaking changes for the `Worker.Middleware`, introduced fairly recently in 0.17.0. We tried not to make this change, but found the existing middleware interface insufficient to provide the necessary range of functionality we wanted, and this is a secondary middleware facility that won't be in use for many users, so it seemed worthwhile.
⚠️ Version 0.19.0 has minor breaking changes for the `Worker.Middleware`, introduced fairly recently in 0.17.0 that has a worker's `Middleware` function now taking a non-generic `JobRow` parameter instead of a generic `Job[T]`. We tried not to make this change, but found the existing middleware interface insufficient to provide the necessary range of functionality we wanted, and this is a secondary middleware facility that won't be in use for many users, so it seemed worthwhile.

### Added

- Added a new "hooks" API for tying into River functionality at various points like job inserts or working. Differs from middleware in that it doesn't go on the stack and can't modify context, but in some cases is able to run at a more granular level (e.g. for each job insert rather than each _batch_ of inserts). [PR #789](https://github.com/riverqueue/river/pull/789).
- `river.Config` has a generic `Middleware` setting that can be used as a convenient way to configure middlewares that implement multiple middleware interfaces (e.g. `JobInsertMiddleware` _and_ `WorkerMiddleware`). Use of this setting is preferred over `Config.JobInsertMiddleware` and `Config.WorkerMiddleware`, which have been deprecated. [PR #804](https://github.com/riverqueue/river/pull/804).

### Changed

Expand Down
156 changes: 116 additions & 40 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/leadership"
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/middlewarelookup"
"github.com/riverqueue/river/internal/notifier"
"github.com/riverqueue/river/internal/notifylimiter"
"github.com/riverqueue/river/internal/rivercommon"
Expand Down Expand Up @@ -157,6 +158,9 @@ type Config struct {

// JobInsertMiddleware are optional functions that can be called around job
// insertion.
//
// Deprecated: Prefer the use of Middleware instead (which may contain
// instances of rivertype.JobInsertMiddleware).
JobInsertMiddleware []rivertype.JobInsertMiddleware

// JobTimeout is the maximum amount of time a job is allowed to run before its
Expand All @@ -168,8 +172,24 @@ type Config struct {
JobTimeout time.Duration

// Hooks are functions that may activate at certain points during a job's
// lifecycle (see rivertype.Hook), installed globally. Jobs may have their
// own specific hooks by implementing the JobArgsWithHooks interface.
// lifecycle (see rivertype.Hook), installed globally.
//
// The effect of hooks in this list will depend on the specific hook
// interfaces they implement, so for example implementing
// rivertype.HookInsertBegin will cause the hook to be invoked before a job
// is inserted, or implementing rivertype.HookWorkBegin will cause it to be
// invoked before a job is worked. Hook structs may implement multiple hook
// interfaces.
//
// Order in this list is significant. A hook that appears first will be
// entered before a hook that appears later. For any particular phase, order
// is relevant only for hooks that will run for that phase. For example, if
// two rivertype.HookInsertBegin are separated by a rivertype.HookWorkBegin,
// during job insertion those two outer hooks will run one after another,
// and the work hook between them will not run. When a job is worked, the
// work hook runs and the insertion hooks on either side of it are skipped.
//
// Jobs may have their own specific hooks by implementing JobArgsWithHooks.
Hooks []rivertype.Hook

// Logger is the structured logger to use for logging purposes. If none is
Expand All @@ -185,6 +205,26 @@ type Config struct {
// If not specified, defaults to 25 (MaxAttemptsDefault).
MaxAttempts int

// Middleware contains middleware that may activate at certain points during
// a job's lifecycle (see rivertype.Middleware), installed globally.
//
// The effect of middleware in this list will depend on the specific
// middleware interfaces they implement, so for example implementing
// rivertype.JobInsertMiddleware will cause the middleware to be invoked
// when jobs are inserted, and implementing rivertype.WorkerMiddleware will
// cause it to be invoked when a job is worked. Middleware structs may
// implement multiple middleware interfaces.
//
// Order in this list is significant. Middleware that appears first will be
// entered before middleware that appears later. For any particular phase,
// order is relevant only for middlewares that will run for that phase. For
// example, if two rivertype.JobInsertMiddleware are separated by a
// rivertype.WorkerMiddleware, during job insertion those two outer
// middlewares will run one after another, and the work middleware between
// them will not run. When a job is worked, the work middleware runs and the
// insertion middlewares on either side of it are skipped.
Middleware []rivertype.Middleware

// PeriodicJobs are a set of periodic jobs to run at the specified intervals
// in the client.
PeriodicJobs []*PeriodicJob
Expand Down Expand Up @@ -276,6 +316,9 @@ type Config struct {

// WorkerMiddleware are optional functions that can be called around
// all job executions.
//
// Deprecated: Prefer the use of Middleware instead (which may contain
// instances of rivertype.WorkerMiddleware).
WorkerMiddleware []rivertype.WorkerMiddleware

// Scheduler run interval. Shared between the scheduler and producer/job
Expand Down Expand Up @@ -325,6 +368,7 @@ func (c *Config) WithDefaults() *Config {
JobTimeout: valutil.ValOrDefault(c.JobTimeout, JobTimeoutDefault),
Logger: logger,
MaxAttempts: valutil.ValOrDefault(c.MaxAttempts, MaxAttemptsDefault),
Middleware: c.Middleware,
PeriodicJobs: c.PeriodicJobs,
PollOnly: c.PollOnly,
Queues: c.Queues,
Expand All @@ -334,8 +378,8 @@ func (c *Config) WithDefaults() *Config {
SkipUnknownJobCheck: c.SkipUnknownJobCheck,
Test: c.Test,
TestOnly: c.TestOnly,
Workers: c.Workers,
WorkerMiddleware: c.WorkerMiddleware,
Workers: c.Workers,
schedulerInterval: valutil.ValOrDefault(c.schedulerInterval, maintenance.JobSchedulerIntervalDefault),
}
}
Expand Down Expand Up @@ -368,6 +412,9 @@ func (c *Config) validate() error {
if c.MaxAttempts < 0 {
return errors.New("MaxAttempts cannot be less than zero")
}
if len(c.Middleware) > 0 && (len(c.JobInsertMiddleware) > 0 || len(c.WorkerMiddleware) > 0) {
return errors.New("only one of the pair JobInsertMiddleware/WorkerMiddleware or Middleware may be provided (Middleware is recommended, and may contain both job insert and worker middleware)")
}
if c.RescueStuckJobsAfter < 0 {
return errors.New("RescueStuckJobsAfter cannot be less than zero")
}
Expand Down Expand Up @@ -430,23 +477,24 @@ type Client[TTx any] struct {
baseService baseservice.BaseService
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
hookLookupByJob *hooklookup.JobHookLookup
hookLookupGlobal hooklookup.HookLookupInterface
insertNotifyLimiter *notifylimiter.Limiter
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
pilot riverpilot.Pilot
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queues *QueueBundle
services []startstop.Service
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals
completer jobcompleter.JobCompleter
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
hookLookupByJob *hooklookup.JobHookLookup
hookLookupGlobal hooklookup.HookLookupInterface
insertNotifyLimiter *notifylimiter.Limiter
middlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
pilot riverpilot.Pilot
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queues *QueueBundle
services []startstop.Service
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
Expand Down Expand Up @@ -564,6 +612,33 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is
client.insertNotifyLimiter = notifylimiter.NewLimiter(archetype, config.FetchCooldown)

// Validation ensures that config.JobInsertMiddleware/WorkerMiddleware or
// the more abstract config.Middleware for middleware are set, but not both,
// so in practice we never append all three of these to each other.
{
middleware := config.Middleware
for _, jobInsertMiddleware := range config.JobInsertMiddleware {
middleware = append(middleware, jobInsertMiddleware)
}
outerLoop:
for _, workerMiddleware := range config.WorkerMiddleware {
// Don't add the middleware if it also implements JobInsertMiddleware
// and the instance has been added to config.JobInsertMiddleware. This
// is a hedge to make sure we don't accidentally double add middleware
// as we've converted over to the unified config.Middleware setting.
if workerMiddlewareAsJobInsertMiddleware, ok := workerMiddleware.(rivertype.JobInsertMiddleware); ok {
for _, jobInsertMiddleware := range config.JobInsertMiddleware {
if workerMiddlewareAsJobInsertMiddleware == jobInsertMiddleware {
continue outerLoop
}
}
}

middleware = append(middleware, workerMiddleware)
}
client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middleware)
}

pluginDriver, _ := driver.(driverPlugin[TTx])
if pluginDriver != nil {
pluginDriver.PluginInit(archetype)
Expand Down Expand Up @@ -1559,12 +1634,13 @@ func (c *Client[TTx]) insertManyShared(
return results, nil
}

if len(c.config.JobInsertMiddleware) > 0 {
jobInsertMiddleware := c.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert)
if len(jobInsertMiddleware) > 0 {
// Wrap middlewares in reverse order so the one defined first is wrapped
// as the outermost function and is first to receive the operation.
for i := len(c.config.JobInsertMiddleware) - 1; i >= 0; i-- {
middlewareItem := c.config.JobInsertMiddleware[i] // capture the current middleware item
previousDoInner := doInner // Capture the current doInner function
for i := len(jobInsertMiddleware) - 1; i >= 0; i-- {
middlewareItem := jobInsertMiddleware[i].(rivertype.JobInsertMiddleware) //nolint:forcetypeassert // capture the current middleware item
previousDoInner := doInner // Capture the current doInner function
doInner = func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
return middlewareItem.InsertMany(ctx, insertParams, previousDoInner)
}
Expand Down Expand Up @@ -1778,22 +1854,22 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error {

func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *producer {
producer := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
HookLookupByJob: c.hookLookupByJob,
HookLookupGlobal: c.hookLookupGlobal,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queueName,
QueueEventCallback: c.subscriptionManager.distributeQueueEvent,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
Workers: c.config.Workers,
WorkerMiddleware: c.config.WorkerMiddleware,
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
HookLookupByJob: c.hookLookupByJob,
HookLookupGlobal: c.hookLookupGlobal,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
MiddlewareLookupGlobal: c.middlewareLookupGlobal,
Notifier: c.notifier,
Queue: queueName,
QueueEventCallback: c.subscriptionManager.distributeQueueEvent,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
Workers: c.config.Workers,
})
c.producersByQueueName[queueName] = producer
return producer
Expand Down
52 changes: 51 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/jobexecutor"
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/middlewarelookup"
"github.com/riverqueue/river/internal/notifier"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/riverinternaltest"
Expand Down Expand Up @@ -736,7 +737,7 @@ func Test_Client(t *testing.T) {
return doInner(ctx)
},
}
bundle.config.WorkerMiddleware = []rivertype.WorkerMiddleware{middleware}
bundle.config.Middleware = []rivertype.Middleware{middleware}

AddWorker(bundle.config.Workers, WorkFunc(func(ctx context.Context, job *Job[callbackArgs]) error {
require.Equal(t, "called", ctx.Value(privateKey("middleware")))
Expand Down Expand Up @@ -5464,6 +5465,55 @@ func Test_NewClient_Validations(t *testing.T) {
require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts)
},
},
{
name: "Middleware can be configured independently",
configFunc: func(config *Config) {
config.Middleware = []rivertype.Middleware{&overridableJobMiddleware{}}
},
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1)
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
},
},
{
name: "JobInsertMiddleware and WorkMiddleware may be configured together with separate middlewares",
configFunc: func(config *Config) {
config.JobInsertMiddleware = []rivertype.JobInsertMiddleware{&overridableJobMiddleware{}}
config.WorkerMiddleware = []rivertype.WorkerMiddleware{&overridableJobMiddleware{}}
},
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 2)
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
},
},
{
name: "JobInsertMiddleware and WorkMiddleware may be configured together with same middleware",
configFunc: func(config *Config) {
middleware := &overridableJobMiddleware{}
config.JobInsertMiddleware = []rivertype.JobInsertMiddleware{middleware}
config.WorkerMiddleware = []rivertype.WorkerMiddleware{middleware}
},
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1)
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
},
},
{
name: "Middleware not allowed with JobInsertMiddleware",
configFunc: func(config *Config) {
config.JobInsertMiddleware = []rivertype.JobInsertMiddleware{&overridableJobMiddleware{}}
config.Middleware = []rivertype.Middleware{&overridableJobMiddleware{}}
},
wantErr: errors.New("only one of the pair JobInsertMiddleware/WorkerMiddleware or Middleware may be provided (Middleware is recommended, and may contain both job insert and worker middleware)"),
},
{
name: "Middleware not allowed with WorkerMiddleware",
configFunc: func(config *Config) {
config.Middleware = []rivertype.Middleware{&overridableJobMiddleware{}}
config.WorkerMiddleware = []rivertype.WorkerMiddleware{&overridableJobMiddleware{}}
},
wantErr: errors.New("only one of the pair JobInsertMiddleware/WorkerMiddleware or Middleware may be provided (Middleware is recommended, and may contain both job insert and worker middleware)"),
},
{
name: "RescueStuckJobsAfter may be overridden",
configFunc: func(config *Config) {
Expand Down
14 changes: 14 additions & 0 deletions common_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package river_test

import (
"context"
"fmt"
"time"

Expand All @@ -13,6 +14,19 @@ import (
// helpers aren't included in Godoc and keep each example more succinct.
//

type NoOpArgs struct{}

func (NoOpArgs) Kind() string { return "no_op" }

type NoOpWorker struct {
river.WorkerDefaults[NoOpArgs]
}

func (w *NoOpWorker) Work(ctx context.Context, job *river.Job[NoOpArgs]) error {
fmt.Printf("NoOpWorker.Work ran\n")
return nil
}
Comment on lines +17 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At one point (during the extraction of the internal/jobexecutor package) I had an internal package of testworker which might be a useful home for these if they can be leveraged elsewhere. Of course you can't use them from package river tests because of the circular dependency due to the river.Job argument and river.WorkerDefaults, so maybe limited on where that would be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. Yeah, I struggle a little knowing what the right thing to do for this type of thing is. A key is that the args/worker is dead simple, i.e. no callback functions or anything to complicate the example.

And even here, I'm not sure that common_test.go is the right home for it. It maybe should just be duplicated into each example's file so that a user reading it gets to see the entire code sample for easiest copy/pasta.


// Wait on the given subscription channel for numJobs. Times out with a panic if
// jobs take too long to be received.
func waitForNJobs(subscribeChan <-chan *river.Event, numJobs int) {
Expand Down
Loading
Loading