From c8c0156c38fdc5dedf3269638681d9968898a026 Mon Sep 17 00:00:00 2001 From: BaiZe1998 <1157467179@qq.com> Date: Sat, 3 Aug 2024 14:04:22 +0800 Subject: [PATCH] feat: optionloader for stream call client --- config/streamcall.go | 9 +++ configloader/yaml/callopt.go | 2 +- configloader/yaml/streamcall.go | 27 ++++++++ examples/yaml/client/callopt/main.go | 2 +- .../client/callopt/streamcall/config.yaml | 9 +++ .../yaml/client/callopt/streamcall/main.go | 22 ++++++- .../client/callopt/streamcall/optionloader.go | 65 +++++++++++++++++++ .../client/callopt/streamcall/translator.go | 25 +++++++ 8 files changed, 158 insertions(+), 3 deletions(-) create mode 100644 examples/yaml/client/callopt/streamcall/config.yaml diff --git a/config/streamcall.go b/config/streamcall.go index d912156..c980374 100644 --- a/config/streamcall.go +++ b/config/streamcall.go @@ -1 +1,10 @@ package config + +type StreamCallConfig struct { + // from callopt + HostPort string `yaml:"host_port"` + URL string `yaml:"url"` + ConnectTimeout *TimeInterval `yaml:"connect_timeout"` + Tag *Tag `yaml:"tag"` + GRPCCompressor string `yaml:"grpc_compressor"` +} diff --git a/configloader/yaml/callopt.go b/configloader/yaml/callopt.go index 959c7a1..a7429ba 100644 --- a/configloader/yaml/callopt.go +++ b/configloader/yaml/callopt.go @@ -7,7 +7,7 @@ import ( "os" ) -func LoadCallopConfig(filePath string) (*config.CalloptConfig, error) { +func LoadCalloptConfig(filePath string) (*config.CalloptConfig, error) { if _, err := os.Stat(filePath); os.IsNotExist(err) { return nil, fmt.Errorf("file does not exist: %s", filePath) diff --git a/configloader/yaml/streamcall.go b/configloader/yaml/streamcall.go index 72f5087..71dc865 100644 --- a/configloader/yaml/streamcall.go +++ b/configloader/yaml/streamcall.go @@ -1 +1,28 @@ package yaml + +import ( + "fmt" + "github.com/kitex-contrib/optionloader/config" + "gopkg.in/yaml.v3" + "os" +) + +func LoadStreamCallConfig(filePath string) (*config.StreamCallConfig, error) { + + if _, err := os.Stat(filePath); os.IsNotExist(err) { + return nil, fmt.Errorf("file does not exist: %s", filePath) + } + data, err := os.ReadFile(filePath) + if err != nil { + return nil, err + } + + var cfg config.StreamCallConfig + + err = yaml.Unmarshal(data, &cfg) + if err != nil { + return nil, err + } + + return &cfg, nil +} diff --git a/examples/yaml/client/callopt/main.go b/examples/yaml/client/callopt/main.go index 6773776..4f9ab04 100644 --- a/examples/yaml/client/callopt/main.go +++ b/examples/yaml/client/callopt/main.go @@ -8,7 +8,7 @@ import ( func main() { filePath := "./examples/yaml/client/callopt/config.yaml" - cfg, err := yaml.LoadCallopConfig(filePath) + cfg, err := yaml.LoadCalloptConfig(filePath) if err != nil { panic(err) } diff --git a/examples/yaml/client/callopt/streamcall/config.yaml b/examples/yaml/client/callopt/streamcall/config.yaml new file mode 100644 index 0000000..6c97bc2 --- /dev/null +++ b/examples/yaml/client/callopt/streamcall/config.yaml @@ -0,0 +1,9 @@ +host_port: "localhost:8080" +url: "/your-service-name/your-method" +connect_timeout: + unit: "s" + value: 3 +tag: + key: "your-tag-key" + value: "your-tag-value" +grpc_compressor: "gzip" diff --git a/examples/yaml/client/callopt/streamcall/main.go b/examples/yaml/client/callopt/streamcall/main.go index da13c8e..addb0eb 100644 --- a/examples/yaml/client/callopt/streamcall/main.go +++ b/examples/yaml/client/callopt/streamcall/main.go @@ -1 +1,21 @@ -package client +package main + +import ( + "fmt" + "github.com/kitex-contrib/optionloader/configloader/yaml" + "github.com/kitex-contrib/optionloader/optionloader/client/callopt/streamcall" +) + +func main() { + filePath := "./examples/yaml/client/callopt/streamcall/config.yaml" + cfg, err := yaml.LoadStreamCallConfig(filePath) + if err != nil { + panic(err) + } + loader := streamcall.NewOptionLoader() + options, err := loader.Load(cfg) + if err != nil { + panic(err) + } + fmt.Println(len(options)) +} diff --git a/optionloader/client/callopt/streamcall/optionloader.go b/optionloader/client/callopt/streamcall/optionloader.go index 8ebfce8..ee2bc83 100644 --- a/optionloader/client/callopt/streamcall/optionloader.go +++ b/optionloader/client/callopt/streamcall/optionloader.go @@ -1 +1,66 @@ package streamcall + +import ( + "fmt" + "github.com/cloudwego/kitex/client/callopt/streamcall" + "github.com/kitex-contrib/optionloader/config" + translator "github.com/kitex-contrib/optionloader/translator/client/callopt/streamcall" +) + +type Translator func(config *config.StreamCallConfig) streamcall.Option + +type OptionLoader interface { + // RegisterTranslator registers a translator function. + RegisterTranslator(translator Translator) + // Load loads the server options from config and custom registered option translators. + Load(config *config.StreamCallConfig) ([]streamcall.Option, error) +} + +type DefaultOptionLoader struct { + translators []Translator +} + +func NewOptionLoader() OptionLoader { + return &DefaultOptionLoader{ + translators: make([]Translator, 0), + } +} + +// RegisterTranslator registers a translator function. +// If the translator function has been registered, both will be registered, +// and the translator functions will be called in the order of registration. +func (loader *DefaultOptionLoader) RegisterTranslator(translator Translator) { + loader.translators = append(loader.translators, translator) +} + +func (loader *DefaultOptionLoader) Load(config *config.StreamCallConfig) ([]streamcall.Option, error) { + if config == nil { + return nil, fmt.Errorf("client config not set") + } + var translatorsList []Translator + + if config.HostPort != "" { + translatorsList = append(translatorsList, translator.HostPortTranslator) + } + if config.URL != "" { + translatorsList = append(translatorsList, translator.URLTranslator) + } + if config.ConnectTimeout != nil { + translatorsList = append(translatorsList, translator.ConnectTimeoutTranslator) + } + if config.Tag != nil { + translatorsList = append(translatorsList, translator.TagTranslator) + } + if config.GRPCCompressor != "" { + translatorsList = append(translatorsList, translator.GRPCCompressorTranslator) + } + + // Add the custom registered option translators behind the default translators. + loader.translators = append(translatorsList, loader.translators...) + + var options []streamcall.Option + for _, trans := range loader.translators { + options = append(options, trans(config)) + } + return options, nil +} diff --git a/translator/client/callopt/streamcall/translator.go b/translator/client/callopt/streamcall/translator.go index 8ebfce8..41d3a6c 100644 --- a/translator/client/callopt/streamcall/translator.go +++ b/translator/client/callopt/streamcall/translator.go @@ -1 +1,26 @@ package streamcall + +import ( + "github.com/cloudwego/kitex/client/callopt/streamcall" + "github.com/kitex-contrib/optionloader/config" +) + +func HostPortTranslator(config *config.StreamCallConfig) streamcall.Option { + return streamcall.WithHostPort(config.HostPort) +} + +func URLTranslator(config *config.StreamCallConfig) streamcall.Option { + return streamcall.WithURL(config.URL) +} + +func ConnectTimeoutTranslator(config *config.StreamCallConfig) streamcall.Option { + return streamcall.WithConnectTimeout(config.ConnectTimeout.Transform()) +} + +func TagTranslator(config *config.StreamCallConfig) streamcall.Option { + return streamcall.WithTag(config.Tag.Key, config.Tag.Value) +} + +func GRPCCompressorTranslator(config *config.StreamCallConfig) streamcall.Option { + return streamcall.WithGRPCCompressor(config.GRPCCompressor) +}