Skip to content

Commit

Permalink
fix: fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
LaurenceLiZhixin committed Apr 3, 2021
2 parents 9202654 + 409c745 commit 6ff62af
Show file tree
Hide file tree
Showing 293 changed files with 696 additions and 1,190 deletions.
6 changes: 2 additions & 4 deletions cluster/cluster_impl/available_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ import (
"github.com/apache/dubbo-go/protocol/mock"
)

var (
availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
var availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))

func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
Expand Down
15 changes: 7 additions & 8 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (invoker *baseClusterInvoker) GetUrl() *common.URL {
}

func (invoker *baseClusterInvoker) Destroy() {
//this is must atom operation
// this is must atom operation
if invoker.destroyed.CAS(false, true) {
invoker.directory.Destroy()
}
Expand All @@ -69,7 +69,7 @@ func (invoker *baseClusterInvoker) IsAvailable() bool {
return invoker.directory.IsAvailable()
}

//check invokers availables
// check invokers availables
func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
if len(invokers) == 0 {
ip := common.GetLocalIp()
Expand All @@ -78,10 +78,9 @@ func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, in
invocation.MethodName(), invoker.directory.GetUrl().SubURL.Key(), invoker.directory.GetUrl().String(), ip, constant.Version)
}
return nil

}

//check cluster invoker is destroyed or not
// check cluster invoker is destroyed or not
func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
if invoker.destroyed.Load() {
ip := common.GetLocalIp()
Expand All @@ -99,7 +98,7 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p

url := invokers[0].GetUrl()
sticky := url.GetParamBool(constant.STICKY_KEY, false)
//Get the service method sticky config if have
// Get the service method sticky config if have
sticky = url.GetMethodParamBool(invocation.MethodName(), constant.STICKY_KEY, sticky)

if invoker.stickyInvoker != nil && !isInvoked(invoker.stickyInvoker, invokers) {
Expand Down Expand Up @@ -135,7 +134,7 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc

selectedInvoker := lb.Select(invokers, invocation)

//judge if the selected Invoker is invoked and available
// judge if the selected Invoker is invoked and available
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
protocol.SetInvokerUnhealthyStatus(selectedInvoker)
otherInvokers := getOtherInvokers(invokers, selectedInvoker)
Expand Down Expand Up @@ -193,10 +192,10 @@ func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cl
url := invoker.GetUrl()

methodName := invocation.MethodName()
//Get the service loadbalance config
// Get the service loadbalance config
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)

//Get the service method loadbalance config if have
// Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); len(v) > 0 {
lb = v
}
Expand Down
1 change: 1 addition & 0 deletions cluster/cluster_impl/broadcast_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package cluster_impl
import (
"context"
)

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/logger"
Expand Down
6 changes: 2 additions & 4 deletions cluster/cluster_impl/broadcast_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ import (
"github.com/apache/dubbo-go/protocol/mock"
)

var (
broadcastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
var broadcastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))

func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
Expand Down
6 changes: 3 additions & 3 deletions cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr
return &protocol.RPCResult{}
}

//Get the service loadbalance config
// Get the service loadbalance config
url := invokers[0].GetUrl()
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
//Get the service method loadbalance config if have
// Get the service method loadbalance config if have
methodName := invocation.MethodName()
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
lb = v
Expand All @@ -149,7 +149,7 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr
loadBalance := extension.GetLoadbalance(lb)
invoked := make([]protocol.Invoker, 0, len(invokers))
ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
//DO INVOKE
// DO INVOKE
result := ivk.Invoke(ctx, invocation)
if result.Error() != nil {
invoker.once.Do(func() {
Expand Down
6 changes: 2 additions & 4 deletions cluster/cluster_impl/failback_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ import (
"github.com/apache/dubbo-go/protocol/mock"
)

var (
failbackUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
var failbackUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))

// registerFailback register failbackCluster to cluster extension.
func registerFailback(invoker *mock.MockInvoker) protocol.Invoker {
Expand Down
1 change: 1 addition & 0 deletions cluster/cluster_impl/failfast_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package cluster_impl
import (
"context"
)

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/protocol"
Expand Down
6 changes: 2 additions & 4 deletions cluster/cluster_impl/failfast_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ import (
"github.com/apache/dubbo-go/protocol/mock"
)

var (
failfastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
var failfastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))

// registerFailfast register failfastCluster to cluster extension.
func registerFailfast(invoker *mock.MockInvoker) protocol.Invoker {
Expand Down
13 changes: 7 additions & 6 deletions cluster/cluster_impl/failover_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
loadBalance := getLoadBalance(invokers[0], invocation)

for i := 0; i <= retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
// Reselect before retry to avoid a change of candidate `invokers`.
// NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
if err := invoker.checkWhetherDestroyed(); err != nil {
return &protocol.RPCResult{Err: err}
Expand All @@ -81,7 +81,7 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
continue
}
invoked = append(invoked, ivk)
//DO INVOKE
// DO INVOKE
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key())
Expand All @@ -105,7 +105,8 @@ func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation pr
"Tried %v times of the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. "+
"Last error is %+v.", methodName, invokerSvc, retries, providers, len(providers), len(invokers),
invokerUrl, ip, constant.Version, result.Error().Error()),
)}
),
}
}

func getRetries(invokers []protocol.Invoker, methodName string) int {
Expand All @@ -114,9 +115,9 @@ func getRetries(invokers []protocol.Invoker, methodName string) int {
}

url := invokers[0].GetUrl()
//get reties
// get reties
retriesConfig := url.GetParam(constant.RETRIES_KEY, constant.DEFAULT_RETRIES)
//Get the service method loadbalance config if have
// Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.RETRIES_KEY, ""); len(v) != 0 {
retriesConfig = v
}
Expand Down
1 change: 1 addition & 0 deletions cluster/cluster_impl/failover_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/url"
"testing"
)

