Skip to content

Commit

Permalink
retry: re-enable retrying on non-IO transport errors (#3691)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Jun 16, 2020
1 parent 6f5ecbe commit 3b63c2b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
15 changes: 14 additions & 1 deletion internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,13 +563,26 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
return callAuthData, nil
}

// PerformedIOError wraps an error to indicate IO may have been performed
// before the error occurred.
type PerformedIOError struct {
Err error
}

// Error implements error.
func (p PerformedIOError) Error() string {
return p.Err.Error()
}

// NewStream creates a stream and registers it into the transport as "active"
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
ctx = peer.NewContext(ctx, t.getPeer())
headerFields, err := t.createHeaderFields(ctx, callHdr)
if err != nil {
return nil, err
// We may have performed I/O in the per-RPC creds callback, so do not
// allow transparent retry.
return nil, PerformedIOError{err}
}
s := t.newStream(ctx, callHdr)
cleanup := func(err error) {
Expand Down
30 changes: 26 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ func (a *csAttempt) newStream() error {
cs.callHdr.PreviousAttempts = cs.numRetries
s, err := a.t.NewStream(cs.ctx, cs.callHdr)
if err != nil {
if _, ok := err.(transport.PerformedIOError); ok {
// Return without converting to an RPC error so retry code can
// inspect.
return err
}
return toRPCErr(err)
}
cs.attempt.s = s
Expand Down Expand Up @@ -459,17 +464,34 @@ func (cs *clientStream) commitAttempt() {
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation.
func (cs *clientStream) shouldRetry(err error) error {
unprocessed := false
if cs.attempt.s == nil {
pioErr, ok := err.(transport.PerformedIOError)
if ok {
// Unwrap error.
err = toRPCErr(pioErr.Err)
} else {
unprocessed = true
}
if !ok && !cs.callInfo.failFast {
// In the event of a non-IO operation error from NewStream, we
// never attempted to write anything to the wire, so we can retry
// indefinitely for non-fail-fast RPCs.
return nil
}
}
if cs.finished || cs.committed {
// RPC is finished or committed; cannot retry.
return err
}
// Wait for the trailers.
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
if cs.firstAttempt && cs.attempt.s.Unprocessed() {
// First attempt, stream unprocessed: transparently retry.
return nil
}
unprocessed = cs.attempt.s.Unprocessed()
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
return nil
}
if cs.cc.dopts.disableRetry {
return err
Expand Down

0 comments on commit 3b63c2b

Please sign in to comment.