Skip to content

Commit

Permalink
Merge pull request apache#748 from louyuting/feature/sentinel-integra…
Browse files Browse the repository at this point in the history
…tion

Ftr: add sentinel-golang  flow control/circuit breaker
# Conflicts:
#	go.sum
  • Loading branch information
zouyx committed Sep 22, 2020
1 parent 657a710 commit 295f936
Show file tree
Hide file tree
Showing 4 changed files with 676 additions and 15 deletions.
237 changes: 237 additions & 0 deletions filter/filter_impl/sentinel_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package filter_impl

import (
"context"
"fmt"
"strings"
)

import (
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
)

func init() {
extension.SetFilter(SentinelProviderFilterName, GetSentinelProviderFilter)
extension.SetFilter(SentinelConsumerFilterName, GetSentinelConsumerFilter)

if err := sentinel.InitDefault(); err != nil {
logger.Errorf("[Sentinel Filter] fail to initialize Sentinel")
}
if err := logging.ResetGlobalLogger(DubboLoggerWrapper{Logger: logger.GetLogger()}); err != nil {
logger.Errorf("[Sentinel Filter] fail to ingest dubbo logger into sentinel")
}
}

type DubboLoggerWrapper struct {
logger.Logger
}

func (d DubboLoggerWrapper) Fatal(v ...interface{}) {
d.Logger.Error(v...)
}

func (d DubboLoggerWrapper) Fatalf(format string, v ...interface{}) {
d.Logger.Errorf(format, v...)
}

func (d DubboLoggerWrapper) Panic(v ...interface{}) {
d.Logger.Error(v...)
}

func (d DubboLoggerWrapper) Panicf(format string, v ...interface{}) {
d.Logger.Errorf(format, v...)
}

func GetSentinelConsumerFilter() filter.Filter {
return &SentinelConsumerFilter{}
}

func GetSentinelProviderFilter() filter.Filter {
return &SentinelProviderFilter{}
}

func sentinelExit(ctx context.Context, result protocol.Result) {
if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil {
e := methodEntry.(*base.SentinelEntry)
sentinel.TraceError(e, result.Error())
e.Exit()
}
if interfaceEntry := ctx.Value(InterfaceEntryKey); interfaceEntry != nil {
e := interfaceEntry.(*base.SentinelEntry)
sentinel.TraceError(e, result.Error())
e.Exit()
}
}

type SentinelProviderFilter struct{}

func (d *SentinelProviderFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
interfaceResourceName, methodResourceName := getResourceName(invoker, invocation, getProviderPrefix())

var (
interfaceEntry *base.SentinelEntry
methodEntry *base.SentinelEntry
b *base.BlockError
)
interfaceEntry, b = sentinel.Entry(interfaceResourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Inbound))
if b != nil {
// interface blocked
return sentinelDubboProviderFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)

methodEntry, b = sentinel.Entry(methodResourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Inbound),
sentinel.WithArgs(invocation.Arguments()...))
if b != nil {
// method blocked
return sentinelDubboProviderFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
return invoker.Invoke(ctx, invocation)
}

func (d *SentinelProviderFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
sentinelExit(ctx, result)
return result
}

type SentinelConsumerFilter struct{}

func (d *SentinelConsumerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
interfaceResourceName, methodResourceName := getResourceName(invoker, invocation, getConsumerPrefix())
var (
interfaceEntry *base.SentinelEntry
methodEntry *base.SentinelEntry
b *base.BlockError
)

interfaceEntry, b = sentinel.Entry(interfaceResourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Outbound))
if b != nil {
// interface blocked
return sentinelDubboConsumerFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)

methodEntry, b = sentinel.Entry(methodResourceName, sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound), sentinel.WithArgs(invocation.Arguments()...))
if b != nil {
// method blocked
return sentinelDubboConsumerFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)

return invoker.Invoke(ctx, invocation)
}

func (d *SentinelConsumerFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
sentinelExit(ctx, result)
return result
}

var (
sentinelDubboConsumerFallback = getDefaultDubboFallback()
sentinelDubboProviderFallback = getDefaultDubboFallback()
)

