Skip to content

Commit

Permalink
Use go-retryablehttp
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Jul 16, 2020
1 parent db0c833 commit 728436c
Show file tree
Hide file tree
Showing 40 changed files with 3,611 additions and 307 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/google/gofuzz v1.1.0
github.com/google/mako v0.0.0-20190821191249-122f8dcef9e3
github.com/google/uuid v1.1.1
github.com/hashicorp/go-retryablehttp v0.6.6
github.com/influxdata/tdigest v0.0.0-20191024211133-5d87a7585faa // indirect
github.com/json-iterator/go v1.1.9 // indirect
github.com/kelseyhightower/envconfig v1.4.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -624,14 +624,17 @@ github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBt
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-retryablehttp v0.6.4/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-retryablehttp v0.6.6 h1:HJunrbHTDDbBb/ay4kxa1n+dLmttUlnP3V9oNE4hmsM=
github.com/hashicorp/go-retryablehttp v0.6.6/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
Expand Down
63 changes: 43 additions & 20 deletions pkg/channel/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import (
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/apis"

duckv1 "knative.dev/pkg/apis/duck/v1"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
Expand All @@ -44,14 +48,9 @@ type MessageDispatcher interface {
// MessageDispatcherImpl is the 'real' MessageDispatcher used everywhere except unit tests.
var _ MessageDispatcher = &MessageDispatcherImpl{}

type MessageSender interface {
NewCloudEventRequestWithTarget(ctx context.Context, target string) (*nethttp.Request, error)
Send(req *nethttp.Request) (*nethttp.Response, error)
}

// MessageDispatcherImpl dispatches events to a destination over HTTP.
type MessageDispatcherImpl struct {
sender MessageSender
sender *kncloudevents.HttpMessageSender
supportedSchemes sets.String

logger *zap.Logger
Expand All @@ -73,15 +72,21 @@ func NewMessageDispatcherFromConfig(logger *zap.Logger, config EventDispatcherCo
}

// NewMessageDispatcherFromConfig creates a new event dispatcher.
func NewMessageDispatcherFromSender(logger *zap.Logger, sender MessageSender) *MessageDispatcherImpl {
func NewMessageDispatcherFromSender(logger *zap.Logger, sender *kncloudevents.HttpMessageSender) *MessageDispatcherImpl {
return &MessageDispatcherImpl{
sender: sender,
supportedSchemes: sets.NewString("http", "https"),
logger: logger,
}
}

func (d *MessageDispatcherImpl) DispatchMessage(ctx context.Context, initialMessage cloudevents.Message, initialAdditionalHeaders nethttp.Header, destination *url.URL, reply *url.URL, deadLetter *url.URL) error {
func (d *MessageDispatcherImpl) Dispatch(ctx context.Context, message cloudevents.Message, additionalHeaders nethttp.Header, destination *url.URL, reply *url.URL, deadLetter *eventingduckv1.DeliverySpec) error {

var deadLetterURL *url.URL
if deadLetter != nil && deadLetter.DeadLetterSink != nil && deadLetter.DeadLetterSink.URI != nil {
deadLetterURL = deadLetter.DeadLetterSink.URI.URL()
}

// All messages that should be finished at the end of this function
// are placed in this slice
var messagesToFinish []binding.Message
Expand All @@ -94,7 +99,7 @@ func (d *MessageDispatcherImpl) DispatchMessage(ctx context.Context, initialMess
// sanitize eventual host-only URLs
destination = d.sanitizeURL(destination)
reply = d.sanitizeURL(reply)
deadLetter = d.sanitizeURL(deadLetter)
deadLetterURL = d.sanitizeURL(deadLetterURL)

// If there is a destination, variables response* are filled with the response of the destination
// Otherwise, they are filled with the original message
Expand All @@ -104,15 +109,20 @@ func (d *MessageDispatcherImpl) DispatchMessage(ctx context.Context, initialMess
if destination != nil {
var err error
// Try to send to destination
messagesToFinish = append(messagesToFinish, initialMessage)
messagesToFinish = append(messagesToFinish, message)

ctx, responseMessage, responseAdditionalHeaders, err = d.executeRequest(ctx, destination, initialMessage, initialAdditionalHeaders)
ctx, responseMessage, responseAdditionalHeaders, err = d.executeRequest(ctx, destination, message, additionalHeaders)
if err != nil {
// DeadLetter is configured, send the message to it
if deadLetter != nil {
_, deadLetterResponse, _, deadLetterErr := d.executeRequest(ctx, deadLetter, initialMessage, initialAdditionalHeaders)
if deadLetter != nil && deadLetterURL != nil {
retryConfig, err := kncloudevents.RetryConfigFromDeliverySpec(*deadLetter)
if err != nil {
return fmt.Errorf("failed to create retry config from delivery spec: %w", err)
}

_, deadLetterResponse, _, deadLetterErr := d.executeRequestWithRetries(ctx, deadLetterURL, message, additionalHeaders, retryConfig)
if deadLetterErr != nil {
return fmt.Errorf("unable to complete request to either %s (%v) or %s (%v)", destination, err, deadLetter, deadLetterErr)
return fmt.Errorf("unable to complete request to either %s (%v) or %s (%v)", destination, err, deadLetterURL, deadLetterErr)
}
if deadLetterResponse != nil {
messagesToFinish = append(messagesToFinish, deadLetterResponse)
Expand All @@ -125,8 +135,8 @@ func (d *MessageDispatcherImpl) DispatchMessage(ctx context.Context, initialMess
}
} else {
// No destination url, try to send to reply if available
responseMessage = initialMessage
responseAdditionalHeaders = initialAdditionalHeaders
responseMessage = message
responseAdditionalHeaders = additionalHeaders
}

// No response, dispatch completed
Expand All @@ -144,10 +154,10 @@ func (d *MessageDispatcherImpl) DispatchMessage(ctx context.Context, initialMess
ctx, responseResponseMessage, _, err := d.executeRequest(ctx, reply, responseMessage, responseAdditionalHeaders)
if err != nil {
// DeadLetter is configured, send the message to it
if deadLetter != nil {
_, deadLetterResponse, _, deadLetterErr := d.executeRequest(ctx, deadLetter, initialMessage, responseAdditionalHeaders)
if deadLetterURL != nil {
_, deadLetterResponse, _, deadLetterErr := d.executeRequest(ctx, deadLetterURL, message, responseAdditionalHeaders)
if deadLetterErr != nil {
return fmt.Errorf("failed to forward reply to %s (%v) and failed to send it to the dead letter sink %s (%v)", reply, err, deadLetter, deadLetterErr)
return fmt.Errorf("failed to forward reply to %s (%v) and failed to send it to the dead letter sink %s (%v)", reply, err, deadLetterURL, deadLetterErr)
}
if deadLetterResponse != nil {
messagesToFinish = append(messagesToFinish, deadLetterResponse)
Expand All @@ -163,9 +173,22 @@ func (d *MessageDispatcherImpl) DispatchMessage(ctx context.Context, initialMess
}

return nil

}

func (d *MessageDispatcherImpl) DispatchMessage(ctx context.Context, initialMessage cloudevents.Message, initialAdditionalHeaders nethttp.Header, destination *url.URL, reply *url.URL, deadLetter *url.URL) error {
return d.Dispatch(ctx, initialMessage, initialAdditionalHeaders, destination, reply, &eventingduckv1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: (*apis.URL)(deadLetter),
},
})
}

func (d *MessageDispatcherImpl) executeRequest(ctx context.Context, url *url.URL, message cloudevents.Message, additionalHeaders nethttp.Header) (context.Context, cloudevents.Message, nethttp.Header, error) {
return d.executeRequestWithRetries(ctx, url, message, additionalHeaders, kncloudevents.NoRetries())
}

func (d *MessageDispatcherImpl) executeRequestWithRetries(ctx context.Context, url *url.URL, message cloudevents.Message, additionalHeaders nethttp.Header, configs kncloudevents.RetryConfig) (context.Context, cloudevents.Message, nethttp.Header, error) {
d.logger.Debug("Dispatching event", zap.String("url", url.String()))

ctx, span := trace.StartSpan(ctx, "knative.dev", trace.WithSpanKind(trace.SpanKindClient))
Expand All @@ -185,7 +208,7 @@ func (d *MessageDispatcherImpl) executeRequest(ctx context.Context, url *url.URL
return ctx, nil, nil, err
}

response, err := d.sender.Send(req)
response, err := d.sender.SendWithRetries(req, configs)
if err != nil {
return ctx, nil, nil, err
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/channel/message_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ import (
"strings"
"testing"

"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/eventing/pkg/kncloudevents"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/transformer"
Expand Down Expand Up @@ -656,9 +653,7 @@ func TestDispatchMessage(t *testing.T) {

ctx := context.Background()

messageSender, _ := kncloudevents.NewHttpMessageSender(&defaultEventDispatcherConfig.ConnectionArgs, "")
sender := kncloudevents.NewHttpMessageSenderWithRetries(messageSender, wait.Backoff{Duration: 10, Steps: 3})
md := NewMessageDispatcherFromSender(zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), sender)
md := NewMessageDispatcher(zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())))

destination := getOnlyDomainURL(t, tc.sendToDestination, destServer.URL)
reply := getOnlyDomainURL(t, tc.sendToReply, replyServer.URL)
Expand Down
83 changes: 0 additions & 83 deletions pkg/kncloudevents/message_client_test.go

This file was deleted.

33 changes: 0 additions & 33 deletions pkg/kncloudevents/message_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
nethttp "net/http"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"go.opencensus.io/plugin/ochttp"
"knative.dev/pkg/tracing/propagation/tracecontextb3"
)
Expand All @@ -32,37 +30,6 @@ const (
DefaultShutdownTimeout = time.Minute * 1
)

type MessageSender interface {
Send(req *nethttp.Request) (*nethttp.Response, error)
NewCloudEventRequestWithTarget(ctx context.Context, target string) (*nethttp.Request, error)
}

type HttpMessageSenderWithRetries struct {
sender MessageSender
backoff wait.Backoff
}

func (s *HttpMessageSenderWithRetries) NewCloudEventRequestWithTarget(ctx context.Context, target string) (*nethttp.Request, error) {
return s.sender.NewCloudEventRequestWithTarget(ctx, target)
}

func NewHttpMessageSenderWithRetries(sender MessageSender, backoff wait.Backoff) *HttpMessageSenderWithRetries {
return &HttpMessageSenderWithRetries{sender: sender, backoff: backoff}
}

func (s *HttpMessageSenderWithRetries) Send(req *nethttp.Request) (*nethttp.Response, error) {
var response *nethttp.Response
err := wait.ExponentialBackoff(s.backoff, func() (ok bool, err error) {
response, err = s.sender.Send(req)
if err != nil {
// retry
return false, nil
}
return true, err
})
return response, err
}

type HttpMessageReceiver struct {
port int

Expand Down
Loading

0 comments on commit 728436c

Please sign in to comment.