Skip to content

Commit

Permalink
feat: update client
Browse files Browse the repository at this point in the history
  • Loading branch information
jiuxia211 committed Dec 5, 2023
1 parent 220170a commit 70e1d12
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 2 deletions.
114 changes: 114 additions & 0 deletions client/circuit_breaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2023 CloudWeGo Authors

Check failure on line 1 in client/circuit_breaker.go

View workflow job for this annotation

GitHub Actions / lint

: # github.com/kitex-contrib/config-zookeeper/client
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"
"strings"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/kitex-contrib/config-zookeeper/utils"
"github.com/kitex-contrib/config-zookeeper/zookeeper"
)

// WithCircuitBreaker sets the circuit breaker policy from zookeeper configuration center.
func WithCircuitBreaker(dest, src string, zookeeperClient zookeeper.Client, opts utils.Options) []client.Option {
param, err := zookeeperClient.ClientConfigParam(&zookeeper.ConfigParamConfig{
Category: circuitBreakerConfigName,
ServerServiceName: dest,
ClientServiceName: src,
})
if err != nil {
panic(err)
}

for _, f := range opts.ZookeeperCustomFunctions {
f(&param)
}

uid := zookeeper.GetUniqueID()
path := param.Prefix + "/" + param.Path

cbSuite := initCircuitBreaker(path, uid, dest, zookeeperClient)

return []client.Option{
client.WithCircuitBreaker(cbSuite),
client.WithCloseCallbacks(func() error {
// cancel the configuration listener when client is closed.
zookeeperClient.DeregisterConfig(path, uid)
err = cbSuite.Close()
if err != nil {
return err
}
return nil
}),
}
}

// keep consistent when initialising the circuit breaker suit and updating
// the circuit breaker policy.
func genServiceCBKeyWithRPCInfo(ri rpcinfo.RPCInfo) string {
if ri == nil {
return ""
}
return genServiceCBKey(ri.To().ServiceName(), ri.To().Method())
}

func genServiceCBKey(toService, method string) string {
sum := len(toService) + len(method) + 2
var buf strings.Builder
buf.Grow(sum)
buf.WriteString(toService)
buf.WriteByte('/')
buf.WriteString(method)
return buf.String()
}

func initCircuitBreaker(path string, uniqueID int64, dest string, zookeeperClient zookeeper.Client) *circuitbreak.CBSuite {
cb := circuitbreak.NewCBSuite(genServiceCBKeyWithRPCInfo)
lcb := utils.ThreadSafeSet{}

onChangeCallback := func(restoreDefault bool, data string, parser zookeeper.ConfigParser) {
set := utils.Set{}
configs := map[string]circuitbreak.CBConfig{}

if !restoreDefault {
err := parser.Decode(data, &configs)
if err != nil {
klog.Warnf("[zookeeper] %s client zookeeper circuit breaker: unmarshal data %s failed: %s, skip...", path, data, err)
return
}
}

for method, config := range configs {
set[method] = true
key := genServiceCBKey(dest, method)
cb.UpdateServiceCBConfig(key, config)
}

for _, method := range lcb.DiffAndEmplace(set) {
key := genServiceCBKey(dest, method)
// For deleted method configs, set to default policy
cb.UpdateServiceCBConfig(key, circuitbreak.GetDefaultCBConfig())
}
}

zookeeperClient.RegisterConfigCallback(context.Background(), path, uniqueID, onChangeCallback)

return cb
}
94 changes: 94 additions & 0 deletions client/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/retry"
"github.com/kitex-contrib/config-zookeeper/utils"
"github.com/kitex-contrib/config-zookeeper/zookeeper"
)

