Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context functions to retry and recover #172

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions recovery/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ import (
// RecoveryHandlerFunc is a function that recovers from the panic `p` by returning an `error`.
type RecoveryHandlerFunc func(p interface{}) (err error)

// RecoveryHandlerFuncContext is a function that recovers from the panic `p` by returning an `error`.
johanbrandhorst marked this conversation as resolved.
Show resolved Hide resolved
// The context can be used to extract request scoped metadata and context values.
type RecoveryHandlerFuncContext func(ctx context.Context, p interface{}) (err error)

// UnaryServerInterceptor returns a new unary server interceptor for panic recovery.
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
o := evaluateOptions(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, err error) {
defer func() {
if r := recover(); r != nil {
err = recoverFrom(r, o.recoveryHandlerFunc)
err = recoverFrom(ctx, r, o.recoveryHandlerFunc)
}
}()

Expand All @@ -32,17 +36,17 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
defer func() {
if r := recover(); r != nil {
err = recoverFrom(r, o.recoveryHandlerFunc)
err = recoverFrom(stream.Context(), r, o.recoveryHandlerFunc)
}
}()

return handler(srv, stream)
}
}

func recoverFrom(p interface{}, r RecoveryHandlerFunc) error {
func recoverFrom(ctx context.Context, p interface{}, r RecoveryHandlerFuncContext) error {
if r == nil {
return grpc.Errorf(codes.Internal, "%s", p)
}
return r(p)
return r(ctx, p)
}
13 changes: 12 additions & 1 deletion recovery/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@

package grpc_recovery

import "golang.org/x/net/context"
johanbrandhorst marked this conversation as resolved.
Show resolved Hide resolved

var (
defaultOptions = &options{
recoveryHandlerFunc: nil,
}
)

type options struct {
recoveryHandlerFunc RecoveryHandlerFunc
recoveryHandlerFunc RecoveryHandlerFuncContext
}

func evaluateOptions(opts []Option) *options {
Expand All @@ -26,6 +28,15 @@ type Option func(*options)

// WithRecoveryHandler customizes the function for recovering from a panic.
func WithRecoveryHandler(f RecoveryHandlerFunc) Option {
return func(o *options) {
o.recoveryHandlerFunc = RecoveryHandlerFuncContext(func(ctx context.Context, p interface{}) error {
return f(p)
})
}
}

// WithRecoveryHandlerContext customizes the function for recovering from a panic.
func WithRecoveryHandlerContext(f RecoveryHandlerFuncContext) Option {
return func(o *options) {
o.recoveryHandlerFunc = f
}
Expand Down
26 changes: 23 additions & 3 deletions retry/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package grpc_retry
import (
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
Expand All @@ -22,7 +23,9 @@ var (
perCallTimeout: 0, // disabled
includeHeader: true,
codes: DefaultRetriableCodes,
backoffFunc: BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt)
}),
}
)

Expand All @@ -34,6 +37,14 @@ var (
// with the next iteration.
type BackoffFunc func(attempt uint) time.Duration

// BackoffFuncContext denotes a family of functions that control the backoff duration between call retries.
//
// They are called with an identifier of the attempt, and should return a time the system client should
// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
johanbrandhorst marked this conversation as resolved.
Show resolved Hide resolved
// the deadline of the request takes precedence and the wait will be interrupted before proceeding
// with the next iteration. The context can be used to extract request scoped metadata and context values.
type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration

// Disable disables the retry behaviour on this call, or this interceptor.
//
// Its semantically the same to `WithMax`
Expand All @@ -48,8 +59,17 @@ func WithMax(maxRetries uint) CallOption {
}}
}

// WithBackoff sets the `BackoffFunc `used to control time between retries.
// WithBackoff sets the `BackoffFunc` used to control time between retries.
func WithBackoff(bf BackoffFunc) CallOption {
return CallOption{applyFunc: func(o *options) {
o.backoffFunc = BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
return bf(attempt)
})
}}
}

// WithBackoffContext sets the `BackoffFuncContext` used to control time between retries.
func WithBackoffContext(bf BackoffFuncContext) CallOption {
return CallOption{applyFunc: func(o *options) {
o.backoffFunc = bf
}}
Expand Down Expand Up @@ -87,7 +107,7 @@ type options struct {
perCallTimeout time.Duration
includeHeader bool
codes []codes.Code
backoffFunc BackoffFunc
backoffFunc BackoffFuncContext
}

// CallOption is a grpc.CallOption that is local to grpc_retry.
Expand Down
2 changes: 1 addition & 1 deletion retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx
func waitRetryBackoff(attempt uint, parentCtx context.Context, callOpts *options) error {
var waitTime time.Duration = 0
if attempt > 0 {
waitTime = callOpts.backoffFunc(attempt)
waitTime = callOpts.backoffFunc(parentCtx, attempt)
}
if waitTime > 0 {
logTrace(parentCtx, "grpc_retry attempt: %d, backoff for %v", attempt, waitTime)
Expand Down