diff --git a/CHANGELOG.md b/CHANGELOG.md index f0a9345a062..44f7426d51f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 You can find a list of previous releases on the [github releases](https://github.com/uber/cadence/releases) page. ## [Unreleased] +### Added +- Added GRPC support. Cadence server will accept requests on both TChannel and GRPC. With dynamic config flag `system.enableGRPCOutbound` it will also switch to GRPC communication internally between server components. + ### Fixed - Fixed a bug where an error message is always displayed in Cadence UI `persistence max qps reached for list operations` on the workflow list screen (#3958) diff --git a/client/admin/grpcClient.go b/client/admin/grpcClient.go new file mode 100644 index 00000000000..5b08df01a17 --- /dev/null +++ b/client/admin/grpcClient.go @@ -0,0 +1,129 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package admin + +import ( + "context" + + "go.uber.org/yarpc" + + adminv1 "github.com/uber/cadence/.gen/proto/admin/v1" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/common/types/mapper/proto" +) + +type grpcClient struct { + c adminv1.AdminAPIYARPCClient +} + +func NewGRPCClient(c adminv1.AdminAPIYARPCClient) Client { + return grpcClient{c} +} + +func (g grpcClient) AddSearchAttribute(ctx context.Context, request *types.AddSearchAttributeRequest, opts ...yarpc.CallOption) error { + _, err := g.c.AddSearchAttribute(ctx, proto.FromAdminAddSearchAttributeRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) CloseShard(ctx context.Context, request *types.CloseShardRequest, opts ...yarpc.CallOption) error { + _, err := g.c.CloseShard(ctx, proto.FromAdminCloseShardRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) DescribeCluster(ctx context.Context, opts ...yarpc.CallOption) (*types.DescribeClusterResponse, error) { + response, err := g.c.DescribeCluster(ctx, &adminv1.DescribeClusterRequest{}, opts...) + return proto.ToAdminDescribeClusterResponse(response), proto.ToError(err) +} + +func (g grpcClient) DescribeHistoryHost(ctx context.Context, request *types.DescribeHistoryHostRequest, opts ...yarpc.CallOption) (*types.DescribeHistoryHostResponse, error) { + response, err := g.c.DescribeHistoryHost(ctx, proto.FromAdminDescribeHistoryHostRequest(request), opts...) + return proto.ToAdminDescribeHistoryHostResponse(response), proto.ToError(err) +} + +func (g grpcClient) DescribeQueue(ctx context.Context, request *types.DescribeQueueRequest, opts ...yarpc.CallOption) (*types.DescribeQueueResponse, error) { + response, err := g.c.DescribeQueue(ctx, proto.FromAdminDescribeQueueRequest(request), opts...) + return proto.ToAdminDescribeQueueResponse(response), proto.ToError(err) +} + +func (g grpcClient) DescribeWorkflowExecution(ctx context.Context, request *types.AdminDescribeWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.AdminDescribeWorkflowExecutionResponse, error) { + response, err := g.c.DescribeWorkflowExecution(ctx, proto.FromAdminDescribeWorkflowExecutionRequest(request), opts...) + return proto.ToAdminDescribeWorkflowExecutionResponse(response), proto.ToError(err) +} + +func (g grpcClient) GetDLQReplicationMessages(ctx context.Context, request *types.GetDLQReplicationMessagesRequest, opts ...yarpc.CallOption) (*types.GetDLQReplicationMessagesResponse, error) { + response, err := g.c.GetDLQReplicationMessages(ctx, proto.FromAdminGetDLQReplicationMessagesRequest(request), opts...) + return proto.ToAdminGetDLQReplicationMessagesResponse(response), proto.ToError(err) +} + +func (g grpcClient) GetDomainReplicationMessages(ctx context.Context, request *types.GetDomainReplicationMessagesRequest, opts ...yarpc.CallOption) (*types.GetDomainReplicationMessagesResponse, error) { + response, err := g.c.GetDomainReplicationMessages(ctx, proto.FromAdminGetDomainReplicationMessagesRequest(request), opts...) + return proto.ToAdminGetDomainReplicationMessagesResponse(response), proto.ToError(err) +} + +func (g grpcClient) GetReplicationMessages(ctx context.Context, request *types.GetReplicationMessagesRequest, opts ...yarpc.CallOption) (*types.GetReplicationMessagesResponse, error) { + response, err := g.c.GetReplicationMessages(ctx, proto.FromAdminGetReplicationMessagesRequest(request), opts...) + return proto.ToAdminGetReplicationMessagesResponse(response), proto.ToError(err) +} + +func (g grpcClient) GetWorkflowExecutionRawHistoryV2(ctx context.Context, request *types.GetWorkflowExecutionRawHistoryV2Request, opts ...yarpc.CallOption) (*types.GetWorkflowExecutionRawHistoryV2Response, error) { + response, err := g.c.GetWorkflowExecutionRawHistoryV2(ctx, proto.FromAdminGetWorkflowExecutionRawHistoryV2Request(request), opts...) + return proto.ToAdminGetWorkflowExecutionRawHistoryV2Response(response), proto.ToError(err) +} + +func (g grpcClient) MergeDLQMessages(ctx context.Context, request *types.MergeDLQMessagesRequest, opts ...yarpc.CallOption) (*types.MergeDLQMessagesResponse, error) { + response, err := g.c.MergeDLQMessages(ctx, proto.FromAdminMergeDLQMessagesRequest(request), opts...) + return proto.ToAdminMergeDLQMessagesResponse(response), proto.ToError(err) +} + +func (g grpcClient) PurgeDLQMessages(ctx context.Context, request *types.PurgeDLQMessagesRequest, opts ...yarpc.CallOption) error { + _, err := g.c.PurgeDLQMessages(ctx, proto.FromAdminPurgeDLQMessagesRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) ReadDLQMessages(ctx context.Context, request *types.ReadDLQMessagesRequest, opts ...yarpc.CallOption) (*types.ReadDLQMessagesResponse, error) { + response, err := g.c.ReadDLQMessages(ctx, proto.FromAdminReadDLQMessagesRequest(request), opts...) + return proto.ToAdminReadDLQMessagesResponse(response), proto.ToError(err) +} + +func (g grpcClient) ReapplyEvents(ctx context.Context, request *types.ReapplyEventsRequest, opts ...yarpc.CallOption) error { + _, err := g.c.ReapplyEvents(ctx, proto.FromAdminReapplyEventsRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RefreshWorkflowTasks(ctx context.Context, request *types.RefreshWorkflowTasksRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RefreshWorkflowTasks(ctx, proto.FromAdminRefreshWorkflowTasksRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RemoveTask(ctx context.Context, request *types.RemoveTaskRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RemoveTask(ctx, proto.FromAdminRemoveTaskRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) ResendReplicationTasks(ctx context.Context, request *types.ResendReplicationTasksRequest, opts ...yarpc.CallOption) error { + _, err := g.c.ResendReplicationTasks(ctx, proto.FromAdminResendReplicationTasksRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) ResetQueue(ctx context.Context, request *types.ResetQueueRequest, opts ...yarpc.CallOption) error { + _, err := g.c.ResetQueue(ctx, proto.FromAdminResetQueueRequest(request), opts...) + return proto.ToError(err) +} diff --git a/client/clientfactory.go b/client/clientfactory.go index 0cdccf3ae00..dd34cc86b3c 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -29,6 +29,11 @@ import ( "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" "github.com/uber/cadence/.gen/go/history/historyserviceclient" "github.com/uber/cadence/.gen/go/matching/matchingserviceclient" + + apiv1 "github.com/uber/cadence/.gen/proto/api/v1" + historyv1 "github.com/uber/cadence/.gen/proto/history/v1" + matchingv1 "github.com/uber/cadence/.gen/proto/matching/v1" + "github.com/uber/cadence/client/admin" "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/client/history" @@ -76,6 +81,7 @@ type ( dynConfig *dynamicconfig.Collection numberOfHistoryShards int logger log.Logger + enableGRPCOutbound bool } ) @@ -88,6 +94,7 @@ func NewRPCClientFactory( numberOfHistoryShards int, logger log.Logger, ) Factory { + enableGRPCOutbound := dc.GetBoolProperty(dynamicconfig.EnableGRPCOutbound, false)() return &rpcClientFactory{ rpcFactory: rpcFactory, monitor: monitor, @@ -95,6 +102,7 @@ func NewRPCClientFactory( dynConfig: dc, numberOfHistoryShards: numberOfHistoryShards, logger: logger, + enableGRPCOutbound: enableGRPCOutbound, } } @@ -125,8 +133,10 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) ( } clientProvider := func(clientKey string) (interface{}, error) { - dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(historyCaller, common.HistoryServiceName, clientKey) - return history.NewThriftClient(historyserviceclient.New(dispatcher.ClientConfig(common.HistoryServiceName))), nil + if cf.enableGRPCOutbound { + return cf.newHistoryGRPCClient(clientKey) + } + return cf.newHistoryThriftClient(clientKey) } client := history.NewClient( @@ -163,8 +173,10 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout( } clientProvider := func(clientKey string) (interface{}, error) { - dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(matchingCaller, common.MatchingServiceName, clientKey) - return matching.NewThriftClient(matchingserviceclient.New(dispatcher.ClientConfig(common.MatchingServiceName))), nil + if cf.enableGRPCOutbound { + return cf.newMatchingGRPCClient(clientKey) + } + return cf.newMatchingThriftClient(clientKey) } client := matching.NewClient( @@ -202,8 +214,10 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeout( } clientProvider := func(clientKey string) (interface{}, error) { - dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(frontendCaller, common.FrontendServiceName, clientKey) - return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName))), nil + if cf.enableGRPCOutbound { + return cf.newFrontendGRPCClient(clientKey) + } + return cf.newFrontendThriftClient(clientKey) } client := frontend.NewClient(timeout, longPollTimeout, common.NewClientCache(keyResolver, clientProvider)) @@ -263,3 +277,57 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndDispatcher( } return client, nil } + +func (cf *rpcClientFactory) newHistoryThriftClient(hostAddress string) (history.Client, error) { + dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(historyCaller, common.HistoryServiceName, hostAddress) + if err != nil { + return nil, err + } + return history.NewThriftClient(historyserviceclient.New(dispatcher.ClientConfig(common.HistoryServiceName))), nil +} + +func (cf *rpcClientFactory) newMatchingThriftClient(hostAddress string) (matching.Client, error) { + dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(matchingCaller, common.MatchingServiceName, hostAddress) + if err != nil { + return nil, err + } + return matching.NewThriftClient(matchingserviceclient.New(dispatcher.ClientConfig(common.MatchingServiceName))), nil +} + +func (cf *rpcClientFactory) newFrontendThriftClient(hostAddress string) (frontend.Client, error) { + dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(frontendCaller, common.FrontendServiceName, hostAddress) + if err != nil { + return nil, err + } + return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName))), nil +} + +func (cf *rpcClientFactory) newHistoryGRPCClient(hostAddress string) (history.Client, error) { + dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(historyCaller, common.HistoryServiceName, hostAddress) + if err != nil { + return nil, err + } + return history.NewGRPCClient(historyv1.NewHistoryAPIYARPCClient(dispatcher.ClientConfig(common.HistoryServiceName))), nil +} + +func (cf *rpcClientFactory) newMatchingGRPCClient(hostAddress string) (matching.Client, error) { + dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(matchingCaller, common.MatchingServiceName, hostAddress) + if err != nil { + return nil, err + } + return matching.NewGRPCClient(matchingv1.NewMatchingAPIYARPCClient(dispatcher.ClientConfig(common.MatchingServiceName))), nil +} + +func (cf *rpcClientFactory) newFrontendGRPCClient(hostAddress string) (frontend.Client, error) { + dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(frontendCaller, common.FrontendServiceName, hostAddress) + if err != nil { + return nil, err + } + config := dispatcher.ClientConfig(common.FrontendServiceName) + return frontend.NewGRPCClient( + apiv1.NewDomainAPIYARPCClient(config), + apiv1.NewWorkflowAPIYARPCClient(config), + apiv1.NewWorkerAPIYARPCClient(config), + apiv1.NewVisibilityAPIYARPCClient(config), + ), nil +} diff --git a/client/frontend/grpcClient.go b/client/frontend/grpcClient.go new file mode 100644 index 00000000000..260ff909964 --- /dev/null +++ b/client/frontend/grpcClient.go @@ -0,0 +1,237 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "context" + + "go.uber.org/yarpc" + + apiv1 "github.com/uber/cadence/.gen/proto/api/v1" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/common/types/mapper/proto" +) + +type grpcClient struct { + domain apiv1.DomainAPIYARPCClient + workflow apiv1.WorkflowAPIYARPCClient + worker apiv1.WorkerAPIYARPCClient + visibility apiv1.VisibilityAPIYARPCClient +} + +func NewGRPCClient( + domain apiv1.DomainAPIYARPCClient, + workflow apiv1.WorkflowAPIYARPCClient, + worker apiv1.WorkerAPIYARPCClient, + visibility apiv1.VisibilityAPIYARPCClient, +) Client { + return grpcClient{domain, workflow, worker, visibility} +} + +func (g grpcClient) CountWorkflowExecutions(ctx context.Context, request *types.CountWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*types.CountWorkflowExecutionsResponse, error) { + response, err := g.visibility.CountWorkflowExecutions(ctx, proto.FromCountWorkflowExecutionsRequest(request), opts...) + return proto.ToCountWorkflowExecutionsResponse(response), proto.ToError(err) +} + +func (g grpcClient) DeprecateDomain(ctx context.Context, request *types.DeprecateDomainRequest, opts ...yarpc.CallOption) error { + _, err := g.domain.DeprecateDomain(ctx, proto.FromDeprecateDomainRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) DescribeDomain(ctx context.Context, request *types.DescribeDomainRequest, opts ...yarpc.CallOption) (*types.DescribeDomainResponse, error) { + response, err := g.domain.DescribeDomain(ctx, proto.FromDescribeDomainRequest(request), opts...) + return proto.ToDescribeDomainResponse(response), proto.ToError(err) +} + +func (g grpcClient) DescribeTaskList(ctx context.Context, request *types.DescribeTaskListRequest, opts ...yarpc.CallOption) (*types.DescribeTaskListResponse, error) { + response, err := g.workflow.DescribeTaskList(ctx, proto.FromDescribeTaskListRequest(request), opts...) + return proto.ToDescribeTaskListResponse(response), proto.ToError(err) +} + +func (g grpcClient) DescribeWorkflowExecution(ctx context.Context, request *types.DescribeWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.DescribeWorkflowExecutionResponse, error) { + response, err := g.workflow.DescribeWorkflowExecution(ctx, proto.FromDescribeWorkflowExecutionRequest(request), opts...) + return proto.ToDescribeWorkflowExecutionResponse(response), proto.ToError(err) +} + +func (g grpcClient) GetClusterInfo(ctx context.Context, opts ...yarpc.CallOption) (*types.ClusterInfo, error) { + response, err := g.workflow.GetClusterInfo(ctx, &apiv1.GetClusterInfoRequest{}, opts...) + return proto.ToGetClusterInfoResponse(response), proto.ToError(err) +} + +func (g grpcClient) GetSearchAttributes(ctx context.Context, opts ...yarpc.CallOption) (*types.GetSearchAttributesResponse, error) { + response, err := g.visibility.GetSearchAttributes(ctx, &apiv1.GetSearchAttributesRequest{}, opts...) + return proto.ToGetSearchAttributesResponse(response), proto.ToError(err) +} + +func (g grpcClient) GetWorkflowExecutionHistory(ctx context.Context, request *types.GetWorkflowExecutionHistoryRequest, opts ...yarpc.CallOption) (*types.GetWorkflowExecutionHistoryResponse, error) { + response, err := g.workflow.GetWorkflowExecutionHistory(ctx, proto.FromGetWorkflowExecutionHistoryRequest(request), opts...) + return proto.ToGetWorkflowExecutionHistoryResponse(response), proto.ToError(err) +} + +func (g grpcClient) ListArchivedWorkflowExecutions(ctx context.Context, request *types.ListArchivedWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*types.ListArchivedWorkflowExecutionsResponse, error) { + response, err := g.visibility.ListArchivedWorkflowExecutions(ctx, proto.FromListArchivedWorkflowExecutionsRequest(request), opts...) + return proto.ToListArchivedWorkflowExecutionsResponse(response), proto.ToError(err) +} + +func (g grpcClient) ListClosedWorkflowExecutions(ctx context.Context, request *types.ListClosedWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*types.ListClosedWorkflowExecutionsResponse, error) { + response, err := g.visibility.ListClosedWorkflowExecutions(ctx, proto.FromListClosedWorkflowExecutionsRequest(request), opts...) + return proto.ToListClosedWorkflowExecutionsResponse(response), proto.ToError(err) +} + +func (g grpcClient) ListDomains(ctx context.Context, request *types.ListDomainsRequest, opts ...yarpc.CallOption) (*types.ListDomainsResponse, error) { + response, err := g.domain.ListDomains(ctx, proto.FromListDomainsRequest(request), opts...) + return proto.ToListDomainsResponse(response), proto.ToError(err) +} + +func (g grpcClient) ListOpenWorkflowExecutions(ctx context.Context, request *types.ListOpenWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*types.ListOpenWorkflowExecutionsResponse, error) { + response, err := g.visibility.ListOpenWorkflowExecutions(ctx, proto.FromListOpenWorkflowExecutionsRequest(request), opts...) + return proto.ToListOpenWorkflowExecutionsResponse(response), proto.ToError(err) +} + +func (g grpcClient) ListTaskListPartitions(ctx context.Context, request *types.ListTaskListPartitionsRequest, opts ...yarpc.CallOption) (*types.ListTaskListPartitionsResponse, error) { + response, err := g.workflow.ListTaskListPartitions(ctx, proto.FromListTaskListPartitionsRequest(request), opts...) + return proto.ToListTaskListPartitionsResponse(response), proto.ToError(err) +} + +func (g grpcClient) ListWorkflowExecutions(ctx context.Context, request *types.ListWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*types.ListWorkflowExecutionsResponse, error) { + response, err := g.visibility.ListWorkflowExecutions(ctx, proto.FromListWorkflowExecutionsRequest(request), opts...) + return proto.ToListWorkflowExecutionsResponse(response), proto.ToError(err) +} + +func (g grpcClient) PollForActivityTask(ctx context.Context, request *types.PollForActivityTaskRequest, opts ...yarpc.CallOption) (*types.PollForActivityTaskResponse, error) { + response, err := g.worker.PollForActivityTask(ctx, proto.FromPollForActivityTaskRequest(request), opts...) + return proto.ToPollForActivityTaskResponse(response), proto.ToError(err) +} + +func (g grpcClient) PollForDecisionTask(ctx context.Context, request *types.PollForDecisionTaskRequest, opts ...yarpc.CallOption) (*types.PollForDecisionTaskResponse, error) { + response, err := g.worker.PollForDecisionTask(ctx, proto.FromPollForDecisionTaskRequest(request), opts...) + return proto.ToPollForDecisionTaskResponse(response), proto.ToError(err) +} + +func (g grpcClient) QueryWorkflow(ctx context.Context, request *types.QueryWorkflowRequest, opts ...yarpc.CallOption) (*types.QueryWorkflowResponse, error) { + response, err := g.workflow.QueryWorkflow(ctx, proto.FromQueryWorkflowRequest(request), opts...) + return proto.ToQueryWorkflowResponse(response), proto.ToError(err) +} + +func (g grpcClient) RecordActivityTaskHeartbeat(ctx context.Context, request *types.RecordActivityTaskHeartbeatRequest, opts ...yarpc.CallOption) (*types.RecordActivityTaskHeartbeatResponse, error) { + response, err := g.worker.RecordActivityTaskHeartbeat(ctx, proto.FromRecordActivityTaskHeartbeatRequest(request), opts...) + return proto.ToRecordActivityTaskHeartbeatResponse(response), proto.ToError(err) +} + +func (g grpcClient) RecordActivityTaskHeartbeatByID(ctx context.Context, request *types.RecordActivityTaskHeartbeatByIDRequest, opts ...yarpc.CallOption) (*types.RecordActivityTaskHeartbeatResponse, error) { + response, err := g.worker.RecordActivityTaskHeartbeatByID(ctx, proto.FromRecordActivityTaskHeartbeatByIDRequest(request), opts...) + return proto.ToRecordActivityTaskHeartbeatByIDResponse(response), proto.ToError(err) +} + +func (g grpcClient) RegisterDomain(ctx context.Context, request *types.RegisterDomainRequest, opts ...yarpc.CallOption) error { + _, err := g.domain.RegisterDomain(ctx, proto.FromRegisterDomainRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RequestCancelWorkflowExecution(ctx context.Context, request *types.RequestCancelWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + _, err := g.worker.RequestCancelWorkflowExecution(ctx, proto.FromRequestCancelWorkflowExecutionRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) ResetStickyTaskList(ctx context.Context, request *types.ResetStickyTaskListRequest, opts ...yarpc.CallOption) (*types.ResetStickyTaskListResponse, error) { + _, err := g.workflow.ResetStickyTaskList(ctx, proto.FromResetStickyTaskListRequest(request), opts...) + return &types.ResetStickyTaskListResponse{}, proto.ToError(err) +} + +func (g grpcClient) ResetWorkflowExecution(ctx context.Context, request *types.ResetWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.ResetWorkflowExecutionResponse, error) { + response, err := g.workflow.ResetWorkflowExecution(ctx, proto.FromResetWorkflowExecutionRequest(request), opts...) + return proto.ToResetWorkflowExecutionResponse(response), proto.ToError(err) +} + +func (g grpcClient) RespondActivityTaskCanceled(ctx context.Context, request *types.RespondActivityTaskCanceledRequest, opts ...yarpc.CallOption) error { + _, err := g.worker.RespondActivityTaskCanceled(ctx, proto.FromRespondActivityTaskCanceledRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RespondActivityTaskCanceledByID(ctx context.Context, request *types.RespondActivityTaskCanceledByIDRequest, opts ...yarpc.CallOption) error { + _, err := g.worker.RespondActivityTaskCanceledByID(ctx, proto.FromRespondActivityTaskCanceledByIDRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RespondActivityTaskCompleted(ctx context.Context, request *types.RespondActivityTaskCompletedRequest, opts ...yarpc.CallOption) error { + _, err := g.worker.RespondActivityTaskCompleted(ctx, proto.FromRespondActivityTaskCompletedRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RespondActivityTaskCompletedByID(ctx context.Context, request *types.RespondActivityTaskCompletedByIDRequest, opts ...yarpc.CallOption) error { + _, err := g.worker.RespondActivityTaskCompletedByID(ctx, proto.FromRespondActivityTaskCompletedByIDRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RespondActivityTaskFailed(ctx context.Context, request *types.RespondActivityTaskFailedRequest, opts ...yarpc.CallOption) error { + _, err := g.worker.RespondActivityTaskFailed(ctx, proto.FromRespondActivityTaskFailedRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RespondActivityTaskFailedByID(ctx context.Context, request *types.RespondActivityTaskFailedByIDRequest, opts ...yarpc.CallOption) error { + _, err := g.worker.RespondActivityTaskFailedByID(ctx, proto.FromRespondActivityTaskFailedByIDRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RespondDecisionTaskCompleted(ctx context.Context, request *types.RespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption) (*types.RespondDecisionTaskCompletedResponse, error) { + response, err := g.worker.RespondDecisionTaskCompleted(ctx, proto.FromRespondDecisionTaskCompletedRequest(request), opts...) + return proto.ToRespondDecisionTaskCompletedResponse(response), proto.ToError(err) +} + +func (g grpcClient) RespondDecisionTaskFailed(ctx context.Context, request *types.RespondDecisionTaskFailedRequest, opts ...yarpc.CallOption) error { + _, err := g.worker.RespondDecisionTaskFailed(ctx, proto.FromRespondDecisionTaskFailedRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RespondQueryTaskCompleted(ctx context.Context, request *types.RespondQueryTaskCompletedRequest, opts ...yarpc.CallOption) error { + _, err := g.worker.RespondQueryTaskCompleted(ctx, proto.FromRespondQueryTaskCompletedRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) ScanWorkflowExecutions(ctx context.Context, request *types.ListWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*types.ListWorkflowExecutionsResponse, error) { + response, err := g.visibility.ScanWorkflowExecutions(ctx, proto.FromScanWorkflowExecutionsRequest(request), opts...) + return proto.ToScanWorkflowExecutionsResponse(response), proto.ToError(err) +} + +func (g grpcClient) SignalWithStartWorkflowExecution(ctx context.Context, request *types.SignalWithStartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.StartWorkflowExecutionResponse, error) { + response, err := g.workflow.SignalWithStartWorkflowExecution(ctx, proto.FromSignalWithStartWorkflowExecutionRequest(request), opts...) + return proto.ToSignalWithStartWorkflowExecutionResponse(response), proto.ToError(err) +} + +func (g grpcClient) SignalWorkflowExecution(ctx context.Context, request *types.SignalWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + _, err := g.workflow.SignalWorkflowExecution(ctx, proto.FromSignalWorkflowExecutionRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) StartWorkflowExecution(ctx context.Context, request *types.StartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.StartWorkflowExecutionResponse, error) { + response, err := g.workflow.StartWorkflowExecution(ctx, proto.FromStartWorkflowExecutionRequest(request), opts...) + return proto.ToStartWorkflowExecutionResponse(response), proto.ToError(err) +} + +func (g grpcClient) TerminateWorkflowExecution(ctx context.Context, request *types.TerminateWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + _, err := g.workflow.TerminateWorkflowExecution(ctx, proto.FromTerminateWorkflowExecutionRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) UpdateDomain(ctx context.Context, request *types.UpdateDomainRequest, opts ...yarpc.CallOption) (*types.UpdateDomainResponse, error) { + response, err := g.domain.UpdateDomain(ctx, proto.FromUpdateDomainRequest(request), opts...) + return proto.ToUpdateDomainResponse(response), proto.ToError(err) +} diff --git a/client/history/grpcClient.go b/client/history/grpcClient.go new file mode 100644 index 00000000000..f0a2fa9005a --- /dev/null +++ b/client/history/grpcClient.go @@ -0,0 +1,234 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + + "go.uber.org/yarpc" + + historyv1 "github.com/uber/cadence/.gen/proto/history/v1" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/common/types/mapper/proto" +) + +type grpcClient struct { + c historyv1.HistoryAPIYARPCClient +} + +func NewGRPCClient(c historyv1.HistoryAPIYARPCClient) Client { + return grpcClient{c} +} + +func (g grpcClient) CloseShard(ctx context.Context, request *types.CloseShardRequest, opts ...yarpc.CallOption) error { + _, err := g.c.CloseShard(ctx, proto.FromHistoryCloseShardRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) DescribeHistoryHost(ctx context.Context, request *types.DescribeHistoryHostRequest, opts ...yarpc.CallOption) (*types.DescribeHistoryHostResponse, error) { + response, err := g.c.DescribeHistoryHost(ctx, proto.FromHistoryDescribeHistoryHostRequest(request), opts...) + return proto.ToHistoryDescribeHistoryHostResponse(response), proto.ToError(err) +} + +func (g grpcClient) DescribeMutableState(ctx context.Context, request *types.DescribeMutableStateRequest, opts ...yarpc.CallOption) (*types.DescribeMutableStateResponse, error) { + response, err := g.c.DescribeMutableState(ctx, proto.FromHistoryDescribeMutableStateRequest(request), opts...) + return proto.ToHistoryDescribeMutableStateResponse(response), proto.ToError(err) +} + +func (g grpcClient) DescribeQueue(ctx context.Context, request *types.DescribeQueueRequest, opts ...yarpc.CallOption) (*types.DescribeQueueResponse, error) { + response, err := g.c.DescribeQueue(ctx, proto.FromHistoryDescribeQueueRequest(request), opts...) + return proto.ToHistoryDescribeQueueResponse(response), proto.ToError(err) +} + +func (g grpcClient) DescribeWorkflowExecution(ctx context.Context, request *types.HistoryDescribeWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.DescribeWorkflowExecutionResponse, error) { + response, err := g.c.DescribeWorkflowExecution(ctx, proto.FromHistoryDescribeWorkflowExecutionRequest(request), opts...) + return proto.ToHistoryDescribeWorkflowExecutionResponse(response), proto.ToError(err) +} + +func (g grpcClient) GetDLQReplicationMessages(ctx context.Context, request *types.GetDLQReplicationMessagesRequest, opts ...yarpc.CallOption) (*types.GetDLQReplicationMessagesResponse, error) { + response, err := g.c.GetDLQReplicationMessages(ctx, proto.FromHistoryGetDLQReplicationMessagesRequest(request), opts...) + return proto.ToHistoryGetDLQReplicationMessagesResponse(response), proto.ToError(err) +} + +func (g grpcClient) GetMutableState(ctx context.Context, request *types.GetMutableStateRequest, opts ...yarpc.CallOption) (*types.GetMutableStateResponse, error) { + response, err := g.c.GetMutableState(ctx, proto.FromHistoryGetMutableStateRequest(request), opts...) + return proto.ToHistoryGetMutableStateResponse(response), proto.ToError(err) +} + +func (g grpcClient) GetReplicationMessages(ctx context.Context, request *types.GetReplicationMessagesRequest, opts ...yarpc.CallOption) (*types.GetReplicationMessagesResponse, error) { + response, err := g.c.GetReplicationMessages(ctx, proto.FromHistoryGetReplicationMessagesRequest(request), opts...) + return proto.ToHistoryGetReplicationMessagesResponse(response), proto.ToError(err) +} + +func (g grpcClient) MergeDLQMessages(ctx context.Context, request *types.MergeDLQMessagesRequest, opts ...yarpc.CallOption) (*types.MergeDLQMessagesResponse, error) { + response, err := g.c.MergeDLQMessages(ctx, proto.FromHistoryMergeDLQMessagesRequest(request), opts...) + return proto.ToHistoryMergeDLQMessagesResponse(response), proto.ToError(err) +} + +func (g grpcClient) NotifyFailoverMarkers(ctx context.Context, request *types.NotifyFailoverMarkersRequest, opts ...yarpc.CallOption) error { + _, err := g.c.NotifyFailoverMarkers(ctx, proto.FromHistoryNotifyFailoverMarkersRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) PollMutableState(ctx context.Context, request *types.PollMutableStateRequest, opts ...yarpc.CallOption) (*types.PollMutableStateResponse, error) { + response, err := g.c.PollMutableState(ctx, proto.FromHistoryPollMutableStateRequest(request), opts...) + return proto.ToHistoryPollMutableStateResponse(response), proto.ToError(err) +} + +func (g grpcClient) PurgeDLQMessages(ctx context.Context, request *types.PurgeDLQMessagesRequest, opts ...yarpc.CallOption) error { + _, err := g.c.PurgeDLQMessages(ctx, proto.FromHistoryPurgeDLQMessagesRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) QueryWorkflow(ctx context.Context, request *types.HistoryQueryWorkflowRequest, opts ...yarpc.CallOption) (*types.HistoryQueryWorkflowResponse, error) { + response, err := g.c.QueryWorkflow(ctx, proto.FromHistoryQueryWorkflowRequest(request), opts...) + return proto.ToHistoryQueryWorkflowResponse(response), proto.ToError(err) +} + +func (g grpcClient) ReadDLQMessages(ctx context.Context, request *types.ReadDLQMessagesRequest, opts ...yarpc.CallOption) (*types.ReadDLQMessagesResponse, error) { + response, err := g.c.ReadDLQMessages(ctx, proto.FromHistoryReadDLQMessagesRequest(request), opts...) + return proto.ToHistoryReadDLQMessagesResponse(response), proto.ToError(err) +} + +func (g grpcClient) ReapplyEvents(ctx context.Context, request *types.HistoryReapplyEventsRequest, opts ...yarpc.CallOption) error { + _, err := g.c.ReapplyEvents(ctx, proto.FromHistoryReapplyEventsRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RecordActivityTaskHeartbeat(ctx context.Context, request *types.HistoryRecordActivityTaskHeartbeatRequest, opts ...yarpc.CallOption) (*types.RecordActivityTaskHeartbeatResponse, error) { + response, err := g.c.RecordActivityTaskHeartbeat(ctx, proto.FromHistoryRecordActivityTaskHeartbeatRequest(request), opts...) + return proto.ToHistoryRecordActivityTaskHeartbeatResponse(response), proto.ToError(err) +} + +func (g grpcClient) RecordActivityTaskStarted(ctx context.Context, request *types.RecordActivityTaskStartedRequest, opts ...yarpc.CallOption) (*types.RecordActivityTaskStartedResponse, error) { + response, err := g.c.RecordActivityTaskStarted(ctx, proto.FromHistoryRecordActivityTaskStartedRequest(request), opts...) + return proto.ToHistoryRecordActivityTaskStartedResponse(response), proto.ToError(err) +} + +func (g grpcClient) RecordChildExecutionCompleted(ctx context.Context, request *types.RecordChildExecutionCompletedRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RecordChildExecutionCompleted(ctx, proto.FromHistoryRecordChildExecutionCompletedRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RecordDecisionTaskStarted(ctx context.Context, request *types.RecordDecisionTaskStartedRequest, opts ...yarpc.CallOption) (*types.RecordDecisionTaskStartedResponse, error) { + response, err := g.c.RecordDecisionTaskStarted(ctx, proto.FromHistoryRecordDecisionTaskStartedRequest(request), opts...) + return proto.ToHistoryRecordDecisionTaskStartedResponse(response), proto.ToError(err) +} + +func (g grpcClient) RefreshWorkflowTasks(ctx context.Context, request *types.HistoryRefreshWorkflowTasksRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RefreshWorkflowTasks(ctx, proto.FromHistoryRefreshWorkflowTasksRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RemoveSignalMutableState(ctx context.Context, request *types.RemoveSignalMutableStateRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RemoveSignalMutableState(ctx, proto.FromHistoryRemoveSignalMutableStateRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RemoveTask(ctx context.Context, request *types.RemoveTaskRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RemoveTask(ctx, proto.FromHistoryRemoveTaskRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) ReplicateEventsV2(ctx context.Context, request *types.ReplicateEventsV2Request, opts ...yarpc.CallOption) error { + _, err := g.c.ReplicateEventsV2(ctx, proto.FromHistoryReplicateEventsV2Request(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RequestCancelWorkflowExecution(ctx context.Context, request *types.HistoryRequestCancelWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RequestCancelWorkflowExecution(ctx, proto.FromHistoryRequestCancelWorkflowExecutionRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) ResetQueue(ctx context.Context, request *types.ResetQueueRequest, opts ...yarpc.CallOption) error { + _, err := g.c.ResetQueue(ctx, proto.FromHistoryResetQueueRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) ResetStickyTaskList(ctx context.Context, request *types.HistoryResetStickyTaskListRequest, opts ...yarpc.CallOption) (*types.HistoryResetStickyTaskListResponse, error) { + _, err := g.c.ResetStickyTaskList(ctx, proto.FromHistoryResetStickyTaskListRequest(request), opts...) + return &types.HistoryResetStickyTaskListResponse{}, proto.ToError(err) +} + +func (g grpcClient) ResetWorkflowExecution(ctx context.Context, request *types.HistoryResetWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.ResetWorkflowExecutionResponse, error) { + response, err := g.c.ResetWorkflowExecution(ctx, proto.FromHistoryResetWorkflowExecutionRequest(request), opts...) + return proto.ToHistoryResetWorkflowExecutionResponse(response), proto.ToError(err) +} + +func (g grpcClient) RespondActivityTaskCanceled(ctx context.Context, request *types.HistoryRespondActivityTaskCanceledRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RespondActivityTaskCanceled(ctx, proto.FromHistoryRespondActivityTaskCanceledRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RespondActivityTaskCompleted(ctx context.Context, request *types.HistoryRespondActivityTaskCompletedRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RespondActivityTaskCompleted(ctx, proto.FromHistoryRespondActivityTaskCompletedRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RespondActivityTaskFailed(ctx context.Context, request *types.HistoryRespondActivityTaskFailedRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RespondActivityTaskFailed(ctx, proto.FromHistoryRespondActivityTaskFailedRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) RespondDecisionTaskCompleted(ctx context.Context, request *types.HistoryRespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption) (*types.HistoryRespondDecisionTaskCompletedResponse, error) { + response, err := g.c.RespondDecisionTaskCompleted(ctx, proto.FromHistoryRespondDecisionTaskCompletedRequest(request), opts...) + return proto.ToHistoryRespondDecisionTaskCompletedResponse(response), proto.ToError(err) +} + +func (g grpcClient) RespondDecisionTaskFailed(ctx context.Context, request *types.HistoryRespondDecisionTaskFailedRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RespondDecisionTaskFailed(ctx, proto.FromHistoryRespondDecisionTaskFailedRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) ScheduleDecisionTask(ctx context.Context, request *types.ScheduleDecisionTaskRequest, opts ...yarpc.CallOption) error { + _, err := g.c.ScheduleDecisionTask(ctx, proto.FromHistoryScheduleDecisionTaskRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) SignalWithStartWorkflowExecution(ctx context.Context, request *types.HistorySignalWithStartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.StartWorkflowExecutionResponse, error) { + response, err := g.c.SignalWithStartWorkflowExecution(ctx, proto.FromHistorySignalWithStartWorkflowExecutionRequest(request), opts...) + return proto.ToHistorySignalWithStartWorkflowExecutionResponse(response), proto.ToError(err) +} + +func (g grpcClient) SignalWorkflowExecution(ctx context.Context, request *types.HistorySignalWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + _, err := g.c.SignalWorkflowExecution(ctx, proto.FromHistorySignalWorkflowExecutionRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) StartWorkflowExecution(ctx context.Context, request *types.HistoryStartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*types.StartWorkflowExecutionResponse, error) { + response, err := g.c.StartWorkflowExecution(ctx, proto.FromHistoryStartWorkflowExecutionRequest(request), opts...) + return proto.ToHistoryStartWorkflowExecutionResponse(response), proto.ToError(err) +} + +func (g grpcClient) SyncActivity(ctx context.Context, request *types.SyncActivityRequest, opts ...yarpc.CallOption) error { + _, err := g.c.SyncActivity(ctx, proto.FromHistorySyncActivityRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) SyncShardStatus(ctx context.Context, request *types.SyncShardStatusRequest, opts ...yarpc.CallOption) error { + _, err := g.c.SyncShardStatus(ctx, proto.FromHistorySyncShardStatusRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) TerminateWorkflowExecution(ctx context.Context, request *types.HistoryTerminateWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + _, err := g.c.TerminateWorkflowExecution(ctx, proto.FromHistoryTerminateWorkflowExecutionRequest(request), opts...) + return proto.ToError(err) +} diff --git a/client/matching/grpcClient.go b/client/matching/grpcClient.go new file mode 100644 index 00000000000..caaa617dd36 --- /dev/null +++ b/client/matching/grpcClient.go @@ -0,0 +1,84 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "context" + + "go.uber.org/yarpc" + + matchingv1 "github.com/uber/cadence/.gen/proto/matching/v1" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/common/types/mapper/proto" +) + +type grpcClient struct { + c matchingv1.MatchingAPIYARPCClient +} + +func NewGRPCClient(c matchingv1.MatchingAPIYARPCClient) Client { + return grpcClient{c} +} + +func (g grpcClient) AddActivityTask(ctx context.Context, request *types.AddActivityTaskRequest, opts ...yarpc.CallOption) error { + _, err := g.c.AddActivityTask(ctx, proto.FromMatchingAddActivityTaskRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) AddDecisionTask(ctx context.Context, request *types.AddDecisionTaskRequest, opts ...yarpc.CallOption) error { + _, err := g.c.AddDecisionTask(ctx, proto.FromMatchingAddDecisionTaskRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) CancelOutstandingPoll(ctx context.Context, request *types.CancelOutstandingPollRequest, opts ...yarpc.CallOption) error { + _, err := g.c.CancelOutstandingPoll(ctx, proto.FromMatchingCancelOutstandingPollRequest(request), opts...) + return proto.ToError(err) +} + +func (g grpcClient) DescribeTaskList(ctx context.Context, request *types.MatchingDescribeTaskListRequest, opts ...yarpc.CallOption) (*types.DescribeTaskListResponse, error) { + response, err := g.c.DescribeTaskList(ctx, proto.FromMatchingDescribeTaskListRequest(request), opts...) + return proto.ToMatchingDescribeTaskListResponse(response), proto.ToError(err) +} + +func (g grpcClient) ListTaskListPartitions(ctx context.Context, request *types.MatchingListTaskListPartitionsRequest, opts ...yarpc.CallOption) (*types.ListTaskListPartitionsResponse, error) { + response, err := g.c.ListTaskListPartitions(ctx, proto.FromMatchingListTaskListPartitionsRequest(request), opts...) + return proto.ToMatchingListTaskListPartitionsResponse(response), proto.ToError(err) +} + +func (g grpcClient) PollForActivityTask(ctx context.Context, request *types.MatchingPollForActivityTaskRequest, opts ...yarpc.CallOption) (*types.PollForActivityTaskResponse, error) { + response, err := g.c.PollForActivityTask(ctx, proto.FromMatchingPollForActivityTaskRequest(request), opts...) + return proto.ToMatchingPollForActivityTaskResponse(response), proto.ToError(err) +} + +func (g grpcClient) PollForDecisionTask(ctx context.Context, request *types.MatchingPollForDecisionTaskRequest, opts ...yarpc.CallOption) (*types.MatchingPollForDecisionTaskResponse, error) { + response, err := g.c.PollForDecisionTask(ctx, proto.FromMatchingPollForDecisionTaskRequest(request), opts...) + return proto.ToMatchingPollForDecisionTaskResponse(response), proto.ToError(err) +} + +func (g grpcClient) QueryWorkflow(ctx context.Context, request *types.MatchingQueryWorkflowRequest, opts ...yarpc.CallOption) (*types.QueryWorkflowResponse, error) { + response, err := g.c.QueryWorkflow(ctx, proto.FromMatchingQueryWorkflowRequest(request), opts...) + return proto.ToMatchingQueryWorkflowResponse(response), proto.ToError(err) +} + +func (g grpcClient) RespondQueryTaskCompleted(ctx context.Context, request *types.MatchingRespondQueryTaskCompletedRequest, opts ...yarpc.CallOption) error { + _, err := g.c.RespondQueryTaskCompleted(ctx, proto.FromMatchingRespondQueryTaskCompletedRequest(request), opts...) + return proto.ToError(err) +} diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 41f20dd0eb1..70ec76e7ca5 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -127,7 +127,7 @@ func (s *server) startService() common.Daemon { svcCfg := s.cfg.Services[s.name] params.MetricScope = svcCfg.Metrics.NewScope(params.Logger, params.Name) - params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger) + params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger, s.cfg.NewGRPCPorts()) params.MembershipFactory, err = s.cfg.Ringpop.NewFactory( params.RPCFactory.GetDispatcher(), params.Name, diff --git a/common/config/config.go b/common/config/config.go index eb70f8879a6..fd573240213 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -84,6 +84,8 @@ type ( RPC struct { // Port is the port on which the channel will bind to Port int `yaml:"port"` + // GRPCPort is the port on which the grpc listener will bind to + GRPCPort int `yaml:"grpcPort"` // BindOnLocalHost is true if localhost is the bind address BindOnLocalHost bool `yaml:"bindOnLocalHost"` // BindOnIP can be used to bind service on specific ip (eg. `0.0.0.0`) - diff --git a/common/config/rpc.go b/common/config/rpc.go index 4f887839e73..9f3d9952caf 100644 --- a/common/config/rpc.go +++ b/common/config/rpc.go @@ -21,11 +21,15 @@ package config import ( + "errors" "fmt" "net" + "strings" "sync" "go.uber.org/yarpc" + "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/transport/grpc" "go.uber.org/yarpc/transport/tchannel" "github.com/uber/cadence/common/log" @@ -37,7 +41,9 @@ type RPCFactory struct { config *RPC serviceName string ch *tchannel.ChannelTransport + grpc *grpc.Transport logger log.Logger + grpcPorts GRPCPorts sync.Mutex dispatcher *yarpc.Dispatcher @@ -45,12 +51,12 @@ type RPCFactory struct { // NewFactory builds a new RPCFactory // conforming to the underlying configuration -func (cfg *RPC) NewFactory(sName string, logger log.Logger) *RPCFactory { - return newRPCFactory(cfg, sName, logger) +func (cfg *RPC) NewFactory(sName string, logger log.Logger, grpcPorts GRPCPorts) *RPCFactory { + return newRPCFactory(cfg, sName, logger, grpcPorts) } -func newRPCFactory(cfg *RPC, sName string, logger log.Logger) *RPCFactory { - factory := &RPCFactory{config: cfg, serviceName: sName, logger: logger} +func newRPCFactory(cfg *RPC, sName string, logger log.Logger, grpcPorts GRPCPorts) *RPCFactory { + factory := &RPCFactory{config: cfg, serviceName: sName, logger: logger, grpcPorts: grpcPorts} return factory } @@ -63,14 +69,16 @@ func (d *RPCFactory) GetDispatcher() *yarpc.Dispatcher { return d.dispatcher } - d.dispatcher = d.createDispatcher() + d.dispatcher = d.createInboundDispatcher() return d.dispatcher } -// createDispatcher creates a dispatcher for inbound -func (d *RPCFactory) createDispatcher() *yarpc.Dispatcher { +// createInboundDispatcher creates a dispatcher for inbound +func (d *RPCFactory) createInboundDispatcher() *yarpc.Dispatcher { // Setup dispatcher for onebox var err error + inbounds := yarpc.Inbounds{} + hostAddress := fmt.Sprintf("%v:%v", d.getListenIP(), d.config.Port) d.ch, err = tchannel.NewChannelTransport( tchannel.ServiceName(d.serviceName), @@ -78,10 +86,24 @@ func (d *RPCFactory) createDispatcher() *yarpc.Dispatcher { if err != nil { d.logger.Fatal("Failed to create transport channel", tag.Error(err)) } - d.logger.Info("Created RPC dispatcher and listening", tag.Address(hostAddress)) + inbounds = append(inbounds, d.ch.NewInbound()) + d.logger.Info("Listening for TChannel requests", tag.Address(hostAddress)) + + d.grpc = grpc.NewTransport() + if d.config.GRPCPort > 0 { + grpcAddress := fmt.Sprintf("%v:%v", d.getListenIP(), d.config.GRPCPort) + listener, err := net.Listen("tcp", grpcAddress) + if err != nil { + d.logger.Fatal("Failed to listen on GRPC port", tag.Error(err)) + } + + inbounds = append(inbounds, d.grpc.NewInbound(listener)) + d.logger.Info("Listening for GRPC requests", tag.Address(grpcAddress)) + } + return yarpc.NewDispatcher(yarpc.Config{ Name: d.serviceName, - Inbounds: yarpc.Inbounds{d.ch.NewInbound()}, + Inbounds: inbounds, }) } @@ -90,20 +112,44 @@ func (d *RPCFactory) CreateDispatcherForOutbound( callerName string, serviceName string, hostName string, -) *yarpc.Dispatcher { +) (*yarpc.Dispatcher, error) { + return d.createOutboundDispatcher(callerName, serviceName, hostName, d.ch.NewSingleOutbound(hostName)) +} + +// CreateGRPCDispatcherForOutbound creates a dispatcher for GRPC outbound connection +func (d *RPCFactory) CreateGRPCDispatcherForOutbound( + callerName string, + serviceName string, + hostName string, +) (*yarpc.Dispatcher, error) { + grpcAddress, err := d.grpcPorts.GetGRPCAddress(serviceName, hostName) + if err != nil { + d.logger.Error("Failed to create GRPC outbound dispatcher", tag.Error(err)) + return nil, err + } + return d.createOutboundDispatcher(callerName, serviceName, grpcAddress, d.grpc.NewSingleOutbound(grpcAddress)) +} + +func (d *RPCFactory) createOutboundDispatcher( + callerName string, + serviceName string, + hostName string, + outbound transport.UnaryOutbound, +) (*yarpc.Dispatcher, error) { // Setup dispatcher(outbound) for onebox d.logger.Info("Created RPC dispatcher outbound", tag.Address(hostName)) dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: callerName, Outbounds: yarpc.Outbounds{ - serviceName: {Unary: d.ch.NewSingleOutbound(hostName)}, + serviceName: {Unary: outbound}, }, }) if err := dispatcher.Start(); err != nil { - d.logger.Fatal("Failed to create outbound transport channel", tag.Error(err)) + d.logger.Error("Failed to create outbound transport channel", tag.Error(err)) + return nil, err } - return dispatcher + return dispatcher, nil } func (d *RPCFactory) getListenIP() net.IP { @@ -128,3 +174,30 @@ func (d *RPCFactory) getListenIP() net.IP { } return ip } + +type GRPCPorts map[string]int + +func (c *Config) NewGRPCPorts() GRPCPorts { + grpcPorts := map[string]int{} + for service, config := range c.Services { + grpcPorts["cadence-"+service] = config.RPC.GRPCPort + } + return grpcPorts +} + +func (p GRPCPorts) GetGRPCAddress(service, hostAddress string) (string, error) { + port, ok := p[service] + if !ok { + return hostAddress, errors.New("unknown service: " + service) + } + if port == 0 { + return hostAddress, errors.New("GRPC port not configured for service: " + service) + } + + // Drop port if provided + if index := strings.Index(hostAddress, ":"); index > 0 { + hostAddress = hostAddress[:index] + } + + return fmt.Sprintf("%s:%d", hostAddress, port), nil +} diff --git a/common/config/rpc_test.go b/common/config/rpc_test.go new file mode 100644 index 00000000000..2c73ed71374 --- /dev/null +++ b/common/config/rpc_test.go @@ -0,0 +1,53 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package config + +import ( + "testing" + + "github.com/uber/cadence/common" + + "github.com/stretchr/testify/assert" +) + +func TestGRPCPorts(t *testing.T) { + config := Config{ + Services: map[string]Service{ + "frontend": {RPC: RPC{GRPCPort: 9999}}, + "history": {RPC: RPC{}}, + }, + } + grpcPorts := config.NewGRPCPorts() + + _, err := grpcPorts.GetGRPCAddress("some-service", "1.2.3.4") + assert.EqualError(t, err, "unknown service: some-service") + + _, err = grpcPorts.GetGRPCAddress(common.HistoryServiceName, "1.2.3.4") + assert.EqualError(t, err, "GRPC port not configured for service: cadence-history") + + grpcAddress, err := grpcPorts.GetGRPCAddress(common.FrontendServiceName, "1.2.3.4") + assert.Nil(t, err) + assert.Equal(t, grpcAddress, "1.2.3.4:9999") + + grpcAddress, err = grpcPorts.GetGRPCAddress(common.FrontendServiceName, "1.2.3.4:8888") + assert.Nil(t, err) + assert.Equal(t, grpcAddress, "1.2.3.4:9999") +} diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 9e5890c5fae..c601235d028 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -138,6 +138,10 @@ const ( // KeyName: system.requiredDomainDataKeys // Default value: nil RequiredDomainDataKeys + // EnableGRPCOutbound is the key for enabling outbound GRPC traffic + // KeyName: system.enableGRPCOutbound + // Default value: false + EnableGRPCOutbound // BlobSizeLimitError is the per event blob size limit // KeyName: limit.blobSize.error @@ -1216,6 +1220,7 @@ var keys = map[Key]string{ EnableStickyQuery: "system.enableStickyQuery", EnableDebugMode: "system.enableDebugMode", RequiredDomainDataKeys: "system.requiredDomainDataKeys", + EnableGRPCOutbound: "system.enableGRPCOutbound", // size limit BlobSizeLimitError: "limit.blobSize.error", diff --git a/common/rpc.go b/common/rpc.go index dbf4a70f8ae..806da38cc21 100644 --- a/common/rpc.go +++ b/common/rpc.go @@ -54,7 +54,8 @@ type ( // RPCFactory Creates a dispatcher that knows how to transport requests. RPCFactory interface { GetDispatcher() *yarpc.Dispatcher - CreateDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher + CreateDispatcherForOutbound(callerName, serviceName, hostName string) (*yarpc.Dispatcher, error) + CreateGRPCDispatcherForOutbound(callerName, serviceName, hostName string) (*yarpc.Dispatcher, error) } ) diff --git a/config/development.yaml b/config/development.yaml index 6b64bfbea63..121ed67a85f 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -22,6 +22,7 @@ services: frontend: rpc: port: 7933 + grpcPort: 7833 bindOnLocalHost: true metrics: statsd: @@ -33,6 +34,7 @@ services: matching: rpc: port: 7935 + grpcPort: 7835 bindOnLocalHost: true metrics: statsd: @@ -44,6 +46,7 @@ services: history: rpc: port: 7934 + grpcPort: 7834 bindOnLocalHost: true metrics: statsd: diff --git a/config/development_active.yaml b/config/development_active.yaml index 509ead6bac5..281c34d1ecc 100644 --- a/config/development_active.yaml +++ b/config/development_active.yaml @@ -22,6 +22,7 @@ services: frontend: rpc: port: 7933 + grpcPort: 7833 bindOnLocalHost: true metrics: statsd: @@ -33,6 +34,7 @@ services: matching: rpc: port: 7935 + grpcPort: 7835 bindOnLocalHost: true metrics: statsd: @@ -44,6 +46,7 @@ services: history: rpc: port: 7934 + grpcPort: 7834 bindOnLocalHost: true metrics: statsd: diff --git a/config/development_es.yaml b/config/development_es.yaml index 1bcd35a3f23..be0659ac298 100644 --- a/config/development_es.yaml +++ b/config/development_es.yaml @@ -30,6 +30,7 @@ services: frontend: rpc: port: 7933 + grpcPort: 7833 bindOnLocalHost: true metrics: statsd: @@ -41,6 +42,7 @@ services: matching: rpc: port: 7935 + grpcPort: 7835 bindOnLocalHost: true metrics: statsd: @@ -52,6 +54,7 @@ services: history: rpc: port: 7934 + grpcPort: 7834 bindOnLocalHost: true metrics: statsd: diff --git a/config/development_es_v7.yaml b/config/development_es_v7.yaml index ccffcdeb99c..8b9700ac5af 100644 --- a/config/development_es_v7.yaml +++ b/config/development_es_v7.yaml @@ -31,6 +31,7 @@ services: frontend: rpc: port: 7933 + grpcPort: 7833 bindOnLocalHost: true metrics: statsd: @@ -42,6 +43,7 @@ services: matching: rpc: port: 7935 + grpcPort: 7835 bindOnLocalHost: true metrics: statsd: @@ -53,6 +55,7 @@ services: history: rpc: port: 7934 + grpcPort: 7834 bindOnLocalHost: true metrics: statsd: diff --git a/config/development_mysql.yaml b/config/development_mysql.yaml index 5fa7f534241..d8b348c16d2 100644 --- a/config/development_mysql.yaml +++ b/config/development_mysql.yaml @@ -36,6 +36,7 @@ services: frontend: rpc: port: 7933 + grpcPort: 7833 bindOnLocalHost: true metrics: statsd: @@ -47,6 +48,7 @@ services: matching: rpc: port: 7935 + grpcPort: 7835 bindOnLocalHost: true metrics: statsd: @@ -58,6 +60,7 @@ services: history: rpc: port: 7934 + grpcPort: 7834 bindOnLocalHost: true metrics: statsd: diff --git a/config/development_other.yaml b/config/development_other.yaml index a770bc59c9b..41561a8afac 100644 --- a/config/development_other.yaml +++ b/config/development_other.yaml @@ -22,6 +22,7 @@ services: frontend: rpc: port: 9933 + grpcPort: 9833 bindOnLocalHost: true metrics: statsd: @@ -33,6 +34,7 @@ services: matching: rpc: port: 9935 + grpcPort: 9835 bindOnLocalHost: true metrics: statsd: @@ -44,6 +46,7 @@ services: history: rpc: port: 9934 + grpcPort: 9834 bindOnLocalHost: true metrics: statsd: diff --git a/config/development_postgres.yaml b/config/development_postgres.yaml index 119d718af76..81d73c61c73 100644 --- a/config/development_postgres.yaml +++ b/config/development_postgres.yaml @@ -36,6 +36,7 @@ services: frontend: rpc: port: 7933 + grpcPort: 7833 bindOnLocalHost: true metrics: statsd: @@ -47,6 +48,7 @@ services: matching: rpc: port: 7935 + grpcPort: 7835 bindOnLocalHost: true metrics: statsd: @@ -58,6 +60,7 @@ services: history: rpc: port: 7934 + grpcPort: 7834 bindOnLocalHost: true metrics: statsd: diff --git a/config/development_prometheus.yaml b/config/development_prometheus.yaml index e0dac721522..1dad4551f6e 100644 --- a/config/development_prometheus.yaml +++ b/config/development_prometheus.yaml @@ -22,6 +22,7 @@ services: frontend: rpc: port: 7933 + grpcPort: 7833 bindOnLocalHost: true metrics: prometheus: @@ -33,6 +34,7 @@ services: matching: rpc: port: 7935 + grpcPort: 7835 bindOnLocalHost: true metrics: prometheus: @@ -44,6 +46,7 @@ services: history: rpc: port: 7934 + grpcPort: 7834 bindOnLocalHost: true metrics: prometheus: diff --git a/config/development_scylla.yaml b/config/development_scylla.yaml index 73212d9bcd4..83a78fd9ca5 100644 --- a/config/development_scylla.yaml +++ b/config/development_scylla.yaml @@ -22,6 +22,7 @@ services: frontend: rpc: port: 7933 + grpcPort: 7833 bindOnLocalHost: true metrics: statsd: @@ -33,6 +34,7 @@ services: matching: rpc: port: 7935 + grpcPort: 7835 bindOnLocalHost: true metrics: statsd: @@ -44,6 +46,7 @@ services: history: rpc: port: 7934 + grpcPort: 7834 bindOnLocalHost: true metrics: statsd: diff --git a/config/development_standby.yaml b/config/development_standby.yaml index 63ea7ba31b5..8e5f0291239 100644 --- a/config/development_standby.yaml +++ b/config/development_standby.yaml @@ -22,6 +22,7 @@ services: frontend: rpc: port: 8933 + grpcPort: 8833 bindOnLocalHost: true metrics: statsd: @@ -33,6 +34,7 @@ services: matching: rpc: port: 8935 + grpcPort: 8835 bindOnLocalHost: true metrics: statsd: @@ -44,6 +46,7 @@ services: history: rpc: port: 8934 + grpcPort: 8834 bindOnLocalHost: true metrics: statsd: diff --git a/config/dynamicconfig/development.yaml b/config/dynamicconfig/development.yaml index 1a662b85810..ba04c456a60 100644 --- a/config/dynamicconfig/development.yaml +++ b/config/dynamicconfig/development.yaml @@ -7,3 +7,6 @@ system.minRetentionDays: history.EnableConsistentQueryByDomain: - value: true constraints: {} +system.enableGRPCOutbound: +- value: true + constraints: {} diff --git a/config/dynamicconfig/development_es.yaml b/config/dynamicconfig/development_es.yaml index fd3b0bb2c57..6e19573b054 100644 --- a/config/dynamicconfig/development_es.yaml +++ b/config/dynamicconfig/development_es.yaml @@ -36,3 +36,6 @@ frontend.validSearchAttributes: Passed: 4 system.minRetentionDays: - value: 0 +system.enableGRPCOutbound: + - value: true + constraints: {} diff --git a/host/dynamicconfig.go b/host/dynamicconfig.go index 19d261c0091..4fdc6f3aaad 100644 --- a/host/dynamicconfig.go +++ b/host/dynamicconfig.go @@ -44,6 +44,7 @@ var ( dynamicconfig.ReplicationTaskFetcherErrorRetryWait: 50 * time.Millisecond, dynamicconfig.ReplicationTaskProcessorErrorRetryWait: time.Millisecond, dynamicconfig.EnableConsistentQueryByDomain: true, + dynamicconfig.EnableGRPCOutbound: true, dynamicconfig.MinRetentionDays: 0, } ) diff --git a/host/onebox.go b/host/onebox.go index 658eca7d296..ce8c878f363 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -24,6 +24,8 @@ import ( "context" "encoding/json" "fmt" + "net" + "strconv" "sync" "time" @@ -33,6 +35,7 @@ import ( cwsc "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/yarpc" "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/transport/grpc" "go.uber.org/yarpc/transport/tchannel" "github.com/uber/cadence/client" @@ -784,6 +787,7 @@ func newPProfInitializerImpl(logger log.Logger, port int) common.PProfInitialize type rpcFactoryImpl struct { ch *tchannel.ChannelTransport + grpc *grpc.Transport serviceName string hostPort string logger log.Logger @@ -814,15 +818,29 @@ func (c *rpcFactoryImpl) GetDispatcher() *yarpc.Dispatcher { func (c *rpcFactoryImpl) createDispatcher() *yarpc.Dispatcher { // Setup dispatcher for onebox + inbounds := yarpc.Inbounds{} var err error c.ch, err = tchannel.NewChannelTransport( tchannel.ServiceName(c.serviceName), tchannel.ListenAddr(c.hostPort)) if err != nil { c.logger.Fatal("Failed to create transport channel", tag.Error(err)) } + inbounds = append(inbounds, c.ch.NewInbound()) + + grpcHostPort, err := getGRPCAddress(c.hostPort) + if err != nil { + c.logger.Fatal("Failed to obtain GRPC address", tag.Error(err)) + } + c.grpc = grpc.NewTransport() + listener, err := net.Listen("tcp", grpcHostPort) + if err != nil { + c.logger.Fatal("Failed to listen for GRPC request", tag.Error(err)) + } + inbounds = append(inbounds, c.grpc.NewInbound(listener)) + return yarpc.NewDispatcher(yarpc.Config{ Name: c.serviceName, - Inbounds: yarpc.Inbounds{c.ch.NewInbound()}, + Inbounds: inbounds, // For integration tests to generate client out of the same outbound. Outbounds: yarpc.Outbounds{ c.serviceName: {Unary: c.ch.NewSingleOutbound(c.hostPort)}, @@ -842,7 +860,7 @@ func (vm *versionMiddleware) Handle(ctx context.Context, req *transport.Request, } func (c *rpcFactoryImpl) CreateDispatcherForOutbound( - callerName, serviceName, hostName string) *yarpc.Dispatcher { + callerName, serviceName, hostName string) (*yarpc.Dispatcher, error) { // Setup dispatcher(outbound) for onebox d := yarpc.NewDispatcher(yarpc.Config{ Name: callerName, @@ -851,7 +869,48 @@ func (c *rpcFactoryImpl) CreateDispatcherForOutbound( }, }) if err := d.Start(); err != nil { - c.logger.Fatal("Failed to create outbound transport channel", tag.Error(err)) + c.logger.Error("Failed to create outbound transport channel", tag.Error(err)) + return nil, err + } + return d, nil +} + +func (c *rpcFactoryImpl) CreateGRPCDispatcherForOutbound( + callerName, serviceName, hostName string) (*yarpc.Dispatcher, error) { + grpcAddress, err := getGRPCAddress(hostName) + if err != nil { + c.logger.Error("Failed to obtain GRPC address", tag.Error(err)) + return nil, err + } + + // Setup dispatcher(outbound) for onebox + d := yarpc.NewDispatcher(yarpc.Config{ + Name: callerName, + Outbounds: yarpc.Outbounds{ + serviceName: {Unary: c.grpc.NewSingleOutbound(grpcAddress)}, + }, + }) + if err := d.Start(); err != nil { + c.logger.Error("Failed to create outbound GRPC", tag.Error(err)) + return nil, err + } + return d, nil +} + +const gprcPortOffset = 10 + +func getGRPCAddress(hostPort string) (string, error) { + fmt.Println() + host, port, err := net.SplitHostPort(hostPort) + if err != nil { + return "", err } - return d + + portInt, err := strconv.Atoi(port) + if err != nil { + return "", err + } + + grpcAddress := net.JoinHostPort(host, strconv.Itoa(portInt+gprcPortOffset)) + return grpcAddress, nil } diff --git a/service/frontend/adminGrpcHandler.go b/service/frontend/adminGrpcHandler.go new file mode 100644 index 00000000000..62454f6cebb --- /dev/null +++ b/service/frontend/adminGrpcHandler.go @@ -0,0 +1,132 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "context" + + "go.uber.org/yarpc" + + adminv1 "github.com/uber/cadence/.gen/proto/admin/v1" + "github.com/uber/cadence/common/types/mapper/proto" +) + +type adminGRPCHandler struct { + h AdminHandler +} + +func newAdminGRPCHandler(h AdminHandler) adminGRPCHandler { + return adminGRPCHandler{h} +} + +func (g adminGRPCHandler) register(dispatcher *yarpc.Dispatcher) { + dispatcher.Register(adminv1.BuildAdminAPIYARPCProcedures(g)) +} + +func (g adminGRPCHandler) AddSearchAttribute(ctx context.Context, request *adminv1.AddSearchAttributeRequest) (*adminv1.AddSearchAttributeResponse, error) { + err := g.h.AddSearchAttribute(withGRPCTag(ctx), proto.ToAdminAddSearchAttributeRequest(request)) + return &adminv1.AddSearchAttributeResponse{}, proto.FromError(err) +} + +func (g adminGRPCHandler) CloseShard(ctx context.Context, request *adminv1.CloseShardRequest) (*adminv1.CloseShardResponse, error) { + err := g.h.CloseShard(withGRPCTag(ctx), proto.ToAdminCloseShardRequest(request)) + return &adminv1.CloseShardResponse{}, proto.FromError(err) +} + +func (g adminGRPCHandler) DescribeCluster(ctx context.Context, _ *adminv1.DescribeClusterRequest) (*adminv1.DescribeClusterResponse, error) { + response, err := g.h.DescribeCluster(withGRPCTag(ctx)) + return proto.FromAdminDescribeClusterResponse(response), proto.FromError(err) +} + +func (g adminGRPCHandler) DescribeHistoryHost(ctx context.Context, request *adminv1.DescribeHistoryHostRequest) (*adminv1.DescribeHistoryHostResponse, error) { + response, err := g.h.DescribeHistoryHost(withGRPCTag(ctx), proto.ToAdminDescribeHistoryHostRequest(request)) + return proto.FromAdminDescribeHistoryHostResponse(response), proto.FromError(err) +} + +func (g adminGRPCHandler) DescribeQueue(ctx context.Context, request *adminv1.DescribeQueueRequest) (*adminv1.DescribeQueueResponse, error) { + response, err := g.h.DescribeQueue(withGRPCTag(ctx), proto.ToAdminDescribeQueueRequest(request)) + return proto.FromAdminDescribeQueueResponse(response), proto.FromError(err) +} + +func (g adminGRPCHandler) DescribeWorkflowExecution(ctx context.Context, request *adminv1.DescribeWorkflowExecutionRequest) (*adminv1.DescribeWorkflowExecutionResponse, error) { + response, err := g.h.DescribeWorkflowExecution(withGRPCTag(ctx), proto.ToAdminDescribeWorkflowExecutionRequest(request)) + return proto.FromAdminDescribeWorkflowExecutionResponse(response), proto.FromError(err) +} + +func (g adminGRPCHandler) GetDLQReplicationMessages(ctx context.Context, request *adminv1.GetDLQReplicationMessagesRequest) (*adminv1.GetDLQReplicationMessagesResponse, error) { + response, err := g.h.GetDLQReplicationMessages(withGRPCTag(ctx), proto.ToAdminGetDLQReplicationMessagesRequest(request)) + return proto.FromAdminGetDLQReplicationMessagesResponse(response), proto.FromError(err) +} + +func (g adminGRPCHandler) GetDomainReplicationMessages(ctx context.Context, request *adminv1.GetDomainReplicationMessagesRequest) (*adminv1.GetDomainReplicationMessagesResponse, error) { + response, err := g.h.GetDomainReplicationMessages(withGRPCTag(ctx), proto.ToAdminGetDomainReplicationMessagesRequest(request)) + return proto.FromAdminGetDomainReplicationMessagesResponse(response), proto.FromError(err) +} + +func (g adminGRPCHandler) GetReplicationMessages(ctx context.Context, request *adminv1.GetReplicationMessagesRequest) (*adminv1.GetReplicationMessagesResponse, error) { + response, err := g.h.GetReplicationMessages(withGRPCTag(ctx), proto.ToAdminGetReplicationMessagesRequest(request)) + return proto.FromAdminGetReplicationMessagesResponse(response), proto.FromError(err) +} + +func (g adminGRPCHandler) GetWorkflowExecutionRawHistoryV2(ctx context.Context, request *adminv1.GetWorkflowExecutionRawHistoryV2Request) (*adminv1.GetWorkflowExecutionRawHistoryV2Response, error) { + response, err := g.h.GetWorkflowExecutionRawHistoryV2(withGRPCTag(ctx), proto.ToAdminGetWorkflowExecutionRawHistoryV2Request(request)) + return proto.FromAdminGetWorkflowExecutionRawHistoryV2Response(response), proto.FromError(err) +} + +func (g adminGRPCHandler) MergeDLQMessages(ctx context.Context, request *adminv1.MergeDLQMessagesRequest) (*adminv1.MergeDLQMessagesResponse, error) { + response, err := g.h.MergeDLQMessages(withGRPCTag(ctx), proto.ToAdminMergeDLQMessagesRequest(request)) + return proto.FromAdminMergeDLQMessagesResponse(response), proto.FromError(err) +} + +func (g adminGRPCHandler) PurgeDLQMessages(ctx context.Context, request *adminv1.PurgeDLQMessagesRequest) (*adminv1.PurgeDLQMessagesResponse, error) { + err := g.h.PurgeDLQMessages(withGRPCTag(ctx), proto.ToAdminPurgeDLQMessagesRequest(request)) + return &adminv1.PurgeDLQMessagesResponse{}, proto.FromError(err) +} + +func (g adminGRPCHandler) ReadDLQMessages(ctx context.Context, request *adminv1.ReadDLQMessagesRequest) (*adminv1.ReadDLQMessagesResponse, error) { + response, err := g.h.ReadDLQMessages(withGRPCTag(ctx), proto.ToAdminReadDLQMessagesRequest(request)) + return proto.FromAdminReadDLQMessagesResponse(response), proto.FromError(err) +} + +func (g adminGRPCHandler) ReapplyEvents(ctx context.Context, request *adminv1.ReapplyEventsRequest) (*adminv1.ReapplyEventsResponse, error) { + err := g.h.ReapplyEvents(withGRPCTag(ctx), proto.ToAdminReapplyEventsRequest(request)) + return &adminv1.ReapplyEventsResponse{}, proto.FromError(err) +} + +func (g adminGRPCHandler) RefreshWorkflowTasks(ctx context.Context, request *adminv1.RefreshWorkflowTasksRequest) (*adminv1.RefreshWorkflowTasksResponse, error) { + err := g.h.RefreshWorkflowTasks(withGRPCTag(ctx), proto.ToAdminRefreshWorkflowTasksRequest(request)) + return &adminv1.RefreshWorkflowTasksResponse{}, proto.FromError(err) +} + +func (g adminGRPCHandler) RemoveTask(ctx context.Context, request *adminv1.RemoveTaskRequest) (*adminv1.RemoveTaskResponse, error) { + err := g.h.RemoveTask(withGRPCTag(ctx), proto.ToAdminRemoveTaskRequest(request)) + return &adminv1.RemoveTaskResponse{}, proto.FromError(err) +} + +func (g adminGRPCHandler) ResendReplicationTasks(ctx context.Context, request *adminv1.ResendReplicationTasksRequest) (*adminv1.ResendReplicationTasksResponse, error) { + err := g.h.ResendReplicationTasks(withGRPCTag(ctx), proto.ToAdminResendReplicationTasksRequest(request)) + return &adminv1.ResendReplicationTasksResponse{}, proto.FromError(err) +} + +func (g adminGRPCHandler) ResetQueue(ctx context.Context, request *adminv1.ResetQueueRequest) (*adminv1.ResetQueueResponse, error) { + err := g.h.ResetQueue(withGRPCTag(ctx), proto.ToAdminResetQueueRequest(request)) + return &adminv1.ResetQueueResponse{}, proto.FromError(err) +} diff --git a/service/frontend/grpcHandler.go b/service/frontend/grpcHandler.go new file mode 100644 index 00000000000..c2200b24977 --- /dev/null +++ b/service/frontend/grpcHandler.go @@ -0,0 +1,246 @@ +// Copyright (c) 2021 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package frontend + +import ( + "context" + + "go.uber.org/yarpc" + + apiv1 "github.com/uber/cadence/.gen/proto/api/v1" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/types/mapper/proto" +) + +type grpcHandler struct { + h Handler +} + +func newGrpcHandler(h Handler) grpcHandler { + return grpcHandler{h} +} + +func (g grpcHandler) register(dispatcher *yarpc.Dispatcher) { + dispatcher.Register(apiv1.BuildDomainAPIYARPCProcedures(g)) + dispatcher.Register(apiv1.BuildWorkflowAPIYARPCProcedures(g)) + dispatcher.Register(apiv1.BuildWorkerAPIYARPCProcedures(g)) + dispatcher.Register(apiv1.BuildVisibilityAPIYARPCProcedures(g)) + dispatcher.Register(apiv1.BuildMetaAPIYARPCProcedures(g)) +} + +func (g grpcHandler) Health(ctx context.Context, _ *apiv1.HealthRequest) (*apiv1.HealthResponse, error) { + response, err := g.h.Health(withGRPCTag(ctx)) + return proto.FromHealthResponse(response), proto.FromError(err) +} + +func (g grpcHandler) CountWorkflowExecutions(ctx context.Context, request *apiv1.CountWorkflowExecutionsRequest) (*apiv1.CountWorkflowExecutionsResponse, error) { + response, err := g.h.CountWorkflowExecutions(withGRPCTag(ctx), proto.ToCountWorkflowExecutionsRequest(request)) + return proto.FromCountWorkflowExecutionsResponse(response), proto.FromError(err) +} + +func (g grpcHandler) DeprecateDomain(ctx context.Context, request *apiv1.DeprecateDomainRequest) (*apiv1.DeprecateDomainResponse, error) { + err := g.h.DeprecateDomain(withGRPCTag(ctx), proto.ToDeprecateDomainRequest(request)) + return &apiv1.DeprecateDomainResponse{}, proto.FromError(err) +} + +func (g grpcHandler) DescribeDomain(ctx context.Context, request *apiv1.DescribeDomainRequest) (*apiv1.DescribeDomainResponse, error) { + response, err := g.h.DescribeDomain(withGRPCTag(ctx), proto.ToDescribeDomainRequest(request)) + return proto.FromDescribeDomainResponse(response), proto.FromError(err) +} + +func (g grpcHandler) DescribeTaskList(ctx context.Context, request *apiv1.DescribeTaskListRequest) (*apiv1.DescribeTaskListResponse, error) { + response, err := g.h.DescribeTaskList(withGRPCTag(ctx), proto.ToDescribeTaskListRequest(request)) + return proto.FromDescribeTaskListResponse(response), proto.FromError(err) +} + +func (g grpcHandler) DescribeWorkflowExecution(ctx context.Context, request *apiv1.DescribeWorkflowExecutionRequest) (*apiv1.DescribeWorkflowExecutionResponse, error) { + response, err := g.h.DescribeWorkflowExecution(withGRPCTag(ctx), proto.ToDescribeWorkflowExecutionRequest(request)) + return proto.FromDescribeWorkflowExecutionResponse(response), proto.FromError(err) +} + +func (g grpcHandler) GetClusterInfo(ctx context.Context, _ *apiv1.GetClusterInfoRequest) (*apiv1.GetClusterInfoResponse, error) { + response, err := g.h.GetClusterInfo(withGRPCTag(ctx)) + return proto.FromGetClusterInfoResponse(response), proto.FromError(err) +} + +func (g grpcHandler) GetSearchAttributes(ctx context.Context, _ *apiv1.GetSearchAttributesRequest) (*apiv1.GetSearchAttributesResponse, error) { + response, err := g.h.GetSearchAttributes(withGRPCTag(ctx)) + return proto.FromGetSearchAttributesResponse(response), proto.FromError(err) +} + +func (g grpcHandler) GetWorkflowExecutionHistory(ctx context.Context, request *apiv1.GetWorkflowExecutionHistoryRequest) (*apiv1.GetWorkflowExecutionHistoryResponse, error) { + response, err := g.h.GetWorkflowExecutionHistory(withGRPCTag(ctx), proto.ToGetWorkflowExecutionHistoryRequest(request)) + return proto.FromGetWorkflowExecutionHistoryResponse(response), proto.FromError(err) +} + +func (g grpcHandler) ListArchivedWorkflowExecutions(ctx context.Context, request *apiv1.ListArchivedWorkflowExecutionsRequest) (*apiv1.ListArchivedWorkflowExecutionsResponse, error) { + response, err := g.h.ListArchivedWorkflowExecutions(withGRPCTag(ctx), proto.ToListArchivedWorkflowExecutionsRequest(request)) + return proto.FromListArchivedWorkflowExecutionsResponse(response), proto.FromError(err) +} + +func (g grpcHandler) ListClosedWorkflowExecutions(ctx context.Context, request *apiv1.ListClosedWorkflowExecutionsRequest) (*apiv1.ListClosedWorkflowExecutionsResponse, error) { + response, err := g.h.ListClosedWorkflowExecutions(withGRPCTag(ctx), proto.ToListClosedWorkflowExecutionsRequest(request)) + return proto.FromListClosedWorkflowExecutionsResponse(response), proto.FromError(err) +} + +func (g grpcHandler) ListDomains(ctx context.Context, request *apiv1.ListDomainsRequest) (*apiv1.ListDomainsResponse, error) { + response, err := g.h.ListDomains(withGRPCTag(ctx), proto.ToListDomainsRequest(request)) + return proto.FromListDomainsResponse(response), proto.FromError(err) +} + +func (g grpcHandler) ListOpenWorkflowExecutions(ctx context.Context, request *apiv1.ListOpenWorkflowExecutionsRequest) (*apiv1.ListOpenWorkflowExecutionsResponse, error) { + response, err := g.h.ListOpenWorkflowExecutions(withGRPCTag(ctx), proto.ToListOpenWorkflowExecutionsRequest(request)) + return proto.FromListOpenWorkflowExecutionsResponse(response), proto.FromError(err) +} + +func (g grpcHandler) ListTaskListPartitions(ctx context.Context, request *apiv1.ListTaskListPartitionsRequest) (*apiv1.ListTaskListPartitionsResponse, error) { + response, err := g.h.ListTaskListPartitions(withGRPCTag(ctx), proto.ToListTaskListPartitionsRequest(request)) + return proto.FromListTaskListPartitionsResponse(response), proto.FromError(err) +} + +func (g grpcHandler) ListWorkflowExecutions(ctx context.Context, request *apiv1.ListWorkflowExecutionsRequest) (*apiv1.ListWorkflowExecutionsResponse, error) { + response, err := g.h.ListWorkflowExecutions(withGRPCTag(ctx), proto.ToListWorkflowExecutionsRequest(request)) + return proto.FromListWorkflowExecutionsResponse(response), proto.FromError(err) +} + +func (g grpcHandler) PollForActivityTask(ctx context.Context, request *apiv1.PollForActivityTaskRequest) (*apiv1.PollForActivityTaskResponse, error) { + response, err := g.h.PollForActivityTask(withGRPCTag(ctx), proto.ToPollForActivityTaskRequest(request)) + return proto.FromPollForActivityTaskResponse(response), proto.FromError(err) +} + +func (g grpcHandler) PollForDecisionTask(ctx context.Context, request *apiv1.PollForDecisionTaskRequest) (*apiv1.PollForDecisionTaskResponse, error) { + response, err := g.h.PollForDecisionTask(withGRPCTag(ctx), proto.ToPollForDecisionTaskRequest(request)) + return proto.FromPollForDecisionTaskResponse(response), proto.FromError(err) +} + +func (g grpcHandler) QueryWorkflow(ctx context.Context, request *apiv1.QueryWorkflowRequest) (*apiv1.QueryWorkflowResponse, error) { + response, err := g.h.QueryWorkflow(withGRPCTag(ctx), proto.ToQueryWorkflowRequest(request)) + return proto.FromQueryWorkflowResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RecordActivityTaskHeartbeat(ctx context.Context, request *apiv1.RecordActivityTaskHeartbeatRequest) (*apiv1.RecordActivityTaskHeartbeatResponse, error) { + response, err := g.h.RecordActivityTaskHeartbeat(withGRPCTag(ctx), proto.ToRecordActivityTaskHeartbeatRequest(request)) + return proto.FromRecordActivityTaskHeartbeatResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RecordActivityTaskHeartbeatByID(ctx context.Context, request *apiv1.RecordActivityTaskHeartbeatByIDRequest) (*apiv1.RecordActivityTaskHeartbeatByIDResponse, error) { + response, err := g.h.RecordActivityTaskHeartbeatByID(withGRPCTag(ctx), proto.ToRecordActivityTaskHeartbeatByIDRequest(request)) + return proto.FromRecordActivityTaskHeartbeatByIDResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RegisterDomain(ctx context.Context, request *apiv1.RegisterDomainRequest) (*apiv1.RegisterDomainResponse, error) { + err := g.h.RegisterDomain(withGRPCTag(ctx), proto.ToRegisterDomainRequest(request)) + return &apiv1.RegisterDomainResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RequestCancelWorkflowExecution(ctx context.Context, request *apiv1.RequestCancelWorkflowExecutionRequest) (*apiv1.RequestCancelWorkflowExecutionResponse, error) { + err := g.h.RequestCancelWorkflowExecution(withGRPCTag(ctx), proto.ToRequestCancelWorkflowExecutionRequest(request)) + return &apiv1.RequestCancelWorkflowExecutionResponse{}, proto.FromError(err) +} + +func (g grpcHandler) ResetStickyTaskList(ctx context.Context, request *apiv1.ResetStickyTaskListRequest) (*apiv1.ResetStickyTaskListResponse, error) { + _, err := g.h.ResetStickyTaskList(withGRPCTag(ctx), proto.ToResetStickyTaskListRequest(request)) + return &apiv1.ResetStickyTaskListResponse{}, proto.FromError(err) +} + +func (g grpcHandler) ResetWorkflowExecution(ctx context.Context, request *apiv1.ResetWorkflowExecutionRequest) (*apiv1.ResetWorkflowExecutionResponse, error) { + response, err := g.h.ResetWorkflowExecution(withGRPCTag(ctx), proto.ToResetWorkflowExecutionRequest(request)) + return proto.FromResetWorkflowExecutionResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RespondActivityTaskCanceled(ctx context.Context, request *apiv1.RespondActivityTaskCanceledRequest) (*apiv1.RespondActivityTaskCanceledResponse, error) { + err := g.h.RespondActivityTaskCanceled(withGRPCTag(ctx), proto.ToRespondActivityTaskCanceledRequest(request)) + return &apiv1.RespondActivityTaskCanceledResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RespondActivityTaskCanceledByID(ctx context.Context, request *apiv1.RespondActivityTaskCanceledByIDRequest) (*apiv1.RespondActivityTaskCanceledByIDResponse, error) { + err := g.h.RespondActivityTaskCanceledByID(withGRPCTag(ctx), proto.ToRespondActivityTaskCanceledByIDRequest(request)) + return &apiv1.RespondActivityTaskCanceledByIDResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RespondActivityTaskCompleted(ctx context.Context, request *apiv1.RespondActivityTaskCompletedRequest) (*apiv1.RespondActivityTaskCompletedResponse, error) { + err := g.h.RespondActivityTaskCompleted(withGRPCTag(ctx), proto.ToRespondActivityTaskCompletedRequest(request)) + return &apiv1.RespondActivityTaskCompletedResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RespondActivityTaskCompletedByID(ctx context.Context, request *apiv1.RespondActivityTaskCompletedByIDRequest) (*apiv1.RespondActivityTaskCompletedByIDResponse, error) { + err := g.h.RespondActivityTaskCompletedByID(withGRPCTag(ctx), proto.ToRespondActivityTaskCompletedByIDRequest(request)) + return &apiv1.RespondActivityTaskCompletedByIDResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RespondActivityTaskFailed(ctx context.Context, request *apiv1.RespondActivityTaskFailedRequest) (*apiv1.RespondActivityTaskFailedResponse, error) { + err := g.h.RespondActivityTaskFailed(withGRPCTag(ctx), proto.ToRespondActivityTaskFailedRequest(request)) + return &apiv1.RespondActivityTaskFailedResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RespondActivityTaskFailedByID(ctx context.Context, request *apiv1.RespondActivityTaskFailedByIDRequest) (*apiv1.RespondActivityTaskFailedByIDResponse, error) { + err := g.h.RespondActivityTaskFailedByID(withGRPCTag(ctx), proto.ToRespondActivityTaskFailedByIDRequest(request)) + return &apiv1.RespondActivityTaskFailedByIDResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RespondDecisionTaskCompleted(ctx context.Context, request *apiv1.RespondDecisionTaskCompletedRequest) (*apiv1.RespondDecisionTaskCompletedResponse, error) { + response, err := g.h.RespondDecisionTaskCompleted(withGRPCTag(ctx), proto.ToRespondDecisionTaskCompletedRequest(request)) + return proto.FromRespondDecisionTaskCompletedResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RespondDecisionTaskFailed(ctx context.Context, request *apiv1.RespondDecisionTaskFailedRequest) (*apiv1.RespondDecisionTaskFailedResponse, error) { + err := g.h.RespondDecisionTaskFailed(withGRPCTag(ctx), proto.ToRespondDecisionTaskFailedRequest(request)) + return &apiv1.RespondDecisionTaskFailedResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RespondQueryTaskCompleted(ctx context.Context, request *apiv1.RespondQueryTaskCompletedRequest) (*apiv1.RespondQueryTaskCompletedResponse, error) { + err := g.h.RespondQueryTaskCompleted(withGRPCTag(ctx), proto.ToRespondQueryTaskCompletedRequest(request)) + return &apiv1.RespondQueryTaskCompletedResponse{}, proto.FromError(err) +} + +func (g grpcHandler) ScanWorkflowExecutions(ctx context.Context, request *apiv1.ScanWorkflowExecutionsRequest) (*apiv1.ScanWorkflowExecutionsResponse, error) { + response, err := g.h.ScanWorkflowExecutions(withGRPCTag(ctx), proto.ToScanWorkflowExecutionsRequest(request)) + return proto.FromScanWorkflowExecutionsResponse(response), proto.FromError(err) +} + +func (g grpcHandler) SignalWithStartWorkflowExecution(ctx context.Context, request *apiv1.SignalWithStartWorkflowExecutionRequest) (*apiv1.SignalWithStartWorkflowExecutionResponse, error) { + response, err := g.h.SignalWithStartWorkflowExecution(withGRPCTag(ctx), proto.ToSignalWithStartWorkflowExecutionRequest(request)) + return proto.FromSignalWithStartWorkflowExecutionResponse(response), proto.FromError(err) +} + +func (g grpcHandler) SignalWorkflowExecution(ctx context.Context, request *apiv1.SignalWorkflowExecutionRequest) (*apiv1.SignalWorkflowExecutionResponse, error) { + err := g.h.SignalWorkflowExecution(withGRPCTag(ctx), proto.ToSignalWorkflowExecutionRequest(request)) + return &apiv1.SignalWorkflowExecutionResponse{}, proto.FromError(err) +} + +func (g grpcHandler) StartWorkflowExecution(ctx context.Context, request *apiv1.StartWorkflowExecutionRequest) (*apiv1.StartWorkflowExecutionResponse, error) { + response, err := g.h.StartWorkflowExecution(withGRPCTag(ctx), proto.ToStartWorkflowExecutionRequest(request)) + return proto.FromStartWorkflowExecutionResponse(response), proto.FromError(err) +} + +func (g grpcHandler) TerminateWorkflowExecution(ctx context.Context, request *apiv1.TerminateWorkflowExecutionRequest) (*apiv1.TerminateWorkflowExecutionResponse, error) { + err := g.h.TerminateWorkflowExecution(withGRPCTag(ctx), proto.ToTerminateWorkflowExecutionRequest(request)) + return &apiv1.TerminateWorkflowExecutionResponse{}, proto.FromError(err) +} + +func (g grpcHandler) UpdateDomain(ctx context.Context, request *apiv1.UpdateDomainRequest) (*apiv1.UpdateDomainResponse, error) { + response, err := g.h.UpdateDomain(withGRPCTag(ctx), proto.ToUpdateDomainRequest(request)) + return proto.FromUpdateDomainResponse(response), proto.FromError(err) +} + +func withGRPCTag(ctx context.Context) context.Context { + return metrics.TagContext(ctx, metrics.GPRCTransportTag()) +} diff --git a/service/frontend/service.go b/service/frontend/service.go index f3b155532e2..924eb4e7b26 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -259,11 +259,17 @@ func (s *Service) Start() { thriftHandler := NewThriftHandler(handler) thriftHandler.register(s.GetDispatcher()) + grpcHandler := newGrpcHandler(handler) + grpcHandler.register(s.GetDispatcher()) + s.adminHandler = NewAdminHandler(s, s.params, s.config) adminThriftHandler := NewAdminThriftHandler(s.adminHandler) adminThriftHandler.register(s.GetDispatcher()) + adminGRPCHandler := newAdminGRPCHandler(s.adminHandler) + adminGRPCHandler.register(s.GetDispatcher()) + // must start resource first s.Resource.Start() s.handler.Start() diff --git a/service/history/grpcHandler.go b/service/history/grpcHandler.go new file mode 100644 index 00000000000..215279dc8c0 --- /dev/null +++ b/service/history/grpcHandler.go @@ -0,0 +1,249 @@ +// Copyright (c) 2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + + "go.uber.org/yarpc" + + apiv1 "github.com/uber/cadence/.gen/proto/api/v1" + historyv1 "github.com/uber/cadence/.gen/proto/history/v1" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/types/mapper/proto" +) + +type grpcHandler struct { + h Handler +} + +func newGRPCHandler(h Handler) grpcHandler { + return grpcHandler{h} +} + +func (g grpcHandler) register(dispatcher *yarpc.Dispatcher) { + dispatcher.Register(historyv1.BuildHistoryAPIYARPCProcedures(g)) + dispatcher.Register(apiv1.BuildMetaAPIYARPCProcedures(g)) +} + +func (g grpcHandler) Health(ctx context.Context, _ *apiv1.HealthRequest) (*apiv1.HealthResponse, error) { + response, err := g.h.Health(withGRPCTag(ctx)) + return proto.FromHealthResponse(response), proto.FromError(err) +} + +func (g grpcHandler) CloseShard(ctx context.Context, request *historyv1.CloseShardRequest) (*historyv1.CloseShardResponse, error) { + err := g.h.CloseShard(withGRPCTag(ctx), proto.ToHistoryCloseShardRequest(request)) + return &historyv1.CloseShardResponse{}, proto.FromError(err) +} + +func (g grpcHandler) DescribeHistoryHost(ctx context.Context, request *historyv1.DescribeHistoryHostRequest) (*historyv1.DescribeHistoryHostResponse, error) { + response, err := g.h.DescribeHistoryHost(withGRPCTag(ctx), proto.ToHistoryDescribeHistoryHostRequest(request)) + return proto.FromHistoryDescribeHistoryHostResponse(response), proto.FromError(err) +} + +func (g grpcHandler) DescribeMutableState(ctx context.Context, request *historyv1.DescribeMutableStateRequest) (*historyv1.DescribeMutableStateResponse, error) { + response, err := g.h.DescribeMutableState(withGRPCTag(ctx), proto.ToHistoryDescribeMutableStateRequest(request)) + return proto.FromHistoryDescribeMutableStateResponse(response), proto.FromError(err) +} + +func (g grpcHandler) DescribeQueue(ctx context.Context, request *historyv1.DescribeQueueRequest) (*historyv1.DescribeQueueResponse, error) { + response, err := g.h.DescribeQueue(withGRPCTag(ctx), proto.ToHistoryDescribeQueueRequest(request)) + return proto.FromHistoryDescribeQueueResponse(response), proto.FromError(err) +} + +func (g grpcHandler) DescribeWorkflowExecution(ctx context.Context, request *historyv1.DescribeWorkflowExecutionRequest) (*historyv1.DescribeWorkflowExecutionResponse, error) { + response, err := g.h.DescribeWorkflowExecution(withGRPCTag(ctx), proto.ToHistoryDescribeWorkflowExecutionRequest(request)) + return proto.FromHistoryDescribeWorkflowExecutionResponse(response), proto.FromError(err) +} + +func (g grpcHandler) GetDLQReplicationMessages(ctx context.Context, request *historyv1.GetDLQReplicationMessagesRequest) (*historyv1.GetDLQReplicationMessagesResponse, error) { + response, err := g.h.GetDLQReplicationMessages(withGRPCTag(ctx), proto.ToHistoryGetDLQReplicationMessagesRequest(request)) + return proto.FromHistoryGetDLQReplicationMessagesResponse(response), proto.FromError(err) +} + +func (g grpcHandler) GetMutableState(ctx context.Context, request *historyv1.GetMutableStateRequest) (*historyv1.GetMutableStateResponse, error) { + response, err := g.h.GetMutableState(withGRPCTag(ctx), proto.ToHistoryGetMutableStateRequest(request)) + return proto.FromHistoryGetMutableStateResponse(response), proto.FromError(err) +} + +func (g grpcHandler) GetReplicationMessages(ctx context.Context, request *historyv1.GetReplicationMessagesRequest) (*historyv1.GetReplicationMessagesResponse, error) { + response, err := g.h.GetReplicationMessages(withGRPCTag(ctx), proto.ToHistoryGetReplicationMessagesRequest(request)) + return proto.FromHistoryGetReplicationMessagesResponse(response), proto.FromError(err) +} + +func (g grpcHandler) MergeDLQMessages(ctx context.Context, request *historyv1.MergeDLQMessagesRequest) (*historyv1.MergeDLQMessagesResponse, error) { + response, err := g.h.MergeDLQMessages(withGRPCTag(ctx), proto.ToHistoryMergeDLQMessagesRequest(request)) + return proto.FromHistoryMergeDLQMessagesResponse(response), proto.FromError(err) +} + +func (g grpcHandler) NotifyFailoverMarkers(ctx context.Context, request *historyv1.NotifyFailoverMarkersRequest) (*historyv1.NotifyFailoverMarkersResponse, error) { + err := g.h.NotifyFailoverMarkers(withGRPCTag(ctx), proto.ToHistoryNotifyFailoverMarkersRequest(request)) + return &historyv1.NotifyFailoverMarkersResponse{}, proto.FromError(err) +} + +func (g grpcHandler) PollMutableState(ctx context.Context, request *historyv1.PollMutableStateRequest) (*historyv1.PollMutableStateResponse, error) { + response, err := g.h.PollMutableState(withGRPCTag(ctx), proto.ToHistoryPollMutableStateRequest(request)) + return proto.FromHistoryPollMutableStateResponse(response), proto.FromError(err) +} + +func (g grpcHandler) PurgeDLQMessages(ctx context.Context, request *historyv1.PurgeDLQMessagesRequest) (*historyv1.PurgeDLQMessagesResponse, error) { + err := g.h.PurgeDLQMessages(withGRPCTag(ctx), proto.ToHistoryPurgeDLQMessagesRequest(request)) + return &historyv1.PurgeDLQMessagesResponse{}, proto.FromError(err) +} + +func (g grpcHandler) QueryWorkflow(ctx context.Context, request *historyv1.QueryWorkflowRequest) (*historyv1.QueryWorkflowResponse, error) { + response, err := g.h.QueryWorkflow(withGRPCTag(ctx), proto.ToHistoryQueryWorkflowRequest(request)) + return proto.FromHistoryQueryWorkflowResponse(response), proto.FromError(err) +} + +func (g grpcHandler) ReadDLQMessages(ctx context.Context, request *historyv1.ReadDLQMessagesRequest) (*historyv1.ReadDLQMessagesResponse, error) { + response, err := g.h.ReadDLQMessages(withGRPCTag(ctx), proto.ToHistoryReadDLQMessagesRequest(request)) + return proto.FromHistoryReadDLQMessagesResponse(response), proto.FromError(err) +} + +func (g grpcHandler) ReapplyEvents(ctx context.Context, request *historyv1.ReapplyEventsRequest) (*historyv1.ReapplyEventsResponse, error) { + err := g.h.ReapplyEvents(withGRPCTag(ctx), proto.ToHistoryReapplyEventsRequest(request)) + return &historyv1.ReapplyEventsResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RecordActivityTaskHeartbeat(ctx context.Context, request *historyv1.RecordActivityTaskHeartbeatRequest) (*historyv1.RecordActivityTaskHeartbeatResponse, error) { + response, err := g.h.RecordActivityTaskHeartbeat(withGRPCTag(ctx), proto.ToHistoryRecordActivityTaskHeartbeatRequest(request)) + return proto.FromHistoryRecordActivityTaskHeartbeatResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RecordActivityTaskStarted(ctx context.Context, request *historyv1.RecordActivityTaskStartedRequest) (*historyv1.RecordActivityTaskStartedResponse, error) { + response, err := g.h.RecordActivityTaskStarted(withGRPCTag(ctx), proto.ToHistoryRecordActivityTaskStartedRequest(request)) + return proto.FromHistoryRecordActivityTaskStartedResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RecordChildExecutionCompleted(ctx context.Context, request *historyv1.RecordChildExecutionCompletedRequest) (*historyv1.RecordChildExecutionCompletedResponse, error) { + err := g.h.RecordChildExecutionCompleted(withGRPCTag(ctx), proto.ToHistoryRecordChildExecutionCompletedRequest(request)) + return &historyv1.RecordChildExecutionCompletedResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RecordDecisionTaskStarted(ctx context.Context, request *historyv1.RecordDecisionTaskStartedRequest) (*historyv1.RecordDecisionTaskStartedResponse, error) { + response, err := g.h.RecordDecisionTaskStarted(withGRPCTag(ctx), proto.ToHistoryRecordDecisionTaskStartedRequest(request)) + return proto.FromHistoryRecordDecisionTaskStartedResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RefreshWorkflowTasks(ctx context.Context, request *historyv1.RefreshWorkflowTasksRequest) (*historyv1.RefreshWorkflowTasksResponse, error) { + err := g.h.RefreshWorkflowTasks(withGRPCTag(ctx), proto.ToHistoryRefreshWorkflowTasksRequest(request)) + return &historyv1.RefreshWorkflowTasksResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RemoveSignalMutableState(ctx context.Context, request *historyv1.RemoveSignalMutableStateRequest) (*historyv1.RemoveSignalMutableStateResponse, error) { + err := g.h.RemoveSignalMutableState(withGRPCTag(ctx), proto.ToHistoryRemoveSignalMutableStateRequest(request)) + return &historyv1.RemoveSignalMutableStateResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RemoveTask(ctx context.Context, request *historyv1.RemoveTaskRequest) (*historyv1.RemoveTaskResponse, error) { + err := g.h.RemoveTask(withGRPCTag(ctx), proto.ToHistoryRemoveTaskRequest(request)) + return &historyv1.RemoveTaskResponse{}, proto.FromError(err) +} + +func (g grpcHandler) ReplicateEventsV2(ctx context.Context, request *historyv1.ReplicateEventsV2Request) (*historyv1.ReplicateEventsV2Response, error) { + err := g.h.ReplicateEventsV2(withGRPCTag(ctx), proto.ToHistoryReplicateEventsV2Request(request)) + return &historyv1.ReplicateEventsV2Response{}, proto.FromError(err) +} + +func (g grpcHandler) RequestCancelWorkflowExecution(ctx context.Context, request *historyv1.RequestCancelWorkflowExecutionRequest) (*historyv1.RequestCancelWorkflowExecutionResponse, error) { + err := g.h.RequestCancelWorkflowExecution(withGRPCTag(ctx), proto.ToHistoryRequestCancelWorkflowExecutionRequest(request)) + return &historyv1.RequestCancelWorkflowExecutionResponse{}, proto.FromError(err) +} + +func (g grpcHandler) ResetQueue(ctx context.Context, request *historyv1.ResetQueueRequest) (*historyv1.ResetQueueResponse, error) { + err := g.h.ResetQueue(withGRPCTag(ctx), proto.ToHistoryResetQueueRequest(request)) + return &historyv1.ResetQueueResponse{}, proto.FromError(err) +} + +func (g grpcHandler) ResetStickyTaskList(ctx context.Context, request *historyv1.ResetStickyTaskListRequest) (*historyv1.ResetStickyTaskListResponse, error) { + _, err := g.h.ResetStickyTaskList(withGRPCTag(ctx), proto.ToHistoryResetStickyTaskListRequest(request)) + return &historyv1.ResetStickyTaskListResponse{}, proto.FromError(err) +} + +func (g grpcHandler) ResetWorkflowExecution(ctx context.Context, request *historyv1.ResetWorkflowExecutionRequest) (*historyv1.ResetWorkflowExecutionResponse, error) { + response, err := g.h.ResetWorkflowExecution(withGRPCTag(ctx), proto.ToHistoryResetWorkflowExecutionRequest(request)) + return proto.FromHistoryResetWorkflowExecutionResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RespondActivityTaskCanceled(ctx context.Context, request *historyv1.RespondActivityTaskCanceledRequest) (*historyv1.RespondActivityTaskCanceledResponse, error) { + err := g.h.RespondActivityTaskCanceled(withGRPCTag(ctx), proto.ToHistoryRespondActivityTaskCanceledRequest(request)) + return &historyv1.RespondActivityTaskCanceledResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RespondActivityTaskCompleted(ctx context.Context, request *historyv1.RespondActivityTaskCompletedRequest) (*historyv1.RespondActivityTaskCompletedResponse, error) { + err := g.h.RespondActivityTaskCompleted(withGRPCTag(ctx), proto.ToHistoryRespondActivityTaskCompletedRequest(request)) + return &historyv1.RespondActivityTaskCompletedResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RespondActivityTaskFailed(ctx context.Context, request *historyv1.RespondActivityTaskFailedRequest) (*historyv1.RespondActivityTaskFailedResponse, error) { + err := g.h.RespondActivityTaskFailed(withGRPCTag(ctx), proto.ToHistoryRespondActivityTaskFailedRequest(request)) + return &historyv1.RespondActivityTaskFailedResponse{}, proto.FromError(err) +} + +func (g grpcHandler) RespondDecisionTaskCompleted(ctx context.Context, request *historyv1.RespondDecisionTaskCompletedRequest) (*historyv1.RespondDecisionTaskCompletedResponse, error) { + response, err := g.h.RespondDecisionTaskCompleted(withGRPCTag(ctx), proto.ToHistoryRespondDecisionTaskCompletedRequest(request)) + return proto.FromHistoryRespondDecisionTaskCompletedResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RespondDecisionTaskFailed(ctx context.Context, request *historyv1.RespondDecisionTaskFailedRequest) (*historyv1.RespondDecisionTaskFailedResponse, error) { + err := g.h.RespondDecisionTaskFailed(withGRPCTag(ctx), proto.ToHistoryRespondDecisionTaskFailedRequest(request)) + return &historyv1.RespondDecisionTaskFailedResponse{}, proto.FromError(err) +} + +func (g grpcHandler) ScheduleDecisionTask(ctx context.Context, request *historyv1.ScheduleDecisionTaskRequest) (*historyv1.ScheduleDecisionTaskResponse, error) { + err := g.h.ScheduleDecisionTask(withGRPCTag(ctx), proto.ToHistoryScheduleDecisionTaskRequest(request)) + return &historyv1.ScheduleDecisionTaskResponse{}, proto.FromError(err) +} + +func (g grpcHandler) SignalWithStartWorkflowExecution(ctx context.Context, request *historyv1.SignalWithStartWorkflowExecutionRequest) (*historyv1.SignalWithStartWorkflowExecutionResponse, error) { + response, err := g.h.SignalWithStartWorkflowExecution(withGRPCTag(ctx), proto.ToHistorySignalWithStartWorkflowExecutionRequest(request)) + return proto.FromHistorySignalWithStartWorkflowExecutionResponse(response), proto.FromError(err) +} + +func (g grpcHandler) SignalWorkflowExecution(ctx context.Context, request *historyv1.SignalWorkflowExecutionRequest) (*historyv1.SignalWorkflowExecutionResponse, error) { + err := g.h.SignalWorkflowExecution(withGRPCTag(ctx), proto.ToHistorySignalWorkflowExecutionRequest(request)) + return &historyv1.SignalWorkflowExecutionResponse{}, proto.FromError(err) +} + +func (g grpcHandler) StartWorkflowExecution(ctx context.Context, request *historyv1.StartWorkflowExecutionRequest) (*historyv1.StartWorkflowExecutionResponse, error) { + response, err := g.h.StartWorkflowExecution(withGRPCTag(ctx), proto.ToHistoryStartWorkflowExecutionRequest(request)) + return proto.FromHistoryStartWorkflowExecutionResponse(response), proto.FromError(err) +} + +func (g grpcHandler) SyncActivity(ctx context.Context, request *historyv1.SyncActivityRequest) (*historyv1.SyncActivityResponse, error) { + err := g.h.SyncActivity(withGRPCTag(ctx), proto.ToHistorySyncActivityRequest(request)) + return &historyv1.SyncActivityResponse{}, proto.FromError(err) +} + +func (g grpcHandler) SyncShardStatus(ctx context.Context, request *historyv1.SyncShardStatusRequest) (*historyv1.SyncShardStatusResponse, error) { + err := g.h.SyncShardStatus(withGRPCTag(ctx), proto.ToHistorySyncShardStatusRequest(request)) + return &historyv1.SyncShardStatusResponse{}, proto.FromError(err) +} + +func (g grpcHandler) TerminateWorkflowExecution(ctx context.Context, request *historyv1.TerminateWorkflowExecutionRequest) (*historyv1.TerminateWorkflowExecutionResponse, error) { + err := g.h.TerminateWorkflowExecution(withGRPCTag(ctx), proto.ToHistoryTerminateWorkflowExecutionRequest(request)) + return &historyv1.TerminateWorkflowExecutionResponse{}, proto.FromError(err) +} + +func withGRPCTag(ctx context.Context) context.Context { + return metrics.TagContext(ctx, metrics.GPRCTransportTag()) +} diff --git a/service/history/service.go b/service/history/service.go index 62a9251de8c..b9913e77680 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -127,6 +127,9 @@ func (s *Service) Start() { thriftHandler := NewThriftHandler(s.handler) thriftHandler.register(s.GetDispatcher()) + grpcHandler := newGRPCHandler(s.handler) + grpcHandler.register(s.GetDispatcher()) + // must start resource first s.Resource.Start() s.handler.Start() diff --git a/service/matching/grpcHandler.go b/service/matching/grpcHandler.go new file mode 100644 index 00000000000..c476056af92 --- /dev/null +++ b/service/matching/grpcHandler.go @@ -0,0 +1,99 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "context" + + "go.uber.org/yarpc" + + apiv1 "github.com/uber/cadence/.gen/proto/api/v1" + matchingv1 "github.com/uber/cadence/.gen/proto/matching/v1" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/types/mapper/proto" +) + +type grpcHandler struct { + h Handler +} + +func newGRPCHandler(h Handler) grpcHandler { + return grpcHandler{h} +} + +func (g grpcHandler) register(dispatcher *yarpc.Dispatcher) { + dispatcher.Register(matchingv1.BuildMatchingAPIYARPCProcedures(g)) + dispatcher.Register(apiv1.BuildMetaAPIYARPCProcedures(g)) +} + +func (g grpcHandler) Health(ctx context.Context, _ *apiv1.HealthRequest) (*apiv1.HealthResponse, error) { + response, err := g.h.Health(withGRPCTag(ctx)) + return proto.FromHealthResponse(response), proto.FromError(err) +} + +func (g grpcHandler) AddActivityTask(ctx context.Context, request *matchingv1.AddActivityTaskRequest) (*matchingv1.AddActivityTaskResponse, error) { + err := g.h.AddActivityTask(withGRPCTag(ctx), proto.ToMatchingAddActivityTaskRequest(request)) + return &matchingv1.AddActivityTaskResponse{}, proto.FromError(err) +} + +func (g grpcHandler) AddDecisionTask(ctx context.Context, request *matchingv1.AddDecisionTaskRequest) (*matchingv1.AddDecisionTaskResponse, error) { + err := g.h.AddDecisionTask(withGRPCTag(ctx), proto.ToMatchingAddDecisionTaskRequest(request)) + return &matchingv1.AddDecisionTaskResponse{}, proto.FromError(err) +} + +func (g grpcHandler) CancelOutstandingPoll(ctx context.Context, request *matchingv1.CancelOutstandingPollRequest) (*matchingv1.CancelOutstandingPollResponse, error) { + err := g.h.CancelOutstandingPoll(withGRPCTag(ctx), proto.ToMatchingCancelOutstandingPollRequest(request)) + return &matchingv1.CancelOutstandingPollResponse{}, proto.FromError(err) +} + +func (g grpcHandler) DescribeTaskList(ctx context.Context, request *matchingv1.DescribeTaskListRequest) (*matchingv1.DescribeTaskListResponse, error) { + response, err := g.h.DescribeTaskList(withGRPCTag(ctx), proto.ToMatchingDescribeTaskListRequest(request)) + return proto.FromMatchingDescribeTaskListResponse(response), proto.FromError(err) +} + +func (g grpcHandler) ListTaskListPartitions(ctx context.Context, request *matchingv1.ListTaskListPartitionsRequest) (*matchingv1.ListTaskListPartitionsResponse, error) { + response, err := g.h.ListTaskListPartitions(withGRPCTag(ctx), proto.ToMatchingListTaskListPartitionsRequest(request)) + return proto.FromMatchingListTaskListPartitionsResponse(response), proto.FromError(err) +} + +func (g grpcHandler) PollForActivityTask(ctx context.Context, request *matchingv1.PollForActivityTaskRequest) (*matchingv1.PollForActivityTaskResponse, error) { + response, err := g.h.PollForActivityTask(withGRPCTag(ctx), proto.ToMatchingPollForActivityTaskRequest(request)) + return proto.FromMatchingPollForActivityTaskResponse(response), proto.FromError(err) +} + +func (g grpcHandler) PollForDecisionTask(ctx context.Context, request *matchingv1.PollForDecisionTaskRequest) (*matchingv1.PollForDecisionTaskResponse, error) { + response, err := g.h.PollForDecisionTask(withGRPCTag(ctx), proto.ToMatchingPollForDecisionTaskRequest(request)) + return proto.FromMatchingPollForDecisionTaskResponse(response), proto.FromError(err) +} + +func (g grpcHandler) QueryWorkflow(ctx context.Context, request *matchingv1.QueryWorkflowRequest) (*matchingv1.QueryWorkflowResponse, error) { + response, err := g.h.QueryWorkflow(withGRPCTag(ctx), proto.ToMatchingQueryWorkflowRequest(request)) + return proto.FromMatchingQueryWorkflowResponse(response), proto.FromError(err) +} + +func (g grpcHandler) RespondQueryTaskCompleted(ctx context.Context, request *matchingv1.RespondQueryTaskCompletedRequest) (*matchingv1.RespondQueryTaskCompletedResponse, error) { + err := g.h.RespondQueryTaskCompleted(withGRPCTag(ctx), proto.ToMatchingRespondQueryTaskCompletedRequest(request)) + return &matchingv1.RespondQueryTaskCompletedResponse{}, proto.FromError(err) +} + +func withGRPCTag(ctx context.Context) context.Context { + return metrics.TagContext(ctx, metrics.GPRCTransportTag()) +} diff --git a/service/matching/service.go b/service/matching/service.go index 785af094efe..933f054c317 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -94,6 +94,9 @@ func (s *Service) Start() { thriftHandler := NewThriftHandler(s.handler) thriftHandler.register(s.GetDispatcher()) + grpcHandler := newGRPCHandler(s.handler) + grpcHandler.register(s.GetDispatcher()) + // must start base service first s.Resource.Start() s.handler.Start()