import (
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
Expand Down
7 changes: 4 additions & 3 deletions cluster/cluster_impl/failsafe_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package cluster_impl
import (
"context"
)

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
Expand Down Expand Up @@ -56,9 +57,9 @@ func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation pr

url := invokers[0].GetUrl()
methodName := invocation.MethodName()
//Get the service loadbalance config
// Get the service loadbalance config
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
//Get the service method loadbalance config if have
// Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
lb = v
}
Expand All @@ -68,7 +69,7 @@ func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation pr
var result protocol.Result

ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
//DO INVOKE
// DO INVOKE
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
// ignore
Expand Down
6 changes: 2 additions & 4 deletions cluster/cluster_impl/failsafe_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ import (
"github.com/apache/dubbo-go/protocol/mock"
)

var (
failsafeUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
var failsafeUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))

// registerFailsafe register failsafeCluster to cluster extension.
func registerFailsafe(invoker *mock.MockInvoker) protocol.Invoker {
Expand Down
8 changes: 3 additions & 5 deletions cluster/cluster_impl/forking_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ import (
"github.com/apache/dubbo-go/protocol/mock"
)

var (
forkingUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)
var forkingUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))

func registerForking(mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance(loadbalance.RoundRobin, loadbalance.NewRoundRobinLoadBalance)
Expand All @@ -72,7 +70,7 @@ func TestForkingInvokeSuccess(t *testing.T) {

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
forkingUrl.AddParam(constant.FORKS_KEY, strconv.Itoa(3))
//forkingUrl.AddParam(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_TIMEOUT))
// forkingUrl.AddParam(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_TIMEOUT))

var wg sync.WaitGroup
wg.Add(2)
Expand Down
1 change: 0 additions & 1 deletion cluster/cluster_impl/zone_aware_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invoc
}

func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) {

}

func matchParam(target, key, def string, invoker protocol.Invoker) bool {
Expand Down
22 changes: 13 additions & 9 deletions cluster/cluster_impl/zone_aware_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
// defer ctrl.Finish()

mockResult := &protocol.RPCResult{
Attrs: map[string]interface{}{constant.PREFERRED_KEY: "true"},
Rest: rest{tried: 0, success: true}}
Rest: rest{tried: 0, success: true},
}

var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -82,7 +83,7 @@ func TestZoneWareInvokerWithWeightSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
// defer ctrl.Finish()

w1 := "50"
w2 := "200"
Expand All @@ -100,15 +101,17 @@ func TestZoneWareInvokerWithWeightSuccess(t *testing.T) {
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{
Attrs: map[string]interface{}{constant.WEIGHT_KEY: w1},
Rest: rest{tried: 0, success: true}}
Rest: rest{tried: 0, success: true},
}
}).MaxTimes(100)
} else {
url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w2)
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{
Attrs: map[string]interface{}{constant.WEIGHT_KEY: w2},
Rest: rest{tried: 0, success: true}}
Rest: rest{tried: 0, success: true},
}
}).MaxTimes(100)
}
invokers = append(invokers, invoker)
Expand All @@ -135,12 +138,12 @@ func TestZoneWareInvokerWithWeightSuccess(t *testing.T) {
}

func TestZoneWareInvokerWithZoneSuccess(t *testing.T) {
var zoneArray = []string{"hangzhou", "shanghai"}
zoneArray := []string{"hangzhou", "shanghai"}

ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
// defer ctrl.Finish()

var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
Expand All @@ -155,7 +158,8 @@ func TestZoneWareInvokerWithZoneSuccess(t *testing.T) {
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{
Attrs: map[string]interface{}{constant.ZONE_KEY: zoneValue},
Rest: rest{tried: 0, success: true}}
Rest: rest{tried: 0, success: true},
}
})
invokers = append(invokers, invoker)
}
Expand All @@ -178,7 +182,7 @@ func TestZoneWareInvokerWithZoneForceFail(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
//defer ctrl.Finish()
// defer ctrl.Finish()

var invokers []protocol.Invoker
for i := 0; i < 2; i++ {
Expand Down
1 change: 0 additions & 1 deletion cluster/directory/base_directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func TestNewBaseDirectory(t *testing.T) {
}

func TestBuildRouterChain(t *testing.T) {

regURL := url
regURL.AddParam(constant.INTERFACE_KEY, "mock-app")
directory := NewBaseDirectory(regURL)
Expand Down
2 changes: 1 addition & 1 deletion cluster/directory/static_directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory {
return dir
}

//for-loop invokers ,if all invokers is available ,then it means directory is available
// for-loop invokers ,if all invokers is available ,then it means directory is available
func (dir *staticDirectory) IsAvailable() bool {
if len(dir.invokers) == 0 {
return false
Expand Down
5 changes: 3 additions & 2 deletions cluster/loadbalance/consistent_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"strconv"
"strings"
)

import (
gxsort "github.com/dubbogo/gost/sort"
)

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
Expand All @@ -56,8 +58,7 @@ func init() {
}

// ConsistentHashLoadBalance implementation of load balancing: using consistent hashing
type ConsistentHashLoadBalance struct {
}
type ConsistentHashLoadBalance struct{}

// NewConsistentHashLoadBalance creates NewConsistentHashLoadBalance
//
Expand Down
3 changes: 1 addition & 2 deletions cluster/loadbalance/least_active.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func init() {
extension.SetLoadbalance(LeastActive, NewLeastActiveLoadBalance)
}

type leastActiveLoadBalance struct {
}
type leastActiveLoadBalance struct{}

// NewLeastActiveLoadBalance returns a least active load balance.
//
Expand Down
Loading

0 comments on commit 6ff62af

Please sign in to comment.