Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/3.0' into feature/support_custom…
Browse files Browse the repository at this point in the history
…_service_group_name_on_nacos
  • Loading branch information
ChangedenCZD committed Oct 18, 2021
2 parents ec7214c + 0aac960 commit c7ac72f
Show file tree
Hide file tree
Showing 352 changed files with 9,541 additions and 13,867 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!-- Thanks for sending a pull request!
Read https://github.com/apache/dubbo-go/blob/master/contributing.md before commit pull request.
Read https://github.com/apache/dubbo-go/blob/master/CONTRIBUTING.md before commit pull request.
-->

**What this PR does**:
Expand Down
4 changes: 2 additions & 2 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ updates:
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
target-branch: "develop"
target-branch: "3.0"

- package-ecosystem: "github-actions"
# Workflow files stored in the
# default location of `.github/workflows`
directory: "/"
schedule:
interval: "weekly"
target-branch: "develop"
target-branch: "3.0"
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ classes
vendor/
logs/
.vscode/
cache
log/

# unit test
remoting/zookeeper/zookeeper-4unittest/
config_center/zookeeper/zookeeper-4unittest/
registry/zookeeper/zookeeper-4unittest/
metadata/report/zookeeper/zookeeper-4unittest/
config_center/apollo/mockDubbog.properties.json

