Skip to content

Commit

Permalink
feat: support nacos degradation
Browse files Browse the repository at this point in the history
  • Loading branch information
Madxf committed Jun 3, 2024
1 parent d34a844 commit 783d6b8
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 3 deletions.
7 changes: 4 additions & 3 deletions client/degradation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package client
import (
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/kitex-contrib/config-nacos/pkg/degradation"
"github.com/nacos-group/nacos-sdk-go/vo"

"github.com/kitex-contrib/config-nacos/nacos"
Expand All @@ -43,7 +44,7 @@ func WithDegradation(dest, src string, nacosClient nacos.Client, opts utils.Opti
dgContainer := initDegradation(param, dest, src, nacosClient, uniqueID)

return []client.Option{
client.WithACLRules(dgContainer.GetACLRule),
client.WithACLRules(dgContainer.GetACLRule()),
client.WithCloseCallbacks(func() error {
err := nacosClient.DeregisterConfig(param, uniqueID)
if err != nil {
Expand All @@ -61,8 +62,8 @@ func initDegradation(param vo.ConfigParam, dest, src string,
dgContainer := degradation.NewDeGradationContainer()

onChangeCallback := func(data string, parser nacos.ConfigParser) {
config := degradation.Config{}
err := parser.Decode(param.Type, data, &config)
config := &degradation.Config{}
err := parser.Decode(param.Type, data, config)
if err != nil {
klog.Warnf("[nacos] %s client nacos rpc degradation: unmarshal data %s failed: %s, skip...", dest, data, err)
return
Expand Down
89 changes: 89 additions & 0 deletions pkg/degradation/degradation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package degradation

import (
"context"
"github.com/bytedance/gopkg/lang/fastrand"
"github.com/cloudwego/kitex/pkg/acl"
"github.com/pkg/errors"
"sync"
"sync/atomic"
)

var errorDegradation = errors.New("rejected by client degradation config")

// DegradationConfig is policy config of degradator.
// DON'T FORGET to update DeepCopy() and Equals() if you add new fields.
type Config struct {
Enable bool `json:"enable"`
Percentage int `json:"percentage"`
}

type Container struct {
sync.RWMutex
dgConfig atomic.Value
}

var defaultDGConfig = &Config{Enable: false, Percentage: 0}

// GetDefaultDGConfig return defaultConfig of degradation.
func GetDefaultDGConfig() *Config {
return defaultDGConfig
}

func NewDeGradationContainer() *Container {
dgContainer := &Container{}
dgContainer.dgConfig.Store(GetDefaultDGConfig())
return dgContainer
}

func (s *Container) NotifyPolicyChange(cfg *Config) {
s.dgConfig.Store(cfg)
}

func (s *Container) GetACLRule() acl.RejectFunc {
return func(ctx context.Context, request interface{}) (reason error) {
config := s.dgConfig.Load().(*Config)
if !config.Enable {
return nil
}
if fastrand.Intn(100) < config.Percentage {
return errorDegradation
}
return nil
}
}

// DeepCopy returns a full copy of DGConfig.
func (c *Config) DeepCopy() *Config {
if c == nil {
return nil
}
return &Config{
Enable: c.Enable,
Percentage: c.Percentage,
}
}

func (c *Config) Equals(other *Config) bool {
if c == nil && other == nil {
return true
}
if c == nil || other == nil {
return false
}
return c.Enable == other.Enable && c.Percentage == other.Percentage
}
40 changes: 40 additions & 0 deletions pkg/degradation/degradation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package degradation

import (
"context"
"errors"
"testing"

"github.com/cloudwego/kitex/pkg/acl"
"github.com/cloudwego/thriftgo/pkg/test"
)

var errFake = errors.New("fake error")

func invoke(ctx context.Context, request, response interface{}) error {
return errFake
}

func TestNewContainer(t *testing.T) {
container := NewDeGradationContainer()
aclMiddleware := acl.NewACLMiddleware([]acl.RejectFunc{container.GetACLRule()})
test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake))
container.NotifyPolicyChange(&Config{Enable: false, Percentage: 100})
test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake))
container.NotifyPolicyChange(&Config{Enable: true, Percentage: 100})
test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errorDegradation))
}

0 comments on commit 783d6b8

Please sign in to comment.