Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Use common dispatcher for public client outbound (#4523)" #4534

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/authorization"
"github.com/uber/cadence/common/blobstore/filestore"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
Expand Down Expand Up @@ -150,10 +151,9 @@ func (s *server) startService() common.Daemon {
log.Fatalf("error creating rpc factory params: %v", err)
}
params.RPCFactory = rpc.NewFactory(params.Logger, rpcParams)
dispatcher := params.RPCFactory.GetDispatcher()

params.MembershipFactory, err = s.cfg.Ringpop.NewFactory(
dispatcher,
params.RPCFactory.GetDispatcher(),
params.Name,
params.Logger,
)
Expand All @@ -175,6 +175,10 @@ func (s *server) startService() common.Daemon {
clusterGroupMetadata.ClusterGroup,
)

if len(s.cfg.PublicClient.HostPort) == 0 {
log.Fatalf("need to provide an endpoint config for PublicClient")
}

params.DispatcherProvider = rpc.NewDispatcherProvider(params.Logger, rpc.NewDNSPeerChooserFactory(s.cfg.PublicClient.RefreshInterval, params.Logger))

advancedVisMode := dc.GetStringProperty(
Expand Down Expand Up @@ -211,7 +215,22 @@ func (s *server) startService() common.Daemon {
}
}

params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(rpc.OutboundPublicClient))
var options *rpc.DispatcherOptions
if s.cfg.Authorization.OAuthAuthorizer.Enable {
clusterName := s.cfg.ClusterGroupMetadata.CurrentClusterName
authProvider, err := authorization.GetAuthProviderClient(s.cfg.ClusterGroupMetadata.ClusterGroup[clusterName].AuthorizationProvider.PrivateKey)
if err != nil {
log.Fatalf("failed to create AuthProvider: %v", err.Error())
}
options = &rpc.DispatcherOptions{
AuthProvider: authProvider,
}
}
dispatcher, err := params.DispatcherProvider.GetTChannel(service.Frontend, s.cfg.PublicClient.HostPort, options)
if err != nil {
log.Fatalf("failed to construct dispatcher: %v", err)
}
params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(service.Frontend))

params.ArchivalMetadata = archiver.NewArchivalMetadata(
dc,
Expand Down
45 changes: 0 additions & 45 deletions common/rpc/outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,12 @@
package rpc

import (
"fmt"

"github.com/uber/cadence/common/authorization"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/service"

"go.uber.org/yarpc"
"go.uber.org/yarpc/api/middleware"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"
)

const (
// OutboundPublicClient is the name of configured public client outbound
OutboundPublicClient = "public-client"
)

// OutboundsBuilder allows defining outbounds for the dispatcher
type OutboundsBuilder interface {
Build(*grpc.Transport, *tchannel.Transport) (yarpc.Outbounds, error)
}

func newPublicClientOutbound(config *config.Config) (publicClientOutbound, error) {
if len(config.PublicClient.HostPort) == 0 {
return publicClientOutbound{}, fmt.Errorf("need to provide an endpoint config for PublicClient")
}

var authMiddleware *authOutboundMiddleware
if config.Authorization.OAuthAuthorizer.Enable {
clusterName := config.ClusterGroupMetadata.CurrentClusterName
clusterInfo := config.ClusterGroupMetadata.ClusterGroup[clusterName]
authProvider, err := authorization.GetAuthProviderClient(clusterInfo.AuthorizationProvider.PrivateKey)
if err != nil {
return publicClientOutbound{}, fmt.Errorf("create AuthProvider: %v", err)
}
authMiddleware = &authOutboundMiddleware{authProvider}
}

return publicClientOutbound{config.PublicClient.HostPort, authMiddleware}, nil
}

type publicClientOutbound struct {
address string
authMiddleware *authOutboundMiddleware
}

func (b publicClientOutbound) Build(_ *grpc.Transport, tchannel *tchannel.Transport) (yarpc.Outbounds, error) {
return yarpc.Outbounds{
OutboundPublicClient: {
ServiceName: service.Frontend,
Unary: middleware.ApplyUnaryOutbound(tchannel.NewSingleOutbound(b.address), b.authMiddleware),
},
}, nil
}
90 changes: 0 additions & 90 deletions common/rpc/outbounds_test.go

This file was deleted.

9 changes: 1 addition & 8 deletions common/rpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,14 @@ func NewParams(serviceName string, config *config.Config) (Params, error) {

listenIP, err := getListenIP(serviceConfig.RPC)
if err != nil {
return Params{}, fmt.Errorf("get listen IP: %v", err)
return Params{}, fmt.Errorf("failed to get listen IP: %v", err)
}

publicClientOutbound, err := newPublicClientOutbound(config)
if err != nil {
return Params{}, fmt.Errorf("public client outbound: %v", err)
}

return Params{
ServiceName: serviceName,
TChannelAddress: fmt.Sprintf("%v:%v", listenIP, serviceConfig.RPC.Port),
GRPCAddress: fmt.Sprintf("%v:%v", listenIP, serviceConfig.RPC.GRPCPort),
GRPCMaxMsgSize: serviceConfig.RPC.GRPCMaxMsgSize,
HostAddressMapper: NewGRPCPorts(config),
OutboundsBuilder: publicClientOutbound,
}, nil
}

Expand Down
11 changes: 3 additions & 8 deletions common/rpc/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,17 @@ import (
func TestNewParams(t *testing.T) {
serviceName := service.Frontend
makeConfig := func(svc config.Service) *config.Config {
return &config.Config{
PublicClient: config.PublicClient{HostPort: "localhost:9999"},
Services: map[string]config.Service{"frontend": svc}}
return &config.Config{Services: map[string]config.Service{"frontend": svc}}
}

_, err := NewParams(serviceName, &config.Config{})
assert.EqualError(t, err, "no config section for service: frontend")

_, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, BindOnIP: "1.2.3.4"}}))
assert.EqualError(t, err, "get listen IP: bindOnLocalHost and bindOnIP are mutually exclusive")
assert.EqualError(t, err, "failed to get listen IP: bindOnLocalHost and bindOnIP are mutually exclusive")

_, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "invalidIP"}}))
assert.EqualError(t, err, "get listen IP: unable to parse bindOnIP value or it is not an IPv4 address: invalidIP")

_, err = NewParams(serviceName, &config.Config{Services: map[string]config.Service{"frontend": {}}})
assert.EqualError(t, err, "public client outbound: need to provide an endpoint config for PublicClient")
assert.EqualError(t, err, "failed to get listen IP: unable to parse bindOnIP value or it is not an IPv4 address: invalidIP")

params, err := NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, Port: 1111, GRPCPort: 2222, GRPCMaxMsgSize: 3333}}))
assert.NoError(t, err)
Expand Down