diff --git a/client/clientfactory.go b/client/clientfactory.go index e61dc9d8812..e2192381676 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -51,7 +51,6 @@ import ( const ( frontendCaller = "cadence-frontend-client" - historyCaller = "history-service-client" matchingCaller = "matching-service-client" ) @@ -134,17 +133,23 @@ func (cf *rpcClientFactory) createKeyResolver(serviceName string) (func(key stri } func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) { - keyResolver, err := cf.createKeyResolver(service.History) - if err != nil { - return nil, err + var rawClient history.Client + var addressMapper history.AddressMapperFn + outboundConfig := cf.rpcFactory.GetDispatcher().ClientConfig(service.History) + if isGRPCOutbound(outboundConfig) { + rawClient = history.NewGRPCClient(historyv1.NewHistoryAPIYARPCClient(outboundConfig)) + addressMapper = func(address string) (string, error) { + return cf.rpcFactory.ReplaceGRPCPort(service.History, address) + } + } else { + rawClient = history.NewThriftClient(historyserviceclient.New(outboundConfig)) } - clientProvider := func(clientKey string) (interface{}, error) { - if cf.enableGRPCOutbound { - return cf.newHistoryGRPCClient(clientKey) - } - return cf.newHistoryThriftClient(clientKey) + resolver, err := cf.monitor.GetResolver(service.History) + if err != nil { + return nil, err } + peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, resolver, addressMapper) supportedMessageSize := cf.rpcFactory.GetMaxMessageSize() maxSizeConfig := cf.dynConfig.GetIntProperty(dynamicconfig.GRPCMaxSizeInByte, supportedMessageSize) @@ -159,7 +164,8 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) ( cf.numberOfHistoryShards, maxSizeConfig, timeout, - common.NewClientCache(keyResolver, clientProvider), + rawClient, + peerResolver, cf.logger, ) if errorRate := cf.dynConfig.GetFloat64Property(dynamicconfig.HistoryErrorInjectionRate, 0)(); errorRate != 0 { @@ -278,14 +284,6 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndConfig( return client, nil } -func (cf *rpcClientFactory) newHistoryThriftClient(hostAddress string) (history.Client, error) { - dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(historyCaller, service.History, hostAddress) - if err != nil { - return nil, err - } - return history.NewThriftClient(historyserviceclient.New(dispatcher.ClientConfig(service.History))), nil -} - func (cf *rpcClientFactory) newMatchingThriftClient(hostAddress string) (matching.Client, error) { dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(matchingCaller, service.Matching, hostAddress) if err != nil { @@ -302,14 +300,6 @@ func (cf *rpcClientFactory) newFrontendThriftClient(hostAddress string) (fronten return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(service.Frontend))), nil } -func (cf *rpcClientFactory) newHistoryGRPCClient(hostAddress string) (history.Client, error) { - dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(historyCaller, service.History, hostAddress) - if err != nil { - return nil, err - } - return history.NewGRPCClient(historyv1.NewHistoryAPIYARPCClient(dispatcher.ClientConfig(service.History))), nil -} - func (cf *rpcClientFactory) newMatchingGRPCClient(hostAddress string) (matching.Client, error) { dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(matchingCaller, service.Matching, hostAddress) if err != nil { diff --git a/client/history/client.go b/client/history/client.go index 3eb326555f8..d2b5c197dfe 100644 --- a/client/history/client.go +++ b/client/history/client.go @@ -49,7 +49,8 @@ type ( rpcMaxSizeInBytes dynamicconfig.IntPropertyFn // This value currently only used in GetReplicationMessage API tokenSerializer common.TaskTokenSerializer timeout time.Duration - clients common.ClientCache + client Client + peerResolver PeerResolver logger log.Logger } @@ -64,7 +65,8 @@ func NewClient( numberOfShards int, rpcMaxSizeInBytes dynamicconfig.IntPropertyFn, timeout time.Duration, - clients common.ClientCache, + client Client, + peerResolver PeerResolver, logger log.Logger, ) Client { return &clientImpl{ @@ -72,7 +74,8 @@ func NewClient( rpcMaxSizeInBytes: rpcMaxSizeInBytes, tokenSerializer: common.NewJSONTaskTokenSerializer(), timeout: timeout, - clients: clients, + client: client, + peerResolver: peerResolver, logger: logger, } } @@ -82,20 +85,20 @@ func (c *clientImpl) StartWorkflowExecution( request *types.HistoryStartWorkflowExecutionRequest, opts ...yarpc.CallOption, ) (*types.StartWorkflowExecutionResponse, error) { - client, err := c.getClientForWorkflowID(request.StartRequest.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.StartRequest.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.StartWorkflowExecutionResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.StartWorkflowExecution(ctx, request, opts...) + response, err = c.client.StartWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -107,20 +110,20 @@ func (c *clientImpl) GetMutableState( request *types.GetMutableStateRequest, opts ...yarpc.CallOption, ) (*types.GetMutableStateResponse, error) { - client, err := c.getClientForWorkflowID(request.Execution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.GetMutableStateResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.GetMutableState(ctx, request, opts...) + response, err = c.client.GetMutableState(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -132,20 +135,20 @@ func (c *clientImpl) PollMutableState( request *types.PollMutableStateRequest, opts ...yarpc.CallOption, ) (*types.PollMutableStateResponse, error) { - client, err := c.getClientForWorkflowID(request.Execution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.PollMutableStateResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.PollMutableState(ctx, request, opts...) + response, err = c.client.PollMutableState(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -159,18 +162,14 @@ func (c *clientImpl) DescribeHistoryHost( ) (*types.DescribeHistoryHostResponse, error) { var err error - var client Client + var peer string if request.ShardIDForHost != nil { - client, err = c.getClientForShardID(int(request.GetShardIDForHost())) + peer, err = c.peerResolver.FromShardID(int(request.GetShardIDForHost())) } else if request.ExecutionForHost != nil { - client, err = c.getClientForWorkflowID(request.ExecutionForHost.GetWorkflowID()) + peer, err = c.peerResolver.FromWorkflowID(request.ExecutionForHost.GetWorkflowID()) } else { - ret, err := c.clients.GetClientForClientKey(request.GetHostAddress()) - if err != nil { - return nil, err - } - client = ret.(Client) + peer, err = c.peerResolver.FromHostAddress(request.GetHostAddress()) } if err != nil { return nil, err @@ -178,14 +177,14 @@ func (c *clientImpl) DescribeHistoryHost( opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.DescribeHistoryHostResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.DescribeHistoryHost(ctx, request, opts...) + response, err = c.client.DescribeHistoryHost(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -197,20 +196,20 @@ func (c *clientImpl) RemoveTask( request *types.RemoveTaskRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForShardID(int(request.GetShardID())) + peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - err = client.RemoveTask(ctx, request, opts...) + err = c.client.RemoveTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -219,20 +218,20 @@ func (c *clientImpl) CloseShard( request *types.CloseShardRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForShardID(int(request.GetShardID())) + peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - err = client.CloseShard(ctx, request, opts...) + err = c.client.CloseShard(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return err } @@ -244,20 +243,20 @@ func (c *clientImpl) ResetQueue( request *types.ResetQueueRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForShardID(int(request.GetShardID())) + peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - err = client.ResetQueue(ctx, request, opts...) + err = c.client.ResetQueue(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return err } @@ -269,21 +268,21 @@ func (c *clientImpl) DescribeQueue( request *types.DescribeQueueRequest, opts ...yarpc.CallOption, ) (*types.DescribeQueueResponse, error) { - client, err := c.getClientForShardID(int(request.GetShardID())) + peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.DescribeQueueResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.DescribeQueue(ctx, request, opts...) + response, err = c.client.DescribeQueue(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -295,20 +294,20 @@ func (c *clientImpl) DescribeMutableState( request *types.DescribeMutableStateRequest, opts ...yarpc.CallOption, ) (*types.DescribeMutableStateResponse, error) { - client, err := c.getClientForWorkflowID(request.Execution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.DescribeMutableStateResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.DescribeMutableState(ctx, request, opts...) + response, err = c.client.DescribeMutableState(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -320,20 +319,20 @@ func (c *clientImpl) ResetStickyTaskList( request *types.HistoryResetStickyTaskListRequest, opts ...yarpc.CallOption, ) (*types.HistoryResetStickyTaskListResponse, error) { - client, err := c.getClientForWorkflowID(request.Execution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.HistoryResetStickyTaskListResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.ResetStickyTaskList(ctx, request, opts...) + response, err = c.client.ResetStickyTaskList(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -345,20 +344,20 @@ func (c *clientImpl) DescribeWorkflowExecution( request *types.HistoryDescribeWorkflowExecutionRequest, opts ...yarpc.CallOption, ) (*types.DescribeWorkflowExecutionResponse, error) { - client, err := c.getClientForWorkflowID(request.Request.Execution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.Request.Execution.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.DescribeWorkflowExecutionResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.DescribeWorkflowExecution(ctx, request, opts...) + response, err = c.client.DescribeWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -370,20 +369,20 @@ func (c *clientImpl) RecordDecisionTaskStarted( request *types.RecordDecisionTaskStartedRequest, opts ...yarpc.CallOption, ) (*types.RecordDecisionTaskStartedResponse, error) { - client, err := c.getClientForWorkflowID(request.WorkflowExecution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.RecordDecisionTaskStartedResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.RecordDecisionTaskStarted(ctx, request, opts...) + response, err = c.client.RecordDecisionTaskStarted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -395,20 +394,20 @@ func (c *clientImpl) RecordActivityTaskStarted( request *types.RecordActivityTaskStartedRequest, opts ...yarpc.CallOption, ) (*types.RecordActivityTaskStartedResponse, error) { - client, err := c.getClientForWorkflowID(request.WorkflowExecution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.RecordActivityTaskStartedResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.RecordActivityTaskStarted(ctx, request, opts...) + response, err = c.client.RecordActivityTaskStarted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -424,19 +423,19 @@ func (c *clientImpl) RespondDecisionTaskCompleted( if err != nil { return nil, err } - client, err := c.getClientForWorkflowID(taskToken.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.HistoryRespondDecisionTaskCompletedResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.RespondDecisionTaskCompleted(ctx, request, opts...) + response, err = c.client.RespondDecisionTaskCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return response, err } @@ -449,17 +448,17 @@ func (c *clientImpl) RespondDecisionTaskFailed( if err != nil { return err } - client, err := c.getClientForWorkflowID(taskToken.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.RespondDecisionTaskFailed(ctx, request, opts...) + return c.client.RespondDecisionTaskFailed(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -472,17 +471,17 @@ func (c *clientImpl) RespondActivityTaskCompleted( if err != nil { return err } - client, err := c.getClientForWorkflowID(taskToken.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.RespondActivityTaskCompleted(ctx, request, opts...) + return c.client.RespondActivityTaskCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -495,17 +494,17 @@ func (c *clientImpl) RespondActivityTaskFailed( if err != nil { return err } - client, err := c.getClientForWorkflowID(taskToken.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.RespondActivityTaskFailed(ctx, request, opts...) + return c.client.RespondActivityTaskFailed(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -518,17 +517,17 @@ func (c *clientImpl) RespondActivityTaskCanceled( if err != nil { return err } - client, err := c.getClientForWorkflowID(taskToken.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.RespondActivityTaskCanceled(ctx, request, opts...) + return c.client.RespondActivityTaskCanceled(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -541,20 +540,20 @@ func (c *clientImpl) RecordActivityTaskHeartbeat( if err != nil { return nil, err } - client, err := c.getClientForWorkflowID(taskToken.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.RecordActivityTaskHeartbeatResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.RecordActivityTaskHeartbeat(ctx, request, opts...) + response, err = c.client.RecordActivityTaskHeartbeat(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -566,17 +565,17 @@ func (c *clientImpl) RequestCancelWorkflowExecution( request *types.HistoryRequestCancelWorkflowExecutionRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForWorkflowID(request.CancelRequest.WorkflowExecution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.CancelRequest.WorkflowExecution.WorkflowID) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.RequestCancelWorkflowExecution(ctx, request, opts...) + return c.client.RequestCancelWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - return c.executeWithRedirect(ctx, client, op) + return c.executeWithRedirect(ctx, peer, op) } func (c *clientImpl) SignalWorkflowExecution( @@ -584,17 +583,17 @@ func (c *clientImpl) SignalWorkflowExecution( request *types.HistorySignalWorkflowExecutionRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForWorkflowID(request.SignalRequest.WorkflowExecution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.SignalRequest.WorkflowExecution.WorkflowID) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.SignalWorkflowExecution(ctx, request, opts...) + return c.client.SignalWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -604,20 +603,20 @@ func (c *clientImpl) SignalWithStartWorkflowExecution( request *types.HistorySignalWithStartWorkflowExecutionRequest, opts ...yarpc.CallOption, ) (*types.StartWorkflowExecutionResponse, error) { - client, err := c.getClientForWorkflowID(request.SignalWithStartRequest.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.SignalWithStartRequest.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.StartWorkflowExecutionResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.SignalWithStartWorkflowExecution(ctx, request, opts...) + response, err = c.client.SignalWithStartWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -630,16 +629,16 @@ func (c *clientImpl) RemoveSignalMutableState( request *types.RemoveSignalMutableStateRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForWorkflowID(request.WorkflowExecution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID) if err != nil { return err } - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.RemoveSignalMutableState(ctx, request) + return c.client.RemoveSignalMutableState(ctx, request, yarpc.WithShardKey(peer)) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -649,17 +648,17 @@ func (c *clientImpl) TerminateWorkflowExecution( request *types.HistoryTerminateWorkflowExecutionRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForWorkflowID(request.TerminateRequest.WorkflowExecution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.TerminateRequest.WorkflowExecution.WorkflowID) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.TerminateWorkflowExecution(ctx, request, opts...) + return c.client.TerminateWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -668,19 +667,19 @@ func (c *clientImpl) ResetWorkflowExecution( request *types.HistoryResetWorkflowExecutionRequest, opts ...yarpc.CallOption, ) (*types.ResetWorkflowExecutionResponse, error) { - client, err := c.getClientForWorkflowID(request.ResetRequest.WorkflowExecution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.ResetRequest.WorkflowExecution.WorkflowID) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.ResetWorkflowExecutionResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.ResetWorkflowExecution(ctx, request, opts...) + response, err = c.client.ResetWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -692,17 +691,17 @@ func (c *clientImpl) ScheduleDecisionTask( request *types.ScheduleDecisionTaskRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForWorkflowID(request.WorkflowExecution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.ScheduleDecisionTask(ctx, request, opts...) + return c.client.ScheduleDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -711,17 +710,17 @@ func (c *clientImpl) RecordChildExecutionCompleted( request *types.RecordChildExecutionCompletedRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForWorkflowID(request.WorkflowExecution.WorkflowID) + peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.RecordChildExecutionCompleted(ctx, request, opts...) + return c.client.RecordChildExecutionCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -730,17 +729,17 @@ func (c *clientImpl) ReplicateEventsV2( request *types.ReplicateEventsV2Request, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForWorkflowID(request.WorkflowExecution.GetWorkflowID()) + peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.GetWorkflowID()) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.ReplicateEventsV2(ctx, request, opts...) + return c.client.ReplicateEventsV2(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -751,18 +750,18 @@ func (c *clientImpl) SyncShardStatus( ) error { // we do not have a workflow ID here, instead, we have something even better - client, err := c.getClientForShardID(int(request.GetShardID())) + peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.SyncShardStatus(ctx, request, opts...) + return c.client.SyncShardStatus(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -772,17 +771,17 @@ func (c *clientImpl) SyncActivity( opts ...yarpc.CallOption, ) error { - client, err := c.getClientForWorkflowID(request.GetWorkflowID()) + peer, err := c.peerResolver.FromWorkflowID(request.GetWorkflowID()) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.SyncActivity(ctx, request, opts...) + return c.client.SyncActivity(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -791,20 +790,20 @@ func (c *clientImpl) QueryWorkflow( request *types.HistoryQueryWorkflowRequest, opts ...yarpc.CallOption, ) (*types.HistoryQueryWorkflowResponse, error) { - client, err := c.getClientForWorkflowID(request.GetRequest().GetExecution().GetWorkflowID()) + peer, err := c.peerResolver.FromWorkflowID(request.GetRequest().GetExecution().GetWorkflowID()) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.HistoryQueryWorkflowResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.QueryWorkflow(ctx, request, opts...) + response, err = c.client.QueryWorkflow(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -816,36 +815,36 @@ func (c *clientImpl) GetReplicationMessages( request *types.GetReplicationMessagesRequest, opts ...yarpc.CallOption, ) (*types.GetReplicationMessagesResponse, error) { - requestsByClient := make(map[Client]*types.GetReplicationMessagesRequest) + requestsByPeer := make(map[string]*types.GetReplicationMessagesRequest) for _, token := range request.Tokens { - client, err := c.getClientForShardID(int(token.GetShardID())) + peer, err := c.peerResolver.FromShardID(int(token.GetShardID())) if err != nil { return nil, err } - if _, ok := requestsByClient[client]; !ok { - requestsByClient[client] = &types.GetReplicationMessagesRequest{ + if _, ok := requestsByPeer[peer]; !ok { + requestsByPeer[peer] = &types.GetReplicationMessagesRequest{ ClusterName: request.ClusterName, } } - req := requestsByClient[client] + req := requestsByPeer[peer] req.Tokens = append(req.Tokens, token) } var wg sync.WaitGroup - wg.Add(len(requestsByClient)) - respChan := make(chan *getReplicationMessagesWithSize, len(requestsByClient)) + wg.Add(len(requestsByPeer)) + respChan := make(chan *getReplicationMessagesWithSize, len(requestsByPeer)) errChan := make(chan error, 1) - for client, req := range requestsByClient { - go func(ctx context.Context, client Client, request *types.GetReplicationMessagesRequest) { + for peer, req := range requestsByPeer { + go func(ctx context.Context, peer string, request *types.GetReplicationMessagesRequest) { defer wg.Done() requestContext, cancel := common.CreateChildContext(ctx, 0.05) defer cancel() requestContext, responseInfo := rpc.ContextWithResponseInfo(requestContext) - resp, err := client.GetReplicationMessages(requestContext, request, opts...) + resp, err := c.client.GetReplicationMessages(requestContext, request, append(opts, yarpc.WithShardKey(peer))...) if err != nil { c.logger.Warn("Failed to get replication tasks from client", tag.Error(err)) // Returns service busy error to notify replication @@ -861,7 +860,7 @@ func (c *clientImpl) GetReplicationMessages( response: resp, size: responseInfo.Size, } - }(ctx, client, req) + }(ctx, peer, req) } wg.Wait() @@ -897,15 +896,15 @@ func (c *clientImpl) GetDLQReplicationMessages( ) (*types.GetDLQReplicationMessagesResponse, error) { // All workflow IDs are in the same shard per request workflowID := request.GetTaskInfos()[0].GetWorkflowID() - client, err := c.getClientForWorkflowID(workflowID) + peer, err := c.peerResolver.FromWorkflowID(workflowID) if err != nil { return nil, err } - return client.GetDLQReplicationMessages( + return c.client.GetDLQReplicationMessages( ctx, request, - opts..., + append(opts, yarpc.WithShardKey(peer))..., ) } @@ -914,17 +913,17 @@ func (c *clientImpl) ReapplyEvents( request *types.HistoryReapplyEventsRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForWorkflowID(request.GetRequest().GetWorkflowExecution().GetWorkflowID()) + peer, err := c.peerResolver.FromWorkflowID(request.GetRequest().GetWorkflowExecution().GetWorkflowID()) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.ReapplyEvents(ctx, request, opts...) + return c.client.ReapplyEvents(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -934,12 +933,12 @@ func (c *clientImpl) ReadDLQMessages( opts ...yarpc.CallOption, ) (*types.ReadDLQMessagesResponse, error) { - client, err := c.getClientForShardID(int(request.GetShardID())) + peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) - return client.ReadDLQMessages(ctx, request, opts...) + return c.client.ReadDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } func (c *clientImpl) PurgeDLQMessages( @@ -948,12 +947,12 @@ func (c *clientImpl) PurgeDLQMessages( opts ...yarpc.CallOption, ) error { - client, err := c.getClientForShardID(int(request.GetShardID())) + peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return err } opts = common.AggregateYarpcOptions(ctx, opts...) - return client.PurgeDLQMessages(ctx, request, opts...) + return c.client.PurgeDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } func (c *clientImpl) MergeDLQMessages( @@ -962,12 +961,12 @@ func (c *clientImpl) MergeDLQMessages( opts ...yarpc.CallOption, ) (*types.MergeDLQMessagesResponse, error) { - client, err := c.getClientForShardID(int(request.GetShardID())) + peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) - return client.MergeDLQMessages(ctx, request, opts...) + return c.client.MergeDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } func (c *clientImpl) RefreshWorkflowTasks( @@ -975,16 +974,16 @@ func (c *clientImpl) RefreshWorkflowTasks( request *types.HistoryRefreshWorkflowTasksRequest, opts ...yarpc.CallOption, ) error { - client, err := c.getClientForWorkflowID(request.GetRequest().GetExecution().GetWorkflowID()) + peer, err := c.peerResolver.FromWorkflowID(request.GetRequest().GetExecution().GetWorkflowID()) if err != nil { return err } - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { ctx, cancel := c.createContext(ctx) defer cancel() - return client.RefreshWorkflowTasks(ctx, request, opts...) + return c.client.RefreshWorkflowTasks(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) return err } @@ -993,40 +992,40 @@ func (c *clientImpl) NotifyFailoverMarkers( request *types.NotifyFailoverMarkersRequest, opts ...yarpc.CallOption, ) error { - requestsByClient := make(map[Client]*types.NotifyFailoverMarkersRequest) + requestsByPeer := make(map[string]*types.NotifyFailoverMarkersRequest) for _, token := range request.GetFailoverMarkerTokens() { marker := token.GetFailoverMarker() - client, err := c.getClientForDomainID(marker.GetDomainID()) + peer, err := c.peerResolver.FromDomainID(marker.GetDomainID()) if err != nil { return err } - if _, ok := requestsByClient[client]; !ok { - requestsByClient[client] = &types.NotifyFailoverMarkersRequest{ + if _, ok := requestsByPeer[peer]; !ok { + requestsByPeer[peer] = &types.NotifyFailoverMarkersRequest{ FailoverMarkerTokens: []*types.FailoverMarkerToken{}, } } - req := requestsByClient[client] + req := requestsByPeer[peer] req.FailoverMarkerTokens = append(req.FailoverMarkerTokens, token) } var wg sync.WaitGroup - wg.Add(len(requestsByClient)) - respChan := make(chan error, len(requestsByClient)) - for client, req := range requestsByClient { - go func(client Client, request *types.NotifyFailoverMarkersRequest) { + wg.Add(len(requestsByPeer)) + respChan := make(chan error, len(requestsByPeer)) + for peer, req := range requestsByPeer { + go func(peer string, request *types.NotifyFailoverMarkersRequest) { defer wg.Done() ctx, cancel := c.createContext(ctx) defer cancel() - err := client.NotifyFailoverMarkers( + err := c.client.NotifyFailoverMarkers( ctx, request, - opts..., + append(opts, yarpc.WithShardKey(peer))..., ) respChan <- err - }(client, req) + }(peer, req) } wg.Wait() @@ -1045,44 +1044,44 @@ func (c *clientImpl) GetCrossClusterTasks( request *types.GetCrossClusterTasksRequest, opts ...yarpc.CallOption, ) (*types.GetCrossClusterTasksResponse, error) { - requestByClient := make(map[Client]*types.GetCrossClusterTasksRequest) + requestByPeer := make(map[string]*types.GetCrossClusterTasksRequest) for _, shardID := range request.GetShardIDs() { - client, err := c.getClientForShardID(int(shardID)) + peer, err := c.peerResolver.FromShardID(int(shardID)) if err != nil { return nil, err } - if _, ok := requestByClient[client]; !ok { - requestByClient[client] = &types.GetCrossClusterTasksRequest{ + if _, ok := requestByPeer[peer]; !ok { + requestByPeer[peer] = &types.GetCrossClusterTasksRequest{ TargetCluster: request.TargetCluster, } } - requestByClient[client].ShardIDs = append(requestByClient[client].ShardIDs, shardID) + requestByPeer[peer].ShardIDs = append(requestByPeer[peer].ShardIDs, shardID) } // preserve 5% timeout to return partial of the result if context is timing out ctx, cancel := common.CreateChildContext(ctx, 0.05) defer cancel() - futureByClient := make(map[Client]future.Future, len(requestByClient)) - for client, req := range requestByClient { + futureByPeer := make(map[string]future.Future, len(requestByPeer)) + for peer, req := range requestByPeer { future, settable := future.NewFuture() - go func(ctx context.Context, client Client, req *types.GetCrossClusterTasksRequest) { - settable.Set(client.GetCrossClusterTasks(ctx, req)) - }(ctx, client, req) + go func(ctx context.Context, peer string, req *types.GetCrossClusterTasksRequest) { + settable.Set(c.client.GetCrossClusterTasks(ctx, req, yarpc.WithShardKey(peer))) + }(ctx, peer, req) - futureByClient[client] = future + futureByPeer[peer] = future } response := &types.GetCrossClusterTasksResponse{ TasksByShard: make(map[int32][]*types.CrossClusterTaskRequest), FailedCauseByShard: make(map[int32]types.GetTaskFailedCause), } - for client, future := range futureByClient { + for peer, future := range futureByPeer { var resp *types.GetCrossClusterTasksResponse if futureErr := future.Get(ctx, &resp); futureErr != nil { c.logger.Error("Failed to get cross cluster tasks", tag.Error(futureErr)) - for _, failedShardID := range requestByClient[client].ShardIDs { + for _, failedShardID := range requestByPeer[peer].ShardIDs { response.FailedCauseByShard[failedShardID] = common.ConvertErrToGetTaskFailedCause(futureErr) } } else { @@ -1105,22 +1104,22 @@ func (c *clientImpl) RespondCrossClusterTasksCompleted( request *types.RespondCrossClusterTasksCompletedRequest, opts ...yarpc.CallOption, ) (*types.RespondCrossClusterTasksCompletedResponse, error) { - client, err := c.getClientForShardID(int(request.GetShardID())) + peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) var response *types.RespondCrossClusterTasksCompletedResponse - op := func(ctx context.Context, client Client) error { + op := func(ctx context.Context, peer string) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() - response, err = client.RespondCrossClusterTasksCompleted(ctx, request, opts...) + response, err = c.client.RespondCrossClusterTasksCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } - err = c.executeWithRedirect(ctx, client, op) + err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } @@ -1132,12 +1131,12 @@ func (c *clientImpl) GetFailoverInfo( request *types.GetFailoverInfoRequest, opts ...yarpc.CallOption, ) (*types.GetFailoverInfoResponse, error) { - client, err := c.getClientForDomainID(request.GetDomainID()) + peer, err := c.peerResolver.FromDomainID(request.GetDomainID()) if err != nil { return nil, err } opts = common.AggregateYarpcOptions(ctx, opts...) - return client.GetFailoverInfo(ctx, request, opts...) + return c.client.GetFailoverInfo(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) { @@ -1147,28 +1146,10 @@ func (c *clientImpl) createContext(parent context.Context) (context.Context, con return context.WithTimeout(parent, c.timeout) } -func (c *clientImpl) getClientForWorkflowID(workflowID string) (Client, error) { - key := common.WorkflowIDToHistoryShard(workflowID, c.numberOfShards) - return c.getClientForShardID(key) -} - -func (c *clientImpl) getClientForDomainID(domainID string) (Client, error) { - key := common.DomainIDToHistoryShard(domainID, c.numberOfShards) - return c.getClientForShardID(key) -} - -func (c *clientImpl) getClientForShardID(shardID int) (Client, error) { - client, err := c.clients.GetClientForKey(string(rune(shardID))) - if err != nil { - return nil, err - } - return client.(Client), nil -} - func (c *clientImpl) executeWithRedirect( ctx context.Context, - client Client, - op func(ctx context.Context, client Client) error, + peer string, + op func(ctx context.Context, peer string) error, ) error { var err error if ctx == nil { @@ -1180,15 +1161,14 @@ redirectLoop: if err != nil { break redirectLoop } - err = op(ctx, client) + err = op(ctx, peer) if err != nil { if s, ok := err.(*types.ShardOwnershipLostError); ok { // TODO: consider emitting a metric for number of redirects - ret, err := c.clients.GetClientForClientKey(s.GetOwner()) + peer, err = c.peerResolver.FromHostAddress(s.GetOwner()) if err != nil { return err } - client = ret.(Client) continue redirectLoop } } diff --git a/client/history/peerResolver.go b/client/history/peerResolver.go new file mode 100644 index 00000000000..c3e04474bd1 --- /dev/null +++ b/client/history/peerResolver.go @@ -0,0 +1,84 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/membership" +) + +// PeerResolver is used to resolve history peers. +// Those are deployed instances of Cadence history services that participate in the cluster ring. +// The resulting peer is simply an address of form ip:port where RPC calls can be routed to. +type PeerResolver struct { + numberOfShards int + membership membership.ServiceResolver + addressMapper AddressMapperFn +} + +type AddressMapperFn func(string) (string, error) + +// NewPeerResolver creates a new history peer resolver. +func NewPeerResolver(numberOfShards int, membership membership.ServiceResolver, addressMapper AddressMapperFn) PeerResolver { + return PeerResolver{ + numberOfShards: numberOfShards, + membership: membership, + addressMapper: addressMapper, + } +} + +// FromWorkflowID resolves the history peer responsible for a given workflowID. +// WorkflowID is converted to logical shardID using a consistent hash function. +// FromShardID is used for further resolving. +func (pr PeerResolver) FromWorkflowID(workflowID string) (string, error) { + shardID := common.WorkflowIDToHistoryShard(workflowID, pr.numberOfShards) + return pr.FromShardID(shardID) +} + +// FromDomainID resolves the history peer responsible for a given domainID. +// DomainID is converted to logical shardID using a consistent hash function. +// FromShardID is used for further resolving. +func (pr PeerResolver) FromDomainID(domainID string) (string, error) { + shardID := common.DomainIDToHistoryShard(domainID, pr.numberOfShards) + return pr.FromShardID(shardID) +} + +// FromShardID resolves the history peer responsible for a given logical shardID. +// It uses our membership provider to lookup which instance currently owns the given shard. +// FromHostAddress is used for further resolving. +func (pr PeerResolver) FromShardID(shardID int) (string, error) { + shardIDString := string(rune(shardID)) + host, err := pr.membership.Lookup(shardIDString) + if err != nil { + return "", err + } + return pr.FromHostAddress(host.GetAddress()) +} + +// FromHostAddress resolves the final history peer responsible for the given host address. +// The address may be used as is, or processed with additional address mapper. +// In case of gRPC transport, the port within the address is replaced with gRPC port. +func (pr PeerResolver) FromHostAddress(hostAddress string) (string, error) { + if pr.addressMapper == nil { + return hostAddress, nil + } + return pr.addressMapper(hostAddress) +} diff --git a/client/history/peerResolver_test.go b/client/history/peerResolver_test.go new file mode 100644 index 00000000000..94488c70d66 --- /dev/null +++ b/client/history/peerResolver_test.go @@ -0,0 +1,78 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "testing" + + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/membership" +) + +func TestPeerResolver(t *testing.T) { + numShards := 123 + controller := gomock.NewController(t) + serviceResolver := membership.NewMockServiceResolver(controller) + serviceResolver.EXPECT().Lookup(string(rune(common.DomainIDToHistoryShard("domainID", numShards)))).Return(membership.NewHostInfo("domainHost:thriftPort", nil), nil) + serviceResolver.EXPECT().Lookup(string(rune(common.WorkflowIDToHistoryShard("workflowID", numShards)))).Return(membership.NewHostInfo("workflowHost:thriftPort", nil), nil) + serviceResolver.EXPECT().Lookup(string(rune(99))).Return(membership.NewHostInfo("shardHost:thriftPort", nil), nil) + serviceResolver.EXPECT().Lookup(string(rune(11))).Return(nil, assert.AnError) + + r := NewPeerResolver(numShards, serviceResolver, fakeAddressMapper) + + peer, err := r.FromDomainID("domainID") + assert.NoError(t, err) + assert.Equal(t, "domainHost:grpcPort", peer) + + peer, err = r.FromWorkflowID("workflowID") + assert.NoError(t, err) + assert.Equal(t, "workflowHost:grpcPort", peer) + + peer, err = r.FromShardID(99) + assert.NoError(t, err) + assert.Equal(t, "shardHost:grpcPort", peer) + + _, err = r.FromShardID(11) + assert.Error(t, err) + + _, err = r.FromHostAddress("invalid address") + assert.Error(t, err) + + r = NewPeerResolver(numShards, nil, nil) + peer, err = r.FromHostAddress("no mapper") + assert.NoError(t, err) + assert.Equal(t, "no mapper", peer) +} + +func fakeAddressMapper(address string) (string, error) { + switch address { + case "domainHost:thriftPort": + return "domainHost:grpcPort", nil + case "workflowHost:thriftPort": + return "workflowHost:grpcPort", nil + case "shardHost:thriftPort": + return "shardHost:grpcPort", nil + } + return "", assert.AnError +} diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 094af808521..80e8f60cd3f 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -145,7 +145,7 @@ func (s *server) startService() common.Daemon { params.MetricScope = svcCfg.Metrics.NewScope(params.Logger, params.Name) - rpcParams, err := rpc.NewParams(params.Name, s.cfg) + rpcParams, err := rpc.NewParams(params.Name, s.cfg, dc) if err != nil { log.Fatalf("error creating rpc factory params: %v", err) } diff --git a/common/rpc/outbounds.go b/common/rpc/outbounds.go index 295f7c682c2..4526cb41b0e 100644 --- a/common/rpc/outbounds.go +++ b/common/rpc/outbounds.go @@ -21,6 +21,7 @@ package rpc import ( + "crypto/tls" "fmt" "github.com/uber/cadence/common/authorization" @@ -31,6 +32,7 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/api/middleware" "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/peer/direct" "go.uber.org/yarpc/transport/grpc" "go.uber.org/yarpc/transport/tchannel" ) @@ -167,3 +169,37 @@ func (b crossDCOutbounds) Build(grpcTransport *grpc.Transport, tchannelTransport } return outbounds, nil } + +type directOutbound struct { + serviceName string + grpcEnabled bool + tlsConfig *tls.Config +} + +func NewDirectOutbound(serviceName string, grpcEnabled bool, tlsConfig *tls.Config) OutboundsBuilder { + return directOutbound{serviceName, grpcEnabled, tlsConfig} +} + +func (o directOutbound) Build(grpc *grpc.Transport, tchannel *tchannel.Transport) (yarpc.Outbounds, error) { + var outbound transport.UnaryOutbound + if o.grpcEnabled { + directChooser, err := direct.New(direct.Configuration{}, createDialer(grpc, o.tlsConfig)) + if err != nil { + return nil, err + } + outbound = grpc.NewOutbound(directChooser) + } else { + directChooser, err := direct.New(direct.Configuration{}, tchannel) + if err != nil { + return nil, err + } + outbound = tchannel.NewOutbound(directChooser) + } + + return yarpc.Outbounds{ + o.serviceName: { + ServiceName: o.serviceName, + Unary: middleware.ApplyUnaryOutbound(outbound, &responseInfoMiddleware{}), + }, + }, nil +} diff --git a/common/rpc/outbounds_test.go b/common/rpc/outbounds_test.go index 0fbd51eb097..954681560b6 100644 --- a/common/rpc/outbounds_test.go +++ b/common/rpc/outbounds_test.go @@ -151,6 +151,21 @@ func TestCrossDCOutbounds(t *testing.T) { assert.NotNil(t, outbounds["cluster-B"].Unary) } +func TestDirectOutbound(t *testing.T) { + grpc := &grpc.Transport{} + tchannel := &tchannel.Transport{} + + outbounds, err := NewDirectOutbound("cadence-history", false, nil).Build(grpc, tchannel) + assert.NoError(t, err) + assert.Equal(t, "cadence-history", outbounds["cadence-history"].ServiceName) + assert.NotNil(t, outbounds["cadence-history"].Unary) + + outbounds, err = NewDirectOutbound("cadence-history", true, nil).Build(grpc, tchannel) + assert.NoError(t, err) + assert.Equal(t, "cadence-history", outbounds["cadence-history"].ServiceName) + assert.NotNil(t, outbounds["cadence-history"].Unary) +} + func tempFile(t *testing.T, content string) string { f, err := ioutil.TempFile("", "") require.NoError(t, err) diff --git a/common/rpc/params.go b/common/rpc/params.go index 04375c7753a..7c7453f9d50 100644 --- a/common/rpc/params.go +++ b/common/rpc/params.go @@ -26,6 +26,7 @@ import ( "net" "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/service" "go.uber.org/yarpc" @@ -49,7 +50,7 @@ type Params struct { } // NewParams creates parameters for rpc.Factory from the given config -func NewParams(serviceName string, config *config.Config) (Params, error) { +func NewParams(serviceName string, config *config.Config, dc *dynamicconfig.Collection) (Params, error) { serviceConfig, err := config.GetServiceConfig(serviceName) if err != nil { return Params{}, err @@ -76,6 +77,8 @@ func NewParams(serviceName string, config *config.Config) (Params, error) { } } + enableGRPCOutbound := dc.GetBoolProperty(dynamicconfig.EnableGRPCOutbound, true)() + publicClientOutbound, err := newPublicClientOutbound(config) if err != nil { return Params{}, fmt.Errorf("public client outbound: %v", err) @@ -87,9 +90,12 @@ func NewParams(serviceName string, config *config.Config) (Params, error) { GRPCAddress: fmt.Sprintf("%v:%v", listenIP, serviceConfig.RPC.GRPCPort), GRPCMaxMsgSize: serviceConfig.RPC.GRPCMaxMsgSize, HostAddressMapper: NewGRPCPorts(config), - OutboundsBuilder: publicClientOutbound, - InboundTLS: inboundTLS, - OutboundTLS: outboundTLS, + OutboundsBuilder: CombineOutbounds( + NewDirectOutbound(service.History, enableGRPCOutbound, outboundTLS[service.History]), + publicClientOutbound, + ), + InboundTLS: inboundTLS, + OutboundTLS: outboundTLS, InboundMiddleware: yarpc.InboundMiddleware{ Unary: &inboundMetricsMiddleware{}, }, diff --git a/common/rpc/params_test.go b/common/rpc/params_test.go index c5ac13a1fd3..067a8c77d49 100644 --- a/common/rpc/params_test.go +++ b/common/rpc/params_test.go @@ -27,39 +27,41 @@ import ( "github.com/stretchr/testify/assert" "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/service" ) func TestNewParams(t *testing.T) { serviceName := service.Frontend + dc := dynamicconfig.NewNopCollection() makeConfig := func(svc config.Service) *config.Config { return &config.Config{ PublicClient: config.PublicClient{HostPort: "localhost:9999"}, Services: map[string]config.Service{"frontend": svc}} } - _, err := NewParams(serviceName, &config.Config{}) + _, err := NewParams(serviceName, &config.Config{}, dc) assert.EqualError(t, err, "no config section for service: frontend") - _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, BindOnIP: "1.2.3.4"}})) + _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, BindOnIP: "1.2.3.4"}}), dc) assert.EqualError(t, err, "get listen IP: bindOnLocalHost and bindOnIP are mutually exclusive") - _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "invalidIP"}})) + _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "invalidIP"}}), dc) assert.EqualError(t, err, "get listen IP: unable to parse bindOnIP value or it is not an IPv4 address: invalidIP") - _, err = NewParams(serviceName, &config.Config{Services: map[string]config.Service{"frontend": {}}}) + _, err = NewParams(serviceName, &config.Config{Services: map[string]config.Service{"frontend": {}}}, dc) assert.EqualError(t, err, "public client outbound: need to provide an endpoint config for PublicClient") - _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, TLS: config.TLS{Enabled: true, CertFile: "invalid", KeyFile: "invalid"}}})) + _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, TLS: config.TLS{Enabled: true, CertFile: "invalid", KeyFile: "invalid"}}}), dc) assert.EqualError(t, err, "inbound TLS config: open invalid: no such file or directory") _, err = NewParams(serviceName, &config.Config{Services: map[string]config.Service{ "frontend": {RPC: config.RPC{BindOnLocalHost: true}}, "history": {RPC: config.RPC{TLS: config.TLS{Enabled: true, CaFile: "invalid"}}}, - }}) + }}, dc) assert.EqualError(t, err, "outbound cadence-history TLS config: open invalid: no such file or directory") - params, err := NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, Port: 1111, GRPCPort: 2222, GRPCMaxMsgSize: 3333}})) + params, err := NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, Port: 1111, GRPCPort: 2222, GRPCMaxMsgSize: 3333}}), dc) assert.NoError(t, err) assert.Equal(t, "127.0.0.1:1111", params.TChannelAddress) assert.Equal(t, "127.0.0.1:2222", params.GRPCAddress) @@ -67,11 +69,11 @@ func TestNewParams(t *testing.T) { assert.Nil(t, params.InboundTLS) assert.IsType(t, GRPCPorts{}, params.HostAddressMapper) - params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "1.2.3.4", GRPCPort: 2222}})) + params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "1.2.3.4", GRPCPort: 2222}}), dc) assert.NoError(t, err) assert.Equal(t, "1.2.3.4:2222", params.GRPCAddress) - params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{GRPCPort: 2222, TLS: config.TLS{Enabled: true}}})) + params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{GRPCPort: 2222, TLS: config.TLS{Enabled: true}}}), dc) assert.NoError(t, err) ip, port, err := net.SplitHostPort(params.GRPCAddress) assert.NoError(t, err) diff --git a/host/client.go b/host/client.go index dc443599b66..b73efc901c3 100644 --- a/host/client.go +++ b/host/client.go @@ -49,15 +49,15 @@ type HistoryClient interface { // NewAdminClient creates a client to cadence admin client func NewAdminClient(d *yarpc.Dispatcher) AdminClient { - return admin.NewThriftClient(adminserviceclient.New(d.ClientConfig(service.Frontend))) + return admin.NewThriftClient(adminserviceclient.New(d.ClientConfig(testOutboundName(service.Frontend)))) } // NewFrontendClient creates a client to cadence frontend client func NewFrontendClient(d *yarpc.Dispatcher) FrontendClient { - return frontend.NewThriftClient(workflowserviceclient.New(d.ClientConfig(service.Frontend))) + return frontend.NewThriftClient(workflowserviceclient.New(d.ClientConfig(testOutboundName(service.Frontend)))) } // NewHistoryClient creates a client to cadence history service client func NewHistoryClient(d *yarpc.Dispatcher) HistoryClient { - return history.NewThriftClient(historyserviceclient.New(d.ClientConfig(service.History))) + return history.NewThriftClient(historyserviceclient.New(d.ClientConfig(testOutboundName(service.History)))) } diff --git a/host/onebox.go b/host/onebox.go index e6e1d1157d7..85a988a3a2f 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -794,12 +794,19 @@ func (c *cadenceImpl) newRPCFactory(serviceName string, tchannelHostPort string) }, // For integration tests to generate client out of the same outbound. OutboundsBuilder: rpc.CombineOutbounds( - &singleTChannelOutbound{serviceName, serviceName, tchannelHostPort}, + &singleTChannelOutbound{testOutboundName(serviceName), serviceName, tchannelHostPort}, &singleTChannelOutbound{rpc.OutboundPublicClient, service.Frontend, c.FrontendAddress()}, - rpc.NewCrossDCOutbounds(c.clusterMetadata.GetAllClusterInfo(), rpc.NewDNSPeerChooserFactory(0, c.logger))), + rpc.NewCrossDCOutbounds(c.clusterMetadata.GetAllClusterInfo(), rpc.NewDNSPeerChooserFactory(0, c.logger)), + rpc.NewDirectOutbound(service.History, true, nil), + ), }) } +// testOutbound prefixes outbound with "test-" to not clash with other real Cadence outbounds. +func testOutboundName(name string) string { + return "test-" + name +} + type singleTChannelOutbound struct { outboundName string serviceName string