Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: realisations of optionloadr's consul part #3

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions consul/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"github.com/cloudwego/kitex/pkg/retry"
"github.com/hashicorp/consul/api"
"go.uber.org/zap"
"text/template"
"time"
)

// Options etcd config options. All the fields have default value.
type ReaderOptions struct {
Addr string
Prefix string
PathFormat string
DataCenter string
TimeOut time.Duration
NamespaceId string
Token string
Partition string
LoggerConfig *zap.Config
ConfigParser ConfigParser
ConfigType ConfigType
MyConfig Config
}

func NewReader(opts ReaderOptions) (*ConsulReader, error) {
if opts.Addr == "" {
opts.Addr = ConsulDefaultConfigAddr
}
if opts.Prefix == "" {
opts.Prefix = ConsulDefaultConfigPrefix
}
if opts.ConfigParser == nil {
opts.ConfigParser = &defaultParser{}
}
if opts.TimeOut == 0 {
opts.TimeOut = ConsulDefaultTimeout
}
if opts.PathFormat == "" {
opts.PathFormat = ConsulDefaultClientPath
}
if opts.DataCenter == "" {
opts.DataCenter = ConsulDefaultDataCenter
}
if opts.ConfigType == "" {
opts.ConfigType = ConsulDefaultConfigType
}
consulClient, err := api.NewClient(&api.Config{
Address: opts.Addr,
Datacenter: opts.DataCenter,
Token: opts.Token,
Namespace: opts.NamespaceId,
Partition: opts.Partition,
})
if err != nil {
return nil, err
}
clientPathTemplate, err := template.New("clientName").Parse(opts.PathFormat)
if err != nil {
return nil, err
}
r := &ConsulReader{
config: &ConsulConfig{MyConfig: opts.MyConfig}, //配置文件读出结果
parser: opts.ConfigParser, //配置文件解码器
consulClient: consulClient,
prefix: opts.Prefix,
clientPathTemplate: clientPathTemplate,
consulTimeout: opts.TimeOut,
configType: opts.ConfigType,
}

return r, nil
}

type LoaderOptions struct {
MyTranslators map[string]Translator
MyStreamTranslators map[string]StreamTranslator
MyCallOptionMapTranslators map[string]CallOptionMapTranslator
MyCallOptionTranslators map[string]CallOptionTranslator
MyStreamCallOptionMapTranslators map[string]StreamCallOptionMapTranslator
MyStreamCallOptionTranslators map[string]StreamCallOptionTranslator
ShouldResultRetry *retry.ShouldResultRetry
}

