Skip to content

Commit

Permalink
Refactor PeerChooserFactory out of DispatcherProvider (cadence-workfl…
Browse files Browse the repository at this point in the history
…ow#4508)

* Refactor PeerChooserFactory out of DispatcherProvider

* Update cmd/server/cadence/server.go

Co-authored-by: Mantas Šidlauskas <sidlauskas.mantas@gmail.com>
  • Loading branch information
vytautas-karpavicius and mantas-sidlauskas authored Sep 24, 2021
1 parent a53f4c9 commit e45753a
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 30 deletions.
6 changes: 3 additions & 3 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ func (s *server) startService() common.Daemon {
clusterGroupMetadata.ClusterGroup,
)

if s.cfg.PublicClient.HostPort != "" {
params.DispatcherProvider = rpc.NewDNSYarpcDispatcherProvider(params.Logger, s.cfg.PublicClient.RefreshInterval)
} else {
if len(s.cfg.PublicClient.HostPort) == 0 {
log.Fatalf("need to provide an endpoint config for PublicClient")
}

params.DispatcherProvider = rpc.NewDispatcherProvider(params.Logger, rpc.NewDNSPeerChooserFactory(s.cfg.PublicClient.RefreshInterval, params.Logger))

advancedVisMode := dc.GetStringProperty(
dynamicconfig.AdvancedVisibilityWritingMode,
common.GetDefaultAdvancedVisibilityWritingMode(params.PersistenceConfig.IsAdvancedVisibilityConfigExist()),
Expand Down
42 changes: 16 additions & 26 deletions common/rpc/dispatcher_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package rpc

import (
"context"
"time"

clientworker "go.uber.org/cadence/worker"

Expand All @@ -32,14 +31,12 @@ import (

"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/peer/roundrobin"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"
)

const (
defaultRefreshInterval = time.Second * 10
crossDCCaller = "cadence-xdc-client"
crossDCCaller = "cadence-xdc-client"
)

type (
Expand All @@ -53,24 +50,21 @@ type (
GetGRPC(name string, address string, options *DispatcherOptions) (*yarpc.Dispatcher, error)
}

dnsDispatcherProvider struct {
interval time.Duration
logger log.Logger
dispatcherProvider struct {
pcf PeerChooserFactory
logger log.Logger
}
)

// NewDNSYarpcDispatcherProvider create a dispatcher provider which handles with IP address
func NewDNSYarpcDispatcherProvider(logger log.Logger, interval time.Duration) DispatcherProvider {
if interval <= 0 {
interval = defaultRefreshInterval
}
return &dnsDispatcherProvider{
interval: interval,
logger: logger,
// NewDispatcherProvider create a dispatcher provider which handles with IP address
func NewDispatcherProvider(logger log.Logger, pcf PeerChooserFactory) DispatcherProvider {
return &dispatcherProvider{
pcf: pcf,
logger: logger,
}
}

func (p *dnsDispatcherProvider) GetTChannel(serviceName string, address string, options *DispatcherOptions) (*yarpc.Dispatcher, error) {
func (p *dispatcherProvider) GetTChannel(serviceName string, address string, options *DispatcherOptions) (*yarpc.Dispatcher, error) {
tchanTransport, err := tchannel.NewTransport(
tchannel.ServiceName(serviceName),
// this aim to get rid of the annoying popup about accepting incoming network connections
Expand All @@ -80,34 +74,30 @@ func (p *dnsDispatcherProvider) GetTChannel(serviceName string, address string,
return nil, err
}

peerList := roundrobin.New(tchanTransport)
peerListUpdater, err := newDNSUpdater(peerList, address, p.interval, p.logger)
peerChooser, err := p.pcf.CreatePeerChooser(tchanTransport, address)
if err != nil {
return nil, err
}
peerListUpdater.Start()
outbound := tchanTransport.NewOutbound(peerList)
outbound := tchanTransport.NewOutbound(peerChooser)

p.logger.Info("Creating TChannel dispatcher outbound", tag.Address(address))
return p.createOutboundDispatcher(serviceName, outbound, options)
}

func (p *dnsDispatcherProvider) GetGRPC(serviceName string, address string, options *DispatcherOptions) (*yarpc.Dispatcher, error) {
func (p *dispatcherProvider) GetGRPC(serviceName string, address string, options *DispatcherOptions) (*yarpc.Dispatcher, error) {
grpcTransport := grpc.NewTransport()

peerList := roundrobin.New(grpcTransport)
peerListUpdater, err := newDNSUpdater(peerList, address, p.interval, p.logger)
peerChooser, err := p.pcf.CreatePeerChooser(grpcTransport, address)
if err != nil {
return nil, err
}
peerListUpdater.Start()
outbound := grpcTransport.NewOutbound(peerList)
outbound := grpcTransport.NewOutbound(peerChooser)

p.logger.Info("Creating GRPC dispatcher outbound", tag.Address(address))
return p.createOutboundDispatcher(serviceName, outbound, options)
}

func (p *dnsDispatcherProvider) createOutboundDispatcher(serviceName string, outbound transport.UnaryOutbound, options *DispatcherOptions) (*yarpc.Dispatcher, error) {
func (p *dispatcherProvider) createOutboundDispatcher(serviceName string, outbound transport.UnaryOutbound, options *DispatcherOptions) (*yarpc.Dispatcher, error) {
cfg := yarpc.Config{
Name: crossDCCaller,
Outbounds: yarpc.Outbounds{
Expand Down
60 changes: 60 additions & 0 deletions common/rpc/peer_chooser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpc

import (
"time"

"github.com/uber/cadence/common/log"

"go.uber.org/yarpc/api/peer"
"go.uber.org/yarpc/peer/roundrobin"
)

const defaultDNSRefreshInterval = time.Second * 10

type (
PeerChooserFactory interface {
CreatePeerChooser(transport peer.Transport, address string) (peer.Chooser, error)
}
dnsPeerChooserFactory struct {
interval time.Duration
logger log.Logger
}
)

func NewDNSPeerChooserFactory(interval time.Duration, logger log.Logger) *dnsPeerChooserFactory {
if interval <= 0 {
interval = defaultDNSRefreshInterval
}

return &dnsPeerChooserFactory{interval, logger}
}

func (f *dnsPeerChooserFactory) CreatePeerChooser(transport peer.Transport, address string) (peer.Chooser, error) {
peerList := roundrobin.New(transport)
peerListUpdater, err := newDNSUpdater(peerList, address, f.interval, f.logger)
if err != nil {
return nil, err
}
peerListUpdater.Start()
return peerList, nil
}
82 changes: 82 additions & 0 deletions common/rpc/peer_chooser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpc

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/yarpc/api/peer"
"go.uber.org/yarpc/api/transport"

"github.com/uber/cadence/common/log"
)

func TestDNSPeerChooserFactory(t *testing.T) {
logger := log.NewNoop()
ctx := context.Background()
interval := 100 * time.Millisecond

// Ensure default interval is set
factory := NewDNSPeerChooserFactory(0, logger)
assert.Equal(t, defaultDNSRefreshInterval, factory.interval)

factory = NewDNSPeerChooserFactory(interval, logger)
peerTransport := &fakePeerTransport{}

// Ensure invalid address returns error
_, err := factory.CreatePeerChooser(peerTransport, "invalid address")
assert.EqualError(t, err, "incorrect DNS:Port format")

chooser, err := factory.CreatePeerChooser(peerTransport, "localhost:1234")
require.NoError(t, err)

require.NoError(t, chooser.Start())
require.True(t, chooser.IsRunning())

// Wait for refresh
time.Sleep(interval)

peer, _, err := chooser.Choose(ctx, &transport.Request{})
require.NoError(t, err)
require.NotNil(t, peer)
assert.Equal(t, "fakePeer", peer.Identifier())
}

type (
fakePeerTransport struct{}
fakePeer struct{}
)

func (t *fakePeerTransport) RetainPeer(peer.Identifier, peer.Subscriber) (peer.Peer, error) {
return &fakePeer{}, nil
}
func (t *fakePeerTransport) ReleasePeer(peer.Identifier, peer.Subscriber) error {
return nil
}

func (p *fakePeer) Identifier() string { return "fakePeer" }
func (p *fakePeer) Status() peer.Status { return peer.Status{ConnectionStatus: peer.Available} }
func (p *fakePeer) StartRequest() {}
func (p *fakePeer) EndRequest() {}
2 changes: 1 addition & 1 deletion host/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func NewCluster(options *TestClusterConfig, logger log.Logger, params persistenc
cadenceParams := &CadenceParams{
ClusterMetadata: params.ClusterMetadata,
PersistenceConfig: pConfig,
DispatcherProvider: rpc.NewDNSYarpcDispatcherProvider(logger, 0),
DispatcherProvider: rpc.NewDispatcherProvider(logger, rpc.NewDNSPeerChooserFactory(0, logger)),
MessagingClient: messagingClient,
DomainManager: testBase.DomainManager,
HistoryV2Mgr: testBase.HistoryV2Mgr,
Expand Down

0 comments on commit e45753a

Please sign in to comment.