Skip to content

Commit

Permalink
feat: finish server layer (#2425)
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima authored Oct 11, 2023
1 parent 1d0c3b4 commit 8682a2e
Show file tree
Hide file tree
Showing 49 changed files with 2,794 additions and 584 deletions.
4 changes: 4 additions & 0 deletions client/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/proxy"
Expand Down Expand Up @@ -252,6 +253,9 @@ func (opts *ClientOptions) refer(srv common.RPCService, info *ClientInfo) {
}
opts.pxy.Implement(srv)
}
// this protocol would be destroyed in graceful_shutdown
// please refer to (https://github.com/apache/dubbo-go/issues/2429)
graceful_shutdown.RegisterProtocol(ref.Protocol)
}

func (opts *ClientOptions) CheckAvailable() bool {
Expand Down
224 changes: 168 additions & 56 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client

import (
"strconv"
"time"
)

import (
Expand All @@ -28,17 +29,21 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common"
commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/proxy"
"dubbo.apache.org/dubbo-go/v3/registry"
)

type ClientOptions struct {
Application *global.ApplicationConfig
Consumer *global.ConsumerConfig
Reference *global.ReferenceConfig
Registries map[string]*global.RegistryConfig
Shutdown *global.ShutdownConfig

pxy *proxy.Proxy
id string
Expand All @@ -54,7 +59,10 @@ type ClientOptions struct {

func defaultClientOptions() *ClientOptions {
return &ClientOptions{
Reference: global.DefaultReferenceConfig(),
Application: global.DefaultApplicationConfig(),
Consumer: global.DefaultConsumerConfig(),
Reference: global.DefaultReferenceConfig(),
Shutdown: global.DefaultShutdownConfig(),
}
}

Expand Down Expand Up @@ -103,6 +111,10 @@ func (cliOpts *ClientOptions) init(opts ...ClientOption) error {

// todo(DMwangnima): move to registry package
// init registries
var emptyRegIDsFlag bool
if ref.RegistryIDs == nil || len(ref.RegistryIDs) <= 0 {
emptyRegIDsFlag = true
}
regs := cliOpts.Registries
if regs != nil {
cliOpts.registriesCompat = make(map[string]*config.RegistryConfig)
Expand All @@ -111,19 +123,26 @@ func (cliOpts *ClientOptions) init(opts ...ClientOption) error {
if err := cliOpts.registriesCompat[key].Init(); err != nil {
return err
}
if emptyRegIDsFlag {
ref.RegistryIDs = append(ref.RegistryIDs, key)
}
}
}
ref.RegistryIDs = commonCfg.TranslateIds(ref.RegistryIDs)

// init graceful_shutdown
graceful_shutdown.Init(graceful_shutdown.WithShutdown_Config(cliOpts.Shutdown))

return commonCfg.Verify(cliOpts)
}

type ClientOption func(*ClientOptions)

// ---------- For user ----------

func WithCheck(check bool) ClientOption {
func WithCheck() ClientOption {
return func(opts *ClientOptions) {
check := true
opts.Reference.Check = &check
}
}
Expand All @@ -134,18 +153,14 @@ func WithURL(url string) ClientOption {
}
}

// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
func WithFilter(filter string) ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Filter = filter
}
}

func WithProtocol(protocol string) ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Protocol = protocol
}
}

// todo(DMwangnima): think about a more ideal configuration style
func WithRegistryIDs(registryIDs []string) ClientOption {
return func(opts *ClientOptions) {
if len(registryIDs) > 0 {
Expand All @@ -154,15 +169,122 @@ func WithRegistryIDs(registryIDs []string) ClientOption {
}
}

func WithCluster(cluster string) ClientOption {
func WithRegistry(key string, opts ...registry.Option) ClientOption {
regOpts := registry.DefaultOptions()
for _, opt := range opts {
opt(regOpts)
}

return func(cliOpts *ClientOptions) {
if cliOpts.Registries == nil {
cliOpts.Registries = make(map[string]*global.RegistryConfig)
}
cliOpts.Registries[key] = regOpts.Registry
}
}

func WithShutdown(opts ...graceful_shutdown.Option) ClientOption {
sdOpts := graceful_shutdown.DefaultOptions()
for _, opt := range opts {
opt(sdOpts)
}

return func(cliOpts *ClientOptions) {
cliOpts.Shutdown = sdOpts.Shutdown
}
}

// ========== Cluster Strategy ==========

func WithClusterAvailable() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Cluster = cluster
opts.Reference.Cluster = constant.ClusterKeyAvailable
}
}

func WithLoadBalance(loadBalance string) ClientOption {
func WithClusterBroadcast() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Loadbalance = loadBalance
opts.Reference.Cluster = constant.ClusterKeyBroadcast
}
}

func WithClusterFailBack() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Cluster = constant.ClusterKeyFailback
}
}

func WithClusterFailFast() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Cluster = constant.ClusterKeyFailfast
}
}

func WithClusterFailOver() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Cluster = constant.ClusterKeyFailover
}
}

func WithClusterFailSafe() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Cluster = constant.ClusterKeyFailsafe
}
}

func WithClusterForking() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Cluster = constant.ClusterKeyForking
}
}

func WithClusterZoneAware() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Cluster = constant.ClusterKeyZoneAware
}
}

func WithClusterAdaptiveService() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Cluster = constant.ClusterKeyAdaptiveService
}
}

// ========== LoadBalance Strategy ==========

func WithLoadBalanceConsistentHashing() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyConsistentHashing
}
}

func WithLoadBalanceLeastActive() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyLeastActive
}
}

func WithLoadBalanceRandom() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyRandom
}
}

func WithLoadBalanceRoundRobin() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyRoundRobin
}
}

func WithLoadBalanceP2C() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyP2C
}
}

func WithLoadBalanceXDSRingHash() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Loadbalance = constant.LoadBalanceKeyLeastActive
}
}

Expand All @@ -184,45 +306,51 @@ func WithVersion(version string) ClientOption {
}
}

func WithSerialization(serialization string) ClientOption {
func WithJSON() ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Serialization = serialization
opts.Reference.Serialization = constant.JSONSerialization
}
}

func WithProviderBy(providedBy string) ClientOption {
func WithProvidedBy(providedBy string) ClientOption {
return func(opts *ClientOptions) {
opts.Reference.ProvidedBy = providedBy
}
}

func WithAsync(async bool) ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Async = async
}
}
// todo(DMwangnima): implement this functionality
//func WithAsync() ClientOption {
// return func(opts *ClientOptions) {
// opts.Reference.Async = true
// }
//}

func WithParams(params map[string]string) ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Params = params
}
}

func WithGeneric(generic string) ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Generic = generic
}
}
// todo(DMwangnima): implement this functionality
//func WithGeneric(generic bool) ClientOption {
// return func(opts *ClientOptions) {
// if generic {
// opts.Reference.Generic = "true"
// } else {
// opts.Reference.Generic = "false"
// }
// }
//}

func WithSticky(sticky bool) ClientOption {
return func(opts *ClientOptions) {
opts.Reference.Sticky = sticky
}
}

func WithRequestTimeout(timeout string) ClientOption {
func WithRequestTimeout(timeout time.Duration) ClientOption {
return func(opts *ClientOptions) {
opts.Reference.RequestTimeout = timeout
opts.Reference.RequestTimeout = timeout.String()
}
}

Expand All @@ -245,51 +373,35 @@ func WithMeshProviderPort(port int) ClientOption {
}

// ---------- For framework ----------
// These functions should not be invoked by users

func WithRegistryConfig(key string, opts ...global.RegistryOption) ClientOption {
regCfg := new(global.RegistryConfig)
for _, opt := range opts {
opt(regCfg)
}

func SetRegistries(regs map[string]*global.RegistryConfig) ClientOption {
return func(opts *ClientOptions) {
if opts.Registries == nil {
opts.Registries = make(map[string]*global.RegistryConfig)
}
opts.Registries[key] = regCfg
opts.Registries = regs
}
}

func WithApplicationConfig(opts ...global.ApplicationOption) ClientOption {
appCfg := new(global.ApplicationConfig)
for _, opt := range opts {
opt(appCfg)
}

func SetApplication(application *global.ApplicationConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Application = appCfg
opts.Application = application
}
}

func WithConsumerConfig(opts ...global.ConsumerOption) ClientOption {
conCfg := new(global.ConsumerConfig)
for _, opt := range opts {
opt(conCfg)
}

func SetConsumer(consumer *global.ConsumerConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Consumer = conCfg
opts.Consumer = consumer
}
}

func WithReferenceConfig(opts ...global.ReferenceOption) ClientOption {
refCfg := new(global.ReferenceConfig)
for _, opt := range opts {
opt(refCfg)
func SetReference(reference *global.ReferenceConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Reference = reference
}
}

func SetShutdown(shutdown *global.ShutdownConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Reference = refCfg
opts.Shutdown = shutdown
}
}

Expand Down
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ const (
CallHTTPTypeKey = "call-http-type"
CallHTTP = "http"
CallHTTP2 = "http2"
ServiceInfoKey = "service-info"
)

const (
Expand Down
1 change: 1 addition & 0 deletions common/constant/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ const (
Hessian2Serialization = "hessian2"
ProtobufSerialization = "protobuf"
MsgpackSerialization = "msgpack"
JSONSerialization = "json"
)
Loading

0 comments on commit 8682a2e

Please sign in to comment.