Skip to content

Commit

Permalink
Add client and handler implementation for cross-cluster task APIs (#4286
Browse files Browse the repository at this point in the history
)
  • Loading branch information
yycptt authored Jun 30, 2021
1 parent ff0046f commit eead0e5
Show file tree
Hide file tree
Showing 28 changed files with 623 additions and 67 deletions.
15 changes: 15 additions & 0 deletions client/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,21 @@ func (c *clientImpl) ResendReplicationTasks(
return client.ResendReplicationTasks(ctx, request, opts...)
}

func (c *clientImpl) GetCrossClusterTasks(
ctx context.Context,
request *types.GetCrossClusterTasksRequest,
opts ...yarpc.CallOption,
) (*types.GetCrossClusterTasksResponse, error) {
opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContextWithLargeTimeout(ctx)
defer cancel()
return client.GetCrossClusterTasks(ctx, request, opts...)
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
if parent == nil {
return context.WithTimeout(context.Background(), c.timeout)
Expand Down
26 changes: 26 additions & 0 deletions client/admin/errorInjectionClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,29 @@ func (c *errorInjectionClient) ResendReplicationTasks(
}
return clientErr
}

func (c *errorInjectionClient) GetCrossClusterTasks(
ctx context.Context,
request *types.GetCrossClusterTasksRequest,
opts ...yarpc.CallOption,
) (*types.GetCrossClusterTasksResponse, error) {
fakeErr := errors.GenerateFakeError(c.errorRate)

var resp *types.GetCrossClusterTasksResponse
var clientErr error
var forwardCall bool
if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall {
resp, clientErr = c.client.GetCrossClusterTasks(ctx, request, opts...)
}

if fakeErr != nil {
c.logger.Error(msgInjectedFakeErr,
tag.AdminClientOperationGetCrossClusterTasks,
tag.Error(fakeErr),
tag.Bool(forwardCall),
tag.ClientError(clientErr),
)
return nil, fakeErr
}
return resp, clientErr
}
5 changes: 5 additions & 0 deletions client/admin/grpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,8 @@ func (g grpcClient) ResetQueue(ctx context.Context, request *types.ResetQueueReq
_, err := g.c.ResetQueue(ctx, proto.FromAdminResetQueueRequest(request), opts...)
return proto.ToError(err)
}

func (g grpcClient) GetCrossClusterTasks(ctx context.Context, request *types.GetCrossClusterTasksRequest, opts ...yarpc.CallOption) (*types.GetCrossClusterTasksResponse, error) {
response, err := g.c.GetCrossClusterTasks(ctx, proto.FromAdminGetCrossClusterTasksRequest(request), opts...)
return proto.ToAdminGetCrossClusterTasksResponse(response), proto.ToError(err)
}
1 change: 1 addition & 0 deletions client/admin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ type Client interface {
RemoveTask(context.Context, *types.RemoveTaskRequest, ...yarpc.CallOption) error
ResendReplicationTasks(context.Context, *types.ResendReplicationTasksRequest, ...yarpc.CallOption) error
ResetQueue(context.Context, *types.ResetQueueRequest, ...yarpc.CallOption) error
GetCrossClusterTasks(context.Context, *types.GetCrossClusterTasksRequest, ...yarpc.CallOption) (*types.GetCrossClusterTasksResponse, error)
}
27 changes: 27 additions & 0 deletions client/admin/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions client/admin/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,20 @@ func (c *metricClient) ResendReplicationTasks(
}
return err
}

func (c *metricClient) GetCrossClusterTasks(
ctx context.Context,
request *types.GetCrossClusterTasksRequest,
opts ...yarpc.CallOption,
) (*types.GetCrossClusterTasksResponse, error) {
c.metricsClient.IncCounter(metrics.AdminClientGetCrossClusterTasksScope, metrics.CadenceClientRequests)

sw := c.metricsClient.StartTimer(metrics.AdminClientGetCrossClusterTasksScope, metrics.CadenceClientLatency)
resp, err := c.client.GetCrossClusterTasks(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.AdminClientGetCrossClusterTasksScope, metrics.CadenceClientFailures)
}
return resp, err
}
15 changes: 15 additions & 0 deletions client/admin/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,18 @@ func (c *retryableClient) ResendReplicationTasks(
}
return backoff.Retry(op, c.policy, c.isRetryable)
}

