Skip to content

Commit

Permalink
feat(format): support urlencoded format (#3276)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying authored Oct 9, 2024
1 parent ba4d475 commit 464f01a
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 11 deletions.
4 changes: 4 additions & 0 deletions internal/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/converter/binary"
"github.com/lf-edge/ekuiper/v2/internal/converter/delimited"
"github.com/lf-edge/ekuiper/v2/internal/converter/json"
"github.com/lf-edge/ekuiper/v2/internal/converter/urlencoded"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
"github.com/lf-edge/ekuiper/v2/pkg/message"
Expand All @@ -39,6 +40,9 @@ func init() {
modules.RegisterConverter(message.FormatDelimited, func(_ api.StreamContext, _ string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return delimited.NewConverter(props)
})
modules.RegisterConverter(message.FormatUrlEncoded, func(_ api.StreamContext, _ string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return urlencoded.NewConverter(props)
})
}

func GetOrCreateConverter(ctx api.StreamContext, format string, schemaId string, schema map[string]*ast.JsonStreamField, props map[string]any) (c message.Converter, err error) {
Expand Down
77 changes: 77 additions & 0 deletions internal/converter/urlencoded/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package urlencoded

import (
"fmt"
"net/url"

"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
"github.com/lf-edge/ekuiper/v2/pkg/message"
)

type Converter struct{}

func (c *Converter) Encode(_ api.StreamContext, d any) (b []byte, err error) {
defer func() {
if err != nil {
err = errorx.NewWithCode(errorx.CovnerterErr, err.Error())
}
}()
switch m := d.(type) {
case map[string]any:
form := url.Values{}
for key, value := range m {
switch vt := value.(type) {
case []any:
for _, item := range vt {
ss, _ := cast.ToString(item, cast.CONVERT_ALL)
form.Add(key, ss)
}
default:
ss, _ := cast.ToString(value, cast.CONVERT_ALL)
form.Set(key, ss)
}
}
return []byte(form.Encode()), nil
default:
return nil, fmt.Errorf("unsupported type %v, must be a map", d)
}
}

func (c *Converter) Decode(ctx api.StreamContext, b []byte) (any, error) {
values, err := url.ParseQuery(string(b))
if err != nil {
return nil, fmt.Errorf("fail to parse url encoded value %s", string(b))
}
queryMap := make(map[string]interface{})
for key, value := range values {
if len(value) == 1 {
queryMap[key] = value[0]
} else {
queryMap[key] = value
}
}
return queryMap, nil
}

var c = &Converter{}

func NewConverter(props map[string]any) (message.Converter, error) {
return c, nil
}
82 changes: 82 additions & 0 deletions internal/converter/urlencoded/converter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package urlencoded

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/topo/context"
)

func TestEncodeDecode(t *testing.T) {
tt := []struct {
name string
m any
s string
nm map[string]any
err string
}{
{
name: "normal",
m: map[string]any{
"a": "b",
"c": 20,
},
s: `a=b&c=20`,
nm: map[string]any{
"a": "b",
"c": "20",
},
},
{
name: "nested",
m: map[string]any{
"a": []any{10, 20, 40},
"b": []map[string]any{{"a": "b"}},
"c": map[string]any{"a": "b"},
},
s: `a=10&a=20&a=40&b=%5Bmap%5Ba%3Ab%5D%5D&c=map%5Ba%3Ab%5D`,
nm: map[string]any{
"a": []string{"10", "20", "40"},
"b": "[map[a:b]]",
"c": "map[a:b]",
},
},
{
name: "unsupport",
m: []map[string]any{{"a": 1}},
err: "unsupported type [map[a:1]], must be a map",
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
cc, err := NewConverter(nil)
require.NoError(t, err)
encode, err := cc.Encode(context.Background(), tc.m)
if tc.err != "" {
require.EqualError(t, err, tc.err)
return
} else {
require.NoError(t, err)
require.Equal(t, tc.s, string(encode))
}
rr, err := cc.Decode(context.Background(), []byte(tc.s))
require.NoError(t, err)
require.Equal(t, tc.nm, rr)
})
}
}
1 change: 1 addition & 0 deletions internal/io/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type RawConf struct {
Method string `json:"method"`
Body string `json:"body"`
BodyType string `json:"bodyType"`
Format string `json:"format"`
Headers map[string]string `json:"headers"`
Timeout cast.DurationConf `json:"timeout"`
Incremental bool `json:"incremental"`
Expand Down
17 changes: 16 additions & 1 deletion internal/io/http/rest_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,24 @@ type RestSink struct {
*ClientConf
}

var bodyTypeFormat = map[string]string{
"json": "json",
"form": "urlencoded",
}

func (r *RestSink) Provision(ctx api.StreamContext, configs map[string]any) error {
r.ClientConf = &ClientConf{}
return r.InitConf("", configs)
err := r.InitConf("", configs)
if err != nil {
return err
}
if r.ClientConf.config.Format == "" {
r.ClientConf.config.Format = "json"
}
if rf, ok := bodyTypeFormat[r.ClientConf.config.BodyType]; ok && r.ClientConf.config.Format != rf {
return fmt.Errorf("format must be %s if bodyType is %s", rf, r.ClientConf.config.BodyType)
}
return nil
}

func (r *RestSink) Close(ctx api.StreamContext) error {
Expand Down
12 changes: 12 additions & 0 deletions internal/io/http/rest_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
Expand Down Expand Up @@ -99,6 +100,7 @@ func TestRestSink_Apply(t *testing.T) {
"method": "post",
//"url": "http://localhost/test", //set dynamically to the test server
"bodyType": "form",
"format": "urlencoded",
"sendSingle": true,
},
data: []map[string]interface{}{{
Expand Down Expand Up @@ -188,6 +190,16 @@ func TestRestSink_Apply(t *testing.T) {
}
}

func TestRestSinkProvision(t *testing.T) {
s := &RestSink{}
require.EqualError(t, s.Provision(context.Background(), map[string]any{
"url": "http://localhost/test",
"method": "get",
"bodyType": "form",
"format": "json",
}), "format must be urlencoded if bodyType is form")
}

func TestRestSinkCollect(t *testing.T) {
server := createServer()
defer func() {
Expand Down
4 changes: 0 additions & 4 deletions internal/topo/node/props.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/message"
)

type SinkConf struct {
Expand Down Expand Up @@ -62,9 +61,6 @@ func ParseConf(logger api.Logger, props map[string]any) (*SinkConf, error) {
}
if sconf.Format == "" {
sconf.Format = "json"
} else if sconf.Format != message.FormatJson && sconf.Format != message.FormatProtobuf && sconf.Format != message.FormatBinary && sconf.Format != message.FormatCustom && sconf.Format != message.FormatDelimited {
logger.Warnf("invalid type for format property, should be json protobuf or binary but found %s", sconf.Format)
sconf.Format = "json"
}
err = cast.MapToStruct(props, &sconf.SinkConf)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/xsql/format_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ package xsql
import "github.com/lf-edge/ekuiper/v2/pkg/message"

func IsTextFormat(format string) bool {
return format == message.FormatJson || format == message.FormatDelimited
return format == message.FormatJson || format == message.FormatDelimited || format == message.FormatUrlEncoded
}
11 changes: 6 additions & 5 deletions pkg/message/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
)

const (
FormatBinary = "binary"
FormatJson = "json"
FormatProtobuf = "protobuf"
FormatDelimited = "delimited"
FormatCustom = "custom"
FormatBinary = "binary"
FormatJson = "json"
FormatProtobuf = "protobuf"
FormatDelimited = "delimited"
FormatUrlEncoded = "urlencoded"
FormatCustom = "custom"

DefaultField = "self"
MetaKey = "__meta"
Expand Down

0 comments on commit 464f01a

Please sign in to comment.