Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

req.Body not reset in doWithRetry #773

Closed
embano1 opened this issue Jun 3, 2022 · 3 comments · Fixed by #774
Closed

req.Body not reset in doWithRetry #773

embano1 opened this issue Jun 3, 2022 · 3 comments · Fixed by #774

Comments

@embano1
Copy link
Member

embano1 commented Jun 3, 2022

Scenario:

When using a net/http HTTP server as remote/receiver (instead of the receiver from this package), ceClient has issues when retrying a non-successful HTTP response.

Error: "msg":"Invalid result type, not HTTP Result: Post \"http://always-fail.default.127.0.0.1.sslip.io\": http: ContentLength=34 with Body length 0 (type: *protocol.Receipt)"}

This happens when the remote server returns a message in the response, e.g. „server down“ (not CE) but not when just returning a status code and empty response body.

The issue is caused by func (p *Protocol) doWithRetry() because it does not reset the req.Body during subsequent attempts.

Expected behavior:

Return encoding error, but correctly reset req.Body, e.g. return "failed to convert response into event: unknown Message encoding\n500: (3x)"

Client code:

/*
 Copyright 2021 The CloudEvents Authors
 SPDX-License-Identifier: Apache-2.0
*/

package main

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
	"go.uber.org/zap"
	"knative.dev/eventing/pkg/kncloudevents"
	"knative.dev/pkg/logging"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/cloudevents/sdk-go/v2/protocol"
	cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

func main() {
	ctx := cloudevents.ContextWithTarget(context.Background(), "http://always-fail.default.127.0.0.1.sslip.io")

	topts := []cehttp.Option{cehttp.WithIsRetriableFunc(func(statusCode int) bool {
		retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: statusCode}, nil)
		return retry
	})}

	ceClient, err := client.NewClientHTTP(topts, nil)
	if err != nil {
		panic(err)
	}

	e := cloudevents.NewEvent()
	e.SetType("com.cloudevents.sample.sent")
	e.SetSource("https://github.com/cloudevents/sdk-go/v2/samples/httpb/sender")
	_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
		"id":      1,
		"message": "Hello, World!",
	})

	ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, 10*time.Millisecond, 3)
	response, result := ceClient.Request(ctx, e)

	if !isSuccess(ctx, result) {
		logging.FromContext(ctx).Errorw("Failed to deliver", zap.Error(result))
		return
	}

	fmt.Printf("got: %v", response)
}

func isSuccess(ctx context.Context, result protocol.Result) bool {
	var retriesResult *cehttp.RetriesResult
	if cloudevents.ResultAs(result, &retriesResult) {
		var httpResult *cehttp.Result
		if cloudevents.ResultAs(retriesResult.Result, &httpResult) {
			retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: httpResult.StatusCode}, nil)
			return !retry
		}
		logging.FromContext(ctx).Warnf("Invalid result type, not HTTP Result: %v (type: %T)", retriesResult.Result, retriesResult.Result)
		return false
	}

	logging.FromContext(ctx).Warnf("Invalid result type, not RetriesResult")
	return false
}

HTTP server code (note: just using net/http)

package main

import (
	"context"
	"errors"
	"net/http"
	"time"

	"github.com/go-chi/chi/v5"
	"github.com/go-chi/chi/v5/middleware"
	"knative.dev/pkg/logging"
	"knative.dev/pkg/signals"
)

const (
	addr    = ":8080"
	timeout = 5 * time.Second
)

func main() {
	ctx := signals.NewContext()

	r := chi.NewRouter()
	r.Use(middleware.Logger)
	r.Post("/", func(w http.ResponseWriter, r *http.Request) {
		// w.WriteHeader(http.StatusInternalServerError) <- just doing this works!
		http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
	})

	srv := http.Server{
		Addr:         addr,
		Handler:      r,
		ReadTimeout:  timeout,
		WriteTimeout: timeout,
	}

	go func() {
		<-ctx.Done()
		logging.FromContext(ctx).Info("shutting down")
		if err := srv.Shutdown(context.Background()); err != nil {
			logging.FromContext(ctx).Warnf("could not shutdown: %v", err)
		}
	}()

	logging.FromContext(ctx).Infow("listening", "address", addr)
	if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
		logging.FromContext(ctx).Fatal(err)
	}
}

