Skip to content

Commit

Permalink
Use direct outbound for history client (cadence-workflow#4619)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Nov 8, 2021
1 parent 2fa2787 commit 04cd354
Show file tree
Hide file tree
Showing 11 changed files with 459 additions and 261 deletions.
42 changes: 16 additions & 26 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (

const (
frontendCaller = "cadence-frontend-client"
historyCaller = "history-service-client"
matchingCaller = "matching-service-client"
)

Expand Down Expand Up @@ -134,17 +133,23 @@ func (cf *rpcClientFactory) createKeyResolver(serviceName string) (func(key stri
}

func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) {
keyResolver, err := cf.createKeyResolver(service.History)
if err != nil {
return nil, err
var rawClient history.Client
var addressMapper history.AddressMapperFn
outboundConfig := cf.rpcFactory.GetDispatcher().ClientConfig(service.History)
if isGRPCOutbound(outboundConfig) {
rawClient = history.NewGRPCClient(historyv1.NewHistoryAPIYARPCClient(outboundConfig))
addressMapper = func(address string) (string, error) {
return cf.rpcFactory.ReplaceGRPCPort(service.History, address)
}
} else {
rawClient = history.NewThriftClient(historyserviceclient.New(outboundConfig))
}

clientProvider := func(clientKey string) (interface{}, error) {
if cf.enableGRPCOutbound {
return cf.newHistoryGRPCClient(clientKey)
}
return cf.newHistoryThriftClient(clientKey)
resolver, err := cf.monitor.GetResolver(service.History)
if err != nil {
return nil, err
}
peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, resolver, addressMapper)

supportedMessageSize := cf.rpcFactory.GetMaxMessageSize()
maxSizeConfig := cf.dynConfig.GetIntProperty(dynamicconfig.GRPCMaxSizeInByte, supportedMessageSize)
Expand All @@ -159,7 +164,8 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (
cf.numberOfHistoryShards,
maxSizeConfig,
timeout,
common.NewClientCache(keyResolver, clientProvider),
rawClient,
peerResolver,
cf.logger,
)
if errorRate := cf.dynConfig.GetFloat64Property(dynamicconfig.HistoryErrorInjectionRate, 0)(); errorRate != 0 {
Expand Down Expand Up @@ -278,14 +284,6 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndConfig(
return client, nil
}

func (cf *rpcClientFactory) newHistoryThriftClient(hostAddress string) (history.Client, error) {
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(historyCaller, service.History, hostAddress)
if err != nil {
return nil, err
}
return history.NewThriftClient(historyserviceclient.New(dispatcher.ClientConfig(service.History))), nil
}

func (cf *rpcClientFactory) newMatchingThriftClient(hostAddress string) (matching.Client, error) {
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(matchingCaller, service.Matching, hostAddress)
if err != nil {
Expand All @@ -302,14 +300,6 @@ func (cf *rpcClientFactory) newFrontendThriftClient(hostAddress string) (fronten
return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(service.Frontend))), nil
}

func (cf *rpcClientFactory) newHistoryGRPCClient(hostAddress string) (history.Client, error) {
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(historyCaller, service.History, hostAddress)
if err != nil {
return nil, err
}
return history.NewGRPCClient(historyv1.NewHistoryAPIYARPCClient(dispatcher.ClientConfig(service.History))), nil
}

func (cf *rpcClientFactory) newMatchingGRPCClient(hostAddress string) (matching.Client, error) {
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(matchingCaller, service.Matching, hostAddress)
if err != nil {
Expand Down
Loading

0 comments on commit 04cd354

Please sign in to comment.