diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 28b0c1662415..b43e21ffaf73 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -563,13 +563,26 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call return callAuthData, nil } +// PerformedIOError wraps an error to indicate IO may have been performed +// before the error occurred. +type PerformedIOError struct { + Err error +} + +// Error implements error. +func (p PerformedIOError) Error() string { + return p.Err.Error() +} + // NewStream creates a stream and registers it into the transport as "active" // streams. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { ctx = peer.NewContext(ctx, t.getPeer()) headerFields, err := t.createHeaderFields(ctx, callHdr) if err != nil { - return nil, err + // We may have performed I/O in the per-RPC creds callback, so do not + // allow transparent retry. + return nil, PerformedIOError{err} } s := t.newStream(ctx, callHdr) cleanup := func(err error) { diff --git a/stream.go b/stream.go index 68d5c14506d8..629af76bdfab 100644 --- a/stream.go +++ b/stream.go @@ -364,6 +364,11 @@ func (a *csAttempt) newStream() error { cs.callHdr.PreviousAttempts = cs.numRetries s, err := a.t.NewStream(cs.ctx, cs.callHdr) if err != nil { + if _, ok := err.(transport.PerformedIOError); ok { + // Return without converting to an RPC error so retry code can + // inspect. + return err + } return toRPCErr(err) } cs.attempt.s = s @@ -459,6 +464,22 @@ func (cs *clientStream) commitAttempt() { // shouldRetry returns nil if the RPC should be retried; otherwise it returns // the error that should be returned by the operation. func (cs *clientStream) shouldRetry(err error) error { + unprocessed := false + if cs.attempt.s == nil { + pioErr, ok := err.(transport.PerformedIOError) + if ok { + // Unwrap error. + err = toRPCErr(pioErr.Err) + } else { + unprocessed = true + } + if !ok && !cs.callInfo.failFast { + // In the event of a non-IO operation error from NewStream, we + // never attempted to write anything to the wire, so we can retry + // indefinitely for non-fail-fast RPCs. + return nil + } + } if cs.finished || cs.committed { // RPC is finished or committed; cannot retry. return err @@ -466,10 +487,11 @@ func (cs *clientStream) shouldRetry(err error) error { // Wait for the trailers. if cs.attempt.s != nil { <-cs.attempt.s.Done() - if cs.firstAttempt && cs.attempt.s.Unprocessed() { - // First attempt, stream unprocessed: transparently retry. - return nil - } + unprocessed = cs.attempt.s.Unprocessed() + } + if cs.firstAttempt && unprocessed { + // First attempt, stream unprocessed: transparently retry. + return nil } if cs.cc.dopts.disableRetry { return err