From 9ea51290292eae5effd6fa6b534722c37e134297 Mon Sep 17 00:00:00 2001 From: Ken Liu Date: Mon, 20 Nov 2023 19:38:46 +0800 Subject: [PATCH] Triple client&server api (#2502) * client api * client api * update triple client and server api * code format * fix example * put version and group into CallOption * put version and group into CallOption * code fmt * remove redundant parameter * update api * mod tidy * metadata support Params * move MethodOption to config package * mod tidy * append tracing filters * rename, metric to metrics --- client/action.go | 132 ++-- client/client.go | 34 +- client/options.go | 576 ++++++++++++++---- client/options_test.go | 80 ++- .../internal/old_triple/oldTriple.go | 13 +- common/dubboutil/copier.go | 42 ++ compat.go | 19 +- config/config_loader.go | 14 +- config/metadata_report_config.go | 26 +- config/metric_config.go | 18 +- config/metric_config_test.go | 2 +- config/options.go | 123 ++++ config/otel_config.go | 2 +- config/reference_config.go | 23 +- config/root_config.go | 12 +- config/service_config.go | 4 +- config/tracing_config.go | 2 +- config_center/options.go | 6 + dubbo.go | 39 +- global/application_config.go | 56 -- global/consumer_config.go | 69 +-- global/custom_config.go | 8 - global/logger_config.go | 71 --- global/metadata_report_config.go | 17 +- global/metric_config.go | 8 +- global/otel_config.go | 4 +- global/provider_config.go | 63 +- global/service_config.go | 190 ------ global/shutdown_config.go | 44 -- global/tracing_config.go | 1 + go.mod | 1 + go.sum | 3 +- metadata/options.go | 12 + metrics/options.go | 36 +- options.go | 12 +- otel/trace/options.go | 44 +- protocol/options.go | 6 + .../health/triple_health/health.triple.go | 22 +- .../triple/internal/client/cmd_client/main.go | 18 +- .../client/cmd_client_with_registry/main.go | 2 +- .../internal/client/cmd_instance/main.go | 2 +- .../client/cmd_instance_with_registry/main.go | 2 +- .../internal/client/health_client/main.go | 2 +- .../triple_gen/greettriple/greet.triple.go | 24 +- .../triple/internal/server/cmd_server/main.go | 3 + registry/options.go | 6 + server/action.go | 10 +- server/options.go | 438 ++++++++++++- server/server.go | 2 +- 49 files changed, 1448 insertions(+), 895 deletions(-) create mode 100644 common/dubboutil/copier.go create mode 100644 config/options.go diff --git a/client/action.go b/client/action.go index 83d72d5e07..aa69089408 100644 --- a/client/action.go +++ b/client/action.go @@ -52,9 +52,9 @@ func getEnv(key, fallback string) string { return fallback } -func updateOrCreateMeshURL(opts *ClientOptions) { +func updateOrCreateMeshURL(opts *ReferenceOptions) { ref := opts.Reference - con := opts.Consumer + con := opts.cliOpts.Consumer if ref.URL != "" { logger.Infof("URL specified explicitly %v", ref.URL) @@ -84,30 +84,31 @@ func updateOrCreateMeshURL(opts *ClientOptions) { } // ReferWithService retrieves invokers from urls. -func (opts *ClientOptions) ReferWithService(srv common.RPCService) { - opts.refer(srv, nil) +func (refOpts *ReferenceOptions) ReferWithService(srv common.RPCService) { + refOpts.refer(srv, nil) } -func (opts *ClientOptions) ReferWithInfo(info *ClientInfo) { - opts.refer(nil, info) +func (refOpts *ReferenceOptions) ReferWithInfo(info *ClientInfo) { + refOpts.refer(nil, info) } -func (opts *ClientOptions) ReferWithServiceAndInfo(srv common.RPCService, info *ClientInfo) { - opts.refer(srv, info) +func (refOpts *ReferenceOptions) ReferWithServiceAndInfo(srv common.RPCService, info *ClientInfo) { + refOpts.refer(srv, info) } -func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) { - ref := opts.Reference - con := opts.Consumer +func (refOpts *ReferenceOptions) refer(srv common.RPCService, info *ClientInfo) { + ref := refOpts.Reference + clientOpts := refOpts.cliOpts + con := clientOpts.Consumer var methods []string if info != nil { ref.InterfaceName = info.InterfaceName methods = info.MethodNames - opts.id = info.InterfaceName - opts.info = info + refOpts.id = info.InterfaceName + refOpts.info = info } else { - opts.id = common.GetReference(srv) + refOpts.id = common.GetReference(srv) } // If adaptive service is enabled, // the cluster and load balance should be overridden to "adaptivesvc" and "p2c" respectively. @@ -121,9 +122,9 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) { common.WithPath(ref.InterfaceName), common.WithProtocol(ref.Protocol), common.WithMethods(methods), - common.WithParams(opts.getURLMap()), - common.WithParamsValue(constant.BeanNameKey, opts.id), - common.WithParamsValue(constant.MetadataTypeKey, opts.metaDataType), + common.WithParams(refOpts.getURLMap()), + common.WithParamsValue(constant.BeanNameKey, refOpts.id), + common.WithParamsValue(constant.MetadataTypeKey, refOpts.metaDataType), ) if info != nil { cfgURL.SetAttribute(constant.ClientInfoKey, info) @@ -132,23 +133,23 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) { if ref.ForceTag { cfgURL.AddParam(constant.ForceUseTag, "true") } - opts.postProcessConfig(cfgURL) + refOpts.postProcessConfig(cfgURL) // if mesh-enabled is set - updateOrCreateMeshURL(opts) + updateOrCreateMeshURL(refOpts) - // retrieving urls from config, and appending the urls to opts.urls - if err := opts.processURL(cfgURL); err != nil { + // retrieving urls from config, and appending the urls to refOpts.urls + if err := refOpts.processURL(cfgURL); err != nil { panic(err) } - // Get invokers according to opts.urls + // Get invokers according to refOpts.urls var ( invoker protocol.Invoker regURL *common.URL ) - invokers := make([]protocol.Invoker, len(opts.urls)) - for i, u := range opts.urls { + invokers := make([]protocol.Invoker, len(refOpts.urls)) + for i, u := range refOpts.urls { if u.Protocol == constant.ServiceRegistryProtocol { invoker = extension.GetProtocol(constant.RegistryProtocol).Refer(u) } else { @@ -167,17 +168,17 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) { // TODO(hxmhlt): decouple from directory, config should not depend on directory module if len(invokers) == 1 { - opts.invoker = invokers[0] + refOpts.invoker = invokers[0] if ref.URL != "" { hitClu := constant.ClusterKeyFailover - if u := opts.invoker.GetURL(); u != nil { + if u := refOpts.invoker.GetURL(); u != nil { hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware) } cluster, err := extension.GetCluster(hitClu) if err != nil { panic(err) } else { - opts.invoker = cluster.Join(static.NewDirectory(invokers)) + refOpts.invoker = cluster.Join(static.NewDirectory(invokers)) } } } else { @@ -196,7 +197,7 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) { if err != nil { panic(err) } else { - opts.invoker = cluster.Join(static.NewDirectory(invokers)) + refOpts.invoker = cluster.Join(static.NewDirectory(invokers)) } } @@ -214,29 +215,29 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) { if asyncSrv, ok := srv.(common.AsyncCallbackService); ok { callback = asyncSrv.CallBack } - opts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetAsyncProxy(opts.invoker, callback, cfgURL) + refOpts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetAsyncProxy(refOpts.invoker, callback, cfgURL) } else { - opts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetProxy(opts.invoker, cfgURL) + refOpts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetProxy(refOpts.invoker, cfgURL) } - opts.pxy.Implement(srv) + refOpts.pxy.Implement(srv) } // this protocol would be destroyed in graceful_shutdown // please refer to (https://github.com/apache/dubbo-go/issues/2429) graceful_shutdown.RegisterProtocol(ref.Protocol) } -func (opts *ClientOptions) processURL(cfgURL *common.URL) error { - ref := opts.Reference +func (refOpts *ReferenceOptions) processURL(cfgURL *common.URL) error { + ref := refOpts.Reference if ref.URL != "" { // use user-specific urls /* - Two types of URL are allowed for opts.URL: + Two types of URL are allowed for refOpts.URL: 1. direct url: server IP, that is, no need for a registry anymore 2. registry url They will be handled in different ways: For example, we have a direct url and a registry url: 1. "tri://localhost:10000" is a direct url 2. "registry://localhost:2181" is a registry url. - Then, opts.URL looks like a string separated by semicolon: "tri://localhost:10000;registry://localhost:2181". + Then, refOpts.URL looks like a string separated by semicolon: "tri://localhost:10000;registry://localhost:2181". The result of urlStrings is a string array: []string{"tri://localhost:10000", "registry://localhost:2181"}. */ urlStrings := gxstrings.RegSplit(ref.URL, "\\s*[;]+\\s*") @@ -247,7 +248,7 @@ func (opts *ClientOptions) processURL(cfgURL *common.URL) error { } if serviceURL.Protocol == constant.RegistryProtocol { // serviceURL in this branch is a registry protocol serviceURL.SubURL = cfgURL - opts.urls = append(opts.urls, serviceURL) + refOpts.urls = append(refOpts.urls, serviceURL) } else { // serviceURL in this branch is the target endpoint IP address if serviceURL.Path == "" { serviceURL.Path = "/" + ref.InterfaceName @@ -256,26 +257,26 @@ func (opts *ClientOptions) processURL(cfgURL *common.URL) error { // other stuff, e.g. IP, port, etc., are same as serviceURL newURL := common.MergeURL(serviceURL, cfgURL) newURL.AddParam("peer", "true") - opts.urls = append(opts.urls, newURL) + refOpts.urls = append(refOpts.urls, newURL) } } } else { // use registry configs - opts.urls = config.LoadRegistries(ref.RegistryIDs, opts.registriesCompat, common.CONSUMER) + refOpts.urls = config.LoadRegistries(ref.RegistryIDs, refOpts.registriesCompat, common.CONSUMER) // set url to regURLs - for _, regURL := range opts.urls { + for _, regURL := range refOpts.urls { regURL.SubURL = cfgURL } } return nil } -func (opts *ClientOptions) CheckAvailable() bool { - ref := opts.Reference - if opts.invoker == nil { +func (refOpts *ReferenceOptions) CheckAvailable() bool { + ref := refOpts.Reference + if refOpts.invoker == nil { logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", ref.InterfaceName) return false } - if !opts.invoker.IsAvailable() { + if !refOpts.invoker.IsAvailable() { return false } return true @@ -283,30 +284,33 @@ func (opts *ClientOptions) CheckAvailable() bool { // Implement // @v is service provider implemented RPCService -func (opts *ClientOptions) Implement(v common.RPCService) { - if opts.pxy != nil { - opts.pxy.Implement(v) - } else if opts.info != nil { - opts.info.ClientInjectFunc(v, &Client{ - invoker: opts.invoker, - info: opts.info, +func (refOpts *ReferenceOptions) Implement(v common.RPCService) { + if refOpts.pxy != nil { + refOpts.pxy.Implement(v) + } else if refOpts.info != nil { + refOpts.info.ClientInjectFunc(v, &Client{ + cliOpts: refOpts.cliOpts, + info: refOpts.info, + refOpts: map[string]*ReferenceOptions{}, }) } } // GetRPCService gets RPCService from proxy -func (opts *ClientOptions) GetRPCService() common.RPCService { - return opts.pxy.Get() +func (refOpts *ReferenceOptions) GetRPCService() common.RPCService { + return refOpts.pxy.Get() } // GetProxy gets proxy -func (opts *ClientOptions) GetProxy() *proxy.Proxy { - return opts.pxy +func (refOpts *ReferenceOptions) GetProxy() *proxy.Proxy { + return refOpts.pxy } -func (opts *ClientOptions) getURLMap() url.Values { - ref := opts.Reference - app := opts.applicationCompat +func (refOpts *ReferenceOptions) getURLMap() url.Values { + ref := refOpts.Reference + app := refOpts.applicationCompat + metrics := refOpts.cliOpts.Metrics + tracing := refOpts.cliOpts.Otel.TracingConfig urlMap := url.Values{} // first set user params @@ -352,6 +356,12 @@ func (opts *ClientOptions) getURLMap() url.Values { if ref.Generic != "" { defaultReferenceFilter = constant.GenericFilterKey + "," + defaultReferenceFilter } + if metrics.Enable != nil && *metrics.Enable { + defaultReferenceFilter += fmt.Sprintf(",%s", constant.MetricsFilterKey) + } + if tracing.Enable != nil && *tracing.Enable { + defaultReferenceFilter += fmt.Sprintf(",%s", constant.OTELClientTraceKey) + } urlMap.Set(constant.ReferenceFilterKey, commonCfg.MergeValue(ref.Filter, "", defaultReferenceFilter)) for _, v := range ref.Methods { @@ -368,7 +378,7 @@ func (opts *ClientOptions) getURLMap() url.Values { // todo: figure this out //// GenericLoad ... -//func (opts *ClientOptions) GenericLoad(id string) { +//func (opts *ReferenceOptions) GenericLoad(id string) { // genericService := generic.NewGenericService(opts.id) // config.SetConsumerService(genericService) // opts.id = id @@ -377,12 +387,12 @@ func (opts *ClientOptions) getURLMap() url.Values { //} // GetInvoker get invoker from ReferenceConfigs -func (opts *ClientOptions) GetInvoker() protocol.Invoker { - return opts.invoker +func (refOpts *ReferenceOptions) GetInvoker() protocol.Invoker { + return refOpts.invoker } // postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfigs. -func (opts *ClientOptions) postProcessConfig(url *common.URL) { +func (refOpts *ReferenceOptions) postProcessConfig(url *common.URL) { for _, p := range extension.GetConfigPostProcessors() { p.PostProcessReferenceConfig(url) } diff --git a/client/client.go b/client/client.go index 1f7fd4c7e6..a3b540e944 100644 --- a/client/client.go +++ b/client/client.go @@ -20,6 +20,7 @@ package client import ( "context" + "fmt" ) import ( @@ -27,16 +28,17 @@ import ( ) import ( + "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/protocol" invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation" ) type Client struct { - invoker protocol.Invoker - info *ClientInfo + info *ClientInfo cliOpts *ClientOptions + refOpts map[string]*ReferenceOptions } type ClientInfo struct { @@ -58,8 +60,13 @@ func (cli *Client) call(ctx context.Context, paramsRawVals []interface{}, interf if err != nil { return nil, err } - // todo: move timeout into context or invocation - return cli.invoker.Invoke(ctx, inv), nil + + refOption := cli.refOpts[common.ServiceKey(interfaceName, options.Group, options.Version)] + if refOption == nil { + return nil, fmt.Errorf("no service found for %s/%s:%s, please check if the service has been registered", options.Group, interfaceName, options.Version) + } + + return refOption.invoker.Invoke(ctx, inv), nil } @@ -95,15 +102,23 @@ func (cli *Client) CallBidiStream(ctx context.Context, interfaceName, methodName return res.Result(), res.Error() } -func (cli *Client) Init(info *ClientInfo) error { +func (cli *Client) Init(info *ClientInfo, opts ...ReferenceOption) (string, string, error) { if info == nil { - return errors.New("ClientInfo is nil") + return "", "", errors.New("ClientInfo is nil") } - cli.cliOpts.ReferWithInfo(info) - cli.invoker = cli.cliOpts.invoker + newRefOptions := defaultReferenceOptions() + err := newRefOptions.init(cli, opts...) + if err != nil { + return "", "", err + } + + ref := newRefOptions.Reference + cli.refOpts[common.ServiceKey(info.InterfaceName, ref.Group, ref.Version)] = newRefOptions + + newRefOptions.ReferWithInfo(info) - return nil + return ref.Group, ref.Version, nil } func generateInvocation(methodName string, paramsRawVals []interface{}, callType string, opts *CallOptions) (protocol.Invocation, error) { @@ -125,5 +140,6 @@ func NewClient(opts ...ClientOption) (*Client, error) { } return &Client{ cliOpts: newCliOpts, + refOpts: make(map[string]*ReferenceOptions), }, nil } diff --git a/client/options.go b/client/options.go index acf1956e97..76f562918e 100644 --- a/client/options.go +++ b/client/options.go @@ -18,6 +18,7 @@ package client import ( + "reflect" "strconv" "time" ) @@ -30,6 +31,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common" commonCfg "dubbo.apache.org/dubbo-go/v3/common/config" "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/dubboutil" "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/global" "dubbo.apache.org/dubbo-go/v3/graceful_shutdown" @@ -38,12 +40,10 @@ import ( "dubbo.apache.org/dubbo-go/v3/registry" ) -type ClientOptions struct { - Application *global.ApplicationConfig - Consumer *global.ConsumerConfig - Reference *global.ReferenceConfig - Registries map[string]*global.RegistryConfig - Shutdown *global.ShutdownConfig +type ReferenceOptions struct { + Reference *global.ReferenceConfig + cliOpts *ClientOptions + Registries map[string]*global.RegistryConfig pxy *proxy.Proxy id string @@ -57,51 +57,50 @@ type ClientOptions struct { registriesCompat map[string]*config.RegistryConfig } -func defaultClientOptions() *ClientOptions { - return &ClientOptions{ - Application: global.DefaultApplicationConfig(), - Consumer: global.DefaultConsumerConfig(), - Reference: global.DefaultReferenceConfig(), - Shutdown: global.DefaultShutdownConfig(), +func defaultReferenceOptions() *ReferenceOptions { + return &ReferenceOptions{ + Reference: global.DefaultReferenceConfig(), } } -func (cliOpts *ClientOptions) init(opts ...ClientOption) error { +func (refOpts *ReferenceOptions) init(cli *Client, opts ...ReferenceOption) error { for _, opt := range opts { - opt(cliOpts) + opt(refOpts) } - if err := defaults.Set(cliOpts); err != nil { + if err := defaults.Set(refOpts); err != nil { return err } - ref := cliOpts.Reference + refOpts.cliOpts = cli.cliOpts + dubboutil.CopyFields(reflect.ValueOf(refOpts.cliOpts.Consumer).Elem(), reflect.ValueOf(refOpts.Reference).Elem()) + + ref := refOpts.Reference // init method methods := ref.Methods if length := len(methods); length > 0 { - cliOpts.methodsCompat = make([]*config.MethodConfig, length) + refOpts.methodsCompat = make([]*config.MethodConfig, length) for i, method := range methods { - cliOpts.methodsCompat[i] = compatMethodConfig(method) - if err := cliOpts.methodsCompat[i].Init(); err != nil { + refOpts.methodsCompat[i] = compatMethodConfig(method) + if err := refOpts.methodsCompat[i].Init(); err != nil { return err } } - } // init application - application := cliOpts.Application + application := refOpts.cliOpts.Application if application != nil { - cliOpts.applicationCompat = compatApplicationConfig(application) - if err := cliOpts.applicationCompat.Init(); err != nil { + refOpts.applicationCompat = compatApplicationConfig(application) + if err := refOpts.applicationCompat.Init(); err != nil { return err } - cliOpts.metaDataType = cliOpts.applicationCompat.MetadataType + refOpts.metaDataType = refOpts.applicationCompat.MetadataType if ref.Group == "" { - ref.Group = cliOpts.applicationCompat.Group + ref.Group = refOpts.applicationCompat.Group } if ref.Version == "" { - ref.Version = cliOpts.applicationCompat.Version + ref.Version = refOpts.applicationCompat.Version } } // init cluster @@ -115,12 +114,12 @@ func (cliOpts *ClientOptions) init(opts ...ClientOption) error { if ref.RegistryIDs == nil || len(ref.RegistryIDs) <= 0 { emptyRegIDsFlag = true } - regs := cliOpts.Registries + regs := refOpts.Registries if regs != nil { - cliOpts.registriesCompat = make(map[string]*config.RegistryConfig) + refOpts.registriesCompat = make(map[string]*config.RegistryConfig) for key, reg := range regs { - cliOpts.registriesCompat[key] = compatRegistryConfig(reg) - if err := cliOpts.registriesCompat[key].Init(); err != nil { + refOpts.registriesCompat[key] = compatRegistryConfig(reg) + if err := refOpts.registriesCompat[key].Init(); err != nil { return err } if emptyRegIDsFlag { @@ -131,45 +130,349 @@ func (cliOpts *ClientOptions) init(opts ...ClientOption) error { ref.RegistryIDs = commonCfg.TranslateIds(ref.RegistryIDs) // init graceful_shutdown - graceful_shutdown.Init(graceful_shutdown.SetShutdown_Config(cliOpts.Shutdown)) + graceful_shutdown.Init(graceful_shutdown.SetShutdown_Config(refOpts.cliOpts.Shutdown)) - return commonCfg.Verify(cliOpts) + return commonCfg.Verify(refOpts) } -type ClientOption func(*ClientOptions) +type ReferenceOption func(*ReferenceOptions) // ---------- For user ---------- -func WithCheck() ClientOption { - return func(opts *ClientOptions) { +func WithCheck() ReferenceOption { + return func(opts *ReferenceOptions) { check := true opts.Reference.Check = &check } } -func WithURL(url string) ClientOption { - return func(opts *ClientOptions) { +func WithURL(url string) ReferenceOption { + return func(opts *ReferenceOptions) { opts.Reference.URL = url } } // todo(DMwangnima): change Filter Option like Cluster and LoadBalance -func WithFilter(filter string) ClientOption { - return func(opts *ClientOptions) { +func WithFilter(filter string) ReferenceOption { + return func(opts *ReferenceOptions) { opts.Reference.Filter = filter } } // todo(DMwangnima): think about a more ideal configuration style -func WithRegistryIDs(registryIDs []string) ClientOption { - return func(opts *ClientOptions) { +func WithRegistryIDs(registryIDs []string) ReferenceOption { + return func(opts *ReferenceOptions) { if len(registryIDs) > 0 { opts.Reference.RegistryIDs = registryIDs } } } -func WithRegistry(opts ...registry.Option) ClientOption { +func WithRegistry(opts ...registry.Option) ReferenceOption { + regOpts := registry.NewOptions(opts...) + + return func(refOpts *ReferenceOptions) { + if refOpts.Registries == nil { + refOpts.Registries = make(map[string]*global.RegistryConfig) + } + refOpts.Registries[regOpts.ID] = regOpts.Registry + } +} + +// ========== Cluster Strategy ========== + +func WithClusterAvailable() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Cluster = constant.ClusterKeyAvailable + } +} + +func WithClusterBroadcast() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Cluster = constant.ClusterKeyBroadcast + } +} + +func WithClusterFailBack() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Cluster = constant.ClusterKeyFailback + } +} + +func WithClusterFailFast() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Cluster = constant.ClusterKeyFailfast + } +} + +func WithClusterFailOver() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Cluster = constant.ClusterKeyFailover + } +} + +func WithClusterFailSafe() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Cluster = constant.ClusterKeyFailsafe + } +} + +func WithClusterForking() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Cluster = constant.ClusterKeyForking + } +} + +func WithClusterZoneAware() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Cluster = constant.ClusterKeyZoneAware + } +} + +func WithClusterAdaptiveService() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Cluster = constant.ClusterKeyAdaptiveService + } +} + +func WithCluster(cluster string) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Cluster = cluster + } +} + +// ========== LoadBalance Strategy ========== + +func WithLoadBalanceConsistentHashing() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Loadbalance = constant.LoadBalanceKeyConsistentHashing + } +} + +func WithLoadBalanceLeastActive() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Loadbalance = constant.LoadBalanceKeyLeastActive + } +} + +func WithLoadBalanceRandom() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Loadbalance = constant.LoadBalanceKeyRandom + } +} + +func WithLoadBalanceRoundRobin() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Loadbalance = constant.LoadBalanceKeyRoundRobin + } +} + +func WithLoadBalanceP2C() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Loadbalance = constant.LoadBalanceKeyP2C + } +} + +func WithLoadBalance(lb string) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Loadbalance = lb + } +} + +func WithRetries(retries int) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Retries = strconv.Itoa(retries) + } +} + +func WithGroup(group string) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Group = group + } +} + +func WithVersion(version string) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Version = version + } +} + +func WithJSON() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Serialization = constant.JSONSerialization + } +} + +func WithProvidedBy(providedBy string) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.ProvidedBy = providedBy + } +} + +func WithAsync() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Async = true + } +} + +func WithParams(params map[string]string) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Params = params + } +} + +func WithGeneric() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Generic = "true" + } +} + +func WithSticky(sticky bool) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Sticky = sticky + } +} + +// ========== Protocol to consume ========== + +func WithProtocolDubbo() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Protocol = constant.Dubbo + } +} + +func WithProtocolTriple() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Protocol = "tri" + } +} + +func WithProtocolJsonRPC() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Protocol = "jsonrpc" + } +} + +func WithProtocol(protocol string) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.Protocol = protocol + } +} + +func WithRequestTimeout(timeout time.Duration) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.RequestTimeout = timeout.String() + } +} + +func WithForceTag() ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.ForceTag = true + } +} + +func WithMeshProviderPort(port int) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference.MeshProviderPort = port + } +} + +func WithMethod(opts ...config.MethodOption) ReferenceOption { + regOpts := config.NewMethodOptions(opts...) + + return func(opts *ReferenceOptions) { + if len(opts.Reference.Methods) == 0 { + opts.Reference.Methods = make([]*global.MethodConfig, 0) + } + opts.Reference.Methods = append(opts.Reference.Methods, regOpts.Method) + } +} + +func WithParam(k, v string) ReferenceOption { + return func(opts *ReferenceOptions) { + if opts.Reference.Params == nil { + opts.Reference.Params = make(map[string]string) + } + opts.Reference.Params[k] = v + } +} + +// ---------- For framework ---------- +// These functions should not be invoked by users + +func SetRegistries(regs map[string]*global.RegistryConfig) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Registries = regs + } +} + +func SetReference(reference *global.ReferenceConfig) ReferenceOption { + return func(opts *ReferenceOptions) { + opts.Reference = reference + } +} + +type ClientOptions struct { + Consumer *global.ConsumerConfig + Application *global.ApplicationConfig + Registries map[string]*global.RegistryConfig + Shutdown *global.ShutdownConfig + Metrics *global.MetricsConfig + Otel *global.OtelConfig +} + +func defaultClientOptions() *ClientOptions { + return &ClientOptions{ + Consumer: global.DefaultConsumerConfig(), + Registries: make(map[string]*global.RegistryConfig), + Application: global.DefaultApplicationConfig(), + Shutdown: global.DefaultShutdownConfig(), + Metrics: global.DefaultMetricsConfig(), + Otel: global.DefaultOtelConfig(), + } +} + +func (cliOpts *ClientOptions) init(opts ...ClientOption) error { + for _, opt := range opts { + opt(cliOpts) + } + if err := defaults.Set(cliOpts); err != nil { + return err + } + return nil +} + +type ClientOption func(*ClientOptions) + +func WithClientCheck() ClientOption { + return func(opts *ClientOptions) { + opts.Consumer.Check = true + } +} + +func WithClientURL(url string) ClientOption { + return func(opts *ClientOptions) { + opts.Consumer.URL = url + } +} + +// todo(DMwangnima): change Filter Option like Cluster and LoadBalance +func WithClientFilter(filter string) ClientOption { + return func(opts *ClientOptions) { + opts.Consumer.Filter = filter + } +} + +// todo(DMwangnima): think about a more ideal configuration style +func WithClientRegistryIDs(registryIDs []string) ClientOption { + return func(opts *ClientOptions) { + if len(registryIDs) > 0 { + opts.Consumer.RegistryIDs = registryIDs + } + } +} + +func WithClientRegistry(opts ...registry.Option) ClientOption { regOpts := registry.NewOptions(opts...) return func(cliOpts *ClientOptions) { @@ -180,7 +483,7 @@ func WithRegistry(opts ...registry.Option) ClientOption { } } -func WithShutdown(opts ...graceful_shutdown.Option) ClientOption { +func WithClientShutdown(opts ...graceful_shutdown.Option) ClientOption { sdOpts := graceful_shutdown.NewOptions(opts...) return func(cliOpts *ClientOptions) { @@ -190,180 +493,218 @@ func WithShutdown(opts ...graceful_shutdown.Option) ClientOption { // ========== Cluster Strategy ========== -func WithClusterAvailable() ClientOption { +func WithClientClusterAvailable() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Cluster = constant.ClusterKeyAvailable + opts.Consumer.Cluster = constant.ClusterKeyAvailable } } -func WithClusterBroadcast() ClientOption { +func WithClientClusterBroadcast() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Cluster = constant.ClusterKeyBroadcast + opts.Consumer.Cluster = constant.ClusterKeyBroadcast } } -func WithClusterFailBack() ClientOption { +func WithClientClusterFailBack() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Cluster = constant.ClusterKeyFailback + opts.Consumer.Cluster = constant.ClusterKeyFailback } } -func WithClusterFailFast() ClientOption { +func WithClientClusterFailFast() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Cluster = constant.ClusterKeyFailfast + opts.Consumer.Cluster = constant.ClusterKeyFailfast } } -func WithClusterFailOver() ClientOption { +func WithClientClusterFailOver() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Cluster = constant.ClusterKeyFailover + opts.Consumer.Cluster = constant.ClusterKeyFailover } } -func WithClusterFailSafe() ClientOption { +func WithClientClusterFailSafe() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Cluster = constant.ClusterKeyFailsafe + opts.Consumer.Cluster = constant.ClusterKeyFailsafe } } -func WithClusterForking() ClientOption { +func WithClientClusterForking() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Cluster = constant.ClusterKeyForking + opts.Consumer.Cluster = constant.ClusterKeyForking } } -func WithClusterZoneAware() ClientOption { +func WithClientClusterZoneAware() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Cluster = constant.ClusterKeyZoneAware + opts.Consumer.Cluster = constant.ClusterKeyZoneAware } } -func WithClusterAdaptiveService() ClientOption { +func WithClientClusterAdaptiveService() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Cluster = constant.ClusterKeyAdaptiveService + opts.Consumer.Cluster = constant.ClusterKeyAdaptiveService } } // ========== LoadBalance Strategy ========== -func WithLoadBalanceConsistentHashing() ClientOption { +func WithClientLoadBalanceConsistentHashing() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Loadbalance = constant.LoadBalanceKeyConsistentHashing + opts.Consumer.Loadbalance = constant.LoadBalanceKeyConsistentHashing } } -func WithLoadBalanceLeastActive() ClientOption { +func WithClientLoadBalanceLeastActive() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Loadbalance = constant.LoadBalanceKeyLeastActive + opts.Consumer.Loadbalance = constant.LoadBalanceKeyLeastActive } } -func WithLoadBalanceRandom() ClientOption { +func WithClientLoadBalanceRandom() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Loadbalance = constant.LoadBalanceKeyRandom + opts.Consumer.Loadbalance = constant.LoadBalanceKeyRandom } } -func WithLoadBalanceRoundRobin() ClientOption { +func WithClientLoadBalanceRoundRobin() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Loadbalance = constant.LoadBalanceKeyRoundRobin + opts.Consumer.Loadbalance = constant.LoadBalanceKeyRoundRobin } } -func WithLoadBalanceP2C() ClientOption { +func WithClientLoadBalanceP2C() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Loadbalance = constant.LoadBalanceKeyP2C + opts.Consumer.Loadbalance = constant.LoadBalanceKeyP2C } } -func WithLoadBalanceXDSRingHash() ClientOption { +func WithClientLoadBalance(lb string) ClientOption { return func(opts *ClientOptions) { - opts.Reference.Loadbalance = constant.LoadBalanceKeyLeastActive + opts.Consumer.Loadbalance = lb } } -func WithRetries(retries int) ClientOption { +func WithClientRetries(retries int) ClientOption { return func(opts *ClientOptions) { - opts.Reference.Retries = strconv.Itoa(retries) + opts.Consumer.Retries = strconv.Itoa(retries) } } -func WithGroup(group string) ClientOption { +func WithClientGroup(group string) ClientOption { return func(opts *ClientOptions) { - opts.Reference.Group = group + opts.Consumer.Group = group } } -func WithVersion(version string) ClientOption { +func WithClientVersion(version string) ClientOption { return func(opts *ClientOptions) { - opts.Reference.Version = version + opts.Consumer.Version = version } } -func WithJSON() ClientOption { +func WithClientSerializationJSON() ClientOption { return func(opts *ClientOptions) { - opts.Reference.Serialization = constant.JSONSerialization + opts.Consumer.Serialization = constant.JSONSerialization } } -func WithProvidedBy(providedBy string) ClientOption { +func WithClientSerialization(ser string) ClientOption { return func(opts *ClientOptions) { - opts.Reference.ProvidedBy = providedBy + opts.Consumer.Serialization = ser + } +} + +func WithClientProvidedBy(providedBy string) ClientOption { + return func(opts *ClientOptions) { + opts.Consumer.ProvidedBy = providedBy } } // todo(DMwangnima): implement this functionality //func WithAsync() ClientOption { // return func(opts *ClientOptions) { -// opts.Reference.Async = true +// opts.Consumer.Async = true // } //} -func WithParams(params map[string]string) ClientOption { +func WithClientParams(params map[string]string) ClientOption { return func(opts *ClientOptions) { - opts.Reference.Params = params + opts.Consumer.Params = params + } +} + +func WithClientParam(k, v string) ClientOption { + return func(opts *ClientOptions) { + if opts.Consumer.Params == nil { + opts.Consumer.Params = make(map[string]string) + } + opts.Consumer.Params[k] = v } } // todo(DMwangnima): implement this functionality -//func WithGeneric(generic bool) ClientOption { +//func WithClientGeneric(generic bool) ClientOption { // return func(opts *ClientOptions) { // if generic { -// opts.Reference.Generic = "true" +// opts.Consumer.Generic = "true" // } else { -// opts.Reference.Generic = "false" +// opts.Consumer.Generic = "false" // } // } //} -func WithSticky(sticky bool) ClientOption { +func WithClientSticky(sticky bool) ClientOption { return func(opts *ClientOptions) { - opts.Reference.Sticky = sticky + opts.Consumer.Sticky = sticky + } +} + +// ========== Protocol to consume ========== + +func WithClientProtocolDubbo() ClientOption { + return func(opts *ClientOptions) { + opts.Consumer.Protocol = constant.Dubbo } } -func WithRequestTimeout(timeout time.Duration) ClientOption { +func WithClientProtocolTriple() ClientOption { return func(opts *ClientOptions) { - opts.Reference.RequestTimeout = timeout.String() + opts.Consumer.Protocol = "tri" } } -func WithForce(force bool) ClientOption { +func WithClientProtocolJsonRPC() ClientOption { return func(opts *ClientOptions) { - opts.Reference.ForceTag = force + opts.Consumer.Protocol = "jsonrpc" } } -func WithMeshProviderPort(port int) ClientOption { +func WithClientProtocol(protocol string) ClientOption { return func(opts *ClientOptions) { - opts.Reference.MeshProviderPort = port + opts.Consumer.Protocol = protocol } } -// ---------- For framework ---------- -// These functions should not be invoked by users +func WithClientRequestTimeout(timeout time.Duration) ClientOption { + return func(opts *ClientOptions) { + opts.Consumer.RequestTimeout = timeout.String() + } +} -func SetRegistries(regs map[string]*global.RegistryConfig) ClientOption { +func WithClientForceTag() ClientOption { + return func(opts *ClientOptions) { + opts.Consumer.ForceTag = true + } +} + +func WithClientMeshProviderPort(port int) ClientOption { + return func(opts *ClientOptions) { + opts.Consumer.MeshProviderPort = port + } +} + +func SetClientRegistries(regs map[string]*global.RegistryConfig) ClientOption { return func(opts *ClientOptions) { opts.Registries = regs } @@ -375,21 +716,27 @@ func SetApplication(application *global.ApplicationConfig) ClientOption { } } -func SetConsumer(consumer *global.ConsumerConfig) ClientOption { +func SetClientConsumer(consumer *global.ConsumerConfig) ClientOption { return func(opts *ClientOptions) { opts.Consumer = consumer } } -func SetReference(reference *global.ReferenceConfig) ClientOption { +func SetClientShutdown(shutdown *global.ShutdownConfig) ClientOption { return func(opts *ClientOptions) { - opts.Reference = reference + opts.Shutdown = shutdown } } -func SetShutdown(shutdown *global.ShutdownConfig) ClientOption { +func SetClientMetrics(metrics *global.MetricsConfig) ClientOption { return func(opts *ClientOptions) { - opts.Shutdown = shutdown + opts.Metrics = metrics + } +} + +func SetClientOtel(otel *global.OtelConfig) ClientOption { + return func(opts *ClientOptions) { + opts.Otel = otel } } @@ -397,15 +744,14 @@ func SetShutdown(shutdown *global.ShutdownConfig) ClientOption { type CallOptions struct { RequestTimeout string Retries string + Group string + Version string } type CallOption func(*CallOptions) func newDefaultCallOptions() *CallOptions { - return &CallOptions{ - RequestTimeout: "", - Retries: "", - } + return &CallOptions{} } // WithCallRequestTimeout the maximum waiting time for one specific call, only works for 'tri' and 'dubbo' protocol @@ -421,3 +767,15 @@ func WithCallRetries(retries int) CallOption { opts.Retries = strconv.Itoa(retries) } } + +func WithCallGroup(group string) CallOption { + return func(opts *CallOptions) { + opts.Group = group + } +} + +func WithCallVersion(version string) CallOption { + return func(opts *CallOptions) { + opts.Version = version + } +} diff --git a/client/options_test.go b/client/options_test.go index 5a8ff5f010..0016b916d4 100644 --- a/client/options_test.go +++ b/client/options_test.go @@ -17,49 +17,37 @@ package client -import ( - "testing" -) - -import ( - "github.com/stretchr/testify/assert" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" -) - -func TestWithURL(t *testing.T) { - tests := []struct { - opts []ClientOption - justify func(t *testing.T, opts *ClientOptions) - }{ - { - opts: []ClientOption{ - WithURL("127.0.0.1:20000"), - }, - justify: func(t *testing.T, opts *ClientOptions) { - urls := opts.urls - assert.Equal(t, 1, len(urls)) - assert.Equal(t, "tri", urls[0].Protocol) - }, - }, - { - opts: []ClientOption{ - WithURL("tri://127.0.0.1:20000"), - }, - justify: func(t *testing.T, opts *ClientOptions) { - urls := opts.urls - assert.Equal(t, 1, len(urls)) - assert.Equal(t, "tri", urls[0].Protocol) - }, - }, - } - - for _, test := range tests { - newOpts := defaultClientOptions() - assert.Nil(t, newOpts.init(test.opts...)) - assert.Nil(t, newOpts.processURL(&common.URL{})) - test.justify(t, newOpts) - } -} +//func TestWithURL(t *testing.T) { +// tests := []struct { +// opts []ClientOption +// justify func(t *testing.T, opts *ClientOptions) +// }{ +// { +// opts: []ClientOption{ +// WithClientURL("127.0.0.1:20000"), +// }, +// justify: func(t *testing.T, opts *ClientOptions) { +// urls := opts.urls +// assert.Equal(t, 1, len(urls)) +// assert.Equal(t, "tri", urls[0].Protocol) +// }, +// }, +// { +// opts: []ClientOption{ +// WithClientURL("tri://127.0.0.1:20000"), +// }, +// justify: func(t *testing.T, opts *ClientOptions) { +// urls := opts.urls +// assert.Equal(t, 1, len(urls)) +// assert.Equal(t, "tri", urls[0].Protocol) +// }, +// }, +// } +// +// for _, test := range tests { +// newOpts := defaultClientOptions() +// assert.Nil(t, newOpts.init(test.opts...)) +// assert.Nil(t, newOpts.processURL(&common.URL{})) +// test.justify(t, newOpts) +// } +//} diff --git a/cmd/protoc-gen-go-triple/internal/old_triple/oldTriple.go b/cmd/protoc-gen-go-triple/internal/old_triple/oldTriple.go index 5bb7305f6a..d264a3ae33 100644 --- a/cmd/protoc-gen-go-triple/internal/old_triple/oldTriple.go +++ b/cmd/protoc-gen-go-triple/internal/old_triple/oldTriple.go @@ -1,10 +1,10 @@ /* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -13,7 +13,6 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ package old_triple diff --git a/common/dubboutil/copier.go b/common/dubboutil/copier.go new file mode 100644 index 0000000000..999c107004 --- /dev/null +++ b/common/dubboutil/copier.go @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dubboutil + +import ( + "reflect" +) + +func CopyFields(sourceValue reflect.Value, targetValue reflect.Value) { + for i := 0; i < sourceValue.NumField(); i++ { + sourceField := sourceValue.Type().Field(i) + sourceFieldValue := sourceValue.Field(i) + + // embedded ReferenceConfig + if sourceFieldValue.Kind() == reflect.Struct && sourceField.Anonymous { + CopyFields(sourceFieldValue, targetValue) + continue + } + + if sourceFieldValue.CanInterface() && sourceFieldValue.CanSet() { + targetField := targetValue.FieldByName(sourceField.Name) + if targetField.IsValid() && targetField.CanSet() && targetField.Type() == sourceFieldValue.Type() && targetField.IsZero() { + targetField.Set(sourceFieldValue) + } + } + } +} diff --git a/compat.go b/compat.go index ff65c2f516..29e1962b3b 100644 --- a/compat.go +++ b/compat.go @@ -48,7 +48,7 @@ func compatRootConfig(c *InstanceOptions) *config.RootConfig { MetadataReport: compatMetadataReportConfig(c.MetadataReport), Provider: compatProviderConfig(c.Provider), Consumer: compatConsumerConfig(c.Consumer), - Metric: compatMetricConfig(c.Metric), + Metrics: compatMetricConfig(c.Metrics), Otel: compatOtelConfig(c.Otel), Logger: compatLoggerConfig(c.Logger), Shutdown: compatShutdownConfig(c.Shutdown), @@ -147,6 +147,7 @@ func compatMetadataReportConfig(c *global.MetadataReportConfig) *config.Metadata Timeout: c.Timeout, Group: c.Group, Namespace: c.Namespace, + Params: c.Params, } } @@ -263,11 +264,11 @@ func compatConsumerConfig(c *global.ConsumerConfig) *config.ConsumerConfig { } } -func compatMetricConfig(c *global.MetricConfig) *config.MetricConfig { +func compatMetricConfig(c *global.MetricsConfig) *config.MetricsConfig { if c == nil { return nil } - return &config.MetricConfig{ + return &config.MetricsConfig{ Enable: c.Enable, Port: c.Port, Path: c.Path, @@ -286,12 +287,12 @@ func compatOtelConfig(c *global.OtelConfig) *config.OtelConfig { } return &config.OtelConfig{ TraceConfig: &config.OtelTraceConfig{ - Enable: c.TraceConfig.Enable, - Exporter: c.TraceConfig.Exporter, - Endpoint: c.TraceConfig.Endpoint, - Propagator: c.TraceConfig.Propagator, - SampleMode: c.TraceConfig.SampleMode, - SampleRatio: c.TraceConfig.SampleRatio, + Enable: c.TracingConfig.Enable, + Exporter: c.TracingConfig.Exporter, + Endpoint: c.TracingConfig.Endpoint, + Propagator: c.TracingConfig.Propagator, + SampleMode: c.TracingConfig.SampleMode, + SampleRatio: c.TracingConfig.SampleRatio, }, } } diff --git a/config/config_loader.go b/config/config_loader.go index 239de5b806..eda382d8fd 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -158,22 +158,22 @@ func RPCService(service common.RPCService) { rootConfig.Consumer.References[ref].Implement(service) } -// GetMetricConfig find the MetricConfig +// GetMetricConfig find the MetricsConfig // if it is nil, create a new one // we use double-check to reduce race condition // In general, it will be locked 0 or 1 time. // So you don't need to worry about the race condition -func GetMetricConfig() *MetricConfig { +func GetMetricConfig() *MetricsConfig { // todo - //if GetBaseConfig().Metric == nil { + //if GetBaseConfig().Metrics == nil { // configAccessMutex.Lock() // defer configAccessMutex.Unlock() - // if GetBaseConfig().Metric == nil { - // GetBaseConfig().Metric = &metric.Metric{} + // if GetBaseConfig().Metrics == nil { + // GetBaseConfig().Metrics = &metric.Metrics{} // } //} - //return GetBaseConfig().Metric - return rootConfig.Metric + //return GetBaseConfig().Metrics + return rootConfig.Metrics } func GetTracingConfig(tracingKey string) *TracingConfig { diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go index a84df18e90..bde3732e7f 100644 --- a/config/metadata_report_config.go +++ b/config/metadata_report_config.go @@ -32,13 +32,14 @@ import ( // MetadataReportConfig is app level configuration type MetadataReportConfig struct { - Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` - Address string `required:"true" yaml:"address" json:"address"` - Username string `yaml:"username" json:"username,omitempty"` - Password string `yaml:"password" json:"password,omitempty"` - Timeout string `yaml:"timeout" json:"timeout,omitempty"` - Group string `yaml:"group" json:"group,omitempty"` - Namespace string `yaml:"namespace" json:"namespace,omitempty"` + Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` + Address string `required:"true" yaml:"address" json:"address"` + Username string `yaml:"username" json:"username,omitempty"` + Password string `yaml:"password" json:"password,omitempty"` + Timeout string `yaml:"timeout" json:"timeout,omitempty"` + Group string `yaml:"group" json:"group,omitempty"` + Namespace string `yaml:"namespace" json:"namespace,omitempty"` + Params map[string]string `yaml:"params" json:"parameters,omitempty"` // metadataType of this application is defined by application config, local or remote metadataType string } @@ -72,6 +73,9 @@ func (mc *MetadataReportConfig) ToUrl() (*common.URL, error) { return nil, perrors.New("Invalid MetadataReport Config.") } res.SetParam("metadata", res.Protocol) + for key, val := range mc.Params { + res.SetParam(key, val) + } return res, nil } @@ -137,7 +141,7 @@ type MetadataReportConfigBuilder struct { } func NewMetadataReportConfigBuilder() *MetadataReportConfigBuilder { - return &MetadataReportConfigBuilder{metadataReportConfig: &MetadataReportConfig{}} + return &MetadataReportConfigBuilder{metadataReportConfig: newEmptyMetadataReportConfig()} } func (mrcb *MetadataReportConfigBuilder) SetProtocol(protocol string) *MetadataReportConfigBuilder { @@ -173,3 +177,9 @@ func (mrcb *MetadataReportConfigBuilder) SetGroup(group string) *MetadataReportC func (mrcb *MetadataReportConfigBuilder) Build() *MetadataReportConfig { return mrcb.metadataReportConfig } + +func newEmptyMetadataReportConfig() *MetadataReportConfig { + return &MetadataReportConfig{ + Params: make(map[string]string), + } +} diff --git a/config/metric_config.go b/config/metric_config.go index 145200072f..628b0f6a11 100644 --- a/config/metric_config.go +++ b/config/metric_config.go @@ -33,8 +33,8 @@ import ( "dubbo.apache.org/dubbo-go/v3/metrics" ) -// MetricConfig This is the config struct for all metrics implementation -type MetricConfig struct { +// MetricsConfig This is the config struct for all metrics implementation +type MetricsConfig struct { Enable *bool `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"` Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"` Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"` @@ -71,7 +71,7 @@ type PushgatewayConfig struct { PushInterval int `default:"30" yaml:"push-interval" json:"push-interval,omitempty" property:"push-interval"` } -func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig { +func (mc *MetricsConfig) ToReporterConfig() *metrics.ReporterConfig { defaultMetricsReportConfig := metrics.NewReporterConfig() defaultMetricsReportConfig.Enable = *mc.Enable @@ -81,7 +81,7 @@ func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig { return defaultMetricsReportConfig } -func (mc *MetricConfig) Init(rc *RootConfig) error { +func (mc *MetricsConfig) Init(rc *RootConfig) error { if mc == nil { return errors.New("metrics config is null") } @@ -99,11 +99,11 @@ func (mc *MetricConfig) Init(rc *RootConfig) error { } type MetricConfigBuilder struct { - metricConfig *MetricConfig + metricConfig *MetricsConfig } func NewMetricConfigBuilder() *MetricConfigBuilder { - return &MetricConfigBuilder{metricConfig: &MetricConfig{}} + return &MetricConfigBuilder{metricConfig: &MetricsConfig{}} } func (mcb *MetricConfigBuilder) SetMetadataEnabled(enabled bool) *MetricConfigBuilder { @@ -121,17 +121,17 @@ func (mcb *MetricConfigBuilder) SetConfigCenterEnabled(enabled bool) *MetricConf return mcb } -func (mcb *MetricConfigBuilder) Build() *MetricConfig { +func (mcb *MetricConfigBuilder) Build() *MetricsConfig { return mcb.metricConfig } // DynamicUpdateProperties dynamically update properties. -func (mc *MetricConfig) DynamicUpdateProperties(newMetricConfig *MetricConfig) { +func (mc *MetricsConfig) DynamicUpdateProperties(newMetricConfig *MetricsConfig) { // TODO update } // prometheus://localhost:9090?&histogram.enabled=false&prometheus.exporter.enabled=false -func (mc *MetricConfig) toURL() *common.URL { +func (mc *MetricsConfig) toURL() *common.URL { url, _ := common.NewURL("localhost", common.WithProtocol(mc.Protocol)) url.SetParam(constant.PrometheusExporterMetricsPortKey, mc.Port) url.SetParam(constant.PrometheusExporterMetricsPathKey, mc.Path) diff --git a/config/metric_config_test.go b/config/metric_config_test.go index 1dfd3af636..e87bd18304 100644 --- a/config/metric_config_test.go +++ b/config/metric_config_test.go @@ -32,7 +32,7 @@ func TestMetricConfigBuilder(t *testing.T) { SetRegistryEnabled(false). Build() enable := false - assert.Equal(t, &MetricConfig{ + assert.Equal(t, &MetricsConfig{ EnableConfigCenter: &enable, EnableMetadata: &enable, EnableRegistry: &enable, diff --git a/config/options.go b/config/options.go new file mode 100644 index 0000000000..7d01b0d2ef --- /dev/null +++ b/config/options.go @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "strconv" + "time" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/global" +) + +type MethodOption func(*MethodOptions) + +func WithInterfaceId(id string) MethodOption { + return func(opts *MethodOptions) { + opts.Method.InterfaceId = id + } +} + +func WithInterfaceName(name string) MethodOption { + return func(opts *MethodOptions) { + opts.Method.InterfaceName = name + } +} + +func WithName(name string) MethodOption { + return func(opts *MethodOptions) { + opts.Method.Name = name + } +} + +func WithRetries(retries int) MethodOption { + return func(opts *MethodOptions) { + opts.Method.Retries = strconv.Itoa(retries) + } +} + +func WithLoadBalance(lb string) MethodOption { + return func(opts *MethodOptions) { + opts.Method.LoadBalance = lb + } +} + +func WithWeight(weight int64) MethodOption { + return func(opts *MethodOptions) { + opts.Method.Weight = weight + } +} + +func WithTpsLimitInterval(interval int) MethodOption { + return func(opts *MethodOptions) { + opts.Method.TpsLimitInterval = strconv.Itoa(interval) + } +} + +func WithTpsLimitRate(rate int) MethodOption { + return func(opts *MethodOptions) { + opts.Method.TpsLimitRate = strconv.Itoa(rate) + } +} + +func WithTpsLimitStrategy(strategy string) MethodOption { + return func(opts *MethodOptions) { + opts.Method.TpsLimitStrategy = strategy + } +} + +func WithExecuteLimit(limit int) MethodOption { + return func(opts *MethodOptions) { + opts.Method.ExecuteLimit = strconv.Itoa(limit) + } +} + +func WithExecuteLimitRejectedHandler(handler string) MethodOption { + return func(opts *MethodOptions) { + opts.Method.ExecuteLimitRejectedHandler = handler + } +} + +func WithSticky() MethodOption { + return func(opts *MethodOptions) { + opts.Method.Sticky = true + } +} + +func WithRequestTimeout(millSeconds time.Duration) MethodOption { + return func(opts *MethodOptions) { + opts.Method.RequestTimeout = millSeconds.String() + } +} + +type MethodOptions struct { + Method *global.MethodConfig +} + +func defaultMethodOptions() *MethodOptions { + return &MethodOptions{Method: &global.MethodConfig{}} +} + +func NewMethodOptions(opts ...MethodOption) *MethodOptions { + defOpts := defaultMethodOptions() + for _, opt := range opts { + opt(defOpts) + } + return defOpts +} diff --git a/config/otel_config.go b/config/otel_config.go index 762a0897f3..9e0c600a01 100644 --- a/config/otel_config.go +++ b/config/otel_config.go @@ -33,7 +33,7 @@ import ( ) type OtelConfig struct { - TraceConfig *OtelTraceConfig `yaml:"trace" json:"trace,omitempty" property:"trace"` + TraceConfig *OtelTraceConfig `yaml:"tracing" json:"trace,omitempty" property:"trace"` } type OtelTraceConfig struct { diff --git a/config/reference_config.go b/config/reference_config.go index 6b14313f2a..c48bc5d9bf 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -47,7 +47,11 @@ import ( // ReferenceConfig is the configuration of service consumer type ReferenceConfig struct { - pxy *proxy.Proxy + pxy *proxy.Proxy + invoker protocol.Invoker + urls []*common.URL + rootConfig *RootConfig + id string InterfaceName string `yaml:"interface" json:"interface,omitempty" property:"interface"` Check *bool `yaml:"check" json:"check,omitempty" property:"check"` @@ -65,14 +69,11 @@ type ReferenceConfig struct { Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` Async bool `yaml:"async" json:"async,omitempty" property:"async"` Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` - invoker protocol.Invoker - urls []*common.URL - Generic string `yaml:"generic" json:"generic,omitempty" property:"generic"` - Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` - RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` - ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"` - TracingKey string `yaml:"tracing-key" json:"tracing-key,omitempty" propertiy:"tracing-key"` - rootConfig *RootConfig + Generic string `yaml:"generic" json:"generic,omitempty" property:"generic"` + Sticky bool `yaml:"sticky" json:"sticky,omitempty" property:"sticky"` + RequestTimeout string `yaml:"timeout" json:"timeout,omitempty" property:"timeout"` + ForceTag bool `yaml:"force.tag" json:"force.tag,omitempty" property:"force.tag"` + TracingKey string `yaml:"tracing-key" json:"tracing-key,omitempty" propertiy:"tracing-key"` metaDataType string metricsEnable bool MeshProviderPort int `yaml:"mesh-provider-port" json:"mesh-provider-port,omitempty" propertiy:"mesh-provider-port"` @@ -122,8 +123,8 @@ func (rc *ReferenceConfig) Init(root *RootConfig) error { if rc.Cluster == "" { rc.Cluster = "failover" } - if root.Metric.Enable != nil { - rc.metricsEnable = *root.Metric.Enable + if root.Metrics.Enable != nil { + rc.metricsEnable = *root.Metrics.Enable } return verify(rc) diff --git a/config/root_config.go b/config/root_config.go index d904b78124..8239b1e109 100644 --- a/config/root_config.go +++ b/config/root_config.go @@ -56,7 +56,7 @@ type RootConfig struct { Provider *ProviderConfig `yaml:"provider" json:"provider" property:"provider"` Consumer *ConsumerConfig `yaml:"consumer" json:"consumer" property:"consumer"` Otel *OtelConfig `yaml:"otel" json:"otel,omitempty" property:"otel"` - Metric *MetricConfig `yaml:"metrics" json:"metrics,omitempty" property:"metrics"` + Metrics *MetricsConfig `yaml:"metrics" json:"metrics,omitempty" property:"metrics"` Tracing map[string]*TracingConfig `yaml:"tracing" json:"tracing,omitempty" property:"tracing"` Logger *LoggerConfig `yaml:"logger" json:"logger,omitempty" property:"logger"` Shutdown *ShutdownConfig `yaml:"shutdown" json:"shutdown,omitempty" property:"shutdown"` @@ -180,7 +180,7 @@ func (rc *RootConfig) Init() error { if err := rc.Otel.Init(rc.Application); err != nil { return err } - if err := rc.Metric.Init(rc); err != nil { + if err := rc.Metrics.Init(rc); err != nil { return err } for _, t := range rc.Tracing { @@ -230,7 +230,7 @@ func newEmptyRootConfig() *RootConfig { Provider: NewProviderConfigBuilder().Build(), Consumer: NewConsumerConfigBuilder().Build(), Otel: NewOtelConfigBuilder().Build(), - Metric: NewMetricConfigBuilder().Build(), + Metrics: NewMetricConfigBuilder().Build(), Logger: NewLoggerConfigBuilder().Build(), Custom: NewCustomConfigBuilder().Build(), Shutdown: NewShutDownConfigBuilder().Build(), @@ -292,8 +292,8 @@ func (rb *RootConfigBuilder) SetOtel(otel *OtelConfig) *RootConfigBuilder { return rb } -func (rb *RootConfigBuilder) SetMetric(metric *MetricConfig) *RootConfigBuilder { - rb.rootConfig.Metric = metric +func (rb *RootConfigBuilder) SetMetric(metric *MetricsConfig) *RootConfigBuilder { + rb.rootConfig.Metrics = metric return rb } @@ -420,5 +420,5 @@ func (rc *RootConfig) Process(event *config_center.ConfigChangeEvent) { rc.Logger.DynamicUpdateProperties(updateRootConfig.Logger) // dynamically update metric - rc.Metric.DynamicUpdateProperties(updateRootConfig.Metric) + rc.Metrics.DynamicUpdateProperties(updateRootConfig.Metrics) } diff --git a/config/service_config.go b/config/service_config.go index 7285ad85a4..461546b72e 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -145,8 +145,8 @@ func (s *ServiceConfig) Init(rc *RootConfig) error { if s.TracingKey == "" { s.TracingKey = rc.Provider.TracingKey } - if rc.Metric.Enable != nil { - s.metricsEnable = *rc.Metric.Enable + if rc.Metrics.Enable != nil { + s.metricsEnable = *rc.Metrics.Enable } err := s.check() if err != nil { diff --git a/config/tracing_config.go b/config/tracing_config.go index 0150eb4252..ba85fe5c3b 100644 --- a/config/tracing_config.go +++ b/config/tracing_config.go @@ -26,7 +26,7 @@ import ( ) // TracingConfig is the configuration of the tracing. -// It's designed to be replaced with config.OtelConfig +// Deprecated: It's designed to be replaced with config.OtelConfig type TracingConfig struct { Name string `default:"jaeger" yaml:"name" json:"name,omitempty" property:"name"` // jaeger or zipkin(todo) ServiceName string `yaml:"serviceName" json:"serviceName,omitempty" property:"serviceName"` diff --git a/config_center/options.go b/config_center/options.go index 3fa9e1b38e..be8c343d86 100644 --- a/config_center/options.go +++ b/config_center/options.go @@ -59,6 +59,12 @@ func WithNacos() Option { } } +func WithConfigCenter(cc string) Option { + return func(opts *Options) { + opts.Center.Protocol = cc + } +} + func WithAddress(address string) Option { return func(opts *Options) { if i := strings.Index(address, "://"); i > 0 { diff --git a/dubbo.go b/dubbo.go index ad24597ff0..155a17bd64 100644 --- a/dubbo.go +++ b/dubbo.go @@ -59,29 +59,39 @@ func (ins *Instance) NewClient(opts ...client.ClientOption) (*client.Client, err appCfg := ins.insOpts.Application regsCfg := ins.insOpts.Registries sdCfg := ins.insOpts.Shutdown + metricsCfg := ins.insOpts.Metrics + otelCfg := ins.insOpts.Otel + if conCfg != nil { if conCfg.Check { - cliOpts = append(cliOpts, client.WithCheck()) + cliOpts = append(cliOpts, client.WithClientCheck()) } // these options come from Consumer and Root. // for dubbo-go developers, referring config/ConsumerConfig.Init and config/ReferenceConfig cliOpts = append(cliOpts, - client.WithFilter(conCfg.Filter), + client.WithClientFilter(conCfg.Filter), // todo(DMwangnima): deal with Protocol - client.WithRegistryIDs(conCfg.RegistryIDs), + client.WithClientRegistryIDs(conCfg.RegistryIDs), // todo(DMwangnima): deal with TracingKey - client.SetConsumer(conCfg), + client.SetClientConsumer(conCfg), ) } if appCfg != nil { cliOpts = append(cliOpts, client.SetApplication(appCfg)) } if regsCfg != nil { - cliOpts = append(cliOpts, client.SetRegistries(regsCfg)) + cliOpts = append(cliOpts, client.SetClientRegistries(regsCfg)) } if sdCfg != nil { - cliOpts = append(cliOpts, client.SetShutdown(sdCfg)) + cliOpts = append(cliOpts, client.SetClientShutdown(sdCfg)) + } + if metricsCfg != nil { + cliOpts = append(cliOpts, client.SetClientMetrics(metricsCfg)) } + if otelCfg != nil { + cliOpts = append(cliOpts, client.SetClientOtel(otelCfg)) + } + // options passed by users has higher priority cliOpts = append(cliOpts, opts...) @@ -104,9 +114,12 @@ func (ins *Instance) NewServer(opts ...server.ServerOption) (*server.Server, err regsCfg := ins.insOpts.Registries prosCfg := ins.insOpts.Protocols sdCfg := ins.insOpts.Shutdown + metricsCfg := ins.insOpts.Metrics + otelCfg := ins.insOpts.Otel + if appCfg != nil { srvOpts = append(srvOpts, - server.SetServer_Application(appCfg), + server.SetServerApplication(appCfg), //server.WithServer_ApplicationConfig( // global.WithApplication_Name(appCfg.Name), // global.WithApplication_Organization(appCfg.Organization), @@ -118,13 +131,19 @@ func (ins *Instance) NewServer(opts ...server.ServerOption) (*server.Server, err ) } if regsCfg != nil { - srvOpts = append(srvOpts, server.SetServer_Registries(regsCfg)) + srvOpts = append(srvOpts, server.SetServerRegistries(regsCfg)) } if prosCfg != nil { - srvOpts = append(srvOpts, server.SetServer_Protocols(prosCfg)) + srvOpts = append(srvOpts, server.SetServerProtocols(prosCfg)) } if sdCfg != nil { - srvOpts = append(srvOpts, server.SetServer_Shutdown(sdCfg)) + srvOpts = append(srvOpts, server.SetServerShutdown(sdCfg)) + } + if metricsCfg != nil { + srvOpts = append(srvOpts, server.SetServerMetrics(metricsCfg)) + } + if otelCfg != nil { + srvOpts = append(srvOpts, server.SetServerOtel(otelCfg)) } // options passed by users have higher priority diff --git a/global/application_config.go b/global/application_config.go index 9a197e7d22..426378cafc 100644 --- a/global/application_config.go +++ b/global/application_config.go @@ -35,59 +35,3 @@ func DefaultApplicationConfig() *ApplicationConfig { // return a new config without setting any field means there is not any default value for initialization return &ApplicationConfig{} } - -type ApplicationOption func(*ApplicationConfig) - -func WithApplication_Organization(organization string) ApplicationOption { - return func(cfg *ApplicationConfig) { - cfg.Organization = organization - } -} - -func WithApplication_Name(name string) ApplicationOption { - return func(cfg *ApplicationConfig) { - cfg.Name = name - } -} - -func WithApplication_Module(module string) ApplicationOption { - return func(cfg *ApplicationConfig) { - cfg.Module = module - } -} - -func WithApplication_Group(group string) ApplicationOption { - return func(cfg *ApplicationConfig) { - cfg.Group = group - } -} - -func WithApplication_Version(version string) ApplicationOption { - return func(cfg *ApplicationConfig) { - cfg.Version = version - } -} - -func WithApplication_Owner(owner string) ApplicationOption { - return func(cfg *ApplicationConfig) { - cfg.Owner = owner - } -} - -func WithApplication_Environment(environment string) ApplicationOption { - return func(cfg *ApplicationConfig) { - cfg.Environment = environment - } -} - -func WithApplication_MetadataType(metadataType string) ApplicationOption { - return func(cfg *ApplicationConfig) { - cfg.MetadataType = metadataType - } -} - -func WithApplication_Tag(tag string) ApplicationOption { - return func(cfg *ApplicationConfig) { - cfg.Tag = tag - } -} diff --git a/global/consumer_config.go b/global/consumer_config.go index d866e5d4f4..d59bc63813 100644 --- a/global/consumer_config.go +++ b/global/consumer_config.go @@ -18,6 +18,7 @@ package global type ConsumerConfig struct { + ReferenceConfig Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` RegistryIDs []string `yaml:"registry-ids" json:"registry-ids,omitempty" property:"registry-ids"` Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"` @@ -39,71 +40,3 @@ func DefaultConsumerConfig() *ConsumerConfig { Check: true, } } - -type ConsumerOption func(*ConsumerConfig) - -func WithConsumer_Filter(filter string) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.Filter = filter - } -} - -func WithConsumer_RegistryIDs(ids []string) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.RegistryIDs = ids - } -} - -func WithConsumer_Protocol(protocol string) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.Protocol = protocol - } -} - -func WithConsumer_RequestTimeout(timeout string) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.RequestTimeout = timeout - } -} - -func WithConsumer_ProxyFactory(factory string) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.ProxyFactory = factory - } -} - -func WithConsumer_Check(flag bool) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.Check = flag - } -} - -func WithConsumer_AdaptiveService(flag bool) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.AdaptiveService = flag - } -} - -func WithConsumer_TracingKey(key string) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.TracingKey = key - } -} - -func WithConsumer_FilterConf(conf interface{}) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.FilterConf = conf - } -} - -func WithConsumer_MaxWaitTimeForServiceDiscovery(time string) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.MaxWaitTimeForServiceDiscovery = time - } -} - -func WithConsumer_MeshEnabled(flag bool) ConsumerOption { - return func(cfg *ConsumerConfig) { - cfg.MeshEnabled = flag - } -} diff --git a/global/custom_config.go b/global/custom_config.go index 3fcb8927fa..0e418a7791 100644 --- a/global/custom_config.go +++ b/global/custom_config.go @@ -30,11 +30,3 @@ type CustomConfig struct { func DefaultCustomConfig() *CustomConfig { return &CustomConfig{} } - -type CustomOption func(*CustomConfig) - -func WithCustom_ConfigMap(cfgMap map[string]interface{}) CustomOption { - return func(cfg *CustomConfig) { - cfg.ConfigMap = cfgMap - } -} diff --git a/global/logger_config.go b/global/logger_config.go index f55ba27f53..0f94a3dcd3 100644 --- a/global/logger_config.go +++ b/global/logger_config.go @@ -63,74 +63,3 @@ func DefaultLoggerConfig() *LoggerConfig { return cfg } - -type LoggerOption func(*LoggerConfig) - -func WithLogger_Driver(driver string) LoggerOption { - return func(cfg *LoggerConfig) { - cfg.Driver = driver - } -} - -func WithLogger_Level(level string) LoggerOption { - return func(cfg *LoggerConfig) { - cfg.Level = level - } -} - -func WithLogger_Format(format string) LoggerOption { - return func(cfg *LoggerConfig) { - cfg.Format = format - } -} - -func WithLogger_Appender(appender string) LoggerOption { - return func(cfg *LoggerConfig) { - cfg.Appender = appender - } -} - -func WithLogger_File_Name(name string) LoggerOption { - return func(cfg *LoggerConfig) { - if cfg.File == nil { - cfg.File = new(File) - } - cfg.File.Name = name - } -} - -func WithLogger_File_MaxSize(size int) LoggerOption { - return func(cfg *LoggerConfig) { - if cfg.File == nil { - cfg.File = new(File) - } - cfg.File.MaxSize = size - } -} - -func WithLogger_File_MaxBackups(backups int) LoggerOption { - return func(cfg *LoggerConfig) { - if cfg.File == nil { - cfg.File = new(File) - } - cfg.File.MaxBackups = backups - } -} - -func WithLogger_File_MaxAge(age int) LoggerOption { - return func(cfg *LoggerConfig) { - if cfg.File == nil { - cfg.File = new(File) - } - cfg.File.MaxAge = age - } -} - -func WithLogger_File_Compress(flag bool) LoggerOption { - return func(cfg *LoggerConfig) { - if cfg.File == nil { - cfg.File = new(File) - } - cfg.File.Compress = &flag - } -} diff --git a/global/metadata_report_config.go b/global/metadata_report_config.go index 718ea19f4a..bac446e849 100644 --- a/global/metadata_report_config.go +++ b/global/metadata_report_config.go @@ -19,18 +19,19 @@ package global // MetadataReportConfig is app level configuration type MetadataReportConfig struct { - Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` - Address string `required:"true" yaml:"address" json:"address"` - Username string `yaml:"username" json:"username,omitempty"` - Password string `yaml:"password" json:"password,omitempty"` - Timeout string `yaml:"timeout" json:"timeout,omitempty"` - Group string `yaml:"group" json:"group,omitempty"` - Namespace string `yaml:"namespace" json:"namespace,omitempty"` + Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` + Address string `required:"true" yaml:"address" json:"address"` + Username string `yaml:"username" json:"username,omitempty"` + Password string `yaml:"password" json:"password,omitempty"` + Timeout string `yaml:"timeout" json:"timeout,omitempty"` + Group string `yaml:"group" json:"group,omitempty"` + Namespace string `yaml:"namespace" json:"namespace,omitempty"` + Params map[string]string `yaml:"params" json:"parameters,omitempty"` // metadataType of this application is defined by application config, local or remote metadataType string } func DefaultMetadataReportConfig() *MetadataReportConfig { // return a new config without setting any field means there is not any default value for initialization - return &MetadataReportConfig{} + return &MetadataReportConfig{Params: map[string]string{}} } diff --git a/global/metric_config.go b/global/metric_config.go index be2842587f..44f4c6b39a 100644 --- a/global/metric_config.go +++ b/global/metric_config.go @@ -17,8 +17,8 @@ package global -// MetricConfig This is the config struct for all metrics implementation -type MetricConfig struct { +// MetricsConfig This is the config struct for all metrics implementation +type MetricsConfig struct { Enable *bool `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"` Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"` Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"` @@ -55,9 +55,9 @@ type PushgatewayConfig struct { PushInterval int `default:"30" yaml:"push-interval" json:"push-interval,omitempty" property:"push-interval"` } -func DefaultMetricConfig() *MetricConfig { +func DefaultMetricsConfig() *MetricsConfig { // return a new config without setting any field means there is not any default value for initialization - return &MetricConfig{Prometheus: defaultPrometheusConfig(), Aggregation: defaultAggregateConfig()} + return &MetricsConfig{Prometheus: defaultPrometheusConfig(), Aggregation: defaultAggregateConfig()} } func defaultPrometheusConfig() *PrometheusConfig { diff --git a/global/otel_config.go b/global/otel_config.go index c84b00821f..f41f8e773b 100644 --- a/global/otel_config.go +++ b/global/otel_config.go @@ -19,7 +19,7 @@ package global // OtelConfig is the configuration of the tracing. type OtelConfig struct { - TraceConfig *OtelTraceConfig `yaml:"trace" json:"trace,omitempty" property:"trace"` + TracingConfig *OtelTraceConfig `yaml:"tracing" json:"trace,omitempty" property:"trace"` } type OtelTraceConfig struct { @@ -33,6 +33,6 @@ type OtelTraceConfig struct { func DefaultOtelConfig() *OtelConfig { return &OtelConfig{ - TraceConfig: &OtelTraceConfig{}, + TracingConfig: &OtelTraceConfig{}, } } diff --git a/global/provider_config.go b/global/provider_config.go index ffba16ec26..3d7e30dd12 100644 --- a/global/provider_config.go +++ b/global/provider_config.go @@ -19,6 +19,7 @@ package global // ProviderConfig is the default configuration of service provider type ProviderConfig struct { + ServiceConfig Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` // Deprecated Register whether registration is required Register bool `yaml:"register" json:"register" property:"register"` @@ -46,65 +47,3 @@ func DefaultProviderConfig() *ProviderConfig { Services: make(map[string]*ServiceConfig), } } - -type ProviderOption func(*ProviderConfig) - -func WithProvider_Filter(filter string) ProviderOption { - return func(cfg *ProviderConfig) { - cfg.Filter = filter - } -} - -func WithProvider_Register(flag bool) ProviderOption { - return func(cfg *ProviderConfig) { - cfg.Register = flag - } -} - -func WithProvider_RegistryIDs(ids []string) ProviderOption { - return func(cfg *ProviderConfig) { - cfg.RegistryIDs = ids - } -} - -func WithProvider_ProtocolIDs(ids []string) ProviderOption { - return func(cfg *ProviderConfig) { - cfg.ProtocolIDs = ids - } -} - -func WithProvider_TracingKey(key string) ProviderOption { - return func(cfg *ProviderConfig) { - cfg.TracingKey = key - } -} - -func WithProvider_ProxyFactory(factory string) ProviderOption { - return func(cfg *ProviderConfig) { - cfg.ProxyFactory = factory - } -} - -func WithProvider_FilterConf(conf []interface{}) ProviderOption { - return func(cfg *ProviderConfig) { - cfg.FilterConf = conf - } -} - -func WithProvider_ConfigType(typ map[string]string) ProviderOption { - return func(cfg *ProviderConfig) { - cfg.ConfigType = typ - } -} - -func WithProvider_AdaptiveService(flag bool) ProviderOption { - return func(cfg *ProviderConfig) { - cfg.AdaptiveService = flag - } -} - -func WithProvider_AdaptiveServiceVerbose(flag bool) ProviderOption { - return func(cfg *ProviderConfig) { - cfg.AdaptiveServiceVerbose = flag - } -} diff --git a/global/service_config.go b/global/service_config.go index 5ed2e55e86..8fd3a3c3e2 100644 --- a/global/service_config.go +++ b/global/service_config.go @@ -62,193 +62,3 @@ func DefaultServiceConfig() *ServiceConfig { RCRegistriesMap: make(map[string]*RegistryConfig), } } - -func WithService_Filter(filter string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Filter = filter - } -} - -func WithService_ProtocolIDs(protocolIDs []string) ServiceOption { - return func(cfg *ServiceConfig) { - if len(protocolIDs) <= 0 { - cfg.ProtocolIDs = protocolIDs - } - } -} - -func WithService_Interface(name string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Interface = name - } -} - -func WithService_RegistryIDs(registryIDs []string) ServiceOption { - return func(cfg *ServiceConfig) { - if len(registryIDs) <= 0 { - cfg.RegistryIDs = registryIDs - } - } -} - -func WithService_Cluster(cluster string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Cluster = cluster - } -} - -func WithService_LoadBalance(loadBalance string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Loadbalance = loadBalance - } -} - -func WithService_Group(group string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Group = group - } -} - -func WithService_Version(version string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Version = version - } -} - -func WithService_Methods(methods []*MethodConfig) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Methods = methods - } -} - -func WithService_WarmUp(warmUp string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Warmup = warmUp - } -} - -func WithService_Retries(retries string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Retries = retries - } -} - -func WithService_Serialization(serialization string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Serialization = serialization - } -} - -func WithService_Params(params map[string]string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Params = params - } -} - -func WithService_Token(token string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Token = token - } -} - -func WithService_AccessLog(accessLog string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.AccessLog = accessLog - } -} - -func WithService_TpsLimiter(tpsLimiter string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.TpsLimiter = tpsLimiter - } -} - -func WithService_TpsLimitInterval(tpsLimitInterval string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.TpsLimitInterval = tpsLimitInterval - } -} - -func WithService_TpsLimitRate(tpsLimitRate string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.TpsLimitRate = tpsLimitRate - } -} - -func WithService_TpsLimitStrategy(tpsLimitStrategy string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.TpsLimitStrategy = tpsLimitStrategy - } -} - -func WithService_TpsLimitRejectedHandler(tpsLimitRejectedHandler string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.TpsLimitRejectedHandler = tpsLimitRejectedHandler - } -} - -func WithService_ExecuteLimit(executeLimit string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.ExecuteLimit = executeLimit - } -} - -func WithService_ExecuteLimitRejectedHandler(executeLimitRejectedHandler string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.ExecuteLimitRejectedHandler = executeLimitRejectedHandler - } -} - -func WithService_Auth(auth string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Auth = auth - } -} - -func WithService_NotRegister(notRegister bool) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.NotRegister = notRegister - } -} - -func WithService_ParamSign(paramSign string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.ParamSign = paramSign - } -} - -func WithService_Tag(tag string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.Tag = tag - } -} - -func WithService_TracingKey(tracingKey string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.TracingKey = tracingKey - } -} - -func WithService_RCProtocol(name string, protocol *ProtocolConfig) ServiceOption { - return func(cfg *ServiceConfig) { - if cfg.RCProtocolsMap == nil { - cfg.RCProtocolsMap = make(map[string]*ProtocolConfig) - } - cfg.RCProtocolsMap[name] = protocol - } -} - -func WithService_RCRegistry(name string, registry *RegistryConfig) ServiceOption { - return func(cfg *ServiceConfig) { - if cfg.RCRegistriesMap == nil { - cfg.RCRegistriesMap = make(map[string]*RegistryConfig) - } - cfg.RCRegistriesMap[name] = registry - } -} - -func WithService_ProxyFactoryKey(factory string) ServiceOption { - return func(cfg *ServiceConfig) { - cfg.ProxyFactoryKey = factory - } -} diff --git a/global/shutdown_config.go b/global/shutdown_config.go index 60dcc1a709..e55448912d 100644 --- a/global/shutdown_config.go +++ b/global/shutdown_config.go @@ -68,47 +68,3 @@ func DefaultShutdownConfig() *ShutdownConfig { return cfg } - -type ShutdownOption func(*ShutdownConfig) - -func WithShutdown_Timeout(timeout string) ShutdownOption { - return func(cfg *ShutdownConfig) { - cfg.Timeout = timeout - } -} - -func WithShutdown_StepTimeout(timeout string) ShutdownOption { - return func(cfg *ShutdownConfig) { - cfg.StepTimeout = timeout - } -} - -func WithShutdown_ConsumerUpdateWaitTime(duration string) ShutdownOption { - return func(cfg *ShutdownConfig) { - cfg.ConsumerUpdateWaitTime = duration - } -} - -func WithShutdown_RejectRequestHandler(handler string) ShutdownOption { - return func(cfg *ShutdownConfig) { - cfg.RejectRequestHandler = handler - } -} - -func WithShutdown_InternalSignal(signal bool) ShutdownOption { - return func(cfg *ShutdownConfig) { - cfg.InternalSignal = &signal - } -} - -func WithShutdown_OfflineRequestWindowTimeout(timeout string) ShutdownOption { - return func(cfg *ShutdownConfig) { - cfg.OfflineRequestWindowTimeout = timeout - } -} - -func WithShutdown_RejectRequest(flag bool) ShutdownOption { - return func(cfg *ShutdownConfig) { - cfg.RejectRequest.Store(flag) - } -} diff --git a/global/tracing_config.go b/global/tracing_config.go index 1429c8c3cb..d4c9b423fe 100644 --- a/global/tracing_config.go +++ b/global/tracing_config.go @@ -18,6 +18,7 @@ package global // TracingConfig is the configuration of the tracing. +// Deprecated: it's designed to be replaced with global.OtelConfig type TracingConfig struct { Name string `default:"jaeger" yaml:"name" json:"name,omitempty" property:"name"` // jaeger or zipkin(todo) ServiceName string `yaml:"serviceName" json:"serviceName,omitempty" property:"serviceName"` diff --git a/go.mod b/go.mod index c86190391b..1c6c705454 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd + github.com/nacos-group/nacos-sdk-go v1.0.9 // indirect github.com/nacos-group/nacos-sdk-go/v2 v2.2.2 github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 github.com/opentracing/opentracing-go v1.2.0 diff --git a/go.sum b/go.sum index 0bf5bb8444..965544e020 100644 --- a/go.sum +++ b/go.sum @@ -1004,8 +1004,9 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nacos-group/nacos-sdk-go v1.0.8 h1:8pEm05Cdav9sQgJSv5kyvlgfz0SzFUUGI3pWX6SiSnM= github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= +github.com/nacos-group/nacos-sdk-go v1.0.9 h1:sMvrp6tZj4LdhuHRsS4GCqASB81k3pjmT2ykDQQpwt0= +github.com/nacos-group/nacos-sdk-go v1.0.9/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= github.com/nacos-group/nacos-sdk-go/v2 v2.1.2/go.mod h1:ys/1adWeKXXzbNWfRNbaFlX/t6HVLWdpsNDvmoWTw0g= github.com/nacos-group/nacos-sdk-go/v2 v2.2.2 h1:FI+7vr1fvCA4jbgx36KezmP3zlU/WoP/7wAloaSd1Ew= github.com/nacos-group/nacos-sdk-go/v2 v2.2.2/go.mod h1:ys/1adWeKXXzbNWfRNbaFlX/t6HVLWdpsNDvmoWTw0g= diff --git a/metadata/options.go b/metadata/options.go index bec1fa91a3..34ea25d991 100644 --- a/metadata/options.go +++ b/metadata/options.go @@ -64,6 +64,12 @@ func WithEtcdV3() Option { } } +func WithMetadata(meta string) Option { + return func(opts *Options) { + opts.Metadata.Protocol = meta + } +} + func WithAddress(address string) Option { return func(opts *Options) { if i := strings.Index(address, "://"); i > 0 { @@ -102,3 +108,9 @@ func WithNamespace(namespace string) Option { opts.Metadata.Namespace = namespace } } + +func WithParams(params map[string]string) Option { + return func(opts *Options) { + opts.Metadata.Params = params + } +} diff --git a/metrics/options.go b/metrics/options.go index fb43a0e48a..8824df6576 100644 --- a/metrics/options.go +++ b/metrics/options.go @@ -27,11 +27,11 @@ import ( ) type Options struct { - Metric *global.MetricConfig + Metrics *global.MetricsConfig } func defaultOptions() *Options { - return &Options{Metric: global.DefaultMetricConfig()} + return &Options{Metrics: global.DefaultMetricsConfig()} } func NewOptions(opts ...Option) *Options { @@ -47,82 +47,82 @@ type Option func(*Options) func WithAggregationEnabled() Option { return func(opts *Options) { enabled := true - opts.Metric.Aggregation.Enabled = &enabled + opts.Metrics.Aggregation.Enabled = &enabled } } func WithAggregationBucketNum(num int) Option { return func(opts *Options) { - opts.Metric.Aggregation.BucketNum = num + opts.Metrics.Aggregation.BucketNum = num } } func WithAggregationTimeWindowSeconds(seconds int) Option { return func(opts *Options) { - opts.Metric.Aggregation.TimeWindowSeconds = seconds + opts.Metrics.Aggregation.TimeWindowSeconds = seconds } } func WithPrometheus() Option { return func(opts *Options) { - opts.Metric.Protocol = "prometheus" + opts.Metrics.Protocol = "prometheus" } } func WithPrometheusExporterEnabled() Option { return func(opts *Options) { enabled := true - opts.Metric.Prometheus.Exporter.Enabled = &enabled + opts.Metrics.Prometheus.Exporter.Enabled = &enabled } } func WithPrometheusGatewayUrl(url string) Option { return func(opts *Options) { - opts.Metric.Prometheus.Pushgateway.BaseUrl = url + opts.Metrics.Prometheus.Pushgateway.BaseUrl = url } } func WithPrometheusGatewayJob(job string) Option { return func(opts *Options) { - opts.Metric.Prometheus.Pushgateway.Job = job + opts.Metrics.Prometheus.Pushgateway.Job = job } } func WithPrometheusGatewayUsername(username string) Option { return func(opts *Options) { - opts.Metric.Prometheus.Pushgateway.Username = username + opts.Metrics.Prometheus.Pushgateway.Username = username } } func WithPrometheusGatewayPassword(password string) Option { return func(opts *Options) { - opts.Metric.Prometheus.Pushgateway.Password = password + opts.Metrics.Prometheus.Pushgateway.Password = password } } func WithPrometheusGatewayInterval(interval time.Duration) Option { return func(opts *Options) { - opts.Metric.Prometheus.Pushgateway.PushInterval = int(interval.Seconds()) + opts.Metrics.Prometheus.Pushgateway.PushInterval = int(interval.Seconds()) } } func WithConfigCenterEnabled() Option { return func(opts *Options) { b := true - opts.Metric.EnableConfigCenter = &b + opts.Metrics.EnableConfigCenter = &b } } func WithMetadataEnabled() Option { return func(opts *Options) { b := true - opts.Metric.EnableMetadata = &b + opts.Metrics.EnableMetadata = &b } } func WithRegistryEnabled() Option { return func(opts *Options) { b := true - opts.Metric.EnableRegistry = &b + opts.Metrics.EnableRegistry = &b } } @@ -130,18 +130,18 @@ func WithRegistryEnabled() Option { func WithEnabled() Option { return func(opts *Options) { b := true - opts.Metric.Enable = &b + opts.Metrics.Enable = &b } } func WithPort(port int) Option { return func(opts *Options) { - opts.Metric.Port = strconv.Itoa(port) + opts.Metrics.Port = strconv.Itoa(port) } } func WithPath(path string) Option { return func(opts *Options) { - opts.Metric.Path = path + opts.Metrics.Path = path } } diff --git a/options.go b/options.go index 55bc57fadf..0e6719473c 100644 --- a/options.go +++ b/options.go @@ -43,7 +43,7 @@ type InstanceOptions struct { MetadataReport *global.MetadataReportConfig `yaml:"metadata-report" json:"metadata-report,omitempty" property:"metadata-report"` Provider *global.ProviderConfig `yaml:"provider" json:"provider" property:"provider"` Consumer *global.ConsumerConfig `yaml:"consumer" json:"consumer" property:"consumer"` - Metric *global.MetricConfig `yaml:"metrics" json:"metrics,omitempty" property:"metrics"` + Metrics *global.MetricsConfig `yaml:"metrics" json:"metrics,omitempty" property:"metrics"` Otel *global.OtelConfig `yaml:"otel" json:"otel,omitempty" property:"otel"` Logger *global.LoggerConfig `yaml:"logger" json:"logger,omitempty" property:"logger"` Shutdown *global.ShutdownConfig `yaml:"shutdown" json:"shutdown,omitempty" property:"shutdown"` @@ -65,7 +65,7 @@ func defaultInstanceOptions() *InstanceOptions { MetadataReport: global.DefaultMetadataReportConfig(), Provider: global.DefaultProviderConfig(), Consumer: global.DefaultConsumerConfig(), - Metric: global.DefaultMetricConfig(), + Metrics: global.DefaultMetricsConfig(), Otel: global.DefaultOtelConfig(), Logger: global.DefaultLoggerConfig(), Shutdown: global.DefaultShutdownConfig(), @@ -131,7 +131,7 @@ func (rc *InstanceOptions) init(opts ...InstanceOption) error { if err := rcCompat.MetadataReport.Init(rcCompat); err != nil { return err } - if err := rcCompat.Metric.Init(rcCompat); err != nil { + if err := rcCompat.Metrics.Init(rcCompat); err != nil { return err } if err := rcCompat.Otel.Init(rcCompat.Application); err != nil { @@ -250,7 +250,7 @@ func WithTracing(opts ...trace.Option) InstanceOption { traceOpts := trace.NewOptions(opts...) return func(insOpts *InstanceOptions) { - insOpts.Otel.TraceConfig = traceOpts.Otel.TraceConfig + insOpts.Otel.TracingConfig = traceOpts.Otel.TracingConfig } } @@ -270,11 +270,11 @@ func WithMetadataReport(opts ...metadata.Option) InstanceOption { } } -func WithMetric(opts ...metrics.Option) InstanceOption { +func WithMetrics(opts ...metrics.Option) InstanceOption { metricOpts := metrics.NewOptions(opts...) return func(cfg *InstanceOptions) { - cfg.Metric = metricOpts.Metric + cfg.Metrics = metricOpts.Metrics } } diff --git a/otel/trace/options.go b/otel/trace/options.go index ca33676f57..be310879d7 100644 --- a/otel/trace/options.go +++ b/otel/trace/options.go @@ -42,81 +42,99 @@ type Option func(*Options) func WithEnabled() Option { return func(opts *Options) { b := true - opts.Otel.TraceConfig.Enable = &b + opts.Otel.TracingConfig.Enable = &b } } func WithStdoutExporter() Option { return func(opts *Options) { - opts.Otel.TraceConfig.Exporter = "stdout" + opts.Otel.TracingConfig.Exporter = "stdout" } } func WithJaegerExporter() Option { return func(opts *Options) { - opts.Otel.TraceConfig.Exporter = "jaeger" + opts.Otel.TracingConfig.Exporter = "jaeger" } } func WithZipkinExporter() Option { return func(opts *Options) { - opts.Otel.TraceConfig.Exporter = "zipkin" + opts.Otel.TracingConfig.Exporter = "zipkin" } } func WithOtlpHttpExporter() Option { return func(opts *Options) { - opts.Otel.TraceConfig.Exporter = "otlp-http" + opts.Otel.TracingConfig.Exporter = "otlp-http" } } func WithOtlpGrpcExporter() Option { return func(opts *Options) { - opts.Otel.TraceConfig.Exporter = "otlp-grpc" + opts.Otel.TracingConfig.Exporter = "otlp-grpc" + } +} + +func WithExporter(exporter string) Option { + return func(opts *Options) { + opts.Otel.TracingConfig.Exporter = exporter } } // WithW3cPropagator w3c(standard) func WithW3cPropagator() Option { return func(opts *Options) { - opts.Otel.TraceConfig.Propagator = "w3c" + opts.Otel.TracingConfig.Propagator = "w3c" } } // WithB3Propagator b3(for zipkin) func WithB3Propagator() Option { return func(opts *Options) { - opts.Otel.TraceConfig.Propagator = "b3" + opts.Otel.TracingConfig.Propagator = "b3" + } +} + +func WithPropagator(propagator string) Option { + return func(opts *Options) { + opts.Otel.TracingConfig.Propagator = propagator } } // WithRatio only takes effect when WithRatioMode is set func WithRatio(ratio float64) Option { return func(opts *Options) { - opts.Otel.TraceConfig.SampleRatio = ratio + opts.Otel.TracingConfig.SampleRatio = ratio } } func WithRatioMode() Option { return func(opts *Options) { - opts.Otel.TraceConfig.SampleMode = "ratio" + opts.Otel.TracingConfig.SampleMode = "ratio" } } func WithAlwaysMode() Option { return func(opts *Options) { - opts.Otel.TraceConfig.SampleMode = "always" + opts.Otel.TracingConfig.SampleMode = "always" } } func WithNeverMode() Option { return func(opts *Options) { - opts.Otel.TraceConfig.SampleMode = "never" + opts.Otel.TracingConfig.SampleMode = "never" + } +} + +func WithMode(mode string) Option { + return func(opts *Options) { + opts.Otel.TracingConfig.SampleMode = mode } } func WithEndpoint(endpoint string) Option { return func(opts *Options) { - opts.Otel.TraceConfig.Endpoint = endpoint + opts.Otel.TracingConfig.Endpoint = endpoint } } diff --git a/protocol/options.go b/protocol/options.go index 58683af5b6..4531f915a4 100644 --- a/protocol/options.go +++ b/protocol/options.go @@ -79,6 +79,12 @@ func WithTriple() Option { } } +func WithProtocol(p string) Option { + return func(opts *Options) { + opts.Protocol.Name = p + } +} + // WithID specifies the id of protocol.Options. Then you could configure server.WithProtocolIDs and // server.WithServer_ProtocolIDs to specify which protocol you need to use in multi-protocols scenario. func WithID(id string) Option { diff --git a/protocol/triple/health/triple_health/health.triple.go b/protocol/triple/health/triple_health/health.triple.go index 8d6e447810..79de1641f6 100644 --- a/protocol/triple/health/triple_health/health.triple.go +++ b/protocol/triple/health/triple_health/health.triple.go @@ -73,21 +73,28 @@ type Health interface { } // NewHealth constructs a client for the grpc.health.v1.Health service. -func NewHealth(cli *client.Client) (Health, error) { - if err := cli.Init(&Health_ClientInfo); err != nil { +func NewHealth(cli *client.Client, opts ...client.ReferenceOption) (Health, error) { + group, version, err := cli.Init(&Health_ClientInfo, opts...) + if err != nil { return nil, err } + return &HealthImpl{ - cli: cli, + cli: cli, + group: group, + version: version, }, nil } // HealthImpl implements Health. type HealthImpl struct { - cli *client.Client + cli *client.Client + group string + version string } func (c *HealthImpl) Check(ctx context.Context, req *HealthCheckRequest, opts ...client.CallOption) (*HealthCheckResponse, error) { + opts = appendGroupVersion(opts, c) resp := new(HealthCheckResponse) if err := c.cli.CallUnary(ctx, req, resp, "grpc.health.v1.Health", "Check", opts...); err != nil { return nil, err @@ -96,6 +103,7 @@ func (c *HealthImpl) Check(ctx context.Context, req *HealthCheckRequest, opts .. } func (c *HealthImpl) Watch(ctx context.Context, req *HealthCheckRequest, opts ...client.CallOption) (Health_WatchClient, error) { + opts = appendGroupVersion(opts, c) stream, err := c.cli.CallServerStream(ctx, req, "grpc.health.v1.Health", "Watch", opts...) if err != nil { return nil, err @@ -104,6 +112,12 @@ func (c *HealthImpl) Watch(ctx context.Context, req *HealthCheckRequest, opts .. return &HealthWatchClient{rawStream}, nil } +func appendGroupVersion(opts []client.CallOption, c *HealthImpl) []client.CallOption { + opts = append(opts, client.WithCallGroup(c.group)) + opts = append(opts, client.WithCallVersion(c.version)) + return opts +} + type Health_WatchClient interface { Recv() bool ResponseHeader() http.Header diff --git a/protocol/triple/internal/client/cmd_client/main.go b/protocol/triple/internal/client/cmd_client/main.go index 289b929c5a..b304285e22 100644 --- a/protocol/triple/internal/client/cmd_client/main.go +++ b/protocol/triple/internal/client/cmd_client/main.go @@ -17,25 +17,41 @@ package main +import ( + "context" + "fmt" + "time" +) + import ( "dubbo.apache.org/dubbo-go/v3/client" _ "dubbo.apache.org/dubbo-go/v3/imports" "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/client/common" + greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto" "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple" ) func main() { // for the most brief RPC case cli, err := client.NewClient( - client.WithURL("127.0.0.1:20000"), + client.WithClientURL("127.0.0.1:20000"), ) + if err != nil { panic(err) } svc, err := greettriple.NewGreetService(cli) + if err != nil { panic(err) } + response, err := svc.Greet(context.Background(), &greet.GreetRequest{Name: "greet"}, client.WithCallRequestTimeout(5*time.Second)) + if err != nil { + panic(err) + } + + fmt.Printf("result: %s", response.Greeting) + common.TestClient(svc) } diff --git a/protocol/triple/internal/client/cmd_client_with_registry/main.go b/protocol/triple/internal/client/cmd_client_with_registry/main.go index 42d4349520..39902a11de 100644 --- a/protocol/triple/internal/client/cmd_client_with_registry/main.go +++ b/protocol/triple/internal/client/cmd_client_with_registry/main.go @@ -29,7 +29,7 @@ func main() { // for the most brief RPC case with Registry cli, err := client.NewClient( - client.WithRegistry( + client.WithClientRegistry( registry.WithZookeeper(), registry.WithAddress("127.0.0.1:2181"), ), diff --git a/protocol/triple/internal/client/cmd_instance/main.go b/protocol/triple/internal/client/cmd_instance/main.go index 098f99c0b0..cd0eeed3f5 100644 --- a/protocol/triple/internal/client/cmd_instance/main.go +++ b/protocol/triple/internal/client/cmd_instance/main.go @@ -36,7 +36,7 @@ func main() { } // configure the params that only client layer cares cli, err := ins.NewClient( - client.WithURL("127.0.0.1:20000"), + client.WithClientURL("127.0.0.1:20000"), ) if err != nil { panic(err) diff --git a/protocol/triple/internal/client/cmd_instance_with_registry/main.go b/protocol/triple/internal/client/cmd_instance_with_registry/main.go index 219ae71685..b3e0b0de48 100644 --- a/protocol/triple/internal/client/cmd_instance_with_registry/main.go +++ b/protocol/triple/internal/client/cmd_instance_with_registry/main.go @@ -42,7 +42,7 @@ func main() { } // configure the params that only client layer cares cli, err := ins.NewClient( - client.WithRegistryIDs([]string{"zk"}), + client.WithClientRegistryIDs([]string{"zk"}), ) if err != nil { panic(err) diff --git a/protocol/triple/internal/client/health_client/main.go b/protocol/triple/internal/client/health_client/main.go index 9d1363941f..7fc0dc2bdb 100644 --- a/protocol/triple/internal/client/health_client/main.go +++ b/protocol/triple/internal/client/health_client/main.go @@ -33,7 +33,7 @@ import ( func main() { cli, err := client.NewClient( - client.WithURL("tri://127.0.0.1:20000"), + client.WithClientURL("tri://127.0.0.1:20000"), ) if err != nil { panic(err) diff --git a/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go b/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go index 3a9c685d23..a88f9a4b36 100644 --- a/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go +++ b/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go @@ -87,22 +87,29 @@ type GreetService interface { } // NewGreetService constructs a client for the greet.GreetService service. -func NewGreetService(cli *client.Client) (GreetService, error) { - if err := cli.Init(&GreetService_ClientInfo); err != nil { +func NewGreetService(cli *client.Client, opts ...client.ReferenceOption) (GreetService, error) { + group, version, err := cli.Init(&GreetService_ClientInfo, opts...) + if err != nil { return nil, err } + return &GreetServiceImpl{ - cli: cli, + cli: cli, + group: group, + version: version, }, nil } // GreetServiceImpl implements GreetService. type GreetServiceImpl struct { - cli *client.Client + cli *client.Client + group string + version string } func (c *GreetServiceImpl) Greet(ctx context.Context, req *proto.GreetRequest, opts ...client.CallOption) (*proto.GreetResponse, error) { resp := new(proto.GreetResponse) + opts = appendGroupVersion(opts, c) if err := c.cli.CallUnary(ctx, req, resp, "greet.GreetService", "Greet", opts...); err != nil { return nil, err } @@ -110,6 +117,7 @@ func (c *GreetServiceImpl) Greet(ctx context.Context, req *proto.GreetRequest, o } func (c *GreetServiceImpl) GreetStream(ctx context.Context, opts ...client.CallOption) (GreetService_GreetStreamClient, error) { + opts = appendGroupVersion(opts, c) stream, err := c.cli.CallBidiStream(ctx, "greet.GreetService", "GreetStream", opts...) if err != nil { return nil, err @@ -119,6 +127,7 @@ func (c *GreetServiceImpl) GreetStream(ctx context.Context, opts ...client.CallO } func (c *GreetServiceImpl) GreetClientStream(ctx context.Context, opts ...client.CallOption) (GreetService_GreetClientStreamClient, error) { + opts = appendGroupVersion(opts, c) stream, err := c.cli.CallClientStream(ctx, "greet.GreetService", "GreetClientStream", opts...) if err != nil { return nil, err @@ -128,6 +137,7 @@ func (c *GreetServiceImpl) GreetClientStream(ctx context.Context, opts ...client } func (c *GreetServiceImpl) GreetServerStream(ctx context.Context, req *proto.GreetServerStreamRequest, opts ...client.CallOption) (GreetService_GreetServerStreamClient, error) { + opts = appendGroupVersion(opts, c) stream, err := c.cli.CallServerStream(ctx, req, "greet.GreetService", "GreetServerStream", opts...) if err != nil { return nil, err @@ -136,6 +146,12 @@ func (c *GreetServiceImpl) GreetServerStream(ctx context.Context, req *proto.Gre return &GreetServiceGreetServerStreamClient{rawStream}, nil } +func appendGroupVersion(opts []client.CallOption, c *GreetServiceImpl) []client.CallOption { + opts = append(opts, client.WithCallGroup(c.group)) + opts = append(opts, client.WithCallVersion(c.version)) + return opts +} + type GreetService_GreetStreamClient interface { Spec() triple_protocol.Spec Peer() triple_protocol.Peer diff --git a/protocol/triple/internal/server/cmd_server/main.go b/protocol/triple/internal/server/cmd_server/main.go index c32e854037..dc5fc0de89 100644 --- a/protocol/triple/internal/server/cmd_server/main.go +++ b/protocol/triple/internal/server/cmd_server/main.go @@ -31,13 +31,16 @@ func main() { protocol.WithTriple(), protocol.WithPort(20000), ), + server.WithServerVersion("1.0.0"), ) + if err != nil { panic(err) } if err := greettriple.RegisterGreetServiceHandler(srv, &api.GreetTripleServer{}); err != nil { panic(err) } + if err := srv.Serve(); err != nil { panic(err) } diff --git a/registry/options.go b/registry/options.go index 0ff58654f5..0957cf968b 100644 --- a/registry/options.go +++ b/registry/options.go @@ -89,6 +89,12 @@ func WithZookeeper() Option { } } +func WithRegistry(r string) Option { + return func(opts *Options) { + opts.Registry.Protocol = r + } +} + // WithID specifies the id of registry.Options. Then you could configure client.WithRegistryIDs and // server.WithServer_RegistryIDs to specify which registry you need to use in multi-registries scenario. func WithID(id string) Option { diff --git a/server/action.go b/server/action.go index 6f93321750..46f41ac3a3 100644 --- a/server/action.go +++ b/server/action.go @@ -315,6 +315,8 @@ func (svcOpts *ServiceOptions) Implement(rpcService common.RPCService) { func (svcOpts *ServiceOptions) getUrlMap() url.Values { srv := svcOpts.Service app := svcOpts.applicationCompat + metrics := svcOpts.srvOpts.Metrics + tracing := svcOpts.srvOpts.Otel.TracingConfig urlMap := url.Values{} // first set user params @@ -355,7 +357,13 @@ func (svcOpts *ServiceOptions) getUrlMap() url.Values { filters = srv.Filter } if svcOpts.adaptiveService { - filters += fmt.Sprintf(",%svcOpts", constant.AdaptiveServiceProviderFilterKey) + filters += fmt.Sprintf(",%s", constant.AdaptiveServiceProviderFilterKey) + } + if metrics.Enable != nil && *metrics.Enable { + filters += fmt.Sprintf(",%s", constant.MetricsFilterKey) + } + if tracing.Enable != nil && *tracing.Enable { + filters += fmt.Sprintf(",%s", constant.OTELServerTraceKey) } urlMap.Set(constant.ServiceFilterKey, filters) diff --git a/server/options.go b/server/options.go index 51a38a0f16..1798b5b0e2 100644 --- a/server/options.go +++ b/server/options.go @@ -18,6 +18,7 @@ package server import ( + "reflect" "strconv" "sync" "time" @@ -37,6 +38,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common" commonCfg "dubbo.apache.org/dubbo-go/v3/common/config" "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/dubboutil" "dubbo.apache.org/dubbo-go/v3/config" aslimiter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter" "dubbo.apache.org/dubbo-go/v3/global" @@ -51,6 +53,8 @@ type ServerOptions struct { Registries map[string]*global.RegistryConfig Protocols map[string]*global.ProtocolConfig Shutdown *global.ShutdownConfig + Metrics *global.MetricsConfig + Otel *global.OtelConfig providerCompat *config.ProviderConfig } @@ -60,6 +64,8 @@ func defaultServerOptions() *ServerOptions { Application: global.DefaultApplicationConfig(), Provider: global.DefaultProviderConfig(), Shutdown: global.DefaultShutdownConfig(), + Metrics: global.DefaultMetricsConfig(), + Otel: global.DefaultOtelConfig(), } } @@ -106,6 +112,232 @@ type ServerOption func(*ServerOptions) // ---------- For user ---------- +// ========== LoadBalance Strategy ========== + +func WithServerLoadBalanceConsistentHashing() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Loadbalance = constant.LoadBalanceKeyConsistentHashing + } +} + +func WithServerLoadBalanceLeastActive() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Loadbalance = constant.LoadBalanceKeyLeastActive + } +} + +func WithServerLoadBalanceRandom() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Loadbalance = constant.LoadBalanceKeyRandom + } +} + +func WithServerLoadBalanceRoundRobin() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Loadbalance = constant.LoadBalanceKeyRoundRobin + } +} + +func WithServerLoadBalanceP2C() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Loadbalance = constant.LoadBalanceKeyP2C + } +} + +func WithServerLoadBalance(lb string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Loadbalance = lb + } +} + +// warmUp is in seconds +func WithServerWarmUp(warmUp time.Duration) ServerOption { + return func(opts *ServerOptions) { + warmUpSec := int(warmUp / time.Second) + opts.Provider.Warmup = strconv.Itoa(warmUpSec) + } +} + +// ========== Cluster Strategy ========== + +func WithServerClusterAvailable() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Cluster = constant.ClusterKeyAvailable + } +} + +func WithServerClusterBroadcast() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Cluster = constant.ClusterKeyBroadcast + } +} + +func WithServerClusterFailBack() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Cluster = constant.ClusterKeyFailback + } +} + +func WithServerClusterFailFast() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Cluster = constant.ClusterKeyFailfast + } +} + +func WithServerClusterFailOver() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Cluster = constant.ClusterKeyFailover + } +} + +func WithServerClusterFailSafe() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Cluster = constant.ClusterKeyFailsafe + } +} + +func WithServerClusterForking() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Cluster = constant.ClusterKeyForking + } +} + +func WithServerClusterZoneAware() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Cluster = constant.ClusterKeyZoneAware + } +} + +func WithServerClusterAdaptiveService() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Cluster = constant.ClusterKeyAdaptiveService + } +} + +func WithServerCluster(cluster string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Cluster = cluster + } +} + +func WithServerGroup(group string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Group = group + } +} + +func WithServerVersion(version string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Version = version + } +} + +func WithServerJSON() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Serialization = constant.JSONSerialization + } +} + +// WithToken should be used with WithFilter("token") +func WithServerToken(token string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Token = token + } +} + +func WithServerNotRegister() ServerOption { + return func(opts *ServerOptions) { + opts.Provider.NotRegister = true + } +} + +func WithServerWarmup(milliSeconds time.Duration) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Warmup = milliSeconds.String() + } +} + +func WithServerRetries(retries int) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Retries = strconv.Itoa(retries) + } +} + +func WithServerSerialization(ser string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Serialization = ser + } +} + +func WithServerAccesslog(accesslog string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.AccessLog = accesslog + } +} + +func WithServerTpsLimiter(limiter string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.TpsLimiter = limiter + } +} + +func WithServerTpsLimitRate(rate int) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.TpsLimitRate = strconv.Itoa(rate) + } +} + +func WithServerTpsLimitStrategy(strategy string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.TpsLimitStrategy = strategy + } +} + +func WithServerTpsLimitRejectedHandler(rejHandler string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.TpsLimitRejectedHandler = rejHandler + } +} + +func WithServerExecuteLimit(exeLimit string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.ExecuteLimit = exeLimit + } +} + +func WithServerExecuteLimitRejectedHandler(exeRejHandler string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.ExecuteLimitRejectedHandler = exeRejHandler + } +} + +func WithServerAuth(auth string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Auth = auth + } +} + +func WithServerParamSign(paramSign string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.ParamSign = paramSign + } +} + +func WithServerTag(tag string) ServerOption { + return func(opts *ServerOptions) { + opts.Provider.Tag = tag + } +} + +func WithServerParam(k, v string) ServerOption { + return func(opts *ServerOptions) { + if opts.Provider.Params == nil { + opts.Provider.Params = make(map[string]string) + } + opts.Provider.Params[k] = v + } +} + // todo(DMwangnima): change Filter Option like Cluster and LoadBalance func WithServerFilter(filter string) ServerOption { return func(opts *ServerOptions) { @@ -172,30 +404,42 @@ func WithServerAdaptiveServiceVerbose() ServerOption { // ========== For framework ========== // These functions should not be invoked by users -func SetServer_Application(application *global.ApplicationConfig) ServerOption { +func SetServerApplication(application *global.ApplicationConfig) ServerOption { return func(opts *ServerOptions) { opts.Application = application } } -func SetServer_Registries(regs map[string]*global.RegistryConfig) ServerOption { +func SetServerRegistries(regs map[string]*global.RegistryConfig) ServerOption { return func(opts *ServerOptions) { opts.Registries = regs } } -func SetServer_Protocols(pros map[string]*global.ProtocolConfig) ServerOption { +func SetServerProtocols(pros map[string]*global.ProtocolConfig) ServerOption { return func(opts *ServerOptions) { opts.Protocols = pros } } -func SetServer_Shutdown(shutdown *global.ShutdownConfig) ServerOption { +func SetServerShutdown(shutdown *global.ShutdownConfig) ServerOption { return func(opts *ServerOptions) { opts.Shutdown = shutdown } } +func SetServerMetrics(metrics *global.MetricsConfig) ServerOption { + return func(opts *ServerOptions) { + opts.Metrics = metrics + } +} + +func SetServerOtel(otel *global.OtelConfig) ServerOption { + return func(opts *ServerOptions) { + opts.Otel = otel + } +} + type ServiceOptions struct { Application *global.ApplicationConfig Provider *global.ProviderConfig @@ -203,6 +447,8 @@ type ServiceOptions struct { Registries map[string]*global.RegistryConfig Protocols map[string]*global.ProtocolConfig + srvOpts *ServerOptions + Id string unexported *atomic.Bool exported *atomic.Bool @@ -233,7 +479,7 @@ func defaultServiceOptions() *ServiceOptions { } } -func (svcOpts *ServiceOptions) init(opts ...ServiceOption) error { +func (svcOpts *ServiceOptions) init(srv *Server, opts ...ServiceOption) error { for _, opt := range opts { opt(svcOpts) } @@ -241,7 +487,9 @@ func (svcOpts *ServiceOptions) init(opts ...ServiceOption) error { return err } - srv := svcOpts.Service + svcOpts.srvOpts = srv.cfg + svc := svcOpts.Service + dubboutil.CopyFields(reflect.ValueOf(srv.cfg.Provider).Elem(), reflect.ValueOf(svc).Elem()) svcOpts.exported = atomic.NewBool(false) @@ -256,22 +504,22 @@ func (svcOpts *ServiceOptions) init(opts ...ServiceOption) error { // since many modules would retrieve this information directly. config.GetRootConfig().Application = svcOpts.applicationCompat svcOpts.metadataType = svcOpts.applicationCompat.MetadataType - if srv.Group == "" { - srv.Group = svcOpts.applicationCompat.Group + if svc.Group == "" { + svc.Group = svcOpts.applicationCompat.Group } - if srv.Version == "" { - srv.Version = svcOpts.applicationCompat.Version + if svc.Version == "" { + svc.Version = svcOpts.applicationCompat.Version } } svcOpts.unexported = atomic.NewBool(false) // initialize Registries - if len(srv.RCRegistriesMap) == 0 { - srv.RCRegistriesMap = svcOpts.Registries + if len(svc.RCRegistriesMap) == 0 { + svc.RCRegistriesMap = svcOpts.Registries } - if len(srv.RCRegistriesMap) > 0 { + if len(svc.RCRegistriesMap) > 0 { svcOpts.registriesCompat = make(map[string]*config.RegistryConfig) - for key, reg := range srv.RCRegistriesMap { + for key, reg := range svc.RCRegistriesMap { svcOpts.registriesCompat[key] = compatRegistryConfig(reg) if err := svcOpts.registriesCompat[key].Init(); err != nil { return err @@ -280,12 +528,12 @@ func (svcOpts *ServiceOptions) init(opts ...ServiceOption) error { } // initialize Protocols - if len(srv.RCProtocolsMap) == 0 { - srv.RCProtocolsMap = svcOpts.Protocols + if len(svc.RCProtocolsMap) == 0 { + svc.RCProtocolsMap = svcOpts.Protocols } - if len(srv.RCProtocolsMap) > 0 { + if len(svc.RCProtocolsMap) > 0 { svcOpts.protocolsCompat = make(map[string]*config.ProtocolConfig) - for key, pro := range srv.RCProtocolsMap { + for key, pro := range svc.RCProtocolsMap { svcOpts.protocolsCompat[key] = compatProtocolConfig(pro) if err := svcOpts.protocolsCompat[key].Init(); err != nil { return err @@ -293,26 +541,26 @@ func (svcOpts *ServiceOptions) init(opts ...ServiceOption) error { } } - srv.RegistryIDs = commonCfg.TranslateIds(srv.RegistryIDs) - if len(srv.RegistryIDs) <= 0 { - srv.RegistryIDs = svcOpts.Provider.RegistryIDs + svc.RegistryIDs = commonCfg.TranslateIds(svc.RegistryIDs) + if len(svc.RegistryIDs) <= 0 { + svc.RegistryIDs = svcOpts.Provider.RegistryIDs } - if srv.RegistryIDs == nil || len(srv.RegistryIDs) <= 0 { - srv.NotRegister = true + if svc.RegistryIDs == nil || len(svc.RegistryIDs) <= 0 { + svc.NotRegister = true } - srv.ProtocolIDs = commonCfg.TranslateIds(srv.ProtocolIDs) - if len(srv.ProtocolIDs) <= 0 { - srv.ProtocolIDs = svcOpts.Provider.ProtocolIDs + svc.ProtocolIDs = commonCfg.TranslateIds(svc.ProtocolIDs) + if len(svc.ProtocolIDs) <= 0 { + svc.ProtocolIDs = svcOpts.Provider.ProtocolIDs } - if len(srv.ProtocolIDs) <= 0 { + if len(svc.ProtocolIDs) <= 0 { for name := range svcOpts.Protocols { - srv.ProtocolIDs = append(srv.ProtocolIDs, name) + svc.ProtocolIDs = append(svc.ProtocolIDs, name) } } - if srv.TracingKey == "" { - srv.TracingKey = svcOpts.Provider.TracingKey + if svc.TracingKey == "" { + svc.TracingKey = svcOpts.Provider.TracingKey } err := svcOpts.check() @@ -384,9 +632,9 @@ func WithLoadBalanceP2C() ServiceOption { } } -func WithLoadBalanceXDSRingHash() ServiceOption { +func WithLoadBalance(lb string) ServiceOption { return func(opts *ServiceOptions) { - opts.Service.Loadbalance = constant.LoadBalanceKeyLeastActive + opts.Service.Loadbalance = lb } } @@ -454,6 +702,12 @@ func WithClusterAdaptiveService() ServiceOption { } } +func WithCluster(cluster string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.Cluster = cluster + } +} + func WithGroup(group string) ServiceOption { return func(cfg *ServiceOptions) { cfg.Service.Group = group @@ -485,6 +739,126 @@ func WithNotRegister() ServiceOption { } } +func WithWarmup(milliSeconds time.Duration) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.Warmup = milliSeconds.String() + } +} + +func WithRetries(retries int) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.Retries = strconv.Itoa(retries) + } +} + +func WithSerialization(ser string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.Serialization = ser + } +} + +func WithAccesslog(accesslog string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.AccessLog = accesslog + } +} + +func WithTpsLimiter(limiter string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.TpsLimiter = limiter + } +} + +func WithTpsLimitRate(rate int) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.TpsLimitRate = strconv.Itoa(rate) + } +} + +func WithTpsLimitStrategy(strategy string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.TpsLimitStrategy = strategy + } +} + +func WithTpsLimitRejectedHandler(rejHandler string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.TpsLimitRejectedHandler = rejHandler + } +} + +func WithExecuteLimit(exeLimit string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.ExecuteLimit = exeLimit + } +} + +func WithExecuteLimitRejectedHandler(exeRejHandler string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.ExecuteLimitRejectedHandler = exeRejHandler + } +} + +func WithAuth(auth string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.Auth = auth + } +} + +func WithParamSign(paramSign string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.ParamSign = paramSign + } +} + +func WithTag(tag string) ServiceOption { + return func(opts *ServiceOptions) { + opts.Service.Tag = tag + } +} + +func WithProtocol(opts ...protocol.Option) ServiceOption { + proOpts := protocol.NewOptions(opts...) + + return func(opts *ServiceOptions) { + if opts.Protocols == nil { + opts.Protocols = make(map[string]*global.ProtocolConfig) + } + opts.Protocols[proOpts.ID] = proOpts.Protocol + } +} + +func WithRegistry(opts ...registry.Option) ServiceOption { + regOpts := registry.NewOptions(opts...) + + return func(opts *ServiceOptions) { + if opts.Registries == nil { + opts.Registries = make(map[string]*global.RegistryConfig) + } + opts.Registries[regOpts.ID] = regOpts.Registry + } +} + +func WithMethod(opts ...config.MethodOption) ServiceOption { + regOpts := config.NewMethodOptions(opts...) + + return func(opts *ServiceOptions) { + if len(opts.Service.Methods) == 0 { + opts.Service.Methods = make([]*global.MethodConfig, 0) + } + opts.Service.Methods = append(opts.Service.Methods, regOpts.Method) + } +} + +func WithParam(k, v string) ServiceOption { + return func(opts *ServiceOptions) { + if opts.Service.Params == nil { + opts.Service.Params = make(map[string]string) + } + opts.Service.Params[k] = v + } +} + // ----------For framework---------- // These functions should not be invoked by users diff --git a/server/server.go b/server/server.go index 0e4ff2ee5f..711aca5f6a 100644 --- a/server/server.go +++ b/server/server.go @@ -153,7 +153,7 @@ func (s *Server) Register(handler interface{}, info *ServiceInfo, opts ...Servic // options passed by users have higher priority svcOpts = append(svcOpts, opts...) - if err := newSvcOpts.init(svcOpts...); err != nil { + if err := newSvcOpts.init(s, svcOpts...); err != nil { return err } newSvcOpts.Implement(handler)