Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add frontend DC redirection functionality and policy #1409

Merged
merged 8 commits into from
Jan 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ install-schema-cdc: bins
./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_visibility_active --rf 1
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_active setup-schema -v 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_active update-schema -d ./schema/cassandra/visibility/versioned

@echo Setting up cadence_standby key space
./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_standby --rf 1
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_standby setup-schema -v 0.0
Expand All @@ -202,8 +203,19 @@ install-schema-cdc: bins
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_standby setup-schema -v 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_standby update-schema -d ./schema/cassandra/visibility/versioned

@echo Setting up cadence_other key space
./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_other --rf 1
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_other setup-schema -v 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_other update-schema -d ./schema/cassandra/cadence/versioned
./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_visibility_other --rf 1
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_other setup-schema -v 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_other update-schema -d ./schema/cassandra/visibility/versioned

start-cdc-active: bins
./cadence-server --zone active start

start-cdc-standby: bins
./cadence-server --zone standby start

start-cdc-other: bins
./cadence-server --zone other start
13 changes: 13 additions & 0 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/public"
"github.com/uber/cadence/common/cluster"
)

Expand All @@ -45,6 +46,7 @@ type (
GetHistoryClient() history.Client
GetMatchingClient() matching.Client
GetFrontendClient() frontend.Client
GetPublicClient() public.Client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FrontendClient and PublicClient are same thing. Why do we need separate definitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because each time the idl on the frontend is updated, developer will have to update the public client first, then get it approved, then update the public sever repo pointing to public client, even for developing features.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried to use client IDL (revert back to prev impl: frontend depends on public client idl)
this will break the redirection handler: although the definition contents are exactly the same, the package name are different. this leads to only 2 solution: either change the idl definition dependency of frontend handler, which will break a lot of code, or use the solution in this PR

created a issue #1414, so in the future, client idl and service idl will be in the same repo

GetRemoteAdminClient(cluster string) admin.Client
GetRemoteFrontendClient(cluster string) frontend.Client
}
Expand All @@ -58,6 +60,7 @@ type (
historyClient history.Client
matchingClient matching.Client
frontendClient frontend.Client
publicClient public.Client
remoteAdminClients map[string]admin.Client
remoteFrontendClients map[string]frontend.Client
}
Expand All @@ -84,6 +87,11 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust
return nil, err
}

publicClient, err := factory.NewPublicClient()
if err != nil {
return nil, err
}

remoteAdminClients := map[string]admin.Client{}
remoteFrontendClients := map[string]frontend.Client{}
for cluster, address := range clusterMetadata.GetAllClientAddress() {
Expand Down Expand Up @@ -119,6 +127,7 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust
historyClient: historyClient,
matchingClient: matchingClient,
frontendClient: frontendClient,
publicClient: publicClient,
remoteAdminClients: remoteAdminClients,
remoteFrontendClients: remoteFrontendClients,
}, nil
Expand All @@ -136,6 +145,10 @@ func (h *clientBeanImpl) GetFrontendClient() frontend.Client {
return h.frontendClient
}

func (h *clientBeanImpl) GetPublicClient() public.Client {
return h.publicClient
}

func (h *clientBeanImpl) GetRemoteAdminClient(cluster string) admin.Client {
client, ok := h.remoteAdminClients[cluster]
if !ok {
Expand Down
17 changes: 17 additions & 0 deletions client/clientBean_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/public"
)

// MockClientBean is an autogenerated mock type for the MockClientBean type
Expand Down Expand Up @@ -83,6 +84,22 @@ func (_m *MockClientBean) GetFrontendClient() frontend.Client {
return r0
}

// GetPublicClient provides a mock function with given fields:
func (_m *MockClientBean) GetPublicClient() public.Client {
ret := _m.Called()

var r0 public.Client
if rf, ok := ret.Get(0).(func() public.Client); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(public.Client)
}
}

return r0
}

