Skip to content

Commit

Permalink
Triple client&server api (#2502)
Browse files Browse the repository at this point in the history
* client api

* client api

* update triple client and server api

* code format

* fix example

* put version and group into CallOption

* put version and group into CallOption

* code fmt

* remove redundant parameter

* update api

* mod tidy

* metadata support Params

* move MethodOption to config package

* mod tidy

* append tracing filters

* rename, metric to metrics
  • Loading branch information
chickenlj authored Nov 20, 2023
1 parent f9eecf4 commit 9ea5129
Show file tree
Hide file tree
Showing 49 changed files with 1,448 additions and 895 deletions.
132 changes: 71 additions & 61 deletions client/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func getEnv(key, fallback string) string {
return fallback
}

func updateOrCreateMeshURL(opts *ClientOptions) {
func updateOrCreateMeshURL(opts *ReferenceOptions) {
ref := opts.Reference
con := opts.Consumer
con := opts.cliOpts.Consumer

if ref.URL != "" {
logger.Infof("URL specified explicitly %v", ref.URL)
Expand Down Expand Up @@ -84,30 +84,31 @@ func updateOrCreateMeshURL(opts *ClientOptions) {
}

// ReferWithService retrieves invokers from urls.
func (opts *ClientOptions) ReferWithService(srv common.RPCService) {
opts.refer(srv, nil)
func (refOpts *ReferenceOptions) ReferWithService(srv common.RPCService) {
refOpts.refer(srv, nil)
}

func (opts *ClientOptions) ReferWithInfo(info *ClientInfo) {
opts.refer(nil, info)
func (refOpts *ReferenceOptions) ReferWithInfo(info *ClientInfo) {
refOpts.refer(nil, info)
}

func (opts *ClientOptions) ReferWithServiceAndInfo(srv common.RPCService, info *ClientInfo) {
opts.refer(srv, info)
func (refOpts *ReferenceOptions) ReferWithServiceAndInfo(srv common.RPCService, info *ClientInfo) {
refOpts.refer(srv, info)
}

func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
ref := opts.Reference
con := opts.Consumer
func (refOpts *ReferenceOptions) refer(srv common.RPCService, info *ClientInfo) {
ref := refOpts.Reference
clientOpts := refOpts.cliOpts
con := clientOpts.Consumer

var methods []string
if info != nil {
ref.InterfaceName = info.InterfaceName
methods = info.MethodNames
opts.id = info.InterfaceName
opts.info = info
refOpts.id = info.InterfaceName
refOpts.info = info
} else {
opts.id = common.GetReference(srv)
refOpts.id = common.GetReference(srv)
}
// If adaptive service is enabled,
// the cluster and load balance should be overridden to "adaptivesvc" and "p2c" respectively.
Expand All @@ -121,9 +122,9 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
common.WithPath(ref.InterfaceName),
common.WithProtocol(ref.Protocol),
common.WithMethods(methods),
common.WithParams(opts.getURLMap()),
common.WithParamsValue(constant.BeanNameKey, opts.id),
common.WithParamsValue(constant.MetadataTypeKey, opts.metaDataType),
common.WithParams(refOpts.getURLMap()),
common.WithParamsValue(constant.BeanNameKey, refOpts.id),
common.WithParamsValue(constant.MetadataTypeKey, refOpts.metaDataType),
)
if info != nil {
cfgURL.SetAttribute(constant.ClientInfoKey, info)
Expand All @@ -132,23 +133,23 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
if ref.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
opts.postProcessConfig(cfgURL)
refOpts.postProcessConfig(cfgURL)

// if mesh-enabled is set
updateOrCreateMeshURL(opts)
updateOrCreateMeshURL(refOpts)

// retrieving urls from config, and appending the urls to opts.urls
if err := opts.processURL(cfgURL); err != nil {
// retrieving urls from config, and appending the urls to refOpts.urls
if err := refOpts.processURL(cfgURL); err != nil {
panic(err)
}

// Get invokers according to opts.urls
// Get invokers according to refOpts.urls
var (
invoker protocol.Invoker
regURL *common.URL
)
invokers := make([]protocol.Invoker, len(opts.urls))
for i, u := range opts.urls {
invokers := make([]protocol.Invoker, len(refOpts.urls))
for i, u := range refOpts.urls {
if u.Protocol == constant.ServiceRegistryProtocol {
invoker = extension.GetProtocol(constant.RegistryProtocol).Refer(u)
} else {
Expand All @@ -167,17 +168,17 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {

// TODO(hxmhlt): decouple from directory, config should not depend on directory module
if len(invokers) == 1 {
opts.invoker = invokers[0]
refOpts.invoker = invokers[0]
if ref.URL != "" {
hitClu := constant.ClusterKeyFailover
if u := opts.invoker.GetURL(); u != nil {
if u := refOpts.invoker.GetURL(); u != nil {
hitClu = u.GetParam(constant.ClusterKey, constant.ClusterKeyZoneAware)
}
cluster, err := extension.GetCluster(hitClu)
if err != nil {
panic(err)
} else {
opts.invoker = cluster.Join(static.NewDirectory(invokers))
refOpts.invoker = cluster.Join(static.NewDirectory(invokers))
}
}
} else {
Expand All @@ -196,7 +197,7 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
if err != nil {
panic(err)
} else {
opts.invoker = cluster.Join(static.NewDirectory(invokers))
refOpts.invoker = cluster.Join(static.NewDirectory(invokers))
}
}

Expand All @@ -214,29 +215,29 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
if asyncSrv, ok := srv.(common.AsyncCallbackService); ok {
callback = asyncSrv.CallBack
}
opts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetAsyncProxy(opts.invoker, callback, cfgURL)
refOpts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetAsyncProxy(refOpts.invoker, callback, cfgURL)
} else {
opts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetProxy(opts.invoker, cfgURL)
refOpts.pxy = extension.GetProxyFactory(con.ProxyFactory).GetProxy(refOpts.invoker, cfgURL)
}
opts.pxy.Implement(srv)
refOpts.pxy.Implement(srv)
}
// this protocol would be destroyed in graceful_shutdown
// please refer to (https://github.com/apache/dubbo-go/issues/2429)
graceful_shutdown.RegisterProtocol(ref.Protocol)
}

func (opts *ClientOptions) processURL(cfgURL *common.URL) error {
ref := opts.Reference
func (refOpts *ReferenceOptions) processURL(cfgURL *common.URL) error {
ref := refOpts.Reference
if ref.URL != "" { // use user-specific urls
/*
Two types of URL are allowed for opts.URL:
Two types of URL are allowed for refOpts.URL:
1. direct url: server IP, that is, no need for a registry anymore
2. registry url
They will be handled in different ways:
For example, we have a direct url and a registry url:
1. "tri://localhost:10000" is a direct url
2. "registry://localhost:2181" is a registry url.
Then, opts.URL looks like a string separated by semicolon: "tri://localhost:10000;registry://localhost:2181".
Then, refOpts.URL looks like a string separated by semicolon: "tri://localhost:10000;registry://localhost:2181".
The result of urlStrings is a string array: []string{"tri://localhost:10000", "registry://localhost:2181"}.
*/
urlStrings := gxstrings.RegSplit(ref.URL, "\\s*[;]+\\s*")
Expand All @@ -247,7 +248,7 @@ func (opts *ClientOptions) processURL(cfgURL *common.URL) error {
}
if serviceURL.Protocol == constant.RegistryProtocol { // serviceURL in this branch is a registry protocol
serviceURL.SubURL = cfgURL
opts.urls = append(opts.urls, serviceURL)
refOpts.urls = append(refOpts.urls, serviceURL)
} else { // serviceURL in this branch is the target endpoint IP address
if serviceURL.Path == "" {
serviceURL.Path = "/" + ref.InterfaceName
Expand All @@ -256,57 +257,60 @@ func (opts *ClientOptions) processURL(cfgURL *common.URL) error {
// other stuff, e.g. IP, port, etc., are same as serviceURL
newURL := common.MergeURL(serviceURL, cfgURL)
newURL.AddParam("peer", "true")
opts.urls = append(opts.urls, newURL)
refOpts.urls = append(refOpts.urls, newURL)
}
}
} else { // use registry configs
opts.urls = config.LoadRegistries(ref.RegistryIDs, opts.registriesCompat, common.CONSUMER)
refOpts.urls = config.LoadRegistries(ref.RegistryIDs, refOpts.registriesCompat, common.CONSUMER)
// set url to regURLs
for _, regURL := range opts.urls {
for _, regURL := range refOpts.urls {
regURL.SubURL = cfgURL
}
}
return nil
}

func (opts *ClientOptions) CheckAvailable() bool {
ref := opts.Reference
if opts.invoker == nil {
func (refOpts *ReferenceOptions) CheckAvailable() bool {
ref := refOpts.Reference
if refOpts.invoker == nil {
logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", ref.InterfaceName)
return false
}
if !opts.invoker.IsAvailable() {
if !refOpts.invoker.IsAvailable() {
return false
}
return true
}

// Implement
// @v is service provider implemented RPCService
func (opts *ClientOptions) Implement(v common.RPCService) {
if opts.pxy != nil {
opts.pxy.Implement(v)
} else if opts.info != nil {
opts.info.ClientInjectFunc(v, &Client{
invoker: opts.invoker,
info: opts.info,
func (refOpts *ReferenceOptions) Implement(v common.RPCService) {
if refOpts.pxy != nil {
refOpts.pxy.Implement(v)
} else if refOpts.info != nil {
refOpts.info.ClientInjectFunc(v, &Client{
cliOpts: refOpts.cliOpts,
info: refOpts.info,
refOpts: map[string]*ReferenceOptions{},
})
}
}

// GetRPCService gets RPCService from proxy
func (opts *ClientOptions) GetRPCService() common.RPCService {
return opts.pxy.Get()
func (refOpts *ReferenceOptions) GetRPCService() common.RPCService {
return refOpts.pxy.Get()
}

// GetProxy gets proxy
func (opts *ClientOptions) GetProxy() *proxy.Proxy {
return opts.pxy
func (refOpts *ReferenceOptions) GetProxy() *proxy.Proxy {
return refOpts.pxy
}

func (opts *ClientOptions) getURLMap() url.Values {
ref := opts.Reference
app := opts.applicationCompat
func (refOpts *ReferenceOptions) getURLMap() url.Values {
ref := refOpts.Reference
app := refOpts.applicationCompat
metrics := refOpts.cliOpts.Metrics
tracing := refOpts.cliOpts.Otel.TracingConfig

urlMap := url.Values{}
// first set user params
Expand Down Expand Up @@ -352,6 +356,12 @@ func (opts *ClientOptions) getURLMap() url.Values {
if ref.Generic != "" {
defaultReferenceFilter = constant.GenericFilterKey + "," + defaultReferenceFilter
}
if metrics.Enable != nil && *metrics.Enable {
defaultReferenceFilter += fmt.Sprintf(",%s", constant.MetricsFilterKey)
}
if tracing.Enable != nil && *tracing.Enable {
defaultReferenceFilter += fmt.Sprintf(",%s", constant.OTELClientTraceKey)
}
urlMap.Set(constant.ReferenceFilterKey, commonCfg.MergeValue(ref.Filter, "", defaultReferenceFilter))

for _, v := range ref.Methods {
Expand All @@ -368,7 +378,7 @@ func (opts *ClientOptions) getURLMap() url.Values {

// todo: figure this out
//// GenericLoad ...
//func (opts *ClientOptions) GenericLoad(id string) {
//func (opts *ReferenceOptions) GenericLoad(id string) {
// genericService := generic.NewGenericService(opts.id)
// config.SetConsumerService(genericService)
// opts.id = id
Expand All @@ -377,12 +387,12 @@ func (opts *ClientOptions) getURLMap() url.Values {
//}

// GetInvoker get invoker from ReferenceConfigs
func (opts *ClientOptions) GetInvoker() protocol.Invoker {
return opts.invoker
func (refOpts *ReferenceOptions) GetInvoker() protocol.Invoker {
return refOpts.invoker
}

// postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfigs.
func (opts *ClientOptions) postProcessConfig(url *common.URL) {
func (refOpts *ReferenceOptions) postProcessConfig(url *common.URL) {
for _, p := range extension.GetConfigPostProcessors() {
p.PostProcessReferenceConfig(url)
}
Expand Down
34 changes: 25 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,25 @@ package client

import (
"context"
"fmt"
)

import (
"github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

type Client struct {
invoker protocol.Invoker
info *ClientInfo
info *ClientInfo

cliOpts *ClientOptions
refOpts map[string]*ReferenceOptions
}

type ClientInfo struct {
Expand All @@ -58,8 +60,13 @@ func (cli *Client) call(ctx context.Context, paramsRawVals []interface{}, interf
if err != nil {
return nil, err
}
// todo: move timeout into context or invocation
return cli.invoker.Invoke(ctx, inv), nil

refOption := cli.refOpts[common.ServiceKey(interfaceName, options.Group, options.Version)]
if refOption == nil {
return nil, fmt.Errorf("no service found for %s/%s:%s, please check if the service has been registered", options.Group, interfaceName, options.Version)
}

return refOption.invoker.Invoke(ctx, inv), nil

}

Expand Down Expand Up @@ -95,15 +102,23 @@ func (cli *Client) CallBidiStream(ctx context.Context, interfaceName, methodName
return res.Result(), res.Error()
}

func (cli *Client) Init(info *ClientInfo) error {
func (cli *Client) Init(info *ClientInfo, opts ...ReferenceOption) (string, string, error) {
if info == nil {
return errors.New("ClientInfo is nil")
return "", "", errors.New("ClientInfo is nil")
}

cli.cliOpts.ReferWithInfo(info)
cli.invoker = cli.cliOpts.invoker
newRefOptions := defaultReferenceOptions()
err := newRefOptions.init(cli, opts...)
if err != nil {
return "", "", err
}

ref := newRefOptions.Reference
cli.refOpts[common.ServiceKey(info.InterfaceName, ref.Group, ref.Version)] = newRefOptions

newRefOptions.ReferWithInfo(info)

return nil
return ref.Group, ref.Version, nil
}

func generateInvocation(methodName string, paramsRawVals []interface{}, callType string, opts *CallOptions) (protocol.Invocation, error) {
Expand All @@ -125,5 +140,6 @@ func NewClient(opts ...ClientOption) (*Client, error) {
}
return &Client{
cliOpts: newCliOpts,
refOpts: make(map[string]*ReferenceOptions),
}, nil
}
Loading

0 comments on commit 9ea5129

Please sign in to comment.