Skip to content

Commit

Permalink
fix(rest): do not override content type (#3024) (#3055)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying authored Jul 30, 2024
1 parent 5b1a37a commit 5cfc728
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 20 deletions.
3 changes: 2 additions & 1 deletion internal/io/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ type RawConf struct {
Timeout cast.DurationConf `json:"timeout"`
Incremental bool `json:"incremental"`

OAuth map[string]map[string]interface{} `json:"oauth"`
OAuth map[string]map[string]interface{} `json:"oauth"`
SendSingle bool `json:"sendSingle"`
// Could be code or body
ResponseType string `json:"responseType"`
Compression string `json:"compression"` // Compression specifies the algorithms used to payload compression
Expand Down
17 changes: 4 additions & 13 deletions internal/io/http/rest_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,19 @@ func (r *RestSink) Connect(ctx api.StreamContext) error {
}

func (r *RestSink) Collect(ctx api.StreamContext, item api.MessageTuple) error {
return r.collect(ctx, item)
return r.collect(ctx, item, item.ToMap())
}

func (r *RestSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error {
var err error
items.RangeOfTuples(func(_ int, tuple api.MessageTuple) bool {
err = r.collect(ctx, tuple)
if err != nil {
return false
}
return true
})
return err
return r.collect(ctx, items, items.ToMaps())
}

func (r *RestSink) collect(ctx api.StreamContext, item api.MessageTuple) error {
func (r *RestSink) collect(ctx api.StreamContext, item any, data any) error {
logger := ctx.GetLogger()
headers := r.config.Headers
bodyType := r.config.BodyType
method := r.config.Method
u := r.config.Url
data := item.ToMap()
if dp, ok := item.(api.HasDynamicProps); ok {
for k := range headers {
nv, ok := dp.DynamicProps(k)
Expand All @@ -86,7 +77,7 @@ func (r *RestSink) collect(ctx api.StreamContext, item api.MessageTuple) error {
u = nu
}
}
resp, err := httpx.Send(ctx.GetLogger(), r.client, bodyType, method, u, headers, true, data)
resp, err := httpx.Send(ctx.GetLogger(), r.client, bodyType, method, u, headers, r.config.SendSingle, data)
failpoint.Inject("recoverAbleErr", func() {
err = errors.New("connection reset by peer")
})
Expand Down
219 changes: 216 additions & 3 deletions internal/io/http/rest_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,229 @@ package http

import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)

type request struct {
Method string
Body string
ContentType string
}

func TestRestSink_Apply(t *testing.T) {
tests := []struct {
name string
config map[string]interface{}
data []map[string]interface{}
result []request
}{
{
name: "1",
config: map[string]interface{}{
"method": "post",
//"url": "http://localhost/test", //set dynamically to the test server
"sendSingle": true,
},
data: []map[string]interface{}{{
"ab": "hello1",
}, {
"ab": "hello2",
}},
result: []request{{
Method: "POST",
Body: `{"ab":"hello1"}`,
ContentType: "application/json",
}, {
Method: "POST",
Body: `{"ab":"hello2"}`,
ContentType: "application/json",
}},
}, {
name: "2",
config: map[string]interface{}{
"method": "post",
//"url": "http://localhost/test", //set dynamically to the test server
"headers": map[string]any{
"Content-Type": "application/vnd.microsoft.servicebus.json",
},
},
data: []map[string]interface{}{{
"ab": "hello1",
}, {
"ab": "hello2",
}},
result: []request{{
Method: "POST",
Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
ContentType: "application/vnd.microsoft.servicebus.json",
}},
}, {
name: "3",
config: map[string]interface{}{
"method": "get",
//"url": "http://localhost/test", //set dynamically to the test server
},
data: []map[string]interface{}{{
"ab": "hello1",
}, {
"ab": "hello2",
}},
result: []request{{
Method: "GET",
ContentType: "",
}},
}, {
name: "4",
config: map[string]interface{}{
"method": "put",
//"url": "http://localhost/test", //set dynamically to the test server
"bodyType": "text",
},
data: []map[string]interface{}{{
"ab": "hello1",
}, {
"ab": "hello2",
}},
result: []request{{
Method: "PUT",
ContentType: "text/plain",
Body: `[{"ab":"hello1"},{"ab":"hello2"}]`,
}},
}, {
name: "5",
config: map[string]interface{}{
"method": "post",
//"url": "http://localhost/test", //set dynamically to the test server
"bodyType": "form",
},
data: []map[string]interface{}{{
"ab": "hello1",
}, {
"ab": "hello2",
}},
result: []request{{
Method: "POST",
ContentType: "application/x-www-form-urlencoded;param=value",
Body: `result=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`,
}},
}, {
name: "6",
config: map[string]interface{}{
"method": "post",
//"url": "http://localhost/test", //set dynamically to the test server
"bodyType": "form",
"sendSingle": true,
},
data: []map[string]interface{}{{
"ab": "hello1",
}, {
"ab": "hello2",
}},
result: []request{{
Method: "POST",
ContentType: "application/x-www-form-urlencoded;param=value",
Body: `ab=hello1`,
}, {
Method: "POST",
ContentType: "application/x-www-form-urlencoded;param=value",
Body: `ab=hello2`,
}},
}, {
name: "7",
config: map[string]interface{}{
"method": "post",
//"url": "http://localhost/test", //set dynamically to the test server
"bodyType": "json",
"sendSingle": true,
//"timeout": float64(1000),
},
data: []map[string]interface{}{{
"ab": "hello1",
}, {
"ab": "hello2",
}},
result: []request{{
Method: "POST",
Body: `{"ab":"hello1"}`,
ContentType: "application/json",
}, {
Method: "POST",
Body: `{"ab":"hello2"}`,
ContentType: "application/json",
}},
},
}
ctx := mockContext.NewMockContext("testApply", "op")

var requests []request
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
fmt.Printf("Error reading body: %v", err)
http.Error(w, "can't read body", http.StatusBadRequest)
return
}

requests = append(requests, request{
Method: r.Method,
Body: string(body),
ContentType: r.Header.Get("Content-Type"),
})
ctx.GetLogger().Debugf(string(body))
fmt.Fprint(w, string(body))
}))
defer ts.Close()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requests = nil
ss, ok := tt.config["sendSingle"]
if !ok {
ss = false
}
s := &RestSink{}
tt.config["url"] = ts.URL
e := s.Provision(ctx, tt.config)
assert.NoError(t, e)
e = s.Connect(ctx)
assert.NoError(t, e)
if ss.(bool) {
for _, d := range tt.data {
e = s.Collect(ctx, &xsql.Tuple{
Message: d,
})
assert.NoError(t, e)
}
} else {
b := &xsql.WindowTuples{
Content: make([]xsql.Row, 0, len(tt.data)),
}
for _, d := range tt.data {
b.Content = append(b.Content, &xsql.Tuple{
Message: d,
})
}
e = s.CollectList(ctx, b)
assert.NoError(t, e)
}

err := s.Close(ctx)
assert.NoError(t, err)
assert.Equal(t, tt.result, requests)
})
}
}