// GetRemoteAdminClient provides a mock function with given fields: _a0
func (_m *MockClientBean) GetRemoteAdminClient(_a0 string) admin.Client {
ret := _m.Called(_a0)
Expand Down
40 changes: 39 additions & 1 deletion client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@ import (
"go.uber.org/yarpc"

"github.com/uber/cadence/.gen/go/admin/adminserviceclient"
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber/cadence/.gen/go/history/historyserviceclient"
"github.com/uber/cadence/.gen/go/matching/matchingserviceclient"
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/public"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/metrics"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
publicClientInterface "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
)

const (
publicCaller = "cadence-public-client"
frontendCaller = "cadence-frontend-client"
historyCaller = "history-service-client"
matchingCaller = "matching-service-client"
Expand All @@ -54,10 +57,12 @@ type Factory interface {
NewHistoryClient() (history.Client, error)
NewMatchingClient() (matching.Client, error)
NewFrontendClient() (frontend.Client, error)
NewPublicClient() (public.Client, error)

NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error)
NewMatchingClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (matching.Client, error)
NewFrontendClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (frontend.Client, error)
NewPublicClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (public.Client, error)

NewAdminClientWithTimeoutAndDispatcher(rpcName string, timeout time.Duration, dispatcher *yarpc.Dispatcher) (admin.Client, error)
NewFrontendClientWithTimeoutAndDispatcher(rpcName string, timeout time.Duration, longPollTimeout time.Duration, dispatcher *yarpc.Dispatcher) (frontend.Client, error)
Expand Down Expand Up @@ -93,6 +98,10 @@ func (cf *rpcClientFactory) NewFrontendClient() (frontend.Client, error) {
return cf.NewFrontendClientWithTimeout(frontend.DefaultTimeout, frontend.DefaultLongPollTimeout)
}

func (cf *rpcClientFactory) NewPublicClient() (public.Client, error) {
return cf.NewPublicClientWithTimeout(public.DefaultTimeout, public.DefaultLongPollTimeout)
}

func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) {
resolver, err := cf.monitor.GetResolver(common.HistoryServiceName)
if err != nil {
Expand Down Expand Up @@ -179,6 +188,35 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeout(
return client, nil
}

func (cf *rpcClientFactory) NewPublicClientWithTimeout(
timeout time.Duration,
longPollTimeout time.Duration,
) (public.Client, error) {

// public client and frontend client are essentially the same,
// except the interface definition
resolver, err := cf.monitor.GetResolver(common.FrontendServiceName)
if err != nil {
return nil, err
}

keyResolver := func(key string) (string, error) {
host, err := resolver.Lookup(key)
if err != nil {
return "", err
}
return host.GetAddress(), nil
}

clientProvider := func(clientKey string) (interface{}, error) {
dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(publicCaller, common.FrontendServiceName, clientKey)
return publicClientInterface.New(dispatcher.ClientConfig(common.FrontendServiceName)), nil
}

client := public.NewClient(timeout, longPollTimeout, common.NewClientCache(keyResolver, clientProvider))
return client, nil
}

func (cf *rpcClientFactory) NewAdminClientWithTimeoutAndDispatcher(
rpcName string,
timeout time.Duration,
Expand Down
20 changes: 18 additions & 2 deletions client/frontend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
"go.uber.org/yarpc"

"github.com/pborman/uuid"
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/shared"
)

var _ Client = (*clientImpl)(nil)
Expand Down Expand Up @@ -316,6 +316,22 @@ func (c *clientImpl) ResetStickyTaskList(
return client.ResetStickyTaskList(ctx, request, opts...)
}

func (c *clientImpl) ResetWorkflowExecution(
ctx context.Context,
request *shared.ResetWorkflowExecutionRequest,
opts ...yarpc.CallOption,
) (*shared.ResetWorkflowExecutionResponse, error) {

opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.ResetWorkflowExecution(ctx, request, opts...)
}

func (c *clientImpl) RespondActivityTaskCanceled(
ctx context.Context,
request *shared.RespondActivityTaskCanceledRequest,
Expand Down
2 changes: 1 addition & 1 deletion client/frontend/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package frontend

import (
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
)

// Client is the interface exposed by frontend service client
Expand Down
21 changes: 19 additions & 2 deletions client/frontend/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ package frontend

import (
"context"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/metrics"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/yarpc"
)

Expand Down Expand Up @@ -331,6 +330,24 @@ func (c *metricClient) ResetStickyTaskList(
return resp, err
}

func (c *metricClient) ResetWorkflowExecution(
ctx context.Context,
request *shared.ResetWorkflowExecutionRequest,
opts ...yarpc.CallOption,
) (*shared.ResetWorkflowExecutionResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientResetWorkflowExecutionScope, metrics.CadenceClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientResetWorkflowExecutionScope, metrics.CadenceClientLatency)
resp, err := c.client.ResetWorkflowExecution(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientResetWorkflowExecutionScope, metrics.CadenceClientFailures)
}
return resp, err
}

func (c *metricClient) RespondActivityTaskCanceled(
ctx context.Context,
request *shared.RespondActivityTaskCanceledRequest,
Expand Down
18 changes: 17 additions & 1 deletion client/frontend/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (

"go.uber.org/yarpc"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/backoff"
"go.uber.org/cadence/.gen/go/shared"
)

var _ Client = (*retryableClient)(nil)
Expand Down Expand Up @@ -290,6 +290,22 @@ func (c *retryableClient) ResetStickyTaskList(
return resp, err
}

func (c *retryableClient) ResetWorkflowExecution(
ctx context.Context,
request *shared.ResetWorkflowExecutionRequest,
opts ...yarpc.CallOption,
) (*shared.ResetWorkflowExecutionResponse, error) {

var resp *shared.ResetWorkflowExecutionResponse
op := func() error {
var err error
resp, err = c.client.ResetWorkflowExecution(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) RespondActivityTaskCanceled(
ctx context.Context,
request *shared.RespondActivityTaskCanceledRequest,
Expand Down
Loading