Skip to content

Commit

Permalink
Revert "Use common dispatcher for public client outbound (cadence-wor…
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Oct 4, 2021
1 parent a094a33 commit d19cae1
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 154 deletions.
25 changes: 22 additions & 3 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/authorization"
"github.com/uber/cadence/common/blobstore/filestore"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
Expand Down Expand Up @@ -150,10 +151,9 @@ func (s *server) startService() common.Daemon {
log.Fatalf("error creating rpc factory params: %v", err)
}
params.RPCFactory = rpc.NewFactory(params.Logger, rpcParams)
dispatcher := params.RPCFactory.GetDispatcher()

params.MembershipFactory, err = s.cfg.Ringpop.NewFactory(
dispatcher,
params.RPCFactory.GetDispatcher(),
params.Name,
params.Logger,
)
Expand All @@ -175,6 +175,10 @@ func (s *server) startService() common.Daemon {
clusterGroupMetadata.ClusterGroup,
)

if len(s.cfg.PublicClient.HostPort) == 0 {
log.Fatalf("need to provide an endpoint config for PublicClient")
}

params.DispatcherProvider = rpc.NewDispatcherProvider(params.Logger, rpc.NewDNSPeerChooserFactory(s.cfg.PublicClient.RefreshInterval, params.Logger))

advancedVisMode := dc.GetStringProperty(
Expand Down Expand Up @@ -211,7 +215,22 @@ func (s *server) startService() common.Daemon {
}
}

params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(rpc.OutboundPublicClient))
var options *rpc.DispatcherOptions
if s.cfg.Authorization.OAuthAuthorizer.Enable {
clusterName := s.cfg.ClusterGroupMetadata.CurrentClusterName
authProvider, err := authorization.GetAuthProviderClient(s.cfg.ClusterGroupMetadata.ClusterGroup[clusterName].AuthorizationProvider.PrivateKey)
if err != nil {
log.Fatalf("failed to create AuthProvider: %v", err.Error())
}
options = &rpc.DispatcherOptions{
AuthProvider: authProvider,
}
}
dispatcher, err := params.DispatcherProvider.GetTChannel(service.Frontend, s.cfg.PublicClient.HostPort, options)
if err != nil {
log.Fatalf("failed to construct dispatcher: %v", err)
}
params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(service.Frontend))

params.ArchivalMetadata = archiver.NewArchivalMetadata(
dc,
Expand Down
45 changes: 0 additions & 45 deletions common/rpc/outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,12 @@
package rpc

import (
"fmt"

"github.com/uber/cadence/common/authorization"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/service"

"go.uber.org/yarpc"
"go.uber.org/yarpc/api/middleware"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"
)

const (
// OutboundPublicClient is the name of configured public client outbound
OutboundPublicClient = "public-client"
)

// OutboundsBuilder allows defining outbounds for the dispatcher
type OutboundsBuilder interface {
Build(*grpc.Transport, *tchannel.Transport) (yarpc.Outbounds, error)
}

func newPublicClientOutbound(config *config.Config) (publicClientOutbound, error) {
if len(config.PublicClient.HostPort) == 0 {
return publicClientOutbound{}, fmt.Errorf("need to provide an endpoint config for PublicClient")
}

var authMiddleware *authOutboundMiddleware
if config.Authorization.OAuthAuthorizer.Enable {
clusterName := config.ClusterGroupMetadata.CurrentClusterName
clusterInfo := config.ClusterGroupMetadata.ClusterGroup[clusterName]
authProvider, err := authorization.GetAuthProviderClient(clusterInfo.AuthorizationProvider.PrivateKey)
if err != nil {
return publicClientOutbound{}, fmt.Errorf("create AuthProvider: %v", err)
}
authMiddleware = &authOutboundMiddleware{authProvider}
}

return publicClientOutbound{config.PublicClient.HostPort, authMiddleware}, nil
}

type publicClientOutbound struct {
address string
authMiddleware *authOutboundMiddleware
}

func (b publicClientOutbound) Build(_ *grpc.Transport, tchannel *tchannel.Transport) (yarpc.Outbounds, error) {
return yarpc.Outbounds{
OutboundPublicClient: {
ServiceName: service.Frontend,
Unary: middleware.ApplyUnaryOutbound(tchannel.NewSingleOutbound(b.address), b.authMiddleware),
},
}, nil
}
90 changes: 0 additions & 90 deletions common/rpc/outbounds_test.go

This file was deleted.

9 changes: 1 addition & 8 deletions common/rpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,14 @@ func NewParams(serviceName string, config *config.Config) (Params, error) {

listenIP, err := getListenIP(serviceConfig.RPC)
if err != nil {
return Params{}, fmt.Errorf("get listen IP: %v", err)
return Params{}, fmt.Errorf("failed to get listen IP: %v", err)
}

publicClientOutbound, err := newPublicClientOutbound(config)
if err != nil {
return Params{}, fmt.Errorf("public client outbound: %v", err)
}

return Params{
ServiceName: serviceName,
TChannelAddress: fmt.Sprintf("%v:%v", listenIP, serviceConfig.RPC.Port),
GRPCAddress: fmt.Sprintf("%v:%v", listenIP, serviceConfig.RPC.GRPCPort),
GRPCMaxMsgSize: serviceConfig.RPC.GRPCMaxMsgSize,
HostAddressMapper: NewGRPCPorts(config),
OutboundsBuilder: publicClientOutbound,
}, nil
}

Expand Down
11 changes: 3 additions & 8 deletions common/rpc/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,17 @@ import (
func TestNewParams(t *testing.T) {
serviceName := service.Frontend
makeConfig := func(svc config.Service) *config.Config {
return &config.Config{
PublicClient: config.PublicClient{HostPort: "localhost:9999"},
Services: map[string]config.Service{"frontend": svc}}
return &config.Config{Services: map[string]config.Service{"frontend": svc}}
}

_, err := NewParams(serviceName, &config.Config{})
assert.EqualError(t, err, "no config section for service: frontend")

_, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, BindOnIP: "1.2.3.4"}}))
assert.EqualError(t, err, "get listen IP: bindOnLocalHost and bindOnIP are mutually exclusive")
assert.EqualError(t, err, "failed to get listen IP: bindOnLocalHost and bindOnIP are mutually exclusive")

_, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "invalidIP"}}))
assert.EqualError(t, err, "get listen IP: unable to parse bindOnIP value or it is not an IPv4 address: invalidIP")

_, err = NewParams(serviceName, &config.Config{Services: map[string]config.Service{"frontend": {}}})
assert.EqualError(t, err, "public client outbound: need to provide an endpoint config for PublicClient")
assert.EqualError(t, err, "failed to get listen IP: unable to parse bindOnIP value or it is not an IPv4 address: invalidIP")

params, err := NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, Port: 1111, GRPCPort: 2222, GRPCMaxMsgSize: 3333}}))
assert.NoError(t, err)
Expand Down

0 comments on commit d19cae1

Please sign in to comment.