From dc2d9bf3d513e1cfa2b6daf3ac11a67309d7aec5 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Fri, 25 Jan 2019 17:33:32 -0800 Subject: [PATCH] Add frontend DC redirection functionality and policy (#1409) * Add new frontend API redirection layer * Wire DC redirection policy config --- Makefile | 12 + client/clientBean.go | 13 + client/clientBean_mock.go | 17 + client/clientfactory.go | 40 +- client/frontend/client.go | 20 +- client/frontend/interface.go | 2 +- client/frontend/metricClient.go | 21 +- client/frontend/retryableClient.go | 18 +- client/public/client.go | 566 ++++++++++++++++ client/public/interface.go | 30 + client/public/retryableClient.go | 475 ++++++++++++++ cmd/server/server.go | 2 + common/cluster/metadata.go | 8 +- common/metrics/defs.go | 3 + common/service/config/config.go | 8 + common/service/service.go | 35 +- common/util.go | 13 + config/development.yaml | 4 + config/development_active.yaml | 18 + config/development_other.yaml | 143 ++++ config/development_standby.yaml | 17 + docker/config_template.yaml | 4 + host/onebox.go | 7 +- service/frontend/adminHandler.go | 2 +- service/frontend/dcRedirectionHandler.go | 613 ++++++++++++++++++ service/frontend/dcRedirectionPolicy.go | 153 +++++ service/frontend/dcRedirectionPolicy_test.go | 240 +++++++ service/frontend/service.go | 24 +- service/frontend/workflowHandler.go | 9 +- service/frontend/workflowHandler_test.go | 4 +- service/history/handler.go | 21 +- service/history/historyEngine.go | 4 +- service/matching/handler.go | 2 +- service/worker/service.go | 16 +- service/worker/sysworkflow/archival_client.go | 6 +- service/worker/sysworkflow/sysworker.go | 8 +- 36 files changed, 2520 insertions(+), 58 deletions(-) create mode 100644 client/public/client.go create mode 100644 client/public/interface.go create mode 100644 client/public/retryableClient.go create mode 100644 config/development_other.yaml create mode 100644 service/frontend/dcRedirectionHandler.go create mode 100644 service/frontend/dcRedirectionPolicy.go create mode 100644 service/frontend/dcRedirectionPolicy_test.go diff --git a/Makefile b/Makefile index e4620643e2e..7e68cd6173f 100644 --- a/Makefile +++ b/Makefile @@ -194,6 +194,7 @@ install-schema-cdc: bins ./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_visibility_active --rf 1 ./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_active setup-schema -v 0.0 ./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_active update-schema -d ./schema/cassandra/visibility/versioned + @echo Setting up cadence_standby key space ./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_standby --rf 1 ./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_standby setup-schema -v 0.0 @@ -202,8 +203,19 @@ install-schema-cdc: bins ./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_standby setup-schema -v 0.0 ./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_standby update-schema -d ./schema/cassandra/visibility/versioned + @echo Setting up cadence_other key space + ./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_other --rf 1 + ./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_other setup-schema -v 0.0 + ./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_other update-schema -d ./schema/cassandra/cadence/versioned + ./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_visibility_other --rf 1 + ./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_other setup-schema -v 0.0 + ./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_other update-schema -d ./schema/cassandra/visibility/versioned + start-cdc-active: bins ./cadence-server --zone active start start-cdc-standby: bins ./cadence-server --zone standby start + +start-cdc-other: bins + ./cadence-server --zone other start diff --git a/client/clientBean.go b/client/clientBean.go index fc3b8757f86..a8d2fa6f58c 100644 --- a/client/clientBean.go +++ b/client/clientBean.go @@ -32,6 +32,7 @@ import ( "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" + "github.com/uber/cadence/client/public" "github.com/uber/cadence/common/cluster" ) @@ -45,6 +46,7 @@ type ( GetHistoryClient() history.Client GetMatchingClient() matching.Client GetFrontendClient() frontend.Client + GetPublicClient() public.Client GetRemoteAdminClient(cluster string) admin.Client GetRemoteFrontendClient(cluster string) frontend.Client } @@ -58,6 +60,7 @@ type ( historyClient history.Client matchingClient matching.Client frontendClient frontend.Client + publicClient public.Client remoteAdminClients map[string]admin.Client remoteFrontendClients map[string]frontend.Client } @@ -84,6 +87,11 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust return nil, err } + publicClient, err := factory.NewPublicClient() + if err != nil { + return nil, err + } + remoteAdminClients := map[string]admin.Client{} remoteFrontendClients := map[string]frontend.Client{} for cluster, address := range clusterMetadata.GetAllClientAddress() { @@ -119,6 +127,7 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust historyClient: historyClient, matchingClient: matchingClient, frontendClient: frontendClient, + publicClient: publicClient, remoteAdminClients: remoteAdminClients, remoteFrontendClients: remoteFrontendClients, }, nil @@ -136,6 +145,10 @@ func (h *clientBeanImpl) GetFrontendClient() frontend.Client { return h.frontendClient } +func (h *clientBeanImpl) GetPublicClient() public.Client { + return h.publicClient +} + func (h *clientBeanImpl) GetRemoteAdminClient(cluster string) admin.Client { client, ok := h.remoteAdminClients[cluster] if !ok { diff --git a/client/clientBean_mock.go b/client/clientBean_mock.go index af40570080e..89812ffefba 100644 --- a/client/clientBean_mock.go +++ b/client/clientBean_mock.go @@ -26,6 +26,7 @@ import ( "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" + "github.com/uber/cadence/client/public" ) // MockClientBean is an autogenerated mock type for the MockClientBean type @@ -83,6 +84,22 @@ func (_m *MockClientBean) GetFrontendClient() frontend.Client { return r0 } +// GetPublicClient provides a mock function with given fields: +func (_m *MockClientBean) GetPublicClient() public.Client { + ret := _m.Called() + + var r0 public.Client + if rf, ok := ret.Get(0).(func() public.Client); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(public.Client) + } + } + + return r0 +} + // GetRemoteAdminClient provides a mock function with given fields: _a0 func (_m *MockClientBean) GetRemoteAdminClient(_a0 string) admin.Client { ret := _m.Called(_a0) diff --git a/client/clientfactory.go b/client/clientfactory.go index 1069f46b575..2642941934b 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -26,19 +26,22 @@ import ( "go.uber.org/yarpc" "github.com/uber/cadence/.gen/go/admin/adminserviceclient" + "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" "github.com/uber/cadence/.gen/go/history/historyserviceclient" "github.com/uber/cadence/.gen/go/matching/matchingserviceclient" "github.com/uber/cadence/client/admin" "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" + "github.com/uber/cadence/client/public" "github.com/uber/cadence/common" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/metrics" - "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + publicClientInterface "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" ) const ( + publicCaller = "cadence-public-client" frontendCaller = "cadence-frontend-client" historyCaller = "history-service-client" matchingCaller = "matching-service-client" @@ -54,10 +57,12 @@ type Factory interface { NewHistoryClient() (history.Client, error) NewMatchingClient() (matching.Client, error) NewFrontendClient() (frontend.Client, error) + NewPublicClient() (public.Client, error) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) NewMatchingClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (matching.Client, error) NewFrontendClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (frontend.Client, error) + NewPublicClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (public.Client, error) NewAdminClientWithTimeoutAndDispatcher(rpcName string, timeout time.Duration, dispatcher *yarpc.Dispatcher) (admin.Client, error) NewFrontendClientWithTimeoutAndDispatcher(rpcName string, timeout time.Duration, longPollTimeout time.Duration, dispatcher *yarpc.Dispatcher) (frontend.Client, error) @@ -93,6 +98,10 @@ func (cf *rpcClientFactory) NewFrontendClient() (frontend.Client, error) { return cf.NewFrontendClientWithTimeout(frontend.DefaultTimeout, frontend.DefaultLongPollTimeout) } +func (cf *rpcClientFactory) NewPublicClient() (public.Client, error) { + return cf.NewPublicClientWithTimeout(public.DefaultTimeout, public.DefaultLongPollTimeout) +} + func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) { resolver, err := cf.monitor.GetResolver(common.HistoryServiceName) if err != nil { @@ -179,6 +188,35 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeout( return client, nil } +func (cf *rpcClientFactory) NewPublicClientWithTimeout( + timeout time.Duration, + longPollTimeout time.Duration, +) (public.Client, error) { + + // public client and frontend client are essentially the same, + // except the interface definition + resolver, err := cf.monitor.GetResolver(common.FrontendServiceName) + if err != nil { + return nil, err + } + + keyResolver := func(key string) (string, error) { + host, err := resolver.Lookup(key) + if err != nil { + return "", err + } + return host.GetAddress(), nil + } + + clientProvider := func(clientKey string) (interface{}, error) { + dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(publicCaller, common.FrontendServiceName, clientKey) + return publicClientInterface.New(dispatcher.ClientConfig(common.FrontendServiceName)), nil + } + + client := public.NewClient(timeout, longPollTimeout, common.NewClientCache(keyResolver, clientProvider)) + return client, nil +} + func (cf *rpcClientFactory) NewAdminClientWithTimeoutAndDispatcher( rpcName string, timeout time.Duration, diff --git a/client/frontend/client.go b/client/frontend/client.go index 3ba19e81228..09fd00b4b83 100644 --- a/client/frontend/client.go +++ b/client/frontend/client.go @@ -27,9 +27,9 @@ import ( "go.uber.org/yarpc" "github.com/pborman/uuid" + "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" + "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" - "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" - "go.uber.org/cadence/.gen/go/shared" ) var _ Client = (*clientImpl)(nil) @@ -316,6 +316,22 @@ func (c *clientImpl) ResetStickyTaskList( return client.ResetStickyTaskList(ctx, request, opts...) } +func (c *clientImpl) ResetWorkflowExecution( + ctx context.Context, + request *shared.ResetWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*shared.ResetWorkflowExecutionResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.ResetWorkflowExecution(ctx, request, opts...) +} + func (c *clientImpl) RespondActivityTaskCanceled( ctx context.Context, request *shared.RespondActivityTaskCanceledRequest, diff --git a/client/frontend/interface.go b/client/frontend/interface.go index d0d33a3cef5..df8990d6ff5 100644 --- a/client/frontend/interface.go +++ b/client/frontend/interface.go @@ -21,7 +21,7 @@ package frontend import ( - "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" ) // Client is the interface exposed by frontend service client diff --git a/client/frontend/metricClient.go b/client/frontend/metricClient.go index f9a04f9c353..a1bc7d70e9a 100644 --- a/client/frontend/metricClient.go +++ b/client/frontend/metricClient.go @@ -22,9 +22,8 @@ package frontend import ( "context" - + "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common/metrics" - "go.uber.org/cadence/.gen/go/shared" "go.uber.org/yarpc" ) @@ -331,6 +330,24 @@ func (c *metricClient) ResetStickyTaskList( return resp, err } +func (c *metricClient) ResetWorkflowExecution( + ctx context.Context, + request *shared.ResetWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*shared.ResetWorkflowExecutionResponse, error) { + + c.metricsClient.IncCounter(metrics.FrontendClientResetWorkflowExecutionScope, metrics.CadenceClientRequests) + + sw := c.metricsClient.StartTimer(metrics.FrontendClientResetWorkflowExecutionScope, metrics.CadenceClientLatency) + resp, err := c.client.ResetWorkflowExecution(ctx, request, opts...) + sw.Stop() + + if err != nil { + c.metricsClient.IncCounter(metrics.FrontendClientResetWorkflowExecutionScope, metrics.CadenceClientFailures) + } + return resp, err +} + func (c *metricClient) RespondActivityTaskCanceled( ctx context.Context, request *shared.RespondActivityTaskCanceledRequest, diff --git a/client/frontend/retryableClient.go b/client/frontend/retryableClient.go index 83bb65fb911..31ec781c18b 100644 --- a/client/frontend/retryableClient.go +++ b/client/frontend/retryableClient.go @@ -25,8 +25,8 @@ import ( "go.uber.org/yarpc" + "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common/backoff" - "go.uber.org/cadence/.gen/go/shared" ) var _ Client = (*retryableClient)(nil) @@ -290,6 +290,22 @@ func (c *retryableClient) ResetStickyTaskList( return resp, err } +func (c *retryableClient) ResetWorkflowExecution( + ctx context.Context, + request *shared.ResetWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*shared.ResetWorkflowExecutionResponse, error) { + + var resp *shared.ResetWorkflowExecutionResponse + op := func() error { + var err error + resp, err = c.client.ResetWorkflowExecution(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) RespondActivityTaskCanceled( ctx context.Context, request *shared.RespondActivityTaskCanceledRequest, diff --git a/client/public/client.go b/client/public/client.go new file mode 100644 index 00000000000..c3e2e50fd1b --- /dev/null +++ b/client/public/client.go @@ -0,0 +1,566 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package public + +import ( + "context" + "time" + + "go.uber.org/yarpc" + + "github.com/pborman/uuid" + "github.com/uber/cadence/common" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/.gen/go/shared" +) + +var _ Client = (*clientImpl)(nil) + +const ( + // DefaultTimeout is the default timeout used to make calls + DefaultTimeout = 10 * time.Second + // DefaultLongPollTimeout is the long poll default timeout used to make calls + DefaultLongPollTimeout = time.Minute * 3 +) + +type clientImpl struct { + timeout time.Duration + longPollTimeout time.Duration + clients common.ClientCache +} + +// NewClient creates a new frontend service TChannel client +func NewClient( + timeout time.Duration, + longPollTimeout time.Duration, + clients common.ClientCache, +) Client { + return &clientImpl{ + timeout: timeout, + longPollTimeout: longPollTimeout, + clients: clients, + } +} + +func (c *clientImpl) DeprecateDomain( + ctx context.Context, + request *shared.DeprecateDomainRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.DeprecateDomain(ctx, request, opts...) +} + +func (c *clientImpl) DescribeDomain( + ctx context.Context, + request *shared.DescribeDomainRequest, + opts ...yarpc.CallOption, +) (*shared.DescribeDomainResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.DescribeDomain(ctx, request, opts...) +} + +func (c *clientImpl) DescribeTaskList( + ctx context.Context, + request *shared.DescribeTaskListRequest, + opts ...yarpc.CallOption, +) (*shared.DescribeTaskListResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.DescribeTaskList(ctx, request, opts...) +} + +func (c *clientImpl) DescribeWorkflowExecution( + ctx context.Context, + request *shared.DescribeWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*shared.DescribeWorkflowExecutionResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.DescribeWorkflowExecution(ctx, request, opts...) +} + +func (c *clientImpl) GetWorkflowExecutionHistory( + ctx context.Context, + request *shared.GetWorkflowExecutionHistoryRequest, + opts ...yarpc.CallOption, +) (*shared.GetWorkflowExecutionHistoryResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.GetWorkflowExecutionHistory(ctx, request, opts...) +} + +func (c *clientImpl) ListClosedWorkflowExecutions( + ctx context.Context, + request *shared.ListClosedWorkflowExecutionsRequest, + opts ...yarpc.CallOption, +) (*shared.ListClosedWorkflowExecutionsResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.ListClosedWorkflowExecutions(ctx, request, opts...) +} + +func (c *clientImpl) ListDomains( + ctx context.Context, + request *shared.ListDomainsRequest, + opts ...yarpc.CallOption, +) (*shared.ListDomainsResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.ListDomains(ctx, request, opts...) +} + +func (c *clientImpl) ListOpenWorkflowExecutions( + ctx context.Context, + request *shared.ListOpenWorkflowExecutionsRequest, + opts ...yarpc.CallOption, +) (*shared.ListOpenWorkflowExecutionsResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.ListOpenWorkflowExecutions(ctx, request, opts...) +} + +func (c *clientImpl) PollForActivityTask( + ctx context.Context, + request *shared.PollForActivityTaskRequest, + opts ...yarpc.CallOption, +) (*shared.PollForActivityTaskResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createLongPollContext(ctx) + defer cancel() + return client.PollForActivityTask(ctx, request, opts...) +} + +func (c *clientImpl) PollForDecisionTask( + ctx context.Context, + request *shared.PollForDecisionTaskRequest, + opts ...yarpc.CallOption, +) (*shared.PollForDecisionTaskResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createLongPollContext(ctx) + defer cancel() + return client.PollForDecisionTask(ctx, request, opts...) +} + +func (c *clientImpl) QueryWorkflow( + ctx context.Context, + request *shared.QueryWorkflowRequest, + opts ...yarpc.CallOption, +) (*shared.QueryWorkflowResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.QueryWorkflow(ctx, request, opts...) +} + +func (c *clientImpl) RecordActivityTaskHeartbeat( + ctx context.Context, + request *shared.RecordActivityTaskHeartbeatRequest, + opts ...yarpc.CallOption, +) (*shared.RecordActivityTaskHeartbeatResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RecordActivityTaskHeartbeat(ctx, request, opts...) +} + +func (c *clientImpl) RecordActivityTaskHeartbeatByID( + ctx context.Context, + request *shared.RecordActivityTaskHeartbeatByIDRequest, + opts ...yarpc.CallOption, +) (*shared.RecordActivityTaskHeartbeatResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RecordActivityTaskHeartbeatByID(ctx, request, opts...) +} + +func (c *clientImpl) RegisterDomain( + ctx context.Context, + request *shared.RegisterDomainRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RegisterDomain(ctx, request, opts...) +} + +func (c *clientImpl) RequestCancelWorkflowExecution( + ctx context.Context, + request *shared.RequestCancelWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RequestCancelWorkflowExecution(ctx, request, opts...) +} + +func (c *clientImpl) ResetStickyTaskList( + ctx context.Context, + request *shared.ResetStickyTaskListRequest, + opts ...yarpc.CallOption, +) (*shared.ResetStickyTaskListResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.ResetStickyTaskList(ctx, request, opts...) +} + +func (c *clientImpl) RespondActivityTaskCanceled( + ctx context.Context, + request *shared.RespondActivityTaskCanceledRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RespondActivityTaskCanceled(ctx, request, opts...) +} + +func (c *clientImpl) RespondActivityTaskCanceledByID( + ctx context.Context, + request *shared.RespondActivityTaskCanceledByIDRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RespondActivityTaskCanceledByID(ctx, request, opts...) +} + +func (c *clientImpl) RespondActivityTaskCompleted( + ctx context.Context, + request *shared.RespondActivityTaskCompletedRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RespondActivityTaskCompleted(ctx, request, opts...) +} + +func (c *clientImpl) RespondActivityTaskCompletedByID( + ctx context.Context, + request *shared.RespondActivityTaskCompletedByIDRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RespondActivityTaskCompletedByID(ctx, request, opts...) +} + +func (c *clientImpl) RespondActivityTaskFailed( + ctx context.Context, + request *shared.RespondActivityTaskFailedRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RespondActivityTaskFailed(ctx, request, opts...) +} + +func (c *clientImpl) RespondActivityTaskFailedByID( + ctx context.Context, + request *shared.RespondActivityTaskFailedByIDRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RespondActivityTaskFailedByID(ctx, request, opts...) +} + +func (c *clientImpl) RespondDecisionTaskCompleted( + ctx context.Context, + request *shared.RespondDecisionTaskCompletedRequest, + opts ...yarpc.CallOption, +) (*shared.RespondDecisionTaskCompletedResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RespondDecisionTaskCompleted(ctx, request, opts...) +} + +func (c *clientImpl) RespondDecisionTaskFailed( + ctx context.Context, + request *shared.RespondDecisionTaskFailedRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RespondDecisionTaskFailed(ctx, request, opts...) +} + +func (c *clientImpl) RespondQueryTaskCompleted( + ctx context.Context, + request *shared.RespondQueryTaskCompletedRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RespondQueryTaskCompleted(ctx, request, opts...) +} + +func (c *clientImpl) SignalWithStartWorkflowExecution( + ctx context.Context, + request *shared.SignalWithStartWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*shared.StartWorkflowExecutionResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.SignalWithStartWorkflowExecution(ctx, request, opts...) +} + +func (c *clientImpl) SignalWorkflowExecution( + ctx context.Context, + request *shared.SignalWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.SignalWorkflowExecution(ctx, request, opts...) +} + +func (c *clientImpl) StartWorkflowExecution( + ctx context.Context, + request *shared.StartWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*shared.StartWorkflowExecutionResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.StartWorkflowExecution(ctx, request, opts...) +} + +func (c *clientImpl) TerminateWorkflowExecution( + ctx context.Context, + request *shared.TerminateWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) error { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.TerminateWorkflowExecution(ctx, request, opts...) +} + +func (c *clientImpl) UpdateDomain( + ctx context.Context, + request *shared.UpdateDomainRequest, + opts ...yarpc.CallOption, +) (*shared.UpdateDomainResponse, error) { + + opts = common.AggregateYarpcOptions(ctx, opts...) + client, err := c.getRandomClient() + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.UpdateDomain(ctx, request, opts...) +} + +func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) { + if parent == nil { + return context.WithTimeout(context.Background(), c.timeout) + } + return context.WithTimeout(parent, c.timeout) +} + +func (c *clientImpl) createLongPollContext(parent context.Context) (context.Context, context.CancelFunc) { + if parent == nil { + return context.WithTimeout(context.Background(), c.longPollTimeout) + } + return context.WithTimeout(parent, c.longPollTimeout) +} + +func (c *clientImpl) getRandomClient() (workflowserviceclient.Interface, error) { + // generate a random shard key to do load balancing + key := uuid.New() + client, err := c.clients.GetClientForKey(key) + if err != nil { + return nil, err + } + + return client.(workflowserviceclient.Interface), nil +} diff --git a/client/public/interface.go b/client/public/interface.go new file mode 100644 index 00000000000..917aeabafc5 --- /dev/null +++ b/client/public/interface.go @@ -0,0 +1,30 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package public + +import ( + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" +) + +// Client is the interface exposed by frontend service client +type Client interface { + workflowserviceclient.Interface +} diff --git a/client/public/retryableClient.go b/client/public/retryableClient.go new file mode 100644 index 00000000000..3090493adc0 --- /dev/null +++ b/client/public/retryableClient.go @@ -0,0 +1,475 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package public + +import ( + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/common/backoff" + "go.uber.org/cadence/.gen/go/shared" +) + +var _ Client = (*retryableClient)(nil) + +type retryableClient struct { + client Client + policy backoff.RetryPolicy + isRetryable backoff.IsRetryable +} + +// NewRetryableClient creates a new instance of Client with retry policy +func NewRetryableClient(client Client, policy backoff.RetryPolicy, isRetryable backoff.IsRetryable) Client { + return &retryableClient{ + client: client, + policy: policy, + isRetryable: isRetryable, + } +} + +func (c *retryableClient) DeprecateDomain( + ctx context.Context, + request *shared.DeprecateDomainRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.DeprecateDomain(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) DescribeDomain( + ctx context.Context, + request *shared.DescribeDomainRequest, + opts ...yarpc.CallOption, +) (*shared.DescribeDomainResponse, error) { + + var resp *shared.DescribeDomainResponse + op := func() error { + var err error + resp, err = c.client.DescribeDomain(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) DescribeTaskList( + ctx context.Context, + request *shared.DescribeTaskListRequest, + opts ...yarpc.CallOption, +) (*shared.DescribeTaskListResponse, error) { + + var resp *shared.DescribeTaskListResponse + op := func() error { + var err error + resp, err = c.client.DescribeTaskList(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) DescribeWorkflowExecution( + ctx context.Context, + request *shared.DescribeWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*shared.DescribeWorkflowExecutionResponse, error) { + + var resp *shared.DescribeWorkflowExecutionResponse + op := func() error { + var err error + resp, err = c.client.DescribeWorkflowExecution(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) GetWorkflowExecutionHistory( + ctx context.Context, + request *shared.GetWorkflowExecutionHistoryRequest, + opts ...yarpc.CallOption, +) (*shared.GetWorkflowExecutionHistoryResponse, error) { + + var resp *shared.GetWorkflowExecutionHistoryResponse + op := func() error { + var err error + resp, err = c.client.GetWorkflowExecutionHistory(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) ListClosedWorkflowExecutions( + ctx context.Context, + request *shared.ListClosedWorkflowExecutionsRequest, + opts ...yarpc.CallOption, +) (*shared.ListClosedWorkflowExecutionsResponse, error) { + + var resp *shared.ListClosedWorkflowExecutionsResponse + op := func() error { + var err error + resp, err = c.client.ListClosedWorkflowExecutions(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) ListDomains( + ctx context.Context, + request *shared.ListDomainsRequest, + opts ...yarpc.CallOption, +) (*shared.ListDomainsResponse, error) { + + var resp *shared.ListDomainsResponse + op := func() error { + var err error + resp, err = c.client.ListDomains(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) ListOpenWorkflowExecutions( + ctx context.Context, + request *shared.ListOpenWorkflowExecutionsRequest, + opts ...yarpc.CallOption, +) (*shared.ListOpenWorkflowExecutionsResponse, error) { + + var resp *shared.ListOpenWorkflowExecutionsResponse + op := func() error { + var err error + resp, err = c.client.ListOpenWorkflowExecutions(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) PollForActivityTask( + ctx context.Context, + request *shared.PollForActivityTaskRequest, + opts ...yarpc.CallOption, +) (*shared.PollForActivityTaskResponse, error) { + + var resp *shared.PollForActivityTaskResponse + op := func() error { + var err error + resp, err = c.client.PollForActivityTask(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) PollForDecisionTask( + ctx context.Context, + request *shared.PollForDecisionTaskRequest, + opts ...yarpc.CallOption, +) (*shared.PollForDecisionTaskResponse, error) { + + var resp *shared.PollForDecisionTaskResponse + op := func() error { + var err error + resp, err = c.client.PollForDecisionTask(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) QueryWorkflow( + ctx context.Context, + request *shared.QueryWorkflowRequest, + opts ...yarpc.CallOption, +) (*shared.QueryWorkflowResponse, error) { + + var resp *shared.QueryWorkflowResponse + op := func() error { + var err error + resp, err = c.client.QueryWorkflow(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RecordActivityTaskHeartbeat( + ctx context.Context, + request *shared.RecordActivityTaskHeartbeatRequest, + opts ...yarpc.CallOption, +) (*shared.RecordActivityTaskHeartbeatResponse, error) { + + var resp *shared.RecordActivityTaskHeartbeatResponse + op := func() error { + var err error + resp, err = c.client.RecordActivityTaskHeartbeat(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RecordActivityTaskHeartbeatByID( + ctx context.Context, + request *shared.RecordActivityTaskHeartbeatByIDRequest, + opts ...yarpc.CallOption, +) (*shared.RecordActivityTaskHeartbeatResponse, error) { + + var resp *shared.RecordActivityTaskHeartbeatResponse + op := func() error { + var err error + resp, err = c.client.RecordActivityTaskHeartbeatByID(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RegisterDomain( + ctx context.Context, + request *shared.RegisterDomainRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.RegisterDomain(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RequestCancelWorkflowExecution( + ctx context.Context, + request *shared.RequestCancelWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.RequestCancelWorkflowExecution(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) ResetStickyTaskList( + ctx context.Context, + request *shared.ResetStickyTaskListRequest, + opts ...yarpc.CallOption, +) (*shared.ResetStickyTaskListResponse, error) { + + var resp *shared.ResetStickyTaskListResponse + op := func() error { + var err error + resp, err = c.client.ResetStickyTaskList(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RespondActivityTaskCanceled( + ctx context.Context, + request *shared.RespondActivityTaskCanceledRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.RespondActivityTaskCanceled(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RespondActivityTaskCanceledByID( + ctx context.Context, + request *shared.RespondActivityTaskCanceledByIDRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.RespondActivityTaskCanceledByID(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RespondActivityTaskCompleted( + ctx context.Context, + request *shared.RespondActivityTaskCompletedRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.RespondActivityTaskCompleted(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RespondActivityTaskCompletedByID( + ctx context.Context, + request *shared.RespondActivityTaskCompletedByIDRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.RespondActivityTaskCompletedByID(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RespondActivityTaskFailed( + ctx context.Context, + request *shared.RespondActivityTaskFailedRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.RespondActivityTaskFailed(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RespondActivityTaskFailedByID( + ctx context.Context, + request *shared.RespondActivityTaskFailedByIDRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.RespondActivityTaskFailedByID(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RespondDecisionTaskCompleted( + ctx context.Context, + request *shared.RespondDecisionTaskCompletedRequest, + opts ...yarpc.CallOption, +) (*shared.RespondDecisionTaskCompletedResponse, error) { + + var resp *shared.RespondDecisionTaskCompletedResponse + op := func() error { + var err error + resp, err = c.client.RespondDecisionTaskCompleted(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RespondDecisionTaskFailed( + ctx context.Context, + request *shared.RespondDecisionTaskFailedRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.RespondDecisionTaskFailed(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) RespondQueryTaskCompleted( + ctx context.Context, + request *shared.RespondQueryTaskCompletedRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.RespondQueryTaskCompleted(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) SignalWithStartWorkflowExecution( + ctx context.Context, + request *shared.SignalWithStartWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*shared.StartWorkflowExecutionResponse, error) { + + var resp *shared.StartWorkflowExecutionResponse + op := func() error { + var err error + resp, err = c.client.SignalWithStartWorkflowExecution(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) SignalWorkflowExecution( + ctx context.Context, + request *shared.SignalWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.SignalWorkflowExecution(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) StartWorkflowExecution( + ctx context.Context, + request *shared.StartWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*shared.StartWorkflowExecutionResponse, error) { + + var resp *shared.StartWorkflowExecutionResponse + op := func() error { + var err error + resp, err = c.client.StartWorkflowExecution(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) TerminateWorkflowExecution( + ctx context.Context, + request *shared.TerminateWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) error { + + op := func() error { + return c.client.TerminateWorkflowExecution(ctx, request, opts...) + } + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) UpdateDomain( + ctx context.Context, + request *shared.UpdateDomainRequest, + opts ...yarpc.CallOption, +) (*shared.UpdateDomainResponse, error) { + + var resp *shared.UpdateDomainResponse + op := func() error { + var err error + resp, err = c.client.UpdateDomain(ctx, request, opts...) + return err + } + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} diff --git a/cmd/server/server.go b/cmd/server/server.go index a0a31a14b37..b11eaf7f084 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -120,6 +120,8 @@ func (s *server) startService() common.Daemon { enableGlobalDomain := dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, s.cfg.ClustersInfo.EnableGlobalDomain) enableArchival := dc.GetBoolProperty(dynamicconfig.EnableArchival, s.cfg.Archival.Enabled) + params.DCRedirectionPolicy = s.cfg.DCRedirectionPolicy + params.ClusterMetadata = cluster.NewMetadata( enableGlobalDomain, s.cfg.ClustersInfo.FailoverVersionIncrement, diff --git a/common/cluster/metadata.go b/common/cluster/metadata.go index 83c92c84808..835f36bdad1 100644 --- a/common/cluster/metadata.go +++ b/common/cluster/metadata.go @@ -124,8 +124,12 @@ func NewMetadata( if len(initialFailoverVersionClusters) != len(clusterInitialFailoverVersions) { panic("Cluster to initial failover versions have duplicate initial versions") } - if len(initialFailoverVersionClusters) != len(clusterToAddress) { - panic("Cluster to address size is different than Cluster to initial failover versions") + + // only check whether a cluster in cluster -> initial failover versions exists in cluster -> address + for clusterName := range clusterInitialFailoverVersions { + if _, ok := clusterToAddress[clusterName]; !ok { + panic("Cluster -> initial failover version does not have an address") + } } defaultArchivalBucketSet := len(defaultArchivalBucket) != 0 diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 8fd3285aecb..eea130416e8 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -308,6 +308,8 @@ const ( FrontendClientRequestCancelWorkflowExecutionScope // FrontendClientResetStickyTaskListScope tracks RPC calls to frontend service FrontendClientResetStickyTaskListScope + // FrontendClientResetWorkflowExecutionScope tracks RPC calls to frontend service + FrontendClientResetWorkflowExecutionScope // FrontendClientRespondActivityTaskCanceledScope tracks RPC calls to frontend service FrontendClientRespondActivityTaskCanceledScope // FrontendClientRespondActivityTaskCanceledByIDScope tracks RPC calls to frontend service @@ -759,6 +761,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ FrontendClientRegisterDomainScope: {operation: "FrontendClientRegisterDomain", tags: map[string]string{CadenceRoleTagName: FrontendRoleTagValue}}, FrontendClientRequestCancelWorkflowExecutionScope: {operation: "FrontendClientRequestCancelWorkflowExecution", tags: map[string]string{CadenceRoleTagName: FrontendRoleTagValue}}, FrontendClientResetStickyTaskListScope: {operation: "FrontendClientResetStickyTaskList", tags: map[string]string{CadenceRoleTagName: FrontendRoleTagValue}}, + FrontendClientResetWorkflowExecutionScope: {operation: "FrontendClientResetWorkflowExecution", tags: map[string]string{CadenceRoleTagName: FrontendRoleTagValue}}, FrontendClientRespondActivityTaskCanceledScope: {operation: "FrontendClientRespondActivityTaskCanceled", tags: map[string]string{CadenceRoleTagName: FrontendRoleTagValue}}, FrontendClientRespondActivityTaskCanceledByIDScope: {operation: "FrontendClientRespondActivityTaskCanceledByID", tags: map[string]string{CadenceRoleTagName: FrontendRoleTagValue}}, FrontendClientRespondActivityTaskCompletedScope: {operation: "FrontendClientRespondActivityTaskCompleted", tags: map[string]string{CadenceRoleTagName: FrontendRoleTagValue}}, diff --git a/common/service/config/config.go b/common/service/config/config.go index 69edac1919b..33e92d5fe0e 100644 --- a/common/service/config/config.go +++ b/common/service/config/config.go @@ -43,6 +43,8 @@ type ( Log Logger `yaml:"log"` // ClustersInfo is the config containing all valid clusters and active cluster ClustersInfo ClustersInfo `yaml:"clustersInfo"` + // DCRedirectionPolicy contains the frontend datacenter redirection policy + DCRedirectionPolicy DCRedirectionPolicy `yaml:"dcRedirectionPolicy"` // Services is a map of service name to service config items Services map[string]Service `yaml:"services"` // Kafka is the config for connecting to kafka @@ -214,6 +216,12 @@ type ( RPCAddress string `yaml:"rpcAddress"` } + // DCRedirectionPolicy contains the frontend datacenter redirection policy + DCRedirectionPolicy struct { + Policy string `yaml:"policy"` + ToDC string `yaml:"toDC"` + } + // Metrics contains the config items for metrics subsystem Metrics struct { // M3 is the configuration for m3 metrics reporter diff --git a/common/service/service.go b/common/service/service.go index 567e0ef830a..7e63659b3aa 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -42,7 +42,7 @@ import ( "github.com/uber-common/bark" "github.com/uber-go/tally" "github.com/uber/cadence/common/elasticsearch" - ringpop "github.com/uber/ringpop-go" + "github.com/uber/ringpop-go" "go.uber.org/yarpc" ) @@ -57,22 +57,23 @@ type ( // BootstrapParams holds the set of parameters // needed to bootstrap a service BootstrapParams struct { - Name string - Logger bark.Logger - MetricScope tally.Scope - RingpopFactory RingpopFactory - RPCFactory common.RPCFactory - PProfInitializer common.PProfInitializer - PersistenceConfig config.Persistence - ClusterMetadata cluster.Metadata - ReplicatorConfig config.Replicator - MetricsClient metrics.Client - MessagingClient messaging.Client - ESClient *elastic.Client - ESConfig *elasticsearch.Config - DynamicConfig dynamicconfig.Client - DispatcherProvider client.DispatcherProvider - BlobstoreClient blobstore.Client + Name string + Logger bark.Logger + MetricScope tally.Scope + RingpopFactory RingpopFactory + RPCFactory common.RPCFactory + PProfInitializer common.PProfInitializer + PersistenceConfig config.Persistence + ClusterMetadata cluster.Metadata + ReplicatorConfig config.Replicator + MetricsClient metrics.Client + MessagingClient messaging.Client + ESClient *elastic.Client + ESConfig *elasticsearch.Config + DynamicConfig dynamicconfig.Client + DispatcherProvider client.DispatcherProvider + BlobstoreClient blobstore.Client + DCRedirectionPolicy config.DCRedirectionPolicy } // RingpopFactory provides a bootstrapped ringpop diff --git a/common/util.go b/common/util.go index c182656b352..1e1a9882d06 100644 --- a/common/util.go +++ b/common/util.go @@ -59,6 +59,10 @@ const ( frontendServiceOperationMaxInterval = 5 * time.Second frontendServiceOperationExpirationInterval = 15 * time.Second + publicClientOperationInitialInterval = 200 * time.Millisecond + publicClientOperationMaxInterval = 5 * time.Second + publicClientOperationExpirationInterval = 15 * time.Second + adminServiceOperationInitialInterval = 200 * time.Millisecond adminServiceOperationMaxInterval = 5 * time.Second adminServiceOperationExpirationInterval = 15 * time.Second @@ -157,6 +161,15 @@ func CreateAdminServiceRetryPolicy() backoff.RetryPolicy { return policy } +// CreatePublicClientRetryPolicy creates a retry policy for calls to frontend service +func CreatePublicClientRetryPolicy() backoff.RetryPolicy { + policy := backoff.NewExponentialRetryPolicy(publicClientOperationInitialInterval) + policy.SetMaximumInterval(publicClientOperationMaxInterval) + policy.SetExpirationInterval(publicClientOperationExpirationInterval) + + return policy +} + // CreateKafkaOperationRetryPolicy creates a retry policy for kafka operation func CreateKafkaOperationRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(retryKafkaOperationInitialInterval) diff --git a/config/development.yaml b/config/development.yaml index 0911751087a..a0fb7164a7f 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -77,6 +77,10 @@ clustersInfo: rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:7933" +dcRedirectionPolicy: + policy: "noop" + toDC: "" + archival: enabled: true filestore: diff --git a/config/development_active.yaml b/config/development_active.yaml index f3f600e7f8e..57f98af760e 100644 --- a/config/development_active.yaml +++ b/config/development_active.yaml @@ -73,6 +73,7 @@ clustersInfo: clusterInitialFailoverVersion: active: 1 standby: 0 + other: 2 clusterAddress: active: rpcName: "cadence-frontend" @@ -80,6 +81,13 @@ clustersInfo: standby: rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:8933" + other: + rpcName: "cadence-frontend" + rpcAddress: "127.0.0.1:9933" + +dcRedirectionPolicy: + policy: "noop" + toDC: "" kafka: clusters: @@ -99,6 +107,12 @@ kafka: cluster: test standby-dlq: cluster: test + other: + cluster: test + other-retry: + cluster: test + other-dlq: + cluster: test cadence-cluster-topics: active: topic: active @@ -108,6 +122,10 @@ kafka: topic: standby retry-topic: standby-retry dlq-topic: standby-dlq + other: + topic: other + retry-topic: other-retry + dlq-topic: other-dlq archival: enabled: true diff --git a/config/development_other.yaml b/config/development_other.yaml new file mode 100644 index 00000000000..3ae52615fdc --- /dev/null +++ b/config/development_other.yaml @@ -0,0 +1,143 @@ +persistence: + defaultStore: cass-default + visibilityStore: cass-visibility + numHistoryShards: 1 + datastores: + cass-default: + cassandra: + hosts: "127.0.0.1" + keyspace: "cadence_other" + consistency: "One" + cass-visibility: + cassandra: + hosts: "127.0.0.1" + keyspace: "cadence_visibility_other" + consistency: "One" + +ringpop: + name: cadence_other + bootstrapMode: hosts + bootstrapHosts: ["127.0.0.1:9933", "127.0.0.1:9934", "127.0.0.1:9935", "127.0.0.1:9940"] + maxJoinDuration: 30s + +services: + frontend: + rpc: + port: 9933 + bindOnLocalHost: true + metrics: + statsd: + hostPort: "127.0.0.1:8125" + prefix: "cadence_other" + pprof: + port: 9936 + + matching: + rpc: + port: 9935 + bindOnLocalHost: true + metrics: + statsd: + hostPort: "127.0.0.1:8125" + prefix: "cadence_other" + pprof: + port: 9938 + + history: + rpc: + port: 9934 + bindOnLocalHost: true + metrics: + statsd: + hostPort: "127.0.0.1:8125" + prefix: "cadence_other" + pprof: + port: 9937 + + worker: + rpc: + port: 9940 + bindOnLocalHost: true + metrics: + statsd: + hostPort: "127.0.0.1:8125" + prefix: "cadence_other" + pprof: + port: 9941 + +clustersInfo: + enableGlobalDomain: true + failoverVersionIncrement: 10 + masterClusterName: "active" + currentClusterName: "other" + clusterInitialFailoverVersion: + active: 1 + other: 2 + clusterAddress: + active: + rpcName: "cadence-frontend" + rpcAddress: "127.0.0.1:7933" + standby: + rpcName: "cadence-frontend" + rpcAddress: "127.0.0.1:8933" + other: + rpcName: "cadence-frontend" + rpcAddress: "127.0.0.1:9933" + +dcRedirectionPolicy: + policy: "forwarding" + toDC: "standby" + +kafka: + clusters: + test: + brokers: + - 127.0.0.1:9092 + topics: + active: + cluster: test + active-retry: + cluster: test + active-dlq: + cluster: test + standby: + cluster: test + standby-retry: + cluster: test + standby-dlq: + cluster: test + other: + cluster: test + other-retry: + cluster: test + other-dlq: + cluster: test + cadence-cluster-topics: + active: + topic: active + retry-topic: active-retry + dlq-topic: active-dlq + standby: + topic: standby + retry-topic: standby-retry + dlq-topic: standby-dlq + other: + topic: other + retry-topic: other-retry + dlq-topic: other-dlq + +archival: + enabled: true + filestore: + storeDirectory: "/tmp/dev_standby/blobstore/" + defaultBucket: + name: "cadence-development" + owner: "cadence" + retentionDays: 10 + customBuckets: + - name: "custom-bucket-1" + owner: "custom-owner-1" + retentionDays: 10 + - name: "custom-bucket-2" + owner: "custom-owner-2" + retentionDays: 5 diff --git a/config/development_standby.yaml b/config/development_standby.yaml index d2a7e5be321..48db2c02d79 100644 --- a/config/development_standby.yaml +++ b/config/development_standby.yaml @@ -80,6 +80,13 @@ clustersInfo: standby: rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:8933" + other: + rpcName: "cadence-frontend" + rpcAddress: "127.0.0.1:9933" + +dcRedirectionPolicy: + policy: "forwarding" + toDC: "other" kafka: clusters: @@ -99,6 +106,12 @@ kafka: cluster: test standby-dlq: cluster: test + other: + cluster: test + other-retry: + cluster: test + other-dlq: + cluster: test cadence-cluster-topics: active: topic: active @@ -108,6 +121,10 @@ kafka: topic: standby retry-topic: standby-retry dlq-topic: standby-dlq + other: + topic: other + retry-topic: other-retry + dlq-topic: other-dlq archival: enabled: true diff --git a/docker/config_template.yaml b/docker/config_template.yaml index 7954c9f6401..8ccef5f2313 100644 --- a/docker/config_template.yaml +++ b/docker/config_template.yaml @@ -73,6 +73,10 @@ clustersInfo: rpcName: "cadence-frontend" rpcAddress: "127.0.0.1:7933" +dcRedirectionPolicy: + policy: "noop" + toDC: "" + archival: enableArchival: true blobstore: diff --git a/host/onebox.go b/host/onebox.go index ff81fe78080..1b3f0566d33 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -35,6 +35,7 @@ import ( "github.com/uber-go/tally" "github.com/uber/cadence/.gen/go/admin/adminserviceclient" "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" + "github.com/uber/cadence/.gen/go/cadence/workflowserviceserver" "github.com/uber/cadence/client" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" @@ -284,6 +285,7 @@ func (c *cadenceImpl) GetFrontendService() service.Service { func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup) { params := new(service.BootstrapParams) + params.DCRedirectionPolicy = config.DCRedirectionPolicy{} params.Name = common.FrontendServiceName params.Logger = c.logger params.PProfInitializer = newPProfInitializerImpl(c.logger, c.FrontendPProfPort()) @@ -321,11 +323,14 @@ func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup) { c.frontEndService, c.numberOfHistoryShards, c.metadataMgr, c.historyMgr, c.historyV2Mgr) c.frontendHandler = frontend.NewWorkflowHandler( c.frontEndService, frontend.NewConfig(dynamicconfig.NewNopCollection()), - c.metadataMgr, c.historyMgr, c.historyV2Mgr, c.visibilityMgr, kafkaProducer, params.BlobstoreClient) + c.metadataMgr, c.historyMgr, c.historyV2Mgr, c.visibilityMgr, kafkaProducer, + params.BlobstoreClient, + ) err = c.frontendHandler.Start() if err != nil { c.logger.WithField("error", err).Fatal("Failed to start frontend") } + c.frontEndService.GetDispatcher().Register(workflowserviceserver.New(c.frontendHandler)) err = c.adminHandler.Start() if err != nil { c.logger.WithField("error", err).Fatal("Failed to start admin") diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 3ba4d7a2d14..90e8511689d 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -89,7 +89,7 @@ func (adh *AdminHandler) Start() error { adh.Service.GetDispatcher().Register(adminserviceserver.New(adh)) adh.Service.Start() - adh.history = adh.Service.GetClientBean().GetHistoryClient() + adh.history = adh.GetClientBean().GetHistoryClient() adh.metricsClient = adh.Service.GetMetricsClient() adh.startWG.Done() return nil diff --git a/service/frontend/dcRedirectionHandler.go b/service/frontend/dcRedirectionHandler.go new file mode 100644 index 00000000000..97a027d1787 --- /dev/null +++ b/service/frontend/dcRedirectionHandler.go @@ -0,0 +1,613 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "context" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/service" +) + +type ( + // DCRedirectionHandlerImpl is simple wrapper over fontend service, doing redirection based on policy + DCRedirectionHandlerImpl struct { + currentClusteName string + redirectionPolicy DCRedirectionPolicy + tokenSerializer common.TaskTokenSerializer + sevice service.Service + frontendHandler *WorkflowHandler + } +) + +// NewDCRedirectionHandler creates a thrift handler for the cadence service, frontend and admin +func NewDCRedirectionHandler(currentClusteName string, redirectionPolicy DCRedirectionPolicy, + sevice service.Service, frontendHandler *WorkflowHandler) *DCRedirectionHandlerImpl { + return &DCRedirectionHandlerImpl{ + currentClusteName: currentClusteName, + redirectionPolicy: redirectionPolicy, + tokenSerializer: common.NewJSONTaskTokenSerializer(), + sevice: sevice, + frontendHandler: frontendHandler, + } +} + +// Start starts the handler +func (handler *DCRedirectionHandlerImpl) Start() error { + return handler.frontendHandler.Start() +} + +// Stop stops the handler +func (handler *DCRedirectionHandlerImpl) Stop() { + handler.frontendHandler.Stop() +} + +// Domain APIs, domain APIs does not require redirection + +// DeprecateDomain API call +func (handler *DCRedirectionHandlerImpl) DeprecateDomain( + ctx context.Context, + request *shared.DeprecateDomainRequest, +) error { + + return handler.frontendHandler.DeprecateDomain(ctx, request) +} + +// DescribeDomain API call +func (handler *DCRedirectionHandlerImpl) DescribeDomain( + ctx context.Context, + request *shared.DescribeDomainRequest, +) (*shared.DescribeDomainResponse, error) { + + return handler.frontendHandler.DescribeDomain(ctx, request) +} + +// ListDomains API call +func (handler *DCRedirectionHandlerImpl) ListDomains( + ctx context.Context, + request *shared.ListDomainsRequest, +) (*shared.ListDomainsResponse, error) { + + return handler.frontendHandler.ListDomains(ctx, request) +} + +// RegisterDomain API call +func (handler *DCRedirectionHandlerImpl) RegisterDomain( + ctx context.Context, + request *shared.RegisterDomainRequest, +) error { + + return handler.frontendHandler.RegisterDomain(ctx, request) +} + +// UpdateDomain API call +func (handler *DCRedirectionHandlerImpl) UpdateDomain( + ctx context.Context, + request *shared.UpdateDomainRequest, +) (*shared.UpdateDomainResponse, error) { + + return handler.frontendHandler.UpdateDomain(ctx, request) +} + +// Other APIs + +// DescribeTaskList API call +func (handler *DCRedirectionHandlerImpl) DescribeTaskList( + ctx context.Context, + request *shared.DescribeTaskListRequest, +) (*shared.DescribeTaskListResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.DescribeTaskList(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).DescribeTaskList(ctx, request) +} + +// DescribeWorkflowExecution API call +func (handler *DCRedirectionHandlerImpl) DescribeWorkflowExecution( + ctx context.Context, + request *shared.DescribeWorkflowExecutionRequest, +) (*shared.DescribeWorkflowExecutionResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.DescribeWorkflowExecution(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).DescribeWorkflowExecution(ctx, request) +} + +// GetWorkflowExecutionHistory API call +func (handler *DCRedirectionHandlerImpl) GetWorkflowExecutionHistory( + ctx context.Context, + request *shared.GetWorkflowExecutionHistoryRequest, +) (*shared.GetWorkflowExecutionHistoryResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.GetWorkflowExecutionHistory(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).GetWorkflowExecutionHistory(ctx, request) +} + +// ListClosedWorkflowExecutions API call +func (handler *DCRedirectionHandlerImpl) ListClosedWorkflowExecutions( + ctx context.Context, + request *shared.ListClosedWorkflowExecutionsRequest, +) (*shared.ListClosedWorkflowExecutionsResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.ListClosedWorkflowExecutions(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).ListClosedWorkflowExecutions(ctx, request) +} + +// ListOpenWorkflowExecutions API call +func (handler *DCRedirectionHandlerImpl) ListOpenWorkflowExecutions( + ctx context.Context, + request *shared.ListOpenWorkflowExecutionsRequest, +) (*shared.ListOpenWorkflowExecutionsResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.ListOpenWorkflowExecutions(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).ListOpenWorkflowExecutions(ctx, request) +} + +// PollForActivityTask API call +func (handler *DCRedirectionHandlerImpl) PollForActivityTask( + ctx context.Context, + request *shared.PollForActivityTaskRequest, +) (*shared.PollForActivityTaskResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.PollForActivityTask(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).PollForActivityTask(ctx, request) +} + +// PollForDecisionTask API call +func (handler *DCRedirectionHandlerImpl) PollForDecisionTask( + ctx context.Context, + request *shared.PollForDecisionTaskRequest, +) (*shared.PollForDecisionTaskResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.PollForDecisionTask(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).PollForDecisionTask(ctx, request) +} + +// QueryWorkflow API call +func (handler *DCRedirectionHandlerImpl) QueryWorkflow( + ctx context.Context, + request *shared.QueryWorkflowRequest, +) (*shared.QueryWorkflowResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.QueryWorkflow(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).QueryWorkflow(ctx, request) +} + +// RecordActivityTaskHeartbeat API call +func (handler *DCRedirectionHandlerImpl) RecordActivityTaskHeartbeat( + ctx context.Context, + request *shared.RecordActivityTaskHeartbeatRequest, +) (*shared.RecordActivityTaskHeartbeatResponse, error) { + + token, err := handler.tokenSerializer.Deserialize(request.TaskToken) + if err != nil { + return nil, err + } + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByID(token.DomainID) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RecordActivityTaskHeartbeat(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RecordActivityTaskHeartbeat(ctx, request) +} + +// RecordActivityTaskHeartbeatByID API call +func (handler *DCRedirectionHandlerImpl) RecordActivityTaskHeartbeatByID( + ctx context.Context, + request *shared.RecordActivityTaskHeartbeatByIDRequest, +) (*shared.RecordActivityTaskHeartbeatResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RecordActivityTaskHeartbeatByID(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RecordActivityTaskHeartbeatByID(ctx, request) +} + +// RequestCancelWorkflowExecution API call +func (handler *DCRedirectionHandlerImpl) RequestCancelWorkflowExecution( + ctx context.Context, + request *shared.RequestCancelWorkflowExecutionRequest, +) error { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RequestCancelWorkflowExecution(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RequestCancelWorkflowExecution(ctx, request) +} + +// ResetStickyTaskList API call +func (handler *DCRedirectionHandlerImpl) ResetStickyTaskList( + ctx context.Context, + request *shared.ResetStickyTaskListRequest, +) (*shared.ResetStickyTaskListResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.ResetStickyTaskList(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).ResetStickyTaskList(ctx, request) +} + +// ResetWorkflowExecution API call +func (handler *DCRedirectionHandlerImpl) ResetWorkflowExecution( + ctx context.Context, + request *shared.ResetWorkflowExecutionRequest, +) (*shared.ResetWorkflowExecutionResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.ResetWorkflowExecution(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).ResetWorkflowExecution(ctx, request) +} + +// RespondActivityTaskCanceled API call +func (handler *DCRedirectionHandlerImpl) RespondActivityTaskCanceled( + ctx context.Context, + request *shared.RespondActivityTaskCanceledRequest, +) error { + + token, err := handler.tokenSerializer.Deserialize(request.TaskToken) + if err != nil { + return err + } + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByID(token.DomainID) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RespondActivityTaskCanceled(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RespondActivityTaskCanceled(ctx, request) +} + +// RespondActivityTaskCanceledByID API call +func (handler *DCRedirectionHandlerImpl) RespondActivityTaskCanceledByID( + ctx context.Context, + request *shared.RespondActivityTaskCanceledByIDRequest, +) error { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RespondActivityTaskCanceledByID(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RespondActivityTaskCanceledByID(ctx, request) +} + +// RespondActivityTaskCompleted API call +func (handler *DCRedirectionHandlerImpl) RespondActivityTaskCompleted( + ctx context.Context, + request *shared.RespondActivityTaskCompletedRequest, +) error { + + token, err := handler.tokenSerializer.Deserialize(request.TaskToken) + if err != nil { + return err + } + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByID(token.DomainID) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RespondActivityTaskCompleted(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RespondActivityTaskCompleted(ctx, request) +} + +// RespondActivityTaskCompletedByID API call +func (handler *DCRedirectionHandlerImpl) RespondActivityTaskCompletedByID( + ctx context.Context, + request *shared.RespondActivityTaskCompletedByIDRequest, +) error { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RespondActivityTaskCompletedByID(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RespondActivityTaskCompletedByID(ctx, request) +} + +// RespondActivityTaskFailed API call +func (handler *DCRedirectionHandlerImpl) RespondActivityTaskFailed( + ctx context.Context, + request *shared.RespondActivityTaskFailedRequest, +) error { + + token, err := handler.tokenSerializer.Deserialize(request.TaskToken) + if err != nil { + return err + } + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByID(token.DomainID) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RespondActivityTaskFailed(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RespondActivityTaskFailed(ctx, request) +} + +// RespondActivityTaskFailedByID API call +func (handler *DCRedirectionHandlerImpl) RespondActivityTaskFailedByID( + ctx context.Context, + request *shared.RespondActivityTaskFailedByIDRequest, +) error { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RespondActivityTaskFailedByID(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RespondActivityTaskFailedByID(ctx, request) +} + +// RespondDecisionTaskCompleted API call +func (handler *DCRedirectionHandlerImpl) RespondDecisionTaskCompleted( + ctx context.Context, + request *shared.RespondDecisionTaskCompletedRequest, +) (*shared.RespondDecisionTaskCompletedResponse, error) { + + token, err := handler.tokenSerializer.Deserialize(request.TaskToken) + if err != nil { + return nil, err + } + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByID(token.DomainID) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RespondDecisionTaskCompleted(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RespondDecisionTaskCompleted(ctx, request) +} + +// RespondDecisionTaskFailed API call +func (handler *DCRedirectionHandlerImpl) RespondDecisionTaskFailed( + ctx context.Context, + request *shared.RespondDecisionTaskFailedRequest, +) error { + + token, err := handler.tokenSerializer.Deserialize(request.TaskToken) + if err != nil { + return err + } + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByID(token.DomainID) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RespondDecisionTaskFailed(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RespondDecisionTaskFailed(ctx, request) +} + +// RespondQueryTaskCompleted API call +func (handler *DCRedirectionHandlerImpl) RespondQueryTaskCompleted( + ctx context.Context, + request *shared.RespondQueryTaskCompletedRequest, +) error { + + token, err := handler.tokenSerializer.DeserializeQueryTaskToken(request.TaskToken) + if err != nil { + return err + } + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByID(token.DomainID) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.RespondQueryTaskCompleted(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).RespondQueryTaskCompleted(ctx, request) +} + +// SignalWithStartWorkflowExecution API call +func (handler *DCRedirectionHandlerImpl) SignalWithStartWorkflowExecution( + ctx context.Context, + request *shared.SignalWithStartWorkflowExecutionRequest, +) (*shared.StartWorkflowExecutionResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.SignalWithStartWorkflowExecution(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).SignalWithStartWorkflowExecution(ctx, request) +} + +// SignalWorkflowExecution API call +func (handler *DCRedirectionHandlerImpl) SignalWorkflowExecution( + ctx context.Context, + request *shared.SignalWorkflowExecutionRequest, +) error { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.SignalWorkflowExecution(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).SignalWorkflowExecution(ctx, request) +} + +// StartWorkflowExecution API call +func (handler *DCRedirectionHandlerImpl) StartWorkflowExecution( + ctx context.Context, + request *shared.StartWorkflowExecutionRequest, +) (*shared.StartWorkflowExecutionResponse, error) { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return nil, err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.StartWorkflowExecution(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).StartWorkflowExecution(ctx, request) +} + +// TerminateWorkflowExecution API call +func (handler *DCRedirectionHandlerImpl) TerminateWorkflowExecution( + ctx context.Context, + request *shared.TerminateWorkflowExecutionRequest, +) error { + + targetDC, err := handler.redirectionPolicy.GetTargetDatacenterByName(request.GetDomain()) + if err != nil { + return err + } + + if targetDC == handler.currentClusteName { + return handler.frontendHandler.TerminateWorkflowExecution(ctx, request) + } + + return handler.sevice.GetClientBean().GetRemoteFrontendClient(targetDC).TerminateWorkflowExecution(ctx, request) +} diff --git a/service/frontend/dcRedirectionPolicy.go b/service/frontend/dcRedirectionPolicy.go new file mode 100644 index 00000000000..a24d68beb88 --- /dev/null +++ b/service/frontend/dcRedirectionPolicy.go @@ -0,0 +1,153 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "fmt" + + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/service/config" +) + +const ( + // DCRedirectionPolicyDefault means no redirection + DCRedirectionPolicyDefault = "" + // DCRedirectionPolicyNoop means no redirection + DCRedirectionPolicyNoop = "noop" + // DCRedirectionPolicyForwarding means forwarding from an DC to another DC + DCRedirectionPolicyForwarding = "forwarding" +) + +type ( + // DCRedirectionPolicy is DC redirection policy interface + DCRedirectionPolicy interface { + GetTargetDatacenterByName(domainName string) (string, error) + GetTargetDatacenterByID(domainName string) (string, error) + } + + // NoopRedirectionPolicy is DC redirection policy which does nothing + NoopRedirectionPolicy struct { + currentClusteName string + } + + // ForwardingDCRedirectionPolicy is DC redirection policy which forwards + // API calls if domain is effective global, fromDC is current cluster, + // fromDC is not in replication config and toDC is in replication config, + ForwardingDCRedirectionPolicy struct { + fromDC string + toDC string + domainCache cache.DomainCache + } +) + +// RedirectionPolicyGenerator generate corresponding redirection policy +func RedirectionPolicyGenerator(clusterMetadata cluster.Metadata, + domainCache cache.DomainCache, policy config.DCRedirectionPolicy) DCRedirectionPolicy { + switch policy.Policy { + case DCRedirectionPolicyDefault: + // default policy, noop + return NewNoopRedirectionPolicy(clusterMetadata.GetCurrentClusterName()) + case DCRedirectionPolicyNoop: + return NewNoopRedirectionPolicy(clusterMetadata.GetCurrentClusterName()) + case DCRedirectionPolicyForwarding: + currentClusterName := clusterMetadata.GetCurrentClusterName() + clusterAddress := clusterMetadata.GetAllClientAddress() + if _, ok := clusterAddress[policy.ToDC]; !ok { + panic(fmt.Sprintf("Incorrect to DC: %v", policy.ToDC)) + } + return NewForwardingDCRedirectionPolicy( + currentClusterName, policy.ToDC, domainCache, + ) + default: + panic(fmt.Sprintf("Unknown DC redirection policy %v", policy.Policy)) + } +} + +// NewNoopRedirectionPolicy is DC redirection policy which does nothing +func NewNoopRedirectionPolicy(currentClusteName string) *NoopRedirectionPolicy { + return &NoopRedirectionPolicy{ + currentClusteName: currentClusteName, + } +} + +// GetTargetDatacenterByName get target cluster name by domain Name +func (policy *NoopRedirectionPolicy) GetTargetDatacenterByName(domainName string) (string, error) { + return policy.currentClusteName, nil +} + +// GetTargetDatacenterByID get target cluster name by domain ID +func (policy *NoopRedirectionPolicy) GetTargetDatacenterByID(domainID string) (string, error) { + return policy.currentClusteName, nil +} + +// NewForwardingDCRedirectionPolicy creates a datacenter redirection policy forwarding API calls +func NewForwardingDCRedirectionPolicy(fromDC string, toDC string, domainCache cache.DomainCache) *ForwardingDCRedirectionPolicy { + return &ForwardingDCRedirectionPolicy{ + fromDC: fromDC, + toDC: toDC, + domainCache: domainCache, + } +} + +// GetTargetDatacenterByName get target cluster name by domain Name +func (policy *ForwardingDCRedirectionPolicy) GetTargetDatacenterByName(domainName string) (string, error) { + domainEntry, err := policy.domainCache.GetDomain(domainName) + if err != nil { + return "", err + } + + return policy.getTargetDatacenter(domainEntry), nil +} + +// GetTargetDatacenterByID get target cluster name by domain ID +func (policy *ForwardingDCRedirectionPolicy) GetTargetDatacenterByID(domainID string) (string, error) { + domainEntry, err := policy.domainCache.GetDomainByID(domainID) + if err != nil { + return "", err + } + + return policy.getTargetDatacenter(domainEntry), nil +} + +func (policy *ForwardingDCRedirectionPolicy) getTargetDatacenter(domainEntry *cache.DomainCacheEntry) string { + if !domainEntry.IsGlobalDomain() { + return policy.fromDC + } + + if len(domainEntry.GetReplicationConfig().Clusters) == 1 { + // do not do dc redirection if domain is only targeting at 1 dc (effectively local domain) + return policy.fromDC + } + + replicationClusterNames := map[string]struct{}{} + for _, clusterConfig := range domainEntry.GetReplicationConfig().Clusters { + replicationClusterNames[clusterConfig.ClusterName] = struct{}{} + } + + _, containsFromDC := replicationClusterNames[policy.fromDC] + _, containsToDC := replicationClusterNames[policy.toDC] + + if !containsFromDC && containsToDC { + return policy.toDC + } + return policy.fromDC +} diff --git a/service/frontend/dcRedirectionPolicy_test.go b/service/frontend/dcRedirectionPolicy_test.go new file mode 100644 index 00000000000..7c5793aecf7 --- /dev/null +++ b/service/frontend/dcRedirectionPolicy_test.go @@ -0,0 +1,240 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "github.com/uber-common/bark" + "github.com/uber-go/tally" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/mocks" + "github.com/uber/cadence/common/persistence" + "os" + "testing" +) + +type ( + noopDCRedirectionPolicySuite struct { + suite.Suite + currentClusteName string + noopDCRedirectionPolicy *NoopRedirectionPolicy + } + + forwardingDCRedirectionPolicySuite struct { + logger bark.Logger + suite.Suite + fromDC string + toDC string + mockMetadataMgr *mocks.MetadataManager + mockClusterMetadata *mocks.ClusterMetadata + forwardingDCRedirectionPolicy *ForwardingDCRedirectionPolicy + } +) + +func TestNoopDCRedirectionPolicySuite(t *testing.T) { + s := new(noopDCRedirectionPolicySuite) + suite.Run(t, s) +} + +func (s *noopDCRedirectionPolicySuite) SetupSuite() { + if testing.Verbose() { + log.SetOutput(os.Stdout) + } +} + +func (s *noopDCRedirectionPolicySuite) TearDownSuite() { + +} + +func (s *noopDCRedirectionPolicySuite) SetupTest() { + s.currentClusteName = cluster.TestCurrentClusterName + s.noopDCRedirectionPolicy = NewNoopRedirectionPolicy(s.currentClusteName) +} + +func (s *noopDCRedirectionPolicySuite) TearDownTest() { + +} + +func (s *noopDCRedirectionPolicySuite) TestGetTargetDatacenter() { + domainName := "some random domain name" + domainID := "some random domain ID" + + targetCluster, err := s.noopDCRedirectionPolicy.GetTargetDatacenterByID(domainID) + s.Nil(err) + s.Equal(s.currentClusteName, targetCluster) + + targetCluster, err = s.noopDCRedirectionPolicy.GetTargetDatacenterByName(domainName) + s.Nil(err) + s.Equal(s.currentClusteName, targetCluster) +} + +func TestForwardingDCRedirectionPolicySuite(t *testing.T) { + s := new(forwardingDCRedirectionPolicySuite) + suite.Run(t, s) +} + +func (s *forwardingDCRedirectionPolicySuite) SetupSuite() { + if testing.Verbose() { + log.SetOutput(os.Stdout) + } +} + +func (s *forwardingDCRedirectionPolicySuite) TearDownSuite() { + +} + +func (s *forwardingDCRedirectionPolicySuite) SetupTest() { + s.fromDC = cluster.TestCurrentClusterName + s.toDC = cluster.TestAlternativeClusterName + log2 := log.New() + log2.Level = log.DebugLevel + s.logger = bark.NewLoggerFromLogrus(log2) + s.mockMetadataMgr = &mocks.MetadataManager{} + s.mockClusterMetadata = &mocks.ClusterMetadata{} + s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(true) + domainCache := cache.NewDomainCache( + s.mockMetadataMgr, + s.mockClusterMetadata, + metrics.NewClient(tally.NoopScope, metrics.Frontend), + s.logger, + ) + s.forwardingDCRedirectionPolicy = NewForwardingDCRedirectionPolicy( + s.fromDC, s.toDC, domainCache, + ) +} + +func (s *forwardingDCRedirectionPolicySuite) TearDownTest() { + +} + +func (s *forwardingDCRedirectionPolicySuite) TestGetTargetDatacenter_LocalDomain() { + domainName := "some random domain name" + domainID := "some random domain ID" + domainRecord := &persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ID: domainID, Name: domainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName}, + }, + }, + IsGlobalDomain: false, + TableVersion: persistence.DomainTableVersionV1, + } + + s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(domainRecord, nil) + + targetCluster, err := s.forwardingDCRedirectionPolicy.GetTargetDatacenterByID(domainID) + s.Nil(err) + s.Equal(s.fromDC, targetCluster) + + targetCluster, err = s.forwardingDCRedirectionPolicy.GetTargetDatacenterByName(domainName) + s.Nil(err) + s.Equal(s.fromDC, targetCluster) +} + +func (s *forwardingDCRedirectionPolicySuite) TestGetTargetDatacenter_GlobalDomain_OneReplicationCluster() { + domainName := "some random domain name" + domainID := "some random domain ID" + domainRecord := &persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ID: domainID, Name: domainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestAlternativeClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName}, + }, + }, + IsGlobalDomain: true, + TableVersion: persistence.DomainTableVersionV1, + } + + s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(domainRecord, nil) + + targetCluster, err := s.forwardingDCRedirectionPolicy.GetTargetDatacenterByID(domainID) + s.Nil(err) + s.Equal(s.fromDC, targetCluster) + + targetCluster, err = s.forwardingDCRedirectionPolicy.GetTargetDatacenterByName(domainName) + s.Nil(err) + s.Equal(s.fromDC, targetCluster) +} + +func (s *forwardingDCRedirectionPolicySuite) TestGetTargetDatacenter_GlobalDomain_NoFowarding() { + domainName := "some random domain name" + domainID := "some random domain ID" + domainRecord := &persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ID: domainID, Name: domainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestAlternativeClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName}, + &persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName}, + }, + }, + IsGlobalDomain: true, + TableVersion: persistence.DomainTableVersionV1, + } + + s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(domainRecord, nil) + + targetCluster, err := s.forwardingDCRedirectionPolicy.GetTargetDatacenterByID(domainID) + s.Nil(err) + s.Equal(s.fromDC, targetCluster) + + targetCluster, err = s.forwardingDCRedirectionPolicy.GetTargetDatacenterByName(domainName) + s.Nil(err) + s.Equal(s.fromDC, targetCluster) +} + +func (s *forwardingDCRedirectionPolicySuite) TestGetTargetDatacenter_GlobalDomain_Fowarding() { + domainName := "some random domain name" + domainID := "some random domain ID" + domainRecord := &persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ID: domainID, Name: domainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestAlternativeClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ClusterName: "some other random cluster"}, + &persistence.ClusterReplicationConfig{ClusterName: cluster.TestAlternativeClusterName}, + }, + }, + IsGlobalDomain: true, + TableVersion: persistence.DomainTableVersionV1, + } + + s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(domainRecord, nil) + + targetCluster, err := s.forwardingDCRedirectionPolicy.GetTargetDatacenterByID(domainID) + s.Nil(err) + s.Equal(s.toDC, targetCluster) + + targetCluster, err = s.forwardingDCRedirectionPolicy.GetTargetDatacenterByName(domainName) + s.Nil(err) + s.Equal(s.toDC, targetCluster) +} diff --git a/service/frontend/service.go b/service/frontend/service.go index 361dcf5bd8c..fdf8fee4e01 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -21,6 +21,8 @@ package frontend import ( + "fmt" + "github.com/uber/cadence/.gen/go/cadence/workflowserviceserver" "github.com/uber/cadence/common" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/mocks" @@ -137,8 +139,28 @@ func (s *Service) Start() { kafkaProducer = &mocks.KafkaProducer{} } - wfHandler := NewWorkflowHandler(base, s.config, metadata, history, historyV2, visibility, kafkaProducer, params.BlobstoreClient) + wfHandler := NewWorkflowHandler(base, s.config, metadata, history, historyV2, visibility, + kafkaProducer, params.BlobstoreClient) wfHandler.Start() + switch params.DCRedirectionPolicy.Policy { + case DCRedirectionPolicyDefault: + base.GetDispatcher().Register(workflowserviceserver.New(wfHandler)) + case DCRedirectionPolicyNoop: + base.GetDispatcher().Register(workflowserviceserver.New(wfHandler)) + case DCRedirectionPolicyForwarding: + dcRedirectionPolicy := RedirectionPolicyGenerator( + base.GetClusterMetadata(), + wfHandler.domainCache, + params.DCRedirectionPolicy, + ) + currentClusteName := base.GetClusterMetadata().GetCurrentClusterName() + dcRediectionHandle := NewDCRedirectionHandler( + currentClusteName, dcRedirectionPolicy, base, wfHandler, + ) + base.GetDispatcher().Register(workflowserviceserver.New(dcRediectionHandle)) + default: + panic(fmt.Sprintf("Unknown DC redirection policy %v", params.DCRedirectionPolicy.Policy)) + } adminHandler := NewAdminHandler(base, pConfig.NumHistoryShards, metadata, history, historyV2) adminHandler.Start() diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 3545d0b048f..3b6e490d7c3 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -155,13 +155,16 @@ func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persi // Start starts the handler func (wh *WorkflowHandler) Start() error { - wh.Service.GetDispatcher().Register(workflowserviceserver.New(wh)) + // previously we call wh.Service.GetDispatcher().Register(workflowserviceserver.New(dcRediectionHandle)) here + // which does the registration of this handler + // however due to the introduction of dc redirection handler, + // the registration needs to be specially handled, in the service.go wh.Service.GetDispatcher().Register(metaserver.New(wh)) wh.Service.Start() wh.domainCache.Start() - wh.history = wh.Service.GetClientBean().GetHistoryClient() - wh.matchingRawClient = wh.Service.GetClientBean().GetMatchingClient() + wh.history = wh.GetClientBean().GetHistoryClient() + wh.matchingRawClient = wh.GetClientBean().GetMatchingClient() wh.matching = matching.NewRetryableClient(wh.matchingRawClient, common.CreateMatchingServiceRetryPolicy(), common.IsWhitelistServiceTransientError) wh.metricsClient = wh.Service.GetMetricsClient() diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 06019f0c965..d79ee1c6ce1 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -23,8 +23,6 @@ package frontend import ( "context" "errors" - "github.com/uber/cadence/common/blobstore" - "github.com/uber/cadence/common/persistence" "log" "os" "testing" @@ -39,10 +37,12 @@ import ( "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/client" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/blobstore" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/mocks" + "github.com/uber/cadence/common/persistence" cs "github.com/uber/cadence/common/service" dc "github.com/uber/cadence/common/service/dynamicconfig" ) diff --git a/service/history/handler.go b/service/history/handler.go index 7e7cea7dc4a..36b7d83f70b 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -25,10 +25,8 @@ import ( "fmt" "sync" - "github.com/uber-common/bark" - "github.com/uber/cadence/common/logging" - "github.com/pborman/uuid" + "github.com/uber-common/bark" "github.com/uber/cadence/.gen/go/health" "github.com/uber/cadence/.gen/go/health/metaserver" hist "github.com/uber/cadence/.gen/go/history" @@ -37,8 +35,10 @@ import ( "github.com/uber/cadence/client/frontend" hc "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" + "github.com/uber/cadence/client/public" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" @@ -61,6 +61,7 @@ type ( historyServiceClient hc.Client matchingServiceClient matching.Client frontendServiceClient frontend.Client + publicClient public.Client hServiceResolver membership.ServiceResolver controller *shardController tokenSerializer common.TaskTokenSerializer @@ -120,23 +121,29 @@ func (h *Handler) Start() error { h.Service.Start() h.matchingServiceClient = matching.NewRetryableClient( - h.Service.GetClientBean().GetMatchingClient(), + h.GetClientBean().GetMatchingClient(), common.CreateMatchingServiceRetryPolicy(), common.IsWhitelistServiceTransientError, ) h.historyServiceClient = hc.NewRetryableClient( - h.Service.GetClientBean().GetHistoryClient(), + h.GetClientBean().GetHistoryClient(), common.CreateHistoryServiceRetryPolicy(), common.IsWhitelistServiceTransientError, ) h.frontendServiceClient = frontend.NewRetryableClient( - h.Service.GetClientBean().GetFrontendClient(), + h.GetClientBean().GetFrontendClient(), common.CreateFrontendServiceRetryPolicy(), common.IsWhitelistServiceTransientError, ) + h.publicClient = public.NewRetryableClient( + h.GetClientBean().GetPublicClient(), + common.CreatePublicClientRetryPolicy(), + common.IsWhitelistServiceTransientError, + ) + hServiceResolver, err1 := h.GetMembershipMonitor().GetResolver(common.HistoryServiceName) if err1 != nil { h.Service.GetLogger().Fatalf("Unable to get history service resolver: ", err1) @@ -190,7 +197,7 @@ func (h *Handler) Stop() { // CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard func (h *Handler) CreateEngine(context ShardContext) Engine { return NewEngineWithShardContext(context, h.visibilityMgr, h.matchingServiceClient, h.historyServiceClient, - h.frontendServiceClient, h.historyEventNotifier, h.publisher, h.visibilityProducer, h.config) + h.frontendServiceClient, h.publicClient, h.historyEventNotifier, h.publisher, h.visibilityProducer, h.config) } // Health is for health check diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 015b256c9bb..1373e0ba190 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -25,6 +25,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/uber/cadence/client/public" "time" "github.com/uber/cadence/service/worker/sysworkflow" @@ -136,6 +137,7 @@ func NewEngineWithShardContext( matching matching.Client, historyClient hc.Client, frontendClient frontend.Client, + publicClient public.Client, historyEventNotifier historyEventNotifier, publisher messaging.Producer, visibilityProducer messaging.Producer, @@ -167,7 +169,7 @@ func NewEngineWithShardContext( metricsClient: shard.GetMetricsClient(), historyEventNotifier: historyEventNotifier, config: config, - archivalClient: sysworkflow.NewArchivalClient(frontendClient, shard.GetConfig().NumSysWorkflows), + archivalClient: sysworkflow.NewArchivalClient(publicClient, shard.GetConfig().NumSysWorkflows), } txProcessor := newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, visibilityProducer, matching, historyClient, logger) diff --git a/service/matching/handler.go b/service/matching/handler.go index 08c9e831c5a..cb48c88a467 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -79,7 +79,7 @@ func (h *Handler) Start() error { h.domainCache.Start() h.metricsClient = h.Service.GetMetricsClient() h.engine = NewEngine( - h.taskPersistence, h.Service.GetClientBean().GetHistoryClient(), h.config, h.Service.GetLogger(), h.Service.GetMetricsClient(), h.domainCache, + h.taskPersistence, h.GetClientBean().GetHistoryClient(), h.config, h.Service.GetLogger(), h.Service.GetMetricsClient(), h.domainCache, ) h.startWG.Done() return nil diff --git a/service/worker/service.go b/service/worker/service.go index 06147edcbab..3e9d7a37a3f 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -22,6 +22,7 @@ package worker import ( "context" + "github.com/uber/cadence/client/public" "time" "github.com/uber/cadence/common/cache" @@ -29,7 +30,6 @@ import ( "github.com/uber-common/bark" "github.com/uber-go/tally" - "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/common" "github.com/uber/cadence/common/metrics" persistencefactory "github.com/uber/cadence/common/persistence/persistence-factory" @@ -170,28 +170,28 @@ func (s *Service) startIndexer(params *service.BootstrapParams, base service.Ser func (s *Service) startSysWorker(base service.Service, log bark.Logger, scope tally.Scope) { - frontendClient := frontend.NewRetryableClient( - base.GetClientBean().GetFrontendClient(), - common.CreateFrontendServiceRetryPolicy(), + publicClient := public.NewRetryableClient( + base.GetClientBean().GetPublicClient(), + common.CreatePublicClientRetryPolicy(), common.IsWhitelistServiceTransientError, ) - s.waitForFrontendStart(frontendClient, log) - sysWorker := sysworkflow.NewSysWorker(frontendClient, scope, s.params.BlobstoreClient) + s.waitForFrontendStart(publicClient, log) + sysWorker := sysworkflow.NewSysWorker(publicClient, scope, s.params.BlobstoreClient) if err := sysWorker.Start(); err != nil { sysWorker.Stop() log.Fatalf("failed to start sysworker: %v", err) } } -func (s *Service) waitForFrontendStart(frontendClient frontend.Client, log bark.Logger) { +func (s *Service) waitForFrontendStart(publicClient public.Client, log bark.Logger) { name := sysworkflow.Domain request := &shared.DescribeDomainRequest{ Name: &name, } for i := 0; i < FrontendRetryLimit; i++ { - if _, err := frontendClient.DescribeDomain(context.Background(), request); err == nil { + if _, err := publicClient.DescribeDomain(context.Background(), request); err == nil { return } <-time.After(PollingDelay) diff --git a/service/worker/sysworkflow/archival_client.go b/service/worker/sysworkflow/archival_client.go index 9e1ee3c0291..ce46839f113 100644 --- a/service/worker/sysworkflow/archival_client.go +++ b/service/worker/sysworkflow/archival_client.go @@ -24,7 +24,7 @@ import ( "context" "errors" "fmt" - "github.com/uber/cadence/client/frontend" + "github.com/uber/cadence/client/public" "github.com/uber/cadence/common/service/dynamicconfig" "go.uber.org/cadence/client" "math/rand" @@ -64,9 +64,9 @@ type ( ) // NewArchivalClient creates a new ArchivalClient -func NewArchivalClient(frontendClient frontend.Client, numSWFn dynamicconfig.IntPropertyFn) ArchivalClient { +func NewArchivalClient(publicClient public.Client, numSWFn dynamicconfig.IntPropertyFn) ArchivalClient { return &archivalClient{ - cadenceClient: client.NewClient(frontendClient, Domain, &client.Options{}), + cadenceClient: client.NewClient(publicClient, Domain, &client.Options{}), numSWFn: numSWFn, } } diff --git a/service/worker/sysworkflow/sysworker.go b/service/worker/sysworkflow/sysworker.go index fe65a238ec8..c5207215931 100644 --- a/service/worker/sysworkflow/sysworker.go +++ b/service/worker/sysworkflow/sysworker.go @@ -23,7 +23,7 @@ package sysworkflow import ( "context" "github.com/uber-go/tally" - "github.com/uber/cadence/client/frontend" + "github.com/uber/cadence/client/public" "github.com/uber/cadence/common/blobstore" "go.uber.org/cadence/activity" "go.uber.org/cadence/worker" @@ -47,10 +47,10 @@ func init() { } // NewSysWorker returns a new SysWorker -func NewSysWorker(frontendClient frontend.Client, scope tally.Scope, blobstoreClient blobstore.Client) *SysWorker { +func NewSysWorker(publicClient public.Client, scope tally.Scope, blobstoreClient blobstore.Client) *SysWorker { logger, _ := zap.NewProduction() actCtx := context.WithValue(context.Background(), blobstoreClientKey, blobstoreClient) - actCtx = context.WithValue(actCtx, frontendClientKey, frontendClient) + actCtx = context.WithValue(actCtx, frontendClientKey, publicClient) wo := worker.Options{ Logger: logger, MetricsScope: scope.SubScope(SystemWorkflowScope), @@ -58,7 +58,7 @@ func NewSysWorker(frontendClient frontend.Client, scope tally.Scope, blobstoreCl } return &SysWorker{ // TODO: after we do task list fan out workers should listen on all task lists - worker: worker.New(frontendClient, Domain, DecisionTaskList, wo), + worker: worker.New(publicClient, Domain, DecisionTaskList, wo), } }