// WithRetryPolicy sets the retry policy from zookeeper configuration center.
func WithRetryPolicy(dest, src string, zookeeperClient zookeeper.Client, opts utils.Options) []client.Option {
param, err := zookeeperClient.ClientConfigParam(&zookeeper.ConfigParamConfig{
Category: retryConfigName,
ServerServiceName: dest,
ClientServiceName: src,
})
if err != nil {
panic(err)
}

for _, f := range opts.ZookeeperCustomFunctions {
f(&param)
}

uid := zookeeper.GetUniqueID()
path := param.Prefix + "/" + param.Path
rc := initRetryContainer(path, uid, dest, zookeeperClient)
return []client.Option{
client.WithRetryContainer(rc),
client.WithCloseCallbacks(func() error {
// cancel the configuration listener when client is closed.
zookeeperClient.DeregisterConfig(path, uid)
return rc.Close()
}),
}
}
func initRetryContainer(path string, uniqueID int64, dest string, zookeeperClient zookeeper.Client) *retry.Container {
retryContainer := retry.NewRetryContainerWithPercentageLimit()

ts := utils.ThreadSafeSet{}

onChangeCallback := func(restoreDefault bool, data string, parser zookeeper.ConfigParser) {
// the key is method name, wildcard "*" can match anything.
rcs := map[string]*retry.Policy{}
if !restoreDefault && data != "" {
err := parser.Decode(data, &rcs)
if err != nil {
klog.Warnf("[zookeeper] %s client zookeeper retry: unmarshal data %s failed: %s, skip...", path, data, err)
return
}
}

set := utils.Set{}
for method, policy := range rcs {
set[method] = true
if policy.BackupPolicy != nil && policy.FailurePolicy != nil {
klog.Warnf("[zookeeper] %s client policy for method %s BackupPolicy and FailurePolicy must not be set at same time",
dest, method)
continue
}
if policy.BackupPolicy == nil && policy.FailurePolicy == nil {
klog.Warnf("[zookeeper] %s client policy for method %s BackupPolicy and FailurePolicy must not be empty at same time",
dest, method)
continue
}
retryContainer.NotifyPolicyChange(method, *policy)
}

for _, method := range ts.DiffAndEmplace(set) {
retryContainer.DeletePolicy(method)
}
}

zookeeperClient.RegisterConfigCallback(context.Background(), path, uniqueID, onChangeCallback)

return retryContainer
}
75 changes: 75 additions & 0 deletions client/rpc_timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/rpctimeout"
"github.com/kitex-contrib/config-zookeeper/utils"
"github.com/kitex-contrib/config-zookeeper/zookeeper"
)

// WithRPCTimeout sets the RPC timeout policy from zookeeper configuration center.
func WithRPCTimeout(dest, src string, zookeeperClient zookeeper.Client, opts utils.Options) []client.Option {
param, err := zookeeperClient.ClientConfigParam(&zookeeper.ConfigParamConfig{
Category: rpcTimeoutConfigName,
ServerServiceName: dest,
ClientServiceName: src,
})
if err != nil {
panic(err)
}

for _, f := range opts.ZookeeperCustomFunctions {
f(&param)
}

uid := zookeeper.GetUniqueID()
path := param.Prefix + "/" + param.Path

return []client.Option{
client.WithTimeoutProvider(initRPCTimeoutContainer(path, uid, dest, zookeeperClient)),
client.WithCloseCallbacks(func() error {
// cancel the configuration listener when client is closed.
zookeeperClient.DeregisterConfig(path, uid)
return nil
}),
}
}

func initRPCTimeoutContainer(path string, uniqueID int64, dest string, zookeeperClient zookeeper.Client) rpcinfo.TimeoutProvider {
rpcTimeoutContainer := rpctimeout.NewContainer()

onChangeCallback := func(restoreDefault bool, data string, parser zookeeper.ConfigParser) {
configs := map[string]*rpctimeout.RPCTimeout{}
if !restoreDefault {
err := parser.Decode(data, &configs)
if err != nil {
klog.Warnf("[zookeeper] %s client zookeeper rpc timeout: unmarshal data %s failed: %s, skip...", path, data, err)
return
}
}

rpcTimeoutContainer.NotifyPolicyChange(configs)
}

zookeeperClient.RegisterConfigCallback(context.Background(), path, uniqueID, onChangeCallback)

return rpcTimeoutContainer
}
57 changes: 57 additions & 0 deletions client/suite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"github.com/cloudwego/kitex/client"
"github.com/kitex-contrib/config-zookeeper/utils"
"github.com/kitex-contrib/config-zookeeper/zookeeper"
)