func (c *retryableClient) GetCrossClusterTasks(
ctx context.Context,
request *types.GetCrossClusterTasksRequest,
opts ...yarpc.CallOption,
) (*types.GetCrossClusterTasksResponse, error) {
var resp *types.GetCrossClusterTasksResponse
op := func() error {
var err error
resp, err = c.client.GetCrossClusterTasks(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}
5 changes: 5 additions & 0 deletions client/admin/thriftClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,8 @@ func (t thriftClient) ResetQueue(ctx context.Context, request *types.ResetQueueR
err := t.c.ResetQueue(ctx, thrift.FromResetQueueRequest(request), opts...)
return thrift.ToError(err)
}

func (t thriftClient) GetCrossClusterTasks(ctx context.Context, request *types.GetCrossClusterTasksRequest, opts ...yarpc.CallOption) (*types.GetCrossClusterTasksResponse, error) {
response, err := t.c.GetCrossClusterTasks(ctx, thrift.FromGetCrossClusterTasksRequest(request), opts...)
return thrift.ToGetCrossClusterTasksResponse(response), thrift.ToError(err)
}
124 changes: 117 additions & 7 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/yarpc"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/future"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -823,13 +824,7 @@ func (c *clientImpl) GetReplicationMessages(
}

// preserve 5% timeout to return partial of the result if context is timing out
now := time.Now()
deadline, ok := ctx.Deadline()
if !ok {
deadline = now.Add(c.timeout)
}
requestTimeout := time.Duration(math.Ceil(float64(deadline.Sub(now)) * 0.95))
requestContext, cancel := context.WithTimeout(ctx, requestTimeout)
requestContext, cancel := c.createChildContext(ctx, 0.05)
defer cancel()

var wg sync.WaitGroup
Expand Down Expand Up @@ -1020,6 +1015,121 @@ func (c *clientImpl) NotifyFailoverMarkers(
return nil
}

func (c *clientImpl) GetCrossClusterTasks(
ctx context.Context,
request *types.GetCrossClusterTasksRequest,
opts ...yarpc.CallOption,
) (*types.GetCrossClusterTasksResponse, error) {
requestByClient := make(map[Client]*types.GetCrossClusterTasksRequest)
for _, shardID := range request.GetShardIDs() {
client, err := c.getClientForShardID(int(shardID))
if err != nil {
return nil, err
}

if _, ok := requestByClient[client]; !ok {
requestByClient[client] = &types.GetCrossClusterTasksRequest{
TargetCluster: request.TargetCluster,
}
}
requestByClient[client].ShardIDs = append(requestByClient[client].ShardIDs, shardID)
}

// preserve 5% timeout to return partial of the result if context is timing out
ctx, cancel := c.createChildContext(ctx, 0.05)
defer cancel()

futureByClient := make(map[Client]future.Future, len(requestByClient))
for client, req := range requestByClient {
future, settable := future.NewFuture()
go func(ctx context.Context, client Client, req *types.GetCrossClusterTasksRequest) {
settable.Set(client.GetCrossClusterTasks(ctx, req))
}(ctx, client, req)

futureByClient[client] = future
}

response := &types.GetCrossClusterTasksResponse{
TasksByShard: make(map[int32][]*types.CrossClusterTaskRequest),
}
var err error
for _, future := range futureByClient {
var resp *types.GetCrossClusterTasksResponse
if futureErr := future.Get(ctx, &resp); futureErr != nil {
c.logger.Error("Failed to get cross cluster tasks", tag.Error(futureErr))
// TODO: return error for each shard and perform backoff at shard level.
// and ensure every shardID in request has a response (either tasks or failed cause).
//
// for _, failedShardID := range requestByClient[client].ShardIDs {
// response.FailedCauseByShard[failedShardID] = ...
// }
//
// for now following the pattern for getting replication tasks:
// ignore errors other than service busy, so that task fetcher in target
// cluster can slow down.
if err == nil && common.IsServiceBusyError(futureErr) {
err = futureErr
}
} else {
for shardID, tasks := range resp.TasksByShard {
response.TasksByShard[shardID] = tasks
}
}
}
// not using a waitGroup for created goroutines as once all futures are unblocked,
// those goroutines will eventually be completed

return response, err
}

func (c *clientImpl) RespondCrossClusterTasksCompleted(
ctx context.Context,
request *types.RespondCrossClusterTasksCompletedRequest,
opts ...yarpc.CallOption,
) (*types.RespondCrossClusterTasksCompletedResponse, error) {
client, err := c.getClientForShardID(int(request.GetShardID()))
if err != nil {
return nil, err
}
opts = common.AggregateYarpcOptions(ctx, opts...)

var response *types.RespondCrossClusterTasksCompletedResponse
op := func(ctx context.Context, client Client) error {
var err error
ctx, cancel := c.createContext(ctx)
defer cancel()
response, err = client.RespondCrossClusterTasksCompleted(ctx, request, opts...)
return err
}

err = c.executeWithRedirect(ctx, client, op)
if err != nil {
return nil, err
}
return response, nil
}

func (c *clientImpl) createChildContext(
parent context.Context,
tailroom float64,
) (context.Context, context.CancelFunc) {
if parent == nil {
return nil, func() {}
}
if parent.Err() != nil {
return parent, func() {}
}

now := time.Now()
deadline, ok := parent.Deadline()
if !ok || deadline.Before(now) {
return parent, func() {}
}

newDeadline := now.Add(time.Duration(math.Ceil(float64(deadline.Sub(now)) * (1.0 - tailroom))))
return context.WithDeadline(parent, newDeadline)
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
if parent == nil {
return context.WithTimeout(context.Background(), c.timeout)
Expand Down
52 changes: 52 additions & 0 deletions client/history/errorInjectionClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,3 +1050,55 @@ func (c *errorInjectionClient) NotifyFailoverMarkers(
}
return clientErr
}

func (c *errorInjectionClient) GetCrossClusterTasks(
ctx context.Context,
request *types.GetCrossClusterTasksRequest,
opts ...yarpc.CallOption,
) (*types.GetCrossClusterTasksResponse, error) {
fakeErr := errors.GenerateFakeError(c.errorRate)

var resp *types.GetCrossClusterTasksResponse
var clientErr error
var forwardCall bool
if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall {
resp, clientErr = c.client.GetCrossClusterTasks(ctx, request, opts...)
}

if fakeErr != nil {
c.logger.Error(msgInjectedFakeErr,
tag.HistoryClientOperationGetCrossClusterTasks,
tag.Error(fakeErr),
tag.Bool(forwardCall),
tag.ClientError(clientErr),
)
return nil, fakeErr
}
return resp, clientErr
}

func (c *errorInjectionClient) RespondCrossClusterTasksCompleted(
ctx context.Context,
request *types.RespondCrossClusterTasksCompletedRequest,
opts ...yarpc.CallOption,
) (*types.RespondCrossClusterTasksCompletedResponse, error) {
fakeErr := errors.GenerateFakeError(c.errorRate)

var resp *types.RespondCrossClusterTasksCompletedResponse
var clientErr error
var forwardCall bool
if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall {
resp, clientErr = c.client.RespondCrossClusterTasksCompleted(ctx, request, opts...)
}

if fakeErr != nil {
c.logger.Error(msgInjectedFakeErr,
tag.HistoryClientOperationRespondCrossClusterTasksCompleted,
tag.Error(fakeErr),
tag.Bool(forwardCall),
tag.ClientError(clientErr),
)
return nil, fakeErr
}
return resp, clientErr
}
10 changes: 10 additions & 0 deletions client/history/grpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (g grpcClient) DescribeWorkflowExecution(ctx context.Context, request *type
return proto.ToHistoryDescribeWorkflowExecutionResponse(response), proto.ToError(err)
}

func (g grpcClient) GetCrossClusterTasks(ctx context.Context, request *types.GetCrossClusterTasksRequest, opts ...yarpc.CallOption) (*types.GetCrossClusterTasksResponse, error) {
response, err := g.c.GetCrossClusterTasks(ctx, proto.FromHistoryGetCrossClusterTasksRequest(request), opts...)
return proto.ToHistoryGetCrossClusterTasksResponse(response), proto.ToError(err)
}

func (g grpcClient) GetDLQReplicationMessages(ctx context.Context, request *types.GetDLQReplicationMessagesRequest, opts ...yarpc.CallOption) (*types.GetDLQReplicationMessagesResponse, error) {
response, err := g.c.GetDLQReplicationMessages(ctx, proto.FromHistoryGetDLQReplicationMessagesRequest(request), opts...)
return proto.ToHistoryGetDLQReplicationMessagesResponse(response), proto.ToError(err)
Expand Down Expand Up @@ -188,6 +193,11 @@ func (g grpcClient) RespondActivityTaskFailed(ctx context.Context, request *type
return proto.ToError(err)
}

func (g grpcClient) RespondCrossClusterTasksCompleted(ctx context.Context, request *types.RespondCrossClusterTasksCompletedRequest, opts ...yarpc.CallOption) (*types.RespondCrossClusterTasksCompletedResponse, error) {
response, err := g.c.RespondCrossClusterTasksCompleted(ctx, proto.FromHistoryRespondCrossClusterTasksCompletedRequest(request), opts...)
return proto.ToHistoryRespondCrossClusterTasksCompletedResponse(response), proto.ToError(err)
}

func (g grpcClient) RespondDecisionTaskCompleted(ctx context.Context, request *types.HistoryRespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption) (*types.HistoryRespondDecisionTaskCompletedResponse, error) {
response, err := g.c.RespondDecisionTaskCompleted(ctx, proto.FromHistoryRespondDecisionTaskCompletedRequest(request), opts...)
return proto.ToHistoryRespondDecisionTaskCompletedResponse(response), proto.ToError(err)
Expand Down
2 changes: 2 additions & 0 deletions client/history/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Client interface {
DescribeMutableState(context.Context, *types.DescribeMutableStateRequest, ...yarpc.CallOption) (*types.DescribeMutableStateResponse, error)
DescribeQueue(context.Context, *types.DescribeQueueRequest, ...yarpc.CallOption) (*types.DescribeQueueResponse, error)
DescribeWorkflowExecution(context.Context, *types.HistoryDescribeWorkflowExecutionRequest, ...yarpc.CallOption) (*types.DescribeWorkflowExecutionResponse, error)
GetCrossClusterTasks(context.Context, *types.GetCrossClusterTasksRequest, ...yarpc.CallOption) (*types.GetCrossClusterTasksResponse, error)
GetDLQReplicationMessages(context.Context, *types.GetDLQReplicationMessagesRequest, ...yarpc.CallOption) (*types.GetDLQReplicationMessagesResponse, error)
GetMutableState(context.Context, *types.GetMutableStateRequest, ...yarpc.CallOption) (*types.GetMutableStateResponse, error)
GetReplicationMessages(context.Context, *types.GetReplicationMessagesRequest, ...yarpc.CallOption) (*types.GetReplicationMessagesResponse, error)
Expand All @@ -62,6 +63,7 @@ type Client interface {
RespondActivityTaskCanceled(context.Context, *types.HistoryRespondActivityTaskCanceledRequest, ...yarpc.CallOption) error
RespondActivityTaskCompleted(context.Context, *types.HistoryRespondActivityTaskCompletedRequest, ...yarpc.CallOption) error
RespondActivityTaskFailed(context.Context, *types.HistoryRespondActivityTaskFailedRequest, ...yarpc.CallOption) error
RespondCrossClusterTasksCompleted(context.Context, *types.RespondCrossClusterTasksCompletedRequest, ...yarpc.CallOption) (*types.RespondCrossClusterTasksCompletedResponse, error)
RespondDecisionTaskCompleted(context.Context, *types.HistoryRespondDecisionTaskCompletedRequest, ...yarpc.CallOption) (*types.HistoryRespondDecisionTaskCompletedResponse, error)
RespondDecisionTaskFailed(context.Context, *types.HistoryRespondDecisionTaskFailedRequest, ...yarpc.CallOption) error
ScheduleDecisionTask(context.Context, *types.ScheduleDecisionTaskRequest, ...yarpc.CallOption) error
Expand Down
Loading

0 comments on commit eead0e5

Please sign in to comment.