Skip to content

Commit

Permalink
Remove dispatcher provider (cadence-workflow#4559)
Browse files Browse the repository at this point in the history
* Remove dispatcher provider

* Make newRPCFactory a method for integration tests
  • Loading branch information
vytautas-karpavicius authored Oct 12, 2021
1 parent b2037bf commit b21e5e0
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 158 deletions.
2 changes: 0 additions & 2 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ func (s *server) startService() common.Daemon {
clusterGroupMetadata.ClusterGroup,
)

params.DispatcherProvider = rpc.NewDispatcherProvider(params.Logger, rpc.NewDNSPeerChooserFactory(s.cfg.PublicClient.RefreshInterval, params.Logger))

advancedVisMode := dc.GetStringProperty(
dynamicconfig.AdvancedVisibilityWritingMode,
common.GetDefaultAdvancedVisibilityWritingMode(params.PersistenceConfig.IsAdvancedVisibilityConfigExist()),
Expand Down
2 changes: 0 additions & 2 deletions common/resource/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/rpc"
)

type (
Expand All @@ -62,7 +61,6 @@ type (
ESClient es.GenericClient
ESConfig *config.ElasticSearchConfig
DynamicConfig dynamicconfig.Client
DispatcherProvider rpc.DispatcherProvider
DCRedirectionPolicy config.DCRedirectionPolicy
PublicClient workflowserviceclient.Interface
ArchivalMetadata archiver.ArchivalMetadata
Expand Down
120 changes: 0 additions & 120 deletions common/rpc/dispatcher_provider.go

This file was deleted.

2 changes: 2 additions & 0 deletions common/rpc/outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
const (
// OutboundPublicClient is the name of configured public client outbound
OutboundPublicClient = "public-client"

crossDCCaller = "cadence-xdc-client"
)

// OutboundsBuilder allows defining outbounds for the dispatcher
Expand Down
46 changes: 17 additions & 29 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ type (
logger log.Logger
clusterMetadata cluster.Metadata
persistenceConfig config.Persistence
dispatcherProvider rpc.DispatcherProvider
messagingClient messaging.Client
domainManager persistence.DomainManager
historyV2Mgr persistence.HistoryManager
Expand Down Expand Up @@ -129,7 +128,6 @@ type (
CadenceParams struct {
ClusterMetadata cluster.Metadata
PersistenceConfig config.Persistence
DispatcherProvider rpc.DispatcherProvider
MessagingClient messaging.Client
DomainManager persistence.DomainManager
HistoryV2Mgr persistence.HistoryManager
Expand Down Expand Up @@ -161,7 +159,6 @@ func NewCadence(params *CadenceParams) Cadence {
logger: params.Logger,
clusterMetadata: params.ClusterMetadata,
persistenceConfig: params.PersistenceConfig,
dispatcherProvider: params.DispatcherProvider,
messagingClient: params.MessagingClient,
domainManager: params.DomainManager,
historyV2Mgr: params.HistoryV2Mgr,
Expand Down Expand Up @@ -398,11 +395,10 @@ func (c *cadenceImpl) startFrontend(hosts map[string][]string, startWG *sync.Wai
params.ThrottledLogger = c.logger
params.UpdateLoggerWithServiceName(service.Frontend)
params.PProfInitializer = newPProfInitializerImpl(c.logger, c.FrontendPProfPort())
params.RPCFactory = newRPCFactory(service.Frontend, c.FrontendAddress(), c.logger, c.clusterMetadata)
params.RPCFactory = c.newRPCFactory(service.Frontend, c.FrontendAddress())
params.MetricScope = tally.NewTestScope(service.Frontend, make(map[string]string))
params.MembershipFactory = newMembershipFactory(params.Name, hosts)
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
params.MessagingClient = c.messagingClient
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger))
params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient())
Expand Down Expand Up @@ -465,26 +461,22 @@ func (c *cadenceImpl) startHistory(
params.ThrottledLogger = c.logger
params.UpdateLoggerWithServiceName(service.History)
params.PProfInitializer = newPProfInitializerImpl(c.logger, pprofPorts[i])
params.RPCFactory = newRPCFactory(service.History, hostport, c.logger, c.clusterMetadata)
params.RPCFactory = c.newRPCFactory(service.History, hostport)
params.MetricScope = tally.NewTestScope(service.History, make(map[string]string))
params.MembershipFactory = newMembershipFactory(params.Name, hosts)
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
params.MessagingClient = c.messagingClient
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger))
integrationClient := newIntegrationConfigClient(dynamicconfig.NewNopClient())
c.overrideHistoryDynamicConfig(integrationClient)
params.DynamicConfig = integrationClient
dispatcher, err := params.DispatcherProvider.GetTChannel(service.Frontend, c.FrontendAddress(), nil)
if err != nil {
c.logger.Fatal("Failed to get dispatcher for history", tag.Error(err))
}
params.PublicClient = cwsc.New(dispatcher.ClientConfig(service.Frontend))
params.PublicClient = cwsc.New(params.RPCFactory.GetDispatcher().ClientConfig(rpc.OutboundPublicClient))
params.ArchivalMetadata = c.archiverMetadata
params.ArchiverProvider = c.archiverProvider
params.ESConfig = c.esConfig
params.ESClient = c.esClient

var err error
params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig)
if err != nil {
c.logger.Fatal("Failed to copy persistence config for history", tag.Error(err))
Expand Down Expand Up @@ -535,11 +527,10 @@ func (c *cadenceImpl) startMatching(hosts map[string][]string, startWG *sync.Wai
params.ThrottledLogger = c.logger
params.UpdateLoggerWithServiceName(service.Matching)
params.PProfInitializer = newPProfInitializerImpl(c.logger, c.MatchingPProfPort())
params.RPCFactory = newRPCFactory(service.Matching, c.MatchingServiceAddress(), c.logger, c.clusterMetadata)
params.RPCFactory = c.newRPCFactory(service.Matching, c.MatchingServiceAddress())
params.MetricScope = tally.NewTestScope(service.Matching, make(map[string]string))
params.MembershipFactory = newMembershipFactory(params.Name, hosts)
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger))
params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient())
params.ArchivalMetadata = c.archiverMetadata
Expand Down Expand Up @@ -578,11 +569,10 @@ func (c *cadenceImpl) startWorker(hosts map[string][]string, startWG *sync.WaitG
params.ThrottledLogger = c.logger
params.UpdateLoggerWithServiceName(service.Worker)
params.PProfInitializer = newPProfInitializerImpl(c.logger, c.WorkerPProfPort())
params.RPCFactory = newRPCFactory(service.Worker, c.WorkerServiceAddress(), c.logger, c.clusterMetadata)
params.RPCFactory = c.newRPCFactory(service.Worker, c.WorkerServiceAddress())
params.MetricScope = tally.NewTestScope(service.Worker, make(map[string]string))
params.MembershipFactory = newMembershipFactory(params.Name, hosts)
params.ClusterMetadata = c.clusterMetadata
params.DispatcherProvider = c.dispatcherProvider
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger))
params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient())
params.ArchivalMetadata = c.archiverMetadata
Expand All @@ -593,11 +583,7 @@ func (c *cadenceImpl) startWorker(hosts map[string][]string, startWG *sync.WaitG
if err != nil {
c.logger.Fatal("Failed to copy persistence config for worker", tag.Error(err))
}
dispatcher, err := params.DispatcherProvider.GetTChannel(service.Frontend, c.FrontendAddress(), nil)
if err != nil {
c.logger.Fatal("Failed to get dispatcher for worker", tag.Error(err))
}
params.PublicClient = cwsc.New(dispatcher.ClientConfig(service.Frontend))
params.PublicClient = cwsc.New(params.RPCFactory.GetDispatcher().ClientConfig(rpc.OutboundPublicClient))
service := NewService(params)
service.Start()

Expand Down Expand Up @@ -791,14 +777,14 @@ func newPProfInitializerImpl(logger log.Logger, port int) common.PProfInitialize
}
}

