Skip to content

Commit

Permalink
feat: enhance transformer (#305)
Browse files Browse the repository at this point in the history
* feat: enhance transformer

Signed-off-by: delu <delu.xu@linkall.com>

* feat: transfrom function

Signed-off-by: delu <delu.xu@linkall.com>

* feat: transfrom function

Signed-off-by: delu <delu.xu@linkall.com>

* fix: fix golangci

Signed-off-by: delu <delu.xu@linkall.com>

* fix: fix ut

Signed-off-by: delu <delu.xu@linkall.com>

* fix: fix golangci

Signed-off-by: delu <delu.xu@linkall.com>

* fix: code review

Signed-off-by: delu <delu.xu@linkall.com>

* fix: code review

Signed-off-by: delu <delu.xu@linkall.com>

* test: add action benchmark

Signed-off-by: delu <delu.xu@linkall.com>

* fix: golangci-lint error

Signed-off-by: delu <delu.xu@linkall.com>

Signed-off-by: delu <delu.xu@linkall.com>
  • Loading branch information
xdlbdy authored Nov 23, 2022
1 parent 3c87a1f commit d0eb300
Show file tree
Hide file tree
Showing 55 changed files with 3,609 additions and 924 deletions.
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,9 @@ issues:
linters:
- gosec
- gomnd
- path: "action.go"
linters:
- gochecknoinits
- path: "^vsctl"
linters:
- gomnd
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ require (
github.com/huandu/skiplist v1.2.0
github.com/iceber/iouring-go v0.0.0-20220609112130-b1dc8dd9fbfd
github.com/jedib0t/go-pretty/v6 v6.3.1
github.com/json-iterator/go v1.1.12
github.com/labstack/echo/v4 v4.7.2
github.com/labstack/gommon v0.3.1
github.com/linkall-labs/embed-etcd v0.1.1
github.com/linkall-labs/vanus/client v0.4.0-alpha
github.com/linkall-labs/vanus/observability v0.4.0-alpha
Expand All @@ -31,6 +33,7 @@ require (
github.com/linkall-labs/vanus/raft v0.4.0-alpha
github.com/mwitkow/grpc-proxy v0.0.0
github.com/ncw/directio v1.0.5
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/prashantv/gostub v1.1.0
github.com/prometheus/client_golang v1.13.0
github.com/smartystreets/goconvey v1.7.2
Expand Down Expand Up @@ -86,10 +89,8 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/labstack/gommon v0.3.1 // indirect
github.com/mattn/go-colorable v0.1.11 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI=
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func (sf *snowflake) RegisterNode(ctx context.Context, in *wrapperspb.UInt32Valu

id := uint16(in.Value)
// TODO(wenfeng) find a good solution in future
//_, exist := sf.nodes[id]
// _, exist := sf.nodes[id]
//
//if exist {
// if exist {
// return nil, errors.New("node has been register")
//}

Expand Down
3 changes: 3 additions & 0 deletions internal/controller/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func (ctrl *controller) CreateSubscription(ctx context.Context,
}
err := validation.ValidateSubscriptionRequest(ctx, request.Subscription)
if err != nil {
log.Info(ctx, "create subscription validate fail", map[string]interface{}{
log.KeyError: err,
})
return nil, err
}
sub := convert.FromPbSubscriptionRequest(request.Subscription)
Expand Down
41 changes: 41 additions & 0 deletions internal/controller/trigger/validation/subscripton.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ package validation
import (
"context"
"fmt"
"net/url"

"github.com/linkall-labs/vanus/internal/primitive/transform/action"

"github.com/linkall-labs/vanus/internal/primitive/transform/arg"

"github.com/linkall-labs/vanus/internal/controller/errors"
"github.com/linkall-labs/vanus/internal/primitive"
Expand Down Expand Up @@ -49,6 +54,9 @@ func ValidateSubscriptionRequest(ctx context.Context, request *ctrlpb.Subscripti
if err := validateSubscriptionConfig(ctx, request.Config); err != nil {
return err
}
if err := validateTransformer(ctx, request.Transformer); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -86,6 +94,10 @@ func ValidateSinkAndProtocol(ctx context.Context,
WithMessage("protocol is gcloud functions, sink credential can not be nil and credential type is gcloud")
}
case metapb.Protocol_HTTP:
if _, err := url.Parse(sink); err != nil {
return errors.ErrInvalidRequest.
WithMessage("protocol is http, sink is url,url parse error").Wrap(err)
}
}
return nil
}
Expand Down Expand Up @@ -147,6 +159,35 @@ func validateSubscriptionConfig(ctx context.Context, cfg *metapb.SubscriptionCon
}
return nil
}

func validateTransformer(ctx context.Context, transformer *metapb.Transformer) error {
if transformer == nil {
return nil
}
if len(transformer.Define) > 0 {
for key, value := range transformer.Define {
_, err := arg.NewArg(value)
if err != nil {
return errors.ErrInvalidRequest.WithMessage(
fmt.Sprintf("transformer define %s:%s is invalid:[%s]", key, value, err.Error()))
}
}
}
if len(transformer.Pipeline) > 0 {
for n, a := range transformer.Pipeline {
commands := make([]interface{}, len(a.Command))
for i, command := range a.Command {
commands[i] = command.AsInterface()
}
if _, err := action.NewAction(commands); err != nil {
return errors.ErrInvalidRequest.WithMessage(
fmt.Sprintf("transformer pipeline %dst command %s is invalid:[%s]", n+1, commands[0], err.Error()))
}
}
}
return nil
}

func ValidateFilterList(ctx context.Context, filters []*metapb.Filter) error {
if len(filters) == 0 {
return nil
Expand Down
42 changes: 42 additions & 0 deletions internal/controller/trigger/validation/suscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"testing"

"google.golang.org/protobuf/types/known/structpb"

ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"

Expand Down Expand Up @@ -72,6 +74,46 @@ func TestValidateSubscriptionConfig(t *testing.T) {
})
}

func TestValidateTransformer(t *testing.T) {
ctx := context.Background()
Convey("test validate transformer ", t, func() {
Convey("test define valid", func() {
trans := &metapb.Transformer{
Define: map[string]string{
"var1": "var",
"var2": "$.id",
"var3": "$.data.id",
},
}
So(validateTransformer(ctx, trans), ShouldBeNil)
})
Convey("test define invalid", func() {
trans := &metapb.Transformer{
Define: map[string]string{
"var2": "$.a-bc",
},
}
So(validateTransformer(ctx, trans), ShouldNotBeNil)
})
Convey("test pipeline valid", func() {
trans := &metapb.Transformer{
Pipeline: []*metapb.Action{
{Command: []*structpb.Value{structpb.NewStringValue("delete"), structpb.NewStringValue("$.id")}},
},
}
So(validateTransformer(ctx, trans), ShouldBeNil)
})
Convey("test pipeline invalid", func() {
trans := &metapb.Transformer{
Pipeline: []*metapb.Action{
{Command: []*structpb.Value{structpb.NewStringValue("noExistActionName")}},
},
}
So(validateTransformer(ctx, trans), ShouldNotBeNil)
})
})
}

func TestValidateSinkAndProtocol(t *testing.T) {
ctx := context.Background()
Convey("sink is empty", t, func() {
Expand Down
46 changes: 43 additions & 3 deletions internal/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
ctrl "github.com/linkall-labs/vanus/proto/pkg/controller"
pb "github.com/linkall-labs/vanus/proto/pkg/meta"
pbtrigger "github.com/linkall-labs/vanus/proto/pkg/trigger"
"google.golang.org/protobuf/types/known/structpb"
)

func FromPbSubscriptionRequest(sub *ctrl.SubscriptionRequest) *metadata.Subscription {
Expand Down Expand Up @@ -259,7 +260,7 @@ func ToPbAddSubscription(sub *primitive.Subscription) *pbtrigger.AddSubscription
EventBus: sub.EventBus,
Offsets: ToPbOffsetInfos(sub.Offsets),
Filters: toPbFilters(sub.Filters),
Transformer: toPbTransformer(sub.Transformer),
Transformer: ToPbTransformer(sub.Transformer),
Config: toPbSubscriptionConfig(sub.Config),
Protocol: toPbProtocol(sub.Protocol),
ProtocolSettings: toPbProtocolSettings(sub.ProtocolSetting),
Expand All @@ -279,7 +280,7 @@ func ToPbSubscription(sub *metadata.Subscription, offsets info.ListOffsetInfo) *
ProtocolSettings: toPbProtocolSettings(sub.ProtocolSetting),
EventBus: sub.EventBus,
Filters: toPbFilters(sub.Filters),
Transformer: toPbTransformer(sub.Transformer),
Transformer: ToPbTransformer(sub.Transformer),
Offsets: ToPbOffsetInfos(offsets),
}
return to
Expand Down Expand Up @@ -409,15 +410,54 @@ func fromPbTransformer(transformer *pb.Transformer) *primitive.Transformer {
return &primitive.Transformer{
Define: transformer.Define,
Template: transformer.Template,
Pipeline: fromPbActions(transformer.Pipeline),
}
}

func toPbTransformer(transformer *primitive.Transformer) *pb.Transformer {
func fromPbActions(actions []*pb.Action) []*primitive.Action {
to := make([]*primitive.Action, 0, len(actions))
for _, action := range actions {
to = append(to, fromPbCommand(action))
}
return to
}

func fromPbCommand(action *pb.Action) *primitive.Action {
to := &primitive.Action{}
commands := make([]interface{}, len(action.Command))
for i, command := range action.Command {
commands[i] = command.AsInterface()
}
to.Command = commands
return to
}

func toPbActions(actions []*primitive.Action) []*pb.Action {
to := make([]*pb.Action, len(actions))
for i, action := range actions {
to[i] = toPbCommand(action)
}
return to
}

func toPbCommand(action *primitive.Action) *pb.Action {
to := &pb.Action{}
commands := make([]*structpb.Value, len(action.Command))
for i, command := range action.Command {
c, _ := structpb.NewValue(command)
commands[i] = c
}
to.Command = commands
return to
}

func ToPbTransformer(transformer *primitive.Transformer) *pb.Transformer {
if transformer == nil {
return nil
}
return &pb.Transformer{
Define: transformer.Define,
Template: transformer.Template,
Pipeline: toPbActions(transformer.Pipeline),
}
}
7 changes: 6 additions & 1 deletion internal/primitive/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (l SubscriptionFilterList) String() string {

type Transformer struct {
Define map[string]string `json:"define,omitempty"`
Pipeline []*Action `json:"pipeline,omitempty"`
Template string `json:"template,omitempty"`
}

Expand All @@ -119,12 +120,16 @@ func (t *Transformer) Exist() bool {
if t == nil {
return false
}
if t.Template == "" {
if t.Template == "" && len(t.Pipeline) == 0 {
return false
}
return true
}

type Action struct {
Command []interface{} `json:"command"`
}

/* annotation no use code .
type SinkSpec struct {
Type string
Expand Down
Loading

0 comments on commit d0eb300

Please sign in to comment.