const (
retryConfigName = "retry"
rpcTimeoutConfigName = "rpc_timeout"
circuitBreakerConfigName = "circuit_break"
)

// ZookeeperClientSuite zookeeper client config suite, configure retry timeout limit and circuitbreak dynamically from zookeeper.
type ZookeeperClientSuite struct {
zookeeperClient zookeeper.Client
service string
client string
opts utils.Options
}

// NewSuite service is the destination service name and client is the local identity.
func NewSuite(service, client string, cli zookeeper.Client, opts ...utils.Option) *ZookeeperClientSuite {
su := &ZookeeperClientSuite{
service: service,
client: client,
zookeeperClient: cli,
}
for _, f := range opts {
f.Apply(&su.opts)
}
return su
}

// Options return a list client.Option
func (s *ZookeeperClientSuite) Options() []client.Option {
opts := make([]client.Option, 0, 7)
opts = append(opts, WithRetryPolicy(s.service, s.client, s.zookeeperClient, s.opts)...)
opts = append(opts, WithRPCTimeout(s.service, s.client, s.zookeeperClient, s.opts)...)
opts = append(opts, WithCircuitBreaker(s.service, s.client, s, s.opts)...)

Check failure on line 55 in client/suite.go

View workflow job for this annotation

GitHub Actions / unit-benchmark-test (1.17, X64)

cannot use s (type *ZookeeperClientSuite) as type zookeeper.Client in argument to WithCircuitBreaker:

Check failure on line 55 in client/suite.go

View workflow job for this annotation

GitHub Actions / unit-benchmark-test (1.17, ARM64)

cannot use s (type *ZookeeperClientSuite) as type zookeeper.Client in argument to WithCircuitBreaker:

Check failure on line 55 in client/suite.go

View workflow job for this annotation

GitHub Actions / unit-benchmark-test (1.18, X64)

cannot use s (variable of type *ZookeeperClientSuite) as type zookeeper.Client in argument to WithCircuitBreaker:

Check failure on line 55 in client/suite.go

View workflow job for this annotation

GitHub Actions / lint

cannot use s (variable of type *ZookeeperClientSuite) as type zookeeper.Client in argument to WithCircuitBreaker:

Check failure on line 55 in client/suite.go

View workflow job for this annotation

GitHub Actions / unit-benchmark-test (1.18, ARM64)

cannot use s (variable of type *ZookeeperClientSuite) as type zookeeper.Client in argument to WithCircuitBreaker:

Check failure on line 55 in client/suite.go

View workflow job for this annotation

GitHub Actions / unit-benchmark-test (1.19, X64)

cannot use s (variable of type *ZookeeperClientSuite) as type zookeeper.Client in argument to WithCircuitBreaker:

Check failure on line 55 in client/suite.go

View workflow job for this annotation

GitHub Actions / unit-benchmark-test (1.19, ARM64)

cannot use s (variable of type *ZookeeperClientSuite) as type zookeeper.Client in argument to WithCircuitBreaker:
return opts
}
3 changes: 1 addition & 2 deletions example/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@ func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Respon
}

func main() {
klog.SetLevel(klog.LevelDebug)
zookeeperClient, err := zookeeper.NewClient(zookeeper.Options{})
if err != nil {
panic(err)
}
serviceName := "ServiceName"
serviceName := "ServiceName" // your server-side service name
svr := echo.NewServer(
new(EchoImpl),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
Expand Down
Loading

0 comments on commit 70e1d12

Please sign in to comment.