func NewLoader(clientServiceName, serverServiceName string, reader *ConsulReader, opts LoaderOptions) (*ConsulLoader, error) {

// Register all translators
translators := map[string]Translator{
"basicInfo": basicInfoTranslator,
"hostPorts": hostPortsTranslator,
"destService": destServiceTranslator,
"protocol": protocolTranslator,
"connection": connectionTranslator,
"failureRetry": failureRetryTranslator,
"specifiedResultRetry": specifiedResultRetryTranslator,
"backupRequest": backupRequestTranslator,
"rpcTimeout": rpcTimeoutTranslator,
"connectionTimeout": connectionTimeoutTranslator,
"tags": tagsTranslator,
"statsLevel": statsLevelTranslator,
"grpc": grpcTranslator,
}
streamTranslators := map[string]StreamTranslator{
"streamBasicInfo": streamBasicInfoTranslator,
"streamHostPorts": streamHostPortsTranslator,
"streamDestService": streamDestServiceTranslator,
"streamConnectionTimeout": streamConnectionTimeoutTranslator,
"streamTags": streamTagsTranslator,
"streamStatsLevel": streamStatsLevelTranslator,
"streamGrpc": streamGrpcTranslator,
}
callOptionMapTranslators := map[string]CallOptionMapTranslator{
"callOptionHostPorts": callOptionHostPortsTranslator,
"callOptionUrls": callOptionUrlsTranslator,
"callOptionTags": callOptionTagsTranslator,
}
callOptionTranslators := map[string]CallOptionTranslator{
"callOptionRPCTimeout": callOptionRPCTimeoutTranslator,
"callOptionConnectionTimeout": callOptionConnectionTimeoutTranslator,
"callOptionHTTPHostTimeout": callOptionHTTPHostTimeoutTranslator,
"callOptionRetryPolicy": callOptionRetryPolicyTranslator,
"callOptionGRPCCompressor": callOptionGRPCCompressorTranslator,
}
streamCallOptionMapTranslators := map[string]StreamCallOptionMapTranslator{
"streamCallOptionHostPorts": streamCallOptionHostPortsTranslator,
"streamCallOptionUrls": streamCallOptionUrlsTranslator,
"streamCallOptionTags": streamCallOptionTagsTranslator,
}
streamCallOptionTranslators := map[string]StreamCallOptionTranslator{
"streamCallOptionConnectionTimeout": streamCallOptionConnectionTimeoutTranslator,
"streamCallOptionGRPCCompressor": streamCallOptionGRPCCompressorTranslator,
}

loader := &ConsulLoader{
translators: translators,
streamTranslators: streamTranslators,
callOptionMapTranslators: callOptionMapTranslators,
callOptionTranslators: callOptionTranslators,
streamCallOptionMapTranslators: streamCallOptionMapTranslators,
streamCallOptionTranslators: streamCallOptionTranslators,
clientServiceName: clientServiceName,
serverServiceName: serverServiceName,
reader: reader,
shouldResultRetry: opts.ShouldResultRetry,
}

for name, translator := range opts.MyTranslators {
loader.RegisterTranslator(name, translator)
}
for name, translator := range opts.MyStreamTranslators {
loader.RegisterStreamTranslator(name, translator)
}
for name, translator := range opts.MyCallOptionMapTranslators {
loader.RegisterCallOptionMapTranslator(name, translator)
}
for name, translator := range opts.MyCallOptionTranslators {
loader.RegisterCallOptionTranslator(name, translator)
}
for name, translator := range opts.MyStreamCallOptionMapTranslators {
loader.RegisterStreamCallOptionMapTranslator(name, translator)
}
for name, translator := range opts.MyStreamCallOptionTranslators {
loader.RegisterStreamCallOptionTranslator(name, translator)
}

return loader, nil
}
167 changes: 167 additions & 0 deletions consul/client/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"encoding/json"
"github.com/cloudwego/kitex/pkg/retry"
"github.com/cloudwego/kitex/pkg/stats"
)

type Config interface {
}

type ConsulConfig struct {
ClientBasicInfo *EndpointBasicInfo `mapstructure:"ClientBasicInfo"`
HostPorts []string `mapstructure:"HostPorts"`
DestService *string `mapstructure:"DestService"`
Protocol *string `mapstructure:"Protocol"`
Connection *Connection `mapstructure:"Connection"`
FailureRetry *FailurePolicy `mapstructure:"FailureRetry"`
ShouldResultRetry *retry.ShouldResultRetry `mapstructure:"-"`
BackupRequest *BackupPolicy `mapstructure:"BackupRequest"`
RPCTimeout *string `mapstructure:"RPCTimeout"`
ConnectionTimeout *string `mapstructure:"ConnectionTimeout"`
Tags []Tag `mapstructure:"Tags"`
StatsLevel *stats.Level `mapstructure:"StatsLevel"`
GRPC *Grpc `mapstructure:"GRPC"`
CallOpt *CallOpt `mapstructure:"CallOpt"`
Stream *StreamConfig `mapstructure:"Stream"`
StreamCallOpt *StreamCallOpt `mapstructure:"StreamCallOpt"`
MyConfig Config `mapstructure:"MyConfig"`
}

func (c *ConsulConfig) String() string {
marshal, err := json.Marshal(c)
if err != nil {
return ""
}
return string(marshal)
}

type BackOffType string
type BackOffCfgKey string
type Type int

type EndpointBasicInfo struct {
ServiceName string `mapstructure:"ServiceName"`
Method string `mapstructure:"Method"`
Tags map[string]string `mapstructure:"Tags"`
}

type IdleConfig struct {
MinIdlePerAddress int `mapstructure:"MinIdlePerAddress"`
MaxIdlePerAddress int `mapstructure:"MaxIdlePerAddress"`
MaxIdleGlobal int `mapstructure:"MaxIdleGlobal"`
MaxIdleTimeout string `mapstructure:"MaxIdleTimeout"`
}

type MuxConnection struct {
ConnNum int `mapstructure:"ConnNum"`
}

type Connection struct {
Method string `mapstructure:"Method"`
LongConnection IdleConfig `mapstructure:"LongConnection"`
MuxConnection MuxConnection `mapstructure:"MuxConnection"`
}

type FailurePolicy struct {
StopPolicy StopPolicy `mapstructure:"StopPolicy"`
BackOffPolicy *BackOffPolicy `mapstructure:"BackOffPolicy"`
RetrySameNode bool `mapstructure:"RetrySameNode"`
ShouldResultRetry *retry.ShouldResultRetry `mapstructure:"-"`
Extra string `mapstructure:"Extra"`
}