Patch

diff --git a/v2/protocol/http/protocol_retry.go b/v2/protocol/http/protocol_retry.go
index fb7bcd2..9050469 100644
--- a/v2/protocol/http/protocol_retry.go
+++ b/v2/protocol/http/protocol_retry.go
@@ -6,8 +6,10 @@
 package http
 
 import (
+	"bytes"
 	"context"
 	"errors"
+	"io"
 	"net/http"
 	"net/url"
 	"time"
@@ -53,6 +55,17 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam
 	retry := 0
 	results := make([]protocol.Result, 0)
 
+	var (
+		body []byte
+		err  error
+	)
+	if req.Body != nil {
+		body, err = io.ReadAll(req.Body)
+		if err != nil {
+			panic(err)
+		}
+	}
+
 	for {
 		msg, result := p.doOnce(req)
 
@@ -90,6 +103,13 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam
 		}
 
 	DoBackoff:
+		if req.Body != nil {
+			req.Body = io.NopCloser(bytes.NewReader(body))
+			req.GetBody = func() (io.ReadCloser, error) {
+				return io.NopCloser(bytes.NewReader(body)), nil
+			}
+		}
+
 		// Wait for the correct amount of backoff time.
 
 		// total tries = retry + 1
@n3wscott
Copy link
Member

n3wscott commented Jun 7, 2022

nice! Want to make a PR?

@embano1
Copy link
Member Author

embano1 commented Jun 7, 2022

nice! Want to make a PR?

Sure, I just need to dig a little deeper as there might be another anomaly in the way requests are handled. Not sure I get the full picture yet.

@embano1
Copy link
Member Author

embano1 commented Jun 8, 2022

Update:

I can reproduce the issue with net/http the following handlers:

func(w http.ResponseWriter, r *http.Request) {
		http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
}
func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusInternalServerError)
}

The second handler does not always show the http: ContentLength=34 with Body length 0 *url.Error, but I managed to catch it several times with a debugger.

I was also able to catch it once using the cloudevents receiver and a simple event handler:

func receive(ctx context.Context, event cloudevents.Event) error {
	fmt.Printf("got event: %v\n", event)
	return http.NewResult(nethttp.StatusInternalServerError, nethttp.StatusText(nethttp.StatusInternalServerError))
}

Still not sure where and how it gets triggered in all these (rare) scenarios, but perhaps there's a race which only shows during debugging - typical Heisenbug :D

Also, I was wondering why the unit tests were not exhibiting the problem: this is caused by only using a RoundTripper mock instead of a "full" HTTP server which reads/modifies the body (also the tests didn't use a body which was another reason we did not see this).

embano1 pushed a commit to embano1/sdk-go that referenced this issue Jun 9, 2022
Closes: cloudevents#773
Signed-off-by: Michael Gasch <mgasch@vmware.com>
@embano1 embano1 mentioned this issue Jun 9, 2022
3 tasks
embano1 pushed a commit to embano1/sdk-go that referenced this issue Jun 9, 2022
Closes: cloudevents#773
Signed-off-by: Michael Gasch <mgasch@vmware.com>
n3wscott pushed a commit that referenced this issue Jun 10, 2022
Closes: #773
Signed-off-by: Michael Gasch <mgasch@vmware.com>
n3wscott added a commit that referenced this issue Jun 10, 2022
Closes: #773
Signed-off-by: Michael Gasch <mgasch@vmware.com>

Co-authored-by: Michael Gasch <mgasch@vmware.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants