Skip to content

Commit

Permalink
Merge branch '3.0' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks committed May 21, 2021
2 parents cc74aa5 + 0f83b1a commit 40082d4
Show file tree
Hide file tree
Showing 17 changed files with 218 additions and 69 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Finished List:
* [Etcd(Local)](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/metadata/report/etcd/report.go)
* [Consul(Local)](https://github.com/apache/dubbo-go/pull/633)
* [Zookeeper(Remoting)](https://github.com/apache/dubbo-go/pull/1161)

- Tool
* [Dubbo-go-cli](https://github.com/apache/dubbo-go/pull/818)

Expand Down Expand Up @@ -313,3 +313,6 @@ If you are using [apache/dubbo-go](https://github.com/apache/dubbo-go) and think
</tbody>
</table>
</div>

[MORE USER CASE](https://github.com/apache/dubbo-go/issues/2)

5 changes: 4 additions & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ Apache License, Version 2.0
* [Etcd(Local)](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/metadata/report/etcd/report.go)
* [Consul(Local)](https://github.com/apache/dubbo-go/pull/633)
* [Zookeeper(Remoting)](https://github.com/apache/dubbo-go/pull/1161)

- 工具箱
* [Dubbo-go-cli](https://github.com/apache/dubbo-go/pull/818)

Expand Down Expand Up @@ -311,3 +311,6 @@ dubbogo 社区已经开通微信公众号,可在微信搜索 "dubbogo示土区
</tbody>
</table>
</div>

[更多用户示例](https://github.com/apache/dubbo-go/issues/2)

2 changes: 1 addition & 1 deletion cluster/cluster_impl/available_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ func NewAvailableCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *availableCluster) Join(directory cluster.Directory) protocol.Invoker {
return NewAvailableClusterInvoker(directory)
return buildInterceptorChain(NewAvailableClusterInvoker(directory))
}
19 changes: 0 additions & 19 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package cluster_impl

import (
"context"
)

import (
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
Expand All @@ -40,7 +36,6 @@ type baseClusterInvoker struct {
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
interceptor cluster.ClusterInterceptor
}

func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
Expand Down Expand Up @@ -165,20 +160,6 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
return nil
}

func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
if invoker.interceptor != nil {
invoker.interceptor.BeforeInvoker(ctx, invocation)

result := invoker.interceptor.DoInvoke(ctx, invocation)

invoker.interceptor.AfterInvoker(ctx, invocation)

return result
}

return nil
}

func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
for _, i := range invoked {
if i == selectedInvoker {
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_impl/broadcast_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewBroadcastCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newBroadcastClusterInvoker(directory)
return buildInterceptorChain(newBroadcastClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failback_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailbackCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailbackClusterInvoker(directory)
return buildInterceptorChain(newFailbackClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failfast_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailFastCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailFastClusterInvoker(directory)
return buildInterceptorChain(newFailFastClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failover_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailoverCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailoverClusterInvoker(directory)
return buildInterceptorChain(newFailoverClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failsafe_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewFailsafeCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailsafeClusterInvoker(directory)
return buildInterceptorChain(newFailsafeClusterInvoker(directory))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/forking_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func NewForkingCluster() cluster.Cluster {

// Join returns a baseClusterInvoker instance
func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {
return newForkingClusterInvoker(directory)
return buildInterceptorChain(newForkingClusterInvoker(directory))
}
76 changes: 76 additions & 0 deletions cluster/cluster_impl/interceptor_invoker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cluster_impl

import (
"context"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

// InterceptorInvoker mocks cluster interceptor as an invoker
type InterceptorInvoker struct {
next protocol.Invoker
interceptor cluster.Interceptor
}

// GetURL is used to get url from InterceptorInvoker
func (i *InterceptorInvoker) GetURL() *common.URL {
return i.next.GetURL()
}

// IsAvailable is used to get available status
func (i *InterceptorInvoker) IsAvailable() bool {
return i.next.IsAvailable()
}

// Invoke is used to call service method by invocation
func (i *InterceptorInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
return i.interceptor.Invoke(ctx, i.next, invocation)
}

// Destroy will destroy invoker
func (i *InterceptorInvoker) Destroy() {
i.next.Destroy()
}

func buildInterceptorChain(invoker protocol.Invoker, builtins ...cluster.Interceptor) protocol.Invoker {
// The order of interceptors is from left to right, so loading from right to left
next := invoker
interceptors := extension.GetClusterInterceptors()
if len(interceptors) != 0 {
for i := len(interceptors) - 1; i >= 0; i-- {
v := &InterceptorInvoker{next: next, interceptor: interceptors[i]}
next = v
}
}

if builtins != nil && len(builtins) > 0 {
for i := len(builtins) - 1; i >= 0; i-- {
v := &InterceptorInvoker{next: next, interceptor: builtins[i]}
next = v
}
}

return next
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ func NewMockCluster() cluster.Cluster {

// nolint
func (cluster *mockCluster) Join(directory cluster.Directory) protocol.Invoker {
return protocol.NewBaseInvoker(directory.GetURL())
return buildInterceptorChain(protocol.NewBaseInvoker(directory.GetURL()))
}
2 changes: 1 addition & 1 deletion cluster/cluster_impl/zone_aware_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ func NewZoneAwareCluster() cluster.Cluster {

// Join returns a zoneAwareClusterInvoker instance
func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newZoneAwareClusterInvoker(directory)
return buildInterceptorChain(newZoneAwareClusterInvoker(directory), getZoneAwareInterceptor())
}
57 changes: 57 additions & 0 deletions cluster/cluster_impl/zone_aware_cluster_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cluster_impl

import (
"context"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type zoneAwareInterceptor struct {
}

func (z *zoneAwareInterceptor) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
force := ctx.Value(key)

if force != nil {
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
}
default:
// ignore
}
}

return invoker.Invoke(ctx, invocation)
}

func getZoneAwareInterceptor() cluster.Interceptor {
return &zoneAwareInterceptor{}
}
31 changes: 3 additions & 28 deletions cluster/cluster_impl/zone_aware_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,14 @@ type zoneAwareClusterInvoker struct {
}

func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoke := &zoneAwareClusterInvoker{
invoker := &zoneAwareClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
// add local to interceptor
invoke.interceptor = invoke
return invoke
return invoker
}

// nolint
func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)

err := invoker.checkInvokers(invokers, invocation)
Expand Down Expand Up @@ -104,29 +102,6 @@ func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation
}
}

func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) {
key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
force := ctx.Value(key)

if force != nil {
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
}
default:
// ignore
}
}
}

func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) {
}

func matchParam(target, key, def string, invoker protocol.Invoker) bool {
return target == invoker.GetURL().GetParam(key, def)
}
16 changes: 5 additions & 11 deletions cluster/cluster_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,9 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

// ClusterInterceptor
// Extension - ClusterInterceptor
type ClusterInterceptor interface {
// Before DoInvoke method
BeforeInvoker(ctx context.Context, invocation protocol.Invocation)

// After DoInvoke method
AfterInvoker(ctx context.Context, invocation protocol.Invocation)

// Corresponding cluster invoke
DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result
// Interceptor
// Extension - Interceptor
type Interceptor interface {
// Invoke is the core function of a cluster interceptor, it determines the process of the interceptor
Invoke(context.Context, protocol.Invoker, protocol.Invocation) protocol.Result
}
Loading

0 comments on commit 40082d4

Please sign in to comment.