func TestRestSinkCollect(t *testing.T) {
server := createServer()
defer func() {
Expand All @@ -44,7 +257,7 @@ func TestRestSinkCollect(t *testing.T) {
},
}
require.NoError(t, s.Connect(ctx))
require.NoError(t, s.collect(ctx, data))
require.NoError(t, s.collect(ctx, data, data.ToMap()))
require.NoError(t, s.Close(ctx))
}

Expand All @@ -65,7 +278,7 @@ func TestRestSinkRecoverErr(t *testing.T) {
"method": "get",
}))
require.NoError(t, sErr.Connect(ctx))
err := sErr.collect(ctx, data)
err := sErr.collect(ctx, data, data.ToMap())
require.Error(t, err)
require.False(t, errorx.IsIOError(err))
s := &RestSink{}
Expand All @@ -76,7 +289,7 @@ func TestRestSinkRecoverErr(t *testing.T) {
failpoint.Enable("github.com/lf-edge/ekuiper/v2/internal/io/http/recoverAbleErr", "return(true)")
defer failpoint.Disable("github.com/lf-edge/ekuiper/v2/internal/io/http/recoverAbleErr")
require.NoError(t, s.Connect(ctx))
err = s.collect(ctx, data)
err = s.collect(ctx, data, data.ToMap())
require.Error(t, err)
require.True(t, errorx.IsIOError(err))
}
10 changes: 7 additions & 3 deletions internal/pkg/httpx/http.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,7 +63,9 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string
if err != nil {
return nil, fmt.Errorf("fail to create request: %v", err)
}
req.Header.Set("Content-Type", BodyTypeMap[bodyType])
if req.Header.Get("Content-Type") == "" {
req.Header.Set("Content-Type", BodyTypeMap[bodyType])
}
case "form":
form := url.Values{}
im, err := convertToMap(v, sendSingle)
Expand All @@ -89,7 +91,9 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string
if err != nil {
return nil, fmt.Errorf("fail to create request: %v", err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
if req.Header.Get("Content-Type") == "" {
req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
}
default:
return nil, fmt.Errorf("unsupported body type %s", bodyType)
}
Expand Down

0 comments on commit 5cfc728

Please sign in to comment.