type DubboFallback func(context.Context, protocol.Invoker, protocol.Invocation, *base.BlockError) protocol.Result

func SetDubboConsumerFallback(f DubboFallback) {
sentinelDubboConsumerFallback = f
}
func SetDubboProviderFallback(f DubboFallback) {
sentinelDubboProviderFallback = f
}
func getDefaultDubboFallback() DubboFallback {
return func(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, blockError *base.BlockError) protocol.Result {
result := &protocol.RPCResult{}
result.SetResult(nil)
result.SetError(blockError)
return result
}
}

const (
SentinelProviderFilterName = "sentinel-provider"
SentinelConsumerFilterName = "sentinel-consumer"

DefaultProviderPrefix = "dubbo:provider:"
DefaultConsumerPrefix = "dubbo:consumer:"

MethodEntryKey = "$$sentinelMethodEntry"
InterfaceEntryKey = "$$sentinelInterfaceEntry"
)

func getResourceName(invoker protocol.Invoker, invocation protocol.Invocation, prefix string) (interfaceResourceName, methodResourceName string) {
var sb strings.Builder

sb.WriteString(prefix)
if getInterfaceGroupAndVersionEnabled() {
interfaceResourceName = getColonSeparatedKey(invoker.GetUrl())
} else {
interfaceResourceName = invoker.GetUrl().Service()
}
sb.WriteString(interfaceResourceName)
sb.WriteString(":")
sb.WriteString(invocation.MethodName())
sb.WriteString("(")
isFirst := true
for _, v := range invocation.ParameterTypes() {
if !isFirst {
sb.WriteString(",")
}
sb.WriteString(v.Name())
isFirst = false
}
sb.WriteString(")")
methodResourceName = sb.String()
return
}

func getConsumerPrefix() string {
return DefaultConsumerPrefix
}

func getProviderPrefix() string {
return DefaultProviderPrefix
}

func getInterfaceGroupAndVersionEnabled() bool {
return true
}

func getColonSeparatedKey(url common.URL) string {
return fmt.Sprintf("%s:%s:%s",
url.Service(),
url.GetParam(constant.GROUP_KEY, ""),
url.GetParam(constant.VERSION_KEY, ""))
}
127 changes: 127 additions & 0 deletions filter/filter_impl/sentinel_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package filter_impl

import (
"context"
"sync"
"sync/atomic"
"testing"
)

import (
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)

func TestSentinelFilter_QPS(t *testing.T) {
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"version=1.0.0&group=myGroup&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
interfaceResourceName, _ := getResourceName(mockInvoker,
invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{})), "prefix_")
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{}))

_, err = flow.LoadRules([]*flow.Rule{
{
Resource: interfaceResourceName,
MetricType: flow.QPS,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Count: 100,
RelationStrategy: flow.CurrentResource,
},
})
assert.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(10)
f := GetSentinelProviderFilter()
pass := int64(0)
block := int64(0)
for i := 0; i < 10; i++ {
go func() {
for j := 0; j < 30; j++ {
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
if result.Error() == nil {
atomic.AddInt64(&pass, 1)
} else {
atomic.AddInt64(&block, 1)
}
}
wg.Done()
}()
}
wg.Wait()
assert.True(t, atomic.LoadInt64(&pass) == 100)
assert.True(t, atomic.LoadInt64(&block) == 200)
}

func TestConsumerFilter_Invoke(t *testing.T) {
f := GetSentinelConsumerFilter()
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{}))
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
assert.NoError(t, result.Error())
}

func TestProviderFilter_Invoke(t *testing.T) {
f := GetSentinelProviderFilter()
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{}))
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
assert.NoError(t, result.Error())
}

func TestGetResourceName(t *testing.T) {
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"version=1.0.0&group=myGroup&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
interfaceResourceName, methodResourceName := getResourceName(mockInvoker,
invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{})), "prefix_")
assert.Equal(t, "com.ikurento.user.UserProvider:myGroup:1.0.0", interfaceResourceName)
assert.Equal(t, "prefix_com.ikurento.user.UserProvider:myGroup:1.0.0:hello()", methodResourceName)
}
Loading

0 comments on commit 295f936

Please sign in to comment.