func newRPCFactory(serviceName string, tchannelHostPort string, logger log.Logger, cluster cluster.Metadata) common.RPCFactory {
func (c *cadenceImpl) newRPCFactory(serviceName string, tchannelHostPort string) common.RPCFactory {
grpcPortResolver := grpcPortResolver{}
grpcHostPort, err := grpcPortResolver.GetGRPCAddress("", tchannelHostPort)
if err != nil {
logger.Fatal("Failed to obtain GRPC address", tag.Error(err))
c.logger.Fatal("Failed to obtain GRPC address", tag.Error(err))
}

return rpc.NewFactory(logger, rpc.Params{
return rpc.NewFactory(c.logger, rpc.Params{
ServiceName: serviceName,
TChannelAddress: tchannelHostPort,
GRPCAddress: grpcHostPort,
Expand All @@ -808,19 +794,21 @@ func newRPCFactory(serviceName string, tchannelHostPort string, logger log.Logge
},
// For integration tests to generate client out of the same outbound.
OutboundsBuilder: rpc.CombineOutbounds(
&singleTChannelOutbound{serviceName, tchannelHostPort},
rpc.NewCrossDCOutbounds(cluster.GetAllClusterInfo(), rpc.NewDNSPeerChooserFactory(0, logger))),
&singleTChannelOutbound{serviceName, serviceName, tchannelHostPort},
&singleTChannelOutbound{rpc.OutboundPublicClient, service.Frontend, c.FrontendAddress()},
rpc.NewCrossDCOutbounds(c.clusterMetadata.GetAllClusterInfo(), rpc.NewDNSPeerChooserFactory(0, c.logger))),
})
}

type singleTChannelOutbound struct {
serviceName string
address string
outboundName string
serviceName string
address string
}

func (b singleTChannelOutbound) Build(_ *grpc.Transport, tchannel *tchannel.Transport) (yarpc.Outbounds, error) {
return yarpc.Outbounds{
b.serviceName: {
b.outboundName: {
ServiceName: b.serviceName,
Unary: tchannel.NewSingleOutbound(b.address),
},
Expand Down
3 changes: 0 additions & 3 deletions host/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/common/rpc"
)

type (
Expand Down Expand Up @@ -95,7 +94,6 @@ type (
messagingClient messaging.Client
blobstoreClient blobstore.Client
dynamicCollection *dynamicconfig.Collection
dispatcherProvider rpc.DispatcherProvider
archivalMetadata archiver.ArchivalMetadata
archiverProvider provider.ArchiverProvider
serializer persistence.PayloadSerializer
Expand All @@ -122,7 +120,6 @@ func NewService(params *resource.Params) Service {
metricsClient: params.MetricsClient,
messagingClient: params.MessagingClient,
blobstoreClient: params.BlobstoreClient,
dispatcherProvider: params.DispatcherProvider,
dynamicCollection: dynamicconfig.NewCollection(
params.DynamicConfig,
params.Logger,
Expand Down
2 changes: 0 additions & 2 deletions host/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql"
"github.com/uber/cadence/common/persistence/persistence-tests/testcluster"
"github.com/uber/cadence/common/rpc"

// the import is a test dependency
_ "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql/public"
Expand Down Expand Up @@ -139,7 +138,6 @@ func NewCluster(options *TestClusterConfig, logger log.Logger, params persistenc
cadenceParams := &CadenceParams{
ClusterMetadata: params.ClusterMetadata,
PersistenceConfig: pConfig,
DispatcherProvider: rpc.NewDispatcherProvider(logger, rpc.NewDNSPeerChooserFactory(0, logger)),
MessagingClient: messagingClient,
DomainManager: testBase.DomainManager,
HistoryV2Mgr: testBase.HistoryV2Mgr,
Expand Down

0 comments on commit b21e5e0

Please sign in to comment.