From 5cfc7288dd345907610730285a6b0df7db70a420 Mon Sep 17 00:00:00 2001 From: ngjaying Date: Tue, 30 Jul 2024 14:54:07 +0800 Subject: [PATCH] fix(rest): do not override content type (#3024) (#3055) Signed-off-by: Jiyong Huang --- internal/io/http/client.go | 3 +- internal/io/http/rest_sink.go | 17 +-- internal/io/http/rest_sink_test.go | 219 ++++++++++++++++++++++++++++- internal/pkg/httpx/http.go | 10 +- 4 files changed, 229 insertions(+), 20 deletions(-) diff --git a/internal/io/http/client.go b/internal/io/http/client.go index 498c38b114..90c37be792 100644 --- a/internal/io/http/client.go +++ b/internal/io/http/client.go @@ -74,7 +74,8 @@ type RawConf struct { Timeout cast.DurationConf `json:"timeout"` Incremental bool `json:"incremental"` - OAuth map[string]map[string]interface{} `json:"oauth"` + OAuth map[string]map[string]interface{} `json:"oauth"` + SendSingle bool `json:"sendSingle"` // Could be code or body ResponseType string `json:"responseType"` Compression string `json:"compression"` // Compression specifies the algorithms used to payload compression diff --git a/internal/io/http/rest_sink.go b/internal/io/http/rest_sink.go index f10df430f9..7e1e0d2485 100644 --- a/internal/io/http/rest_sink.go +++ b/internal/io/http/rest_sink.go @@ -44,28 +44,19 @@ func (r *RestSink) Connect(ctx api.StreamContext) error { } func (r *RestSink) Collect(ctx api.StreamContext, item api.MessageTuple) error { - return r.collect(ctx, item) + return r.collect(ctx, item, item.ToMap()) } func (r *RestSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error { - var err error - items.RangeOfTuples(func(_ int, tuple api.MessageTuple) bool { - err = r.collect(ctx, tuple) - if err != nil { - return false - } - return true - }) - return err + return r.collect(ctx, items, items.ToMaps()) } -func (r *RestSink) collect(ctx api.StreamContext, item api.MessageTuple) error { +func (r *RestSink) collect(ctx api.StreamContext, item any, data any) error { logger := ctx.GetLogger() headers := r.config.Headers bodyType := r.config.BodyType method := r.config.Method u := r.config.Url - data := item.ToMap() if dp, ok := item.(api.HasDynamicProps); ok { for k := range headers { nv, ok := dp.DynamicProps(k) @@ -86,7 +77,7 @@ func (r *RestSink) collect(ctx api.StreamContext, item api.MessageTuple) error { u = nu } } - resp, err := httpx.Send(ctx.GetLogger(), r.client, bodyType, method, u, headers, true, data) + resp, err := httpx.Send(ctx.GetLogger(), r.client, bodyType, method, u, headers, r.config.SendSingle, data) failpoint.Inject("recoverAbleErr", func() { err = errors.New("connection reset by peer") }) diff --git a/internal/io/http/rest_sink_test.go b/internal/io/http/rest_sink_test.go index 1191958f27..c06387cd83 100644 --- a/internal/io/http/rest_sink_test.go +++ b/internal/io/http/rest_sink_test.go @@ -16,9 +16,13 @@ package http import ( "fmt" + "io" + "net/http" + "net/http/httptest" "testing" "github.com/pingcap/failpoint" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/lf-edge/ekuiper/v2/internal/xsql" @@ -26,6 +30,215 @@ import ( mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" ) +type request struct { + Method string + Body string + ContentType string +} + +func TestRestSink_Apply(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + data []map[string]interface{} + result []request + }{ + { + name: "1", + config: map[string]interface{}{ + "method": "post", + //"url": "http://localhost/test", //set dynamically to the test server + "sendSingle": true, + }, + data: []map[string]interface{}{{ + "ab": "hello1", + }, { + "ab": "hello2", + }}, + result: []request{{ + Method: "POST", + Body: `{"ab":"hello1"}`, + ContentType: "application/json", + }, { + Method: "POST", + Body: `{"ab":"hello2"}`, + ContentType: "application/json", + }}, + }, { + name: "2", + config: map[string]interface{}{ + "method": "post", + //"url": "http://localhost/test", //set dynamically to the test server + "headers": map[string]any{ + "Content-Type": "application/vnd.microsoft.servicebus.json", + }, + }, + data: []map[string]interface{}{{ + "ab": "hello1", + }, { + "ab": "hello2", + }}, + result: []request{{ + Method: "POST", + Body: `[{"ab":"hello1"},{"ab":"hello2"}]`, + ContentType: "application/vnd.microsoft.servicebus.json", + }}, + }, { + name: "3", + config: map[string]interface{}{ + "method": "get", + //"url": "http://localhost/test", //set dynamically to the test server + }, + data: []map[string]interface{}{{ + "ab": "hello1", + }, { + "ab": "hello2", + }}, + result: []request{{ + Method: "GET", + ContentType: "", + }}, + }, { + name: "4", + config: map[string]interface{}{ + "method": "put", + //"url": "http://localhost/test", //set dynamically to the test server + "bodyType": "text", + }, + data: []map[string]interface{}{{ + "ab": "hello1", + }, { + "ab": "hello2", + }}, + result: []request{{ + Method: "PUT", + ContentType: "text/plain", + Body: `[{"ab":"hello1"},{"ab":"hello2"}]`, + }}, + }, { + name: "5", + config: map[string]interface{}{ + "method": "post", + //"url": "http://localhost/test", //set dynamically to the test server + "bodyType": "form", + }, + data: []map[string]interface{}{{ + "ab": "hello1", + }, { + "ab": "hello2", + }}, + result: []request{{ + Method: "POST", + ContentType: "application/x-www-form-urlencoded;param=value", + Body: `result=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`, + }}, + }, { + name: "6", + config: map[string]interface{}{ + "method": "post", + //"url": "http://localhost/test", //set dynamically to the test server + "bodyType": "form", + "sendSingle": true, + }, + data: []map[string]interface{}{{ + "ab": "hello1", + }, { + "ab": "hello2", + }}, + result: []request{{ + Method: "POST", + ContentType: "application/x-www-form-urlencoded;param=value", + Body: `ab=hello1`, + }, { + Method: "POST", + ContentType: "application/x-www-form-urlencoded;param=value", + Body: `ab=hello2`, + }}, + }, { + name: "7", + config: map[string]interface{}{ + "method": "post", + //"url": "http://localhost/test", //set dynamically to the test server + "bodyType": "json", + "sendSingle": true, + //"timeout": float64(1000), + }, + data: []map[string]interface{}{{ + "ab": "hello1", + }, { + "ab": "hello2", + }}, + result: []request{{ + Method: "POST", + Body: `{"ab":"hello1"}`, + ContentType: "application/json", + }, { + Method: "POST", + Body: `{"ab":"hello2"}`, + ContentType: "application/json", + }}, + }, + } + ctx := mockContext.NewMockContext("testApply", "op") + + var requests []request + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + fmt.Printf("Error reading body: %v", err) + http.Error(w, "can't read body", http.StatusBadRequest) + return + } + + requests = append(requests, request{ + Method: r.Method, + Body: string(body), + ContentType: r.Header.Get("Content-Type"), + }) + ctx.GetLogger().Debugf(string(body)) + fmt.Fprint(w, string(body)) + })) + defer ts.Close() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + requests = nil + ss, ok := tt.config["sendSingle"] + if !ok { + ss = false + } + s := &RestSink{} + tt.config["url"] = ts.URL + e := s.Provision(ctx, tt.config) + assert.NoError(t, e) + e = s.Connect(ctx) + assert.NoError(t, e) + if ss.(bool) { + for _, d := range tt.data { + e = s.Collect(ctx, &xsql.Tuple{ + Message: d, + }) + assert.NoError(t, e) + } + } else { + b := &xsql.WindowTuples{ + Content: make([]xsql.Row, 0, len(tt.data)), + } + for _, d := range tt.data { + b.Content = append(b.Content, &xsql.Tuple{ + Message: d, + }) + } + e = s.CollectList(ctx, b) + assert.NoError(t, e) + } + + err := s.Close(ctx) + assert.NoError(t, err) + assert.Equal(t, tt.result, requests) + }) + } +} + func TestRestSinkCollect(t *testing.T) { server := createServer() defer func() { @@ -44,7 +257,7 @@ func TestRestSinkCollect(t *testing.T) { }, } require.NoError(t, s.Connect(ctx)) - require.NoError(t, s.collect(ctx, data)) + require.NoError(t, s.collect(ctx, data, data.ToMap())) require.NoError(t, s.Close(ctx)) } @@ -65,7 +278,7 @@ func TestRestSinkRecoverErr(t *testing.T) { "method": "get", })) require.NoError(t, sErr.Connect(ctx)) - err := sErr.collect(ctx, data) + err := sErr.collect(ctx, data, data.ToMap()) require.Error(t, err) require.False(t, errorx.IsIOError(err)) s := &RestSink{} @@ -76,7 +289,7 @@ func TestRestSinkRecoverErr(t *testing.T) { failpoint.Enable("github.com/lf-edge/ekuiper/v2/internal/io/http/recoverAbleErr", "return(true)") defer failpoint.Disable("github.com/lf-edge/ekuiper/v2/internal/io/http/recoverAbleErr") require.NoError(t, s.Connect(ctx)) - err = s.collect(ctx, data) + err = s.collect(ctx, data, data.ToMap()) require.Error(t, err) require.True(t, errorx.IsIOError(err)) } diff --git a/internal/pkg/httpx/http.go b/internal/pkg/httpx/http.go index b2f458a121..531f855bb6 100644 --- a/internal/pkg/httpx/http.go +++ b/internal/pkg/httpx/http.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -63,7 +63,9 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string if err != nil { return nil, fmt.Errorf("fail to create request: %v", err) } - req.Header.Set("Content-Type", BodyTypeMap[bodyType]) + if req.Header.Get("Content-Type") == "" { + req.Header.Set("Content-Type", BodyTypeMap[bodyType]) + } case "form": form := url.Values{} im, err := convertToMap(v, sendSingle) @@ -89,7 +91,9 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string if err != nil { return nil, fmt.Errorf("fail to create request: %v", err) } - req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value") + if req.Header.Get("Content-Type") == "" { + req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value") + } default: return nil, fmt.Errorf("unsupported body type %s", bodyType) }