Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Add `panictoerror` middleware that recovers panics and returns them as errors to middlewares up the stack. [PR #32](https://github.com/riverqueue/rivercontrib/pull/32).

### Changed

- More complete example test for `nilerror` package. [PR #27](https://github.com/riverqueue/rivercontrib/pull/27).
Expand Down
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ See:
* [`datadogriver`](../datadogriver): Package containing examples of using `otelriver` with [DataDog](https://www.datadoghq.com/).
* [`nilerror`](../nilerror): Package containing a River hook for detecting a common accidental Go problem where a nil struct value is wrapped in a non-nil interface value.
* [`otelriver`](../otelriver): Package for use with [OpenTelemetry](https://opentelemetry.io/).
* [`panictoerror`](../panictoerror): Provides a middleware that recovers panics that may have occurred deeper in the middleware stack (i.e. an inner middleware or the worker itself), converts those panics to errors, and returns those errors up the stack.
1 change: 1 addition & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ use (
./datadogriver
./otelriver
./nilerror
./panictoerror
)
29 changes: 29 additions & 0 deletions panictoerror/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# panictoerror [![Build Status](https://github.com/riverqueue/rivercontrib/actions/workflows/ci.yaml/badge.svg?branch=master)](https://github.com/riverqueue/rivercontrib/actions) [![Go Reference](https://pkg.go.dev/badge/github.com/riverqueue/rivercontrib.svg)](https://pkg.go.dev/github.com/riverqueue/rivercontrib/nilerror)

Provides a `rivertype.WorkerMiddleware` that recovers panics that may have occurred deeper in the middleware stack (i.e. an inner middleware or the worker itself), converts those panics to errors, and returns those errors up the stack. This may be convenient in some cases so that middleware further up the stack need only have one way to handle either return errors or panic values.

``` go
// A worker implementation which will always panic.
func (w *PanicErrorWorker) Work(ctx context.Context, job *river.Job[PanicErrorArgs]) error {
panic("this worker always panics!")
}

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Middleware: []rivertype.Middleware{
// This middleware further up the stack always receives an error instead
// of a panic because `panictoerror.Middleware` is nested below it.
river.WorkerMiddlewareFunc(func(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
if err := doInner(ctx); err != nil {
panicErr := err.(*panictoerror.PanicError)
fmt.Printf("error from doInner: %s", panicErr.Cause)
}
return nil
}),

// This middleware coverts the panic to an error.
panictoerror.NewMiddleware(nil),
},
}
```

Based [on work](https://github.com/riverqueue/river/issues/1073#issuecomment-3515520394) from [@jerbob92](https://github.com/jerbob92).
94 changes: 94 additions & 0 deletions panictoerror/example_middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package panictoerror_test

import (
"context"
"errors"
"fmt"
"log/slog"
"os"

"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"github.com/riverqueue/river/rivertype"
"github.com/riverqueue/rivercontrib/panictoerror"
)

type PanicErrorArgs struct{}

func (PanicErrorArgs) Kind() string { return "custom_error" }

type PanicErrorWorker struct {
river.WorkerDefaults[PanicErrorArgs]
}

func (w *PanicErrorWorker) Work(ctx context.Context, job *river.Job[PanicErrorArgs]) error {
panic("this worker always panics!")
}

func ExampleMiddleware() {
ctx := context.Background()

dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()

workers := river.NewWorkers()
river.AddWorker(workers, &PanicErrorWorker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
Middleware: []rivertype.Middleware{
// Layer a middleware above panictoerror.Middleware that takes a
// return error and prints it to stdout for the purpose of this test.
river.WorkerMiddlewareFunc(func(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
var panicErr *panictoerror.PanicError
if err := doInner(ctx); errors.As(err, &panicErr) {
fmt.Printf("error from doInner: %s", panicErr.Cause)
}
return nil
}),

// This middleware coverts the panic to an error.
panictoerror.NewMiddleware(nil),
},
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
})
if err != nil {
panic(err)
}

// Out of example scope, but used to wait until a job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()

if _, err = riverClient.Insert(ctx, PanicErrorArgs{}, nil); err != nil {
panic(err)
}

if err := riverClient.Start(ctx); err != nil {
panic(err)
}

// Wait for jobs to complete. Only needed for purposes of the example test.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

if err := riverClient.Stop(ctx); err != nil {
panic(err)
}

// Output:
// error from doInner: this worker always panics!
}
29 changes: 29 additions & 0 deletions panictoerror/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
module github.com/riverqueue/rivercontrib/panictoerror

go 1.24.2

require (
github.com/riverqueue/river v0.26.0
github.com/riverqueue/river/rivershared v0.26.0
github.com/riverqueue/river/rivertype v0.26.0
github.com/stretchr/testify v1.11.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.7.6 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/riverqueue/river/riverdriver v0.26.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/text v0.29.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
62 changes: 62 additions & 0 deletions panictoerror/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river v0.26.0 h1:Lykh7L6iDBNxku3NXrnL5RXUGk7FgEnk5CdN/ak3lko=
github.com/riverqueue/river v0.26.0/go.mod h1:w8+9lbnPQe/vlmBsIG7T1TObTm94Rvx63ZLUZHPmcR8=
github.com/riverqueue/river/riverdriver v0.26.0 h1:hMW/OOEjAkyvkTIzTf/zqZChThJCQQO0Mi2aMvgcFzg=
github.com/riverqueue/river/riverdriver v0.26.0/go.mod h1:qRLS0bFTrwmCevlpaMje5jhQK6aCDMJ9i8hRFbXAgTo=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.26.0 h1:M5t0t9wZJwOIO0f6Gsbn5LmNLUQlk9K1gL0DhkZvd6k=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.26.0/go.mod h1:+fkIOQtVOaUaDyJyVFK3R3bA1sg6DqGEQ0F9D47sG48=
github.com/riverqueue/river/rivershared v0.26.0 h1:tsMvxTIdG58GoYXd3788DwjNq87Y7CcfRlV7TAzeuhw=
github.com/riverqueue/river/rivershared v0.26.0/go.mod h1:/BEdbdGEqfcFP9FtChwK81e2AWF8e82RC6z5mwQ3y1g=
github.com/riverqueue/river/rivertype v0.26.0 h1:C3GdCMH8khTUUKH+OkTSQv1kdsSAXWL8n7M7Rq2r4yE=
github.com/riverqueue/river/rivertype v0.26.0/go.mod h1:rWpgI59doOWS6zlVocROcwc00fZ1RbzRwsRTU8CDguw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
124 changes: 124 additions & 0 deletions panictoerror/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Package panictoerror provides a rivertype.WorkerMiddleware that recovers
// panics that may have occurred deeper in the middleware stack (i.e. an inner
// middleware or the worker itself), converts those panics to errors, and
// returns those errors up the stack. This may be convenient in some cases so
// that middleware further up the stack need only have one way to handle either
// return errors or panic values.
package panictoerror

import (
"context"
"fmt"
"runtime"
"strings"

"github.com/riverqueue/river"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivertype"
)

// Verify interface compliance.
var _ rivertype.WorkerMiddleware = &Middleware{}

// PanicError is a panic that's been converted to an error.
type PanicError struct {
// Cause is the value recovered with `recover()`.
Cause any

// Trace up to the top 100 stack frames when the panic occurred. The
// middleware attempts to remove internal frames on top so that user code is
// the first stack frame.
Trace []*runtime.Frame
}

func (e *PanicError) Error() string {
var sb strings.Builder
for _, frame := range e.Trace {
sb.WriteString(fmt.Sprintf("%s\n\t%s:%d\n", frame.Function, frame.File, frame.Line))
}

return fmt.Sprintf("PanicError: %v\n%s", e.Cause, sb.String())
}

func (e *PanicError) Is(target error) bool {
_, ok := target.(*PanicError)
return ok
}

// MiddlewareConfig is configuration for the panictoerror middleware.
//
// Currently empty, but reserved for future use.
type MiddlewareConfig struct{}

// Middleware is a rivertype.WorkerMiddleware that recovers panics that may have
// occurred deeper in the middleware stack (i.e. an inner middleware or the
// worker itself), converts those panics to errors, and returns those errors up
// the stack.
type Middleware struct {
baseservice.BaseService
river.MiddlewareDefaults

config *MiddlewareConfig
}

// NewMiddleware initializes a new River panictoerror middleware.
//
// config may be nil.
func NewMiddleware(config *MiddlewareConfig) *Middleware {
if config == nil {
config = &MiddlewareConfig{}
}

return &Middleware{
config: config,
}
}

func (s *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) (err error) {
defer func() {
if recovery := recover(); recovery != nil {
err = &PanicError{
Cause: recovery,

// Skip (1) Callers, (2) captureStackTraceSkipFrames, (3) Work (this function), and (4) panic.go.
//
// runtime.Callers
// /opt/homebrew/Cellar/go/1.25.0/libexec/src/runtime/extern.go:345
// github.com/riverqueue/rivercontrib/panictoerror.captureStackTraceSkipFrames
// /Users/brandur/Documents/projects/rivercontrib/panictoerror/middleware.go:77
// github.com/riverqueue/rivercontrib/panictoerror.(*Middleware).Work.func1
// /Users/brandur/Documents/projects/rivercontrib/panictoerror/middleware.go:58
// runtime.gopanic
// /opt/homebrew/Cellar/go/1.25.0/libexec/src/runtime/panic.go:783
Trace: captureStackFrames(4),
}
}
}()

err = doInner(ctx)
return err
}

// captureStackFrames captures the current stack trace, skipping the top
// numSkipped frames.
func captureStackFrames(numSkipped int) []*runtime.Frame {
var (
// Allocate room for up to 100 callers; adjust as needed.
callers = make([]uintptr, 100)

// Skip the specified number of frames.
numFrames = runtime.Callers(numSkipped, callers)

frames = runtime.CallersFrames(callers[:numFrames])
)

trace := make([]*runtime.Frame, 0, numFrames)
for {
frame, more := frames.Next()
trace = append(trace, &frame)
if !more {
break
}
}
return trace
}
Loading
Loading