type StopPolicy struct {
MaxRetryTimes int `mapstructure:"MaxRetryTimes"`
MaxDurationMS uint32 `mapstructure:"MaxDurationMS"`
DisableChainStop bool `mapstructure:"DisableChainStop"`
DDLStop bool `mapstructure:"DDLStop"`
CBPolicy CBPolicy `mapstructure:"CBPolicy"`
}

type CBPolicy struct {
ErrorRate float64 `mapstructure:"ErrorRate"`
}

type BackOffPolicy struct {
BackOffType BackOffType `mapstructure:"BackOffType"`
CfgItems map[BackOffCfgKey]float64 `mapstructure:"CfgItems"`
}

type BackupPolicy struct {
RetryDelayMS uint32 `mapstructure:"RetryDelayMS"`
StopPolicy StopPolicy `mapstructure:"StopPolicy"`
RetrySameNode bool `mapstructure:"RetrySameNode"`
}

type Tag struct {
Key string `mapstructure:"Key"`
Value string `mapstructure:"Value"`
}

type Grpc struct {
GRPCConnPoolSize *uint32 `mapstructure:"GRPCConnPoolSize"`
GRPCWriteBufferSize *uint32 `mapstructure:"GRPCWriteBufferSize"`
GRPCReadBufferSize *uint32 `mapstructure:"GRPCReadBufferSize"`
GRPCInitialWindowSize *uint32 `mapstructure:"GRPCInitialWindowSize"`
GRPCInitialConnWindowSize *uint32 `mapstructure:"GRPCInitialConnWindowSize"`
GRPCMaxHeaderListSize *uint32 `mapstructure:"GRPCMaxHeaderListSize"`
GRPCKeepaliveParams *GRPCClientKeepalive `mapstructure:"GRPCKeepaliveParams"`
}

type GRPCClientKeepalive struct {
Time string `mapstructure:"Time"`
Timeout string `mapstructure:"Timeout"`
PermitWithoutStream bool `mapstructure:"PermitWithoutStream"`
}

type CallOpt struct {
HostPorts *map[string]string `mapstructure:"HostPorts"`
Urls *map[string]string `mapstructure:"Urls"`
Tags *map[string]Tag `mapstructure:"Tags"`
RPCTimeout *string `mapstructure:"RPCTimeout"`
ConnectionTimeout *string `mapstructure:"ConnectionTimeout"`
HTTPHost *string `mapstructure:"HTTPHost"`
RetryPolicy *Policy `mapstructure:"RetryPolicy"`
CompressorName *string `mapstructure:"CompressorName"`
}

type StreamConfig struct {
ClientBasicInfo *EndpointBasicInfo `mapstructure:"ClientBasicInfo"`
HostPorts []string `mapstructure:"HostPorts"`
DestService *string `mapstructure:"DestService"`
ConnectionTimeout *string `mapstructure:"ConnectionTimeout"`
Tags []Tag `mapstructure:"Tags"`
StatsLevel *stats.Level `mapstructure:"StatsLevel"`
GRPC *Grpc `mapstructure:"GRPC"`
}

type StreamCallOpt struct {
HostPorts *map[string]string `mapstructure:"HostPorts"`
Urls *map[string]string `mapstructure:"Urls"`
Tags *map[string]Tag `mapstructure:"Tags"`
ConnectionTimeout *string `mapstructure:"ConnectionTimeout"`
CompressorName *string `mapstructure:"CompressorName"`
}

type Policy struct {
Enable bool `mapstructure:"Enable"`
Type Type `mapstructure:"Type"`
FailurePolicy *FailurePolicy `mapstructure:"FailurePolicy"`
BackupPolicy *BackupPolicy `mapstructure:"BackupPolicy"`
}
39 changes: 39 additions & 0 deletions consul/client/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"encoding/json"
"fmt"
"gopkg.in/yaml.v3"
)

type ConfigParser interface {
Decode(configType ConfigType, data []byte, config *ConsulConfig) error
}

type defaultParser struct {
}

func (p *defaultParser) Decode(configType ConfigType, data []byte, config *ConsulConfig) error {
switch configType {
case JSON:
return json.Unmarshal(data, config)
case YAML:
return yaml.Unmarshal(data, config)
default:
return fmt.Errorf("unsupported config data type %s", configType)
}
}
Loading
Loading