Skip to content

Commit

Permalink
test: add trigger filter benchmark (#107)
Browse files Browse the repository at this point in the history
* test: add trigger filter benchmark

Signed-off-by: delu <delu.xu@linkall.com>

* fix: go mod tidy

Signed-off-by: delu <delu.xu@linkall.com>

* fix: fix golangci filter dupl

Signed-off-by: delu <delu.xu@linkall.com>

* test: add trigger filter test

Signed-off-by: delu <delu.xu@linkall.com>
  • Loading branch information
xdlbdy authored Jun 15, 2022
1 parent 3a436b2 commit 0e88d1a
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 77 deletions.
5 changes: 4 additions & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ linters-settings:
pop-directional-isolate: true # default true
cyclop:
max-complexity: 30 # the maximal code complexity to report # default 10
package-average: 10.0 # the maximal average package complexity. If it's higher than 0.0 (float) the check is enabled # default 0.0
package-average: 30.0 # the maximal average package complexity. If it's higher than 0.0 (float) the check is enabled # default 0.0
skip-tests: false # should ignore tests # default false
dupl:
threshold: 150 # default 150
Expand Down Expand Up @@ -374,6 +374,9 @@ issues:
- path: "convert.go"
linters:
- dupl
- path: "_filter.go"
linters:
- dupl
- text: "^(G401|G404|G501)"
linters:
- gosec
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ require (
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
Expand All @@ -73,6 +75,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
Expand All @@ -82,6 +85,7 @@ require (
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
5 changes: 0 additions & 5 deletions internal/trigger/filter/all_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
package filter

import (
"context"

"github.com/linkall-labs/vanus/observability/log"

ce "github.com/cloudevents/sdk-go/v2"
)

Expand All @@ -32,7 +28,6 @@ func NewAllFilter(filters ...Filter) Filter {
}

func (filter allFilter) Filter(event ce.Event) Result {
log.Debug(context.Background(), "all filter ", map[string]interface{}{"filter": filter, "event": event})
for _, f := range filter {
res := f.Filter(event)
if res == FailFilter {
Expand Down
5 changes: 0 additions & 5 deletions internal/trigger/filter/any_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
package filter

import (
"context"

"github.com/linkall-labs/vanus/observability/log"

ce "github.com/cloudevents/sdk-go/v2"
)

Expand All @@ -32,7 +28,6 @@ func NewAnyFilter(filters ...Filter) Filter {
}

func (filter anyFilter) Filter(event ce.Event) Result {
log.Debug(context.Background(), "any filter ", map[string]interface{}{"filter": filter, "event": event})
for _, f := range filter {
res := f.Filter(event)
if res == PassFilter {
Expand Down
6 changes: 1 addition & 5 deletions internal/trigger/filter/cel_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,9 @@ func NewCELFilter(expression string) Filter {
}

func (filter *CELFilter) Filter(event ce.Event) Result {
if filter == nil {
return FailFilter
}
log.Debug(context.Background(), "cel filter ", map[string]interface{}{"filter": filter, "event": event})
result, err := filter.parsedExpression.Eval(event)
if err != nil {
log.Warning(context.Background(), "cek eval error", map[string]interface{}{
log.Info(context.Background(), "cel eval error", map[string]interface{}{
log.KeyError: err,
})
return FailFilter
Expand Down
4 changes: 0 additions & 4 deletions internal/trigger/filter/cesql_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ func NewCESQLFilter(expression string) Filter {
}

func (filter *ceSQLFilter) Filter(event ce.Event) Result {
log.Debug(context.Background(), "cesql filter ", map[string]interface{}{
"filter": filter,
"event": event,
})
res, err := filter.parsedExpression.Evaluate(event)
if err != nil {
log.Info(context.Background(), "cesql filter evaluate error ", map[string]interface{}{
Expand Down
4 changes: 0 additions & 4 deletions internal/trigger/filter/exact_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ func NewExactFilter(exact map[string]string) Filter {
}

func (filter *exactFilter) Filter(event ce.Event) Result {
if filter == nil {
return FailFilter
}
log.Debug(context.Background(), "exact filter ", map[string]interface{}{"filter": filter, "event": event})
for attr, v := range filter.exact {
value, ok := util.LookupAttribute(event, attr)
if !ok || value != v {
Expand Down
46 changes: 46 additions & 0 deletions internal/trigger/filter/filter_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2022 Linkall Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filter_test

import (
"testing"

ce "github.com/cloudevents/sdk-go/v2"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/linkall-labs/vanus/internal/trigger/filter"
)

func filterBenchmark(f filter.Filter, event ce.Event) func(b *testing.B) {
return func(b *testing.B) {
for i := 0; i < b.N; i++ {
f.Filter(event)
}
}
}

func BenchmarkFilter(b *testing.B) {
event := cetest.FullEvent()
b.Run("noFilter", filterBenchmark(filter.NewNoFilter(), event))
b.Run("exact", filterBenchmark(filter.NewExactFilter(map[string]string{"type": event.Type()}), event))
b.Run("not", filterBenchmark(filter.NewNotFilter(filter.NewExactFilter(map[string]string{"type": event.Type()})), event))
b.Run("suffix", filterBenchmark(filter.NewSuffixFilter(map[string]string{"type": event.Type()}), event))
b.Run("prefix", filterBenchmark(filter.NewPrefixFilter(map[string]string{"type": event.Type()}), event))
b.Run("ceSQL", filterBenchmark(filter.NewCESQLFilter("source = 'testSource'"), event))
event.SetData(ce.ApplicationJSON, map[string]interface{}{
"key": "value",
"num": 10,
})
b.Run("cel", filterBenchmark(filter.NewCELFilter("$key.(string) == 'value'"), event))
}
3 changes: 0 additions & 3 deletions internal/trigger/filter/not_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ func NewNotFilter(f Filter) Filter {
}

func (filter *notFilter) Filter(event ce.Event) Result {
if filter == nil {
return FailFilter
}
switch filter.filter.Filter(event) {
case FailFilter:
return PassFilter
Expand Down
10 changes: 1 addition & 9 deletions internal/trigger/filter/prefix_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,9 @@ func NewPrefixFilter(prefix map[string]string) Filter {
}

func (filter *prefixFilter) Filter(event ce.Event) Result {
if filter == nil {
return FailFilter
}
log.Debug(context.Background(), "prefix filter ", map[string]interface{}{"filter": filter, "event": event})
for attr, prefix := range filter.prefix {
value, ok := util.LookupAttribute(event, attr)
if !ok {
return FailFilter
}

if !strings.HasPrefix(fmt.Sprintf("%v", value), prefix) {
if !ok || !strings.HasPrefix(value, prefix) {
return FailFilter
}
}
Expand Down
6 changes: 1 addition & 5 deletions internal/trigger/filter/suffix_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,9 @@ func NewSuffixFilter(suffix map[string]string) Filter {
}

func (filter *suffixFilter) Filter(event ce.Event) Result {
log.Debug(context.TODO(), "suffix filter ", map[string]interface{}{"filter": filter, "event": event})
for attr, suffix := range filter.suffix {
value, ok := util.LookupAttribute(event, attr)
if !ok {
return FailFilter
}
if !strings.HasSuffix(fmt.Sprintf("%v", value), suffix) {
if !ok || !strings.HasSuffix(value, suffix) {
return FailFilter
}
}
Expand Down
10 changes: 9 additions & 1 deletion internal/trigger/filter/suffix_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,16 @@ func TestSuffixFilter(t *testing.T) {
So(result, ShouldEqual, filter.PassFilter)
})

Convey("suffix filter fail no exist filed", t, func() {
f := filter.NewSuffixFilter(map[string]string{
"abc": "value",
})
result := f.Filter(event)
So(result, ShouldEqual, filter.FailFilter)
})

Convey("suffix filter fail", t, func() {
f := filter.NewPrefixFilter(map[string]string{
f := filter.NewSuffixFilter(map[string]string{
"id": "un",
"source": "test",
})
Expand Down
35 changes: 7 additions & 28 deletions internal/trigger/filter/trigger_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

func extractFilter(subscriptionFilter *primitive.SubscriptionFilter) Filter {
ctx := context.Background()
if len(subscriptionFilter.Exact) > 0 {
return NewExactFilter(subscriptionFilter.Exact)
}
Expand All @@ -35,39 +34,19 @@ func extractFilter(subscriptionFilter *primitive.SubscriptionFilter) Filter {
return NewSuffixFilter(subscriptionFilter.Suffix)
}
if subscriptionFilter.Not != nil {
f := NewNotFilter(extractFilter(subscriptionFilter.Not))
if f == nil {
log.Debug(ctx, "new not filter is nil ", map[string]interface{}{"filter": subscriptionFilter.Not})
}
return f
return NewNotFilter(extractFilter(subscriptionFilter.Not))
}
if subscriptionFilter.CeSQL != "" {
f := NewCESQLFilter(subscriptionFilter.CeSQL)
if f == nil {
log.Debug(ctx, "new cesql filter is nil ", map[string]interface{}{"sql": subscriptionFilter.CeSQL})
}
return f
return NewCESQLFilter(subscriptionFilter.CeSQL)
}
if subscriptionFilter.CEL != "" {
f := NewCELFilter(subscriptionFilter.CEL)
if f == nil {
log.Debug(ctx, "new cel filter is nil ", map[string]interface{}{"cel-expression": subscriptionFilter.CEL})
}
return f
return NewCELFilter(subscriptionFilter.CEL)
}
if len(subscriptionFilter.All) > 0 {
f := NewAllFilter(extractFilters(subscriptionFilter.All)...)
if f == nil {
log.Debug(ctx, "new all filter is nil ", map[string]interface{}{"filters": subscriptionFilter.All})
}
return f
return NewAllFilter(extractFilters(subscriptionFilter.All)...)
}
if len(subscriptionFilter.Any) > 0 {
f := NewAnyFilter(extractFilters(subscriptionFilter.Any)...)
if f == nil {
log.Debug(ctx, "new any filter is nil ", map[string]interface{}{"filters": subscriptionFilter.Any})
}
return f
return NewAnyFilter(extractFilters(subscriptionFilter.Any)...)
}
return nil
}
Expand All @@ -77,7 +56,7 @@ func extractFilters(subscriptionFilters []*primitive.SubscriptionFilter) []Filte
for _, subscriptionFilter := range subscriptionFilters {
tf := extractFilter(subscriptionFilter)
if tf == nil {
log.Debug(context.Background(), "get filter is nil will ignore the filter", map[string]interface{}{
log.Info(context.Background(), "get filter is nil will ignore the filter", map[string]interface{}{
"filter": subscriptionFilter,
})
continue
Expand All @@ -90,7 +69,7 @@ func extractFilters(subscriptionFilters []*primitive.SubscriptionFilter) []Filte
func GetFilter(subscriptionFilters []*primitive.SubscriptionFilter) Filter {
filters := extractFilters(subscriptionFilters)
if len(filters) == 0 {
return NewNoFilter()
return nil
}
if len(filters) == 1 {
return filters[0]
Expand Down
21 changes: 20 additions & 1 deletion internal/trigger/filter/trigger_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,30 @@ func TestGetFilter(t *testing.T) {
"num": 10,
})
filters := make([]*primitive.SubscriptionFilter, 0)
Convey("trigger filter no filter", t, func() {
f := filter.GetFilter(filters)
So(f, ShouldBeNil)
result := filter.Run(f, event)
So(result, ShouldEqual, filter.PassFilter)
})
filters = append(filters, &primitive.SubscriptionFilter{})
Convey("trigger filter one filter no filed", t, func() {
f := filter.GetFilter(filters)
So(f, ShouldBeNil)
result := filter.Run(f, event)
So(result, ShouldEqual, filter.PassFilter)
})
filters = append(filters, &primitive.SubscriptionFilter{
Exact: map[string]string{
"id": "testID",
},
})
Convey("trigger filter one filter", t, func() {
f := filter.GetFilter(filters)
So(f, ShouldNotBeNil)
result := filter.Run(f, event)
So(result, ShouldEqual, filter.PassFilter)
})
filters = append(filters, &primitive.SubscriptionFilter{
Suffix: map[string]string{
"id": "ID",
Expand Down Expand Up @@ -79,7 +98,7 @@ func TestGetFilter(t *testing.T) {
},
},
})
Convey("suffix filter pass", t, func() {
Convey("trigger filter multi filter", t, func() {
f := filter.GetFilter(filters)
So(f, ShouldNotBeNil)
result := filter.Run(f, event)
Expand Down
4 changes: 1 addition & 3 deletions internal/trigger/transformation/input_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/tidwall/gjson"

ce "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/types"
)

type InputTransformer struct {
Expand Down Expand Up @@ -59,8 +58,7 @@ func (tf *InputTransformer) parseData(event *ce.Event) map[string]template.Data
dataMap[k] = template.NewNullData()
continue
}
s, _ := types.Format(v)
dataMap[k] = template.NewTextData([]byte(s))
dataMap[k] = template.NewTextData([]byte(v))
case define.DataVariable:
if n.Value == "" {
dataMap[k] = template.NewOtherData(event.Data())
Expand Down
14 changes: 11 additions & 3 deletions internal/trigger/util/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

package util

import ce "github.com/cloudevents/sdk-go/v2"
import (
"fmt"

func LookupAttribute(event ce.Event, attr string) (interface{}, bool) {
ce "github.com/cloudevents/sdk-go/v2"
)

func LookupAttribute(event ce.Event, attr string) (string, bool) {
// Set standard context attributes. The attributes available may not be
// exactly the same as the attributes defined in the current version of the
// ce spec.
Expand All @@ -39,6 +43,10 @@ func LookupAttribute(event ce.Event, attr string) (interface{}, bool) {
return event.DataContentType(), true
default:
val, ok := event.Extensions()[attr]
return val, ok
var str string
if ok {
str = fmt.Sprintf("%v", val)
}
return str, ok
}
}

0 comments on commit 0e88d1a

Please sign in to comment.