# vim stuff
*~
Expand Down
11 changes: 2 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,10 @@ prepareLic:
$(GO_LICENSE_CHECKER) -version || (wget https://github.com/lsm-dev/license-header-checker/releases/download/v1.2.0/$(GO_LICENSE_CHECKER_DIR).zip -O $(GO_LICENSE_CHECKER_DIR).zip && unzip -o $(GO_LICENSE_CHECKER_DIR).zip && mkdir -p $(GO_PATH)/bin/ && cp $(GO_LICENSE_CHECKER_DIR)/64bit/license-header-checker $(GO_PATH)/bin/)
ls /tmp/tools/license/license.txt || wget -P $(LICENSE_DIR) https://github.com/dubbogo/resources/raw/master/tools/license/license.txt

prepareZk:
ls $(ZK_JAR) || (mkdir -p $(ZK_JAR_PATH)&& wget -P $(ZK_JAR_PATH) https://github.com/dubbogo/resources/raw/master/zookeeper-4unitest/contrib/fatjar/${ZK_JAR_NAME})
@for i in $(ZK_TEST_LIST); do \
mkdir -p $$i$(ZK_FATJAR_BASE);\
cp ${ZK_JAR} $$i$(ZK_FATJAR_BASE);\
done

prepare: prepareZk prepareLic
prepare: prepareLic

.PHONE: test
test: clean prepareZk
test: clean
$(GO_TEST) ./... -coverprofile=coverage.txt -covermode=atomic

deps: prepare
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ If you are using [apache/dubbo-go](https://github.com/apache/dubbo-go) and think
<img width="222px" src="https://gw.alicdn.com/tfs/TB1HPATMrrpK1RjSZTEXXcWAVXa-260-74.png">
</a>
</td>
<td align="center" valign="middle">
<a href="https://www.autohome.com.cn" target="_blank">
<img width="222px" src="https://avatars.githubusercontent.com/u/18279051?s=200&v=4">
</a>
</td>
</tr>
<tr></tr>
</tbody>
Expand Down
5 changes: 5 additions & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ go get dubbo.apache.org/dubbo-go/v3
<img width="222px" src="https://gw.alicdn.com/tfs/TB1HPATMrrpK1RjSZTEXXcWAVXa-260-74.png">
</a>
</td>
<td align="center" valign="middle">
<a href="https://www.autohome.com.cn" target="_blank">
<img width="222px" src="https://avatars.githubusercontent.com/u/18279051?s=200&v=4">
</a>
</td>
</tr>
<tr></tr>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,30 @@
* limitations under the License.
*/

package cluster_impl
package available

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type availableCluster struct{}

const available = "available"

func init() {
extension.SetCluster(available, NewAvailableCluster)
extension.SetCluster(constant.ClusterKeyAvailable, NewAvailableCluster)
}

type cluster struct{}

// NewAvailableCluster returns a cluster instance
//
// Obtain available service providers
func NewAvailableCluster() cluster.Cluster {
return &availableCluster{}
func NewAvailableCluster() clusterpkg.Cluster {
return &cluster{}
}

// Join returns a baseClusterInvoker instance
func (cluster *availableCluster) Join(directory cluster.Directory) protocol.Invoker {
return buildInterceptorChain(NewAvailableClusterInvoker(directory))
func (cluster *cluster) Join(directory directory.Directory) protocol.Invoker {
return clusterpkg.BuildInterceptorChain(NewClusterInvoker(directory))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cluster_impl
package available

import (
"context"
Expand All @@ -27,29 +27,30 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/cluster/base"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type availableClusterInvoker struct {
baseClusterInvoker
type clusterInvoker struct {
base.ClusterInvoker
}

// NewAvailableClusterInvoker returns a cluster invoker instance
func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &availableClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
// NewClusterInvoker returns a cluster invoker instance
func NewClusterInvoker(directory directory.Directory) protocol.Invoker {
return &clusterInvoker{
ClusterInvoker: base.NewClusterInvoker(directory),
}
}

func (invoker *availableClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.Directory.List(invocation)
err := invoker.CheckInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
}

err = invoker.checkWhetherDestroyed()
err = invoker.CheckWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cluster_impl
package available

import (
"context"
Expand All @@ -26,12 +26,14 @@ import (

import (
"github.com/golang/mock/gomock"

"github.com/stretchr/testify/assert"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory/static"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
Expand All @@ -44,14 +46,14 @@ var availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))

func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
extension.SetLoadbalance("random", random.NewLoadBalance)
availableCluster := NewAvailableCluster()

invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)
invoker.EXPECT().GetUrl().Return(availableUrl)

staticDir := directory.NewStaticDirectory(invokers)
staticDir := static.NewDirectory(invokers)
clusterInvoker := availableCluster.Join(staticDir)
return clusterInvoker
}
Expand All @@ -63,7 +65,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(invoker)

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,77 +15,79 @@
* limitations under the License.
*/

package cluster_impl
package base

import (
perrors "github.com/pkg/errors"

"go.uber.org/atomic"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type baseClusterInvoker struct {
directory cluster.Directory
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
type ClusterInvoker struct {
Directory directory.Directory
AvailableCheck bool
Destroyed *atomic.Bool
StickyInvoker protocol.Invoker
}

func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
return baseClusterInvoker{
directory: directory,
availablecheck: true,
destroyed: atomic.NewBool(false),
func NewClusterInvoker(directory directory.Directory) ClusterInvoker {
return ClusterInvoker{
Directory: directory,
AvailableCheck: true,
Destroyed: atomic.NewBool(false),
}
}

func (invoker *baseClusterInvoker) GetURL() *common.URL {
return invoker.directory.GetURL()
func (invoker *ClusterInvoker) GetURL() *common.URL {
return invoker.Directory.GetURL()
}

func (invoker *baseClusterInvoker) Destroy() {
func (invoker *ClusterInvoker) Destroy() {
// this is must atom operation
if invoker.destroyed.CAS(false, true) {
invoker.directory.Destroy()
if invoker.Destroyed.CAS(false, true) {
invoker.Directory.Destroy()
}
}

func (invoker *baseClusterInvoker) IsAvailable() bool {
if invoker.stickyInvoker != nil {
return invoker.stickyInvoker.IsAvailable()
func (invoker *ClusterInvoker) IsAvailable() bool {
if invoker.StickyInvoker != nil {
return invoker.StickyInvoker.IsAvailable()
}
return invoker.directory.IsAvailable()
return invoker.Directory.IsAvailable()
}

// check invokers availables
func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
// CheckInvokers checks invokers' status if is available or not
func (invoker *ClusterInvoker) CheckInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
if len(invokers) == 0 {
ip := common.GetLocalIp()
return perrors.Errorf("Failed to invoke the method %v. No provider available for the service %v from "+
"registry %v on the consumer %v using the dubbo version %v .Please check if the providers have been started and registered.",
invocation.MethodName(), invoker.directory.GetURL().SubURL.Key(), invoker.directory.GetURL().String(), ip, constant.Version)
invocation.MethodName(), invoker.Directory.GetURL().SubURL.Key(), invoker.Directory.GetURL().String(), ip, constant.Version)
}
return nil
}

// check cluster invoker is destroyed or not
func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
if invoker.destroyed.Load() {
// CheckWhetherDestroyed checks if cluster invoker was destroyed or not
func (invoker *ClusterInvoker) CheckWhetherDestroyed() error {
if invoker.Destroyed.Load() {
ip := common.GetLocalIp()
return perrors.Errorf("Rpc cluster invoker for %v on consumer %v use dubbo version %v is now destroyed! can not invoke any more. ",
invoker.directory.GetURL().Service(), ip, constant.Version)
invoker.Directory.GetURL().Service(), ip, constant.Version)
}
return nil
}

func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
func (invoker *ClusterInvoker) DoSelect(lb loadbalance.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
var selectedInvoker protocol.Invoker
if len(invokers) <= 0 {
return selectedInvoker
Expand All @@ -96,24 +98,24 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
// Get the service method sticky config if have
sticky = url.GetMethodParamBool(invocation.MethodName(), constant.STICKY_KEY, sticky)

if invoker.stickyInvoker != nil && !isInvoked(invoker.stickyInvoker, invokers) {
invoker.stickyInvoker = nil
if invoker.StickyInvoker != nil && !isInvoked(invoker.StickyInvoker, invokers) {
invoker.StickyInvoker = nil
}

if sticky && invoker.availablecheck &&
invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() &&
(invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
return invoker.stickyInvoker
if sticky && invoker.AvailableCheck &&
invoker.StickyInvoker != nil && invoker.StickyInvoker.IsAvailable() &&
(invoked == nil || !isInvoked(invoker.StickyInvoker, invoked)) {
return invoker.StickyInvoker
}

selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)
if sticky {
invoker.stickyInvoker = selectedInvoker
invoker.StickyInvoker = selectedInvoker
}
return selectedInvoker
}

func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
func (invoker *ClusterInvoker) doSelectInvoker(lb loadbalance.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
if len(invokers) == 0 {
return nil
}
Expand All @@ -130,7 +132,7 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
selectedInvoker := lb.Select(invokers, invocation)

// judge if the selected Invoker is invoked and available
if (!selectedInvoker.IsAvailable() && invoker.availablecheck) || isInvoked(selectedInvoker, invoked) {
if (!selectedInvoker.IsAvailable() && invoker.AvailableCheck) || isInvoked(selectedInvoker, invoked) {
protocol.SetInvokerUnhealthyStatus(selectedInvoker)
otherInvokers := getOtherInvokers(invokers, selectedInvoker)
// do reselect
Expand Down Expand Up @@ -169,7 +171,7 @@ func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) boo
return false
}

func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance {
func GetLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) loadbalance.LoadBalance {
url := invoker.GetURL()

methodName := invocation.MethodName()
Expand Down
Loading

0 comments on commit c7ac72f

Please sign in to comment.