Skip to content

Commit

Permalink
Add TaskListPartitionConfig message to proto
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Oct 14, 2024
1 parent a32159c commit f13e95d
Show file tree
Hide file tree
Showing 38 changed files with 1,126 additions and 481 deletions.
843 changes: 664 additions & 179 deletions .gen/proto/matching/v1/service.pb.go

Large diffs are not rendered by default.

268 changes: 137 additions & 131 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *clientImpl) AddActivityTask(
ctx context.Context,
request *types.AddActivityTaskRequest,
opts ...yarpc.CallOption,
) error {
) (*types.AddActivityTaskResponse, error) {
partition := c.loadBalancer.PickWritePartition(
request.GetDomainUUID(),
*request.GetTaskList(),
Expand All @@ -65,7 +65,7 @@ func (c *clientImpl) AddActivityTask(
request.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return err
return nil, err
}
return c.client.AddActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
}
Expand All @@ -74,7 +74,7 @@ func (c *clientImpl) AddDecisionTask(
ctx context.Context,
request *types.AddDecisionTaskRequest,
opts ...yarpc.CallOption,
) error {
) (*types.AddDecisionTaskResponse, error) {
partition := c.loadBalancer.PickWritePartition(
request.GetDomainUUID(),
*request.GetTaskList(),
Expand All @@ -84,7 +84,7 @@ func (c *clientImpl) AddDecisionTask(
request.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return err
return nil, err
}
return c.client.AddDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
}
Expand Down
138 changes: 70 additions & 68 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,74 +58,6 @@ func TestClient_withoutResponse(t *testing.T) {
mock func(*MockPeerResolver, *MockLoadBalancer, *MockClient)
wantError bool
}{
{
name: "AddActivityTask",
op: func(c Client) error {
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil)
},
},
{
name: "AddActivityTask - Error in resolving peer",
op: func(c Client) error {
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
},
wantError: true,
},
{
name: "AddActivityTask - Error while adding activity task",
op: func(c Client) error {
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(assert.AnError)
},
wantError: true,
},
{
name: "AddDecisionTask",
op: func(c Client) error {
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil)
},
},
{
name: "AddDecisionTask - Error in resolving peer",
op: func(c Client) error {
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
},
wantError: true,
},
{
name: "AddDecisionTask - Error while adding decision task",
op: func(c Client) error {
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(assert.AnError)
},
wantError: true,
},
{
name: "RespondQueryTaskCompleted",
op: func(c Client) error {
Expand Down Expand Up @@ -220,6 +152,76 @@ func TestClient_withResponse(t *testing.T) {
want any
wantError bool
}{
{
name: "AddActivityTask",
op: func(c Client) (any, error) {
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.AddActivityTaskResponse{}, nil)
},
want: &types.AddActivityTaskResponse{},
},
{
name: "AddActivityTask - Error in resolving peer",
op: func(c Client) (any, error) {
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
},
wantError: true,
},
{
name: "AddActivityTask - Error while adding activity task",
op: func(c Client) (any, error) {
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
wantError: true,
},
{
name: "AddDecisionTask",
op: func(c Client) (any, error) {
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.AddDecisionTaskResponse{}, nil)
},
want: &types.AddDecisionTaskResponse{},
},
{
name: "AddDecisionTask - Error in resolving peer",
op: func(c Client) (any, error) {
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", assert.AnError)
},
wantError: true,
},
{
name: "AddDecisionTask - Error while adding decision task",
op: func(c Client) (any, error) {
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
wantError: true,
},
{
name: "PollForActivityTask",
op: func(c Client) (any, error) {
Expand Down
4 changes: 2 additions & 2 deletions client/matching/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import (

// Client is the interface exposed by types service client
type Client interface {
AddActivityTask(context.Context, *types.AddActivityTaskRequest, ...yarpc.CallOption) error
AddDecisionTask(context.Context, *types.AddDecisionTaskRequest, ...yarpc.CallOption) error
AddActivityTask(context.Context, *types.AddActivityTaskRequest, ...yarpc.CallOption) (*types.AddActivityTaskResponse, error)
AddDecisionTask(context.Context, *types.AddDecisionTaskRequest, ...yarpc.CallOption) (*types.AddDecisionTaskResponse, error)
CancelOutstandingPoll(context.Context, *types.CancelOutstandingPollRequest, ...yarpc.CallOption) error
DescribeTaskList(context.Context, *types.MatchingDescribeTaskListRequest, ...yarpc.CallOption) (*types.DescribeTaskListResponse, error)
ListTaskListPartitions(context.Context, *types.MatchingListTaskListPartitionsRequest, ...yarpc.CallOption) (*types.ListTaskListPartitionsResponse, error)
Expand Down
14 changes: 8 additions & 6 deletions client/matching/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions client/templates/thrift.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ import (
func (g {{$decorator}}) {{$method.Declaration}} {
{{- if eq $method.Name "CountDLQMessages"}}
return nil, thrift.ToError(&types.BadRequestError{Message: "Feature not supported on TChannel"})
{{- else if or (eq $method.Name "AddDecisionTask") (eq $method.Name "AddActivityTask")}}
{{(index $method.Results 1).Name}} = g.c.{{$method.Name}}({{(index $method.Params 0).Name}}, thrift.From{{$prefix}}{{$Request}}({{(index $method.Params 1).Name}}), {{(index $method.Params 2).Pass}})
if {{(index $method.Results 1).Name}} != nil {
return nil, {{(index $method.Results 1).Name}}
}
return &types.{{$method.Name}}Response{}, nil
{{- else}}
{{- if eq (len $method.Params) 2}}
{{- if eq (len $method.Results) 1}}
Expand Down
8 changes: 4 additions & 4 deletions client/wrappers/errorinjectors/matching_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions client/wrappers/grpc/matching_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions client/wrappers/metered/matching_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions client/wrappers/metered/metered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ func TestMatching(t *testing.T) {
metricsClient := metrics.NewClient(testScope, metrics.ServiceIdx(0))

clientMock.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)
Return(&types.AddActivityTaskResponse{}, nil).Times(1)

retryableClient := NewMatchingClient(
clientMock,
metricsClient)

err := retryableClient.AddActivityTask(context.Background(), &types.AddActivityTaskRequest{
_, err := retryableClient.AddActivityTask(context.Background(), &types.AddActivityTaskRequest{
ForwardedFrom: "test",
TaskList: &types.TaskList{Name: "test"},
})
Expand Down Expand Up @@ -132,13 +132,13 @@ func TestMatching(t *testing.T) {
metricsClient := metrics.NewClient(testScope, metrics.ServiceIdx(0))

clientMock.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)
Return(&types.AddActivityTaskResponse{}, nil).Times(1)

retryableClient := NewMatchingClient(
clientMock,
metricsClient)

err := retryableClient.AddActivityTask(context.Background(), &types.AddActivityTaskRequest{
_, err := retryableClient.AddActivityTask(context.Background(), &types.AddActivityTaskRequest{
ForwardedFrom: "",
TaskList: &types.TaskList{Name: "test"},
})
Expand All @@ -153,13 +153,13 @@ func TestMatching(t *testing.T) {
metricsClient := metrics.NewClient(testScope, metrics.ServiceIdx(0))

clientMock.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)
Return(&types.AddActivityTaskResponse{}, nil).Times(1)

retryableClient := NewMatchingClient(
clientMock,
metricsClient)

err := retryableClient.AddActivityTask(context.Background(), &types.AddActivityTaskRequest{
_, err := retryableClient.AddActivityTask(context.Background(), &types.AddActivityTaskRequest{
ForwardedFrom: "",
TaskList: &types.TaskList{Name: common.ReservedTaskListPrefix + "test"},
})
Expand Down
Loading

0 comments on commit f13e95d

